New Tool for helping debugging Kamaelia Systems
June 08, 2009 at 06:17 PM | categories: python, oldblog | View CommentsUse in a system
This allows a variety of things, from a basic command line console:StandaloneInterpreter().run()Which you can run in the background on any console. For example you could do
this:
ServerCore(protocol=myprotocol, port=1234).activate()Alternatively, you can use an embeddable component that speaks to inbox/outbox rather than stdin/stdout. Crudely you can do something like this:
StandaloneInterpreter().run()
Pipeline(But you can also put it inside a pygame application, reading & writing from a
ConsoleReader(),
InterpreterTransformer(),
ConsoleEchoer(),
).run()
Textbox/TextDisplayer:
Pipeline(
Textbox(size = (800, 300), position = (100,380)),
InterpreterTransformer(),
TextDisplayer(size = (800, 300), position = (100,40)),
).run()
This looks like this:
The interesting option this opens up of course is the fact that you can add in a networked interpreter (note: this is hideously insecure incidentally - so only safe of a network you completely control!) into any application:
from Kamaelia.Chassis.Pipeline import PipelineIn one console:
from Kamaelia.Chassis.ConnectedServer import ServerCore
from Kamaelia.Util.PureTransformer import PureTransformer
from Kamaelia.Experimental.PythonInterpreter import InterpreterTransformer
def NetInterpreter(*args, **argv):
return Pipeline(
PureTransformer(lambda x: str(x).rstrip()),
InterpreterTransformer(),
PureTransformer(lambda x: str(x)+"\r\n>>> "),
)
ServerCore(protocol=NetInterpreter, port=1236).run()
# ./ServerBasedPythonInterpreter.pyIn another:
~> telnet 127.0.0.1 1236
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
>>> self
Component
Kamaelia.Experimental.PythonInterpreter.InterpreterTransformer_21 [ inboxes :
{'control': [], 'inbox': []} outboxes : {'outbox': [], 'signal': []}
>>> "hello world"
hello world
>>> Axon
<module 'Axon'
from '/usr/local/lib/python2.5/site-packages/Axon/__init__.pyc'>
>>> dir(self)
['Inbox', 'Inboxes', 'Outboxes', 'Usescomponents', '_AxonObject_...
>>> self.scheduler
Axon.Scheduler.scheduler_1 :1 :1 :0 :
>>> self.scheduler.threads
{<Kamaelia.Internet.TCPServer.TCPServer object at 0xb7b972cc>: <object object
at 0xb7c92468>,
<Kamaelia.Internet.ConnectedSocketAdapter.ConnectedSocketAdapter object at
0xb7ba4acc>: <object object at 0xb7c92468>,
<Kamaelia.Experimental.PythonInterpreter.InterpreterTransformer object at
0xb7ba6bec>: <object object at 0xb7c92468>,
<Kamaelia.Chassis.Pipeline.Pipeline object at 0xb7bafc2c>: <object object at
0xb7c92468>, <Kamaelia.Experimental.PythonInterpreter.InterpreterTransformer
object at 0xb7b9dc4c>: <object object at 0xb7c92468>,
<Kamaelia.Util.PureTransformer.PureTransformer object at 0xb7baf86c>: <object
object at 0xb7c92460>, <Kamaelia.Chassis.ConnectedServer.ServerCore object at
0xb7b8acec>: <object object at 0xb7c92468>,
<Kamaelia.Chassis.Pipeline.Pipeline object at 0xb7ba1d0c>: <object object at
0xb7c92468>, <Kamaelia.Util.PureTransformer.PureTransformer object at
0xb7ba192c>: <object object at 0xb7c92468>,
<Kamaelia.Util.PureTransformer.PureTransformer object at 0xb7b9d96c>: <object
object at 0xb7c92468>, <Kamaelia.Internet.Selector.Selector object at
0xb7b979cc>: <object object at 0xb7c92468>}
etc.
Rummaging Around inside Running Systems
The idea behind this component really is to assist in debugging live running systems by directly rummaging around inside them, which is why this form is probably most useful:if interactive_debug:And whilst the other forms naturally drop out as useful, this form is probably the safest of the bunch!
StandaloneInterpreter().activate()
MyMainApplication().run()
This form also is slightly more flexible, in that it allows this sort of thing:
~/> ./BasicPythonInterpreter.py
> from Kamaelia.UI.Pygame.Ticker import Ticker
ok
> X=Ticker()
ok
Component Kamaelia.UI.Pygame.Ticker.Ticker_7 [ inboxes : {'control':
[], '_displaycontrol': [], 'unpausebox': [], 'pausebox': [], 'inbox':
[], 'alphacontrol': []} outboxes : {'outbox': <>, 'signal':
<>, '_displaysignal': <>}
> X.activate()
ok
Component Kamaelia.UI.Pygame.Ticker.Ticker_7 [ inboxes : {'control':
[], '_displaycontrol': [], 'unpausebox': [], 'pausebox': [], 'inbox':
[''], 'alphacontrol': []} outboxes : {'outbox': <>, 'signal':
<>, '_displaysignal': <>}
> self.link((self, "outbox"), (X,"inbox"))
ok
Link( source:
[Kamaelia.Experimental.PythonInterpreter.StandaloneInterpreter_5,outbox],
sink:[Kamaelia.UI.Pygame.Ticker.Ticker_7,inbox] )
> self.send("Hello", "outbox")
ok
As you can guess this then results the text displayer outputting to the display.
As another example, if you're rummaging inside a system, you could start up an introspector in another console:
./AxonVisualiser.py --port 1500And then in the enbedded interpreter do this:
> from Kamaelia.Util.Introspector import IntrospectorAnd that then adds an Axon visualisation/introspector to a running system - which looks like this:
ok
> from Kamaelia.Chassis.Pipeline import Pipeline
ok
> from Kamaelia.Internet.TCPClient import TCPClient
ok
> Pipeline(
Introspector(),
TCPClient("127.0.0.1", 1500),
).activate()
> > >
ok
Clearly this can aid in identifying when something has gone wrong.
Hope others find it useful too :-)
Going from generator coroutines to Kamaelia Components
May 17, 2009 at 02:09 AM | categories: python, oldblog | View CommentsPete then posted to the list a small collection of different ways of implementing...
#!/bin/sh... in python.
tail -f /var/log/system.log |grep pants
This struck me as a nice example worth comparing to Kamaelia - specifically to compare the generator version. Indeed the Kamaelia version is fairly directly derived from the generator version, which is why I'm picking on it. The full generator version looks like this:
import timeNow, this is nice, but the core logic:
import re
def follow(fname):
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
def grep(lines, pattern):
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
yield l
def printer(lines):
for l in lines:
print l.strip()
f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
tail -f /var/log/system.log |grep pantsHas somehow become obfuscated at the end - which is a shame, because it doesn't need to be. Taking the Kamaelia version step by step, let's take that end part:
f = follow('/var/log/system.log')... and turn it into something which looks like what you'd do in Kamaelia:
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
from Kamaelia.Chassis.Pipeline import Pipeline(incidentally, to actually use separate processes, ala shell, you'd use ProcessPipeline, not Pipeline)
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
Printer(),
).run()
So in order for this to be valid, we now need to adapt follow into Follow, grep into Grep, and printer into Printer.
Let's start by adapting follow:
def follow(fname):First of all, we note that this follow source blocks - specifically calling time.sleep. Now there are other ways of doing this, but the simplest way of keeping this code structure is to just create a threaded component. We also need to capture the argument (fname) which isn't optional and has no logical default, so we need to have an __init__ method. So we grab that, store it somewhere handy, and call the super class initialiser. Pretty standard stuff.
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
import AxonThe main() method of this component now follows the same core logic as the follow() generator, as highlighted in green. The logic added in blue, is simply control logic, and is fairly commonly used, which looks like this:
class Follow(Axon.ThreadedComponent.threadedcomponent):
def __init__(self, fname, **argv):
self.fname = fname
super(Follow,self).__init__(**argv)
def main(self):
f = file(self.fname)
f.seek(0,2) # go to the end
while not self.dataReady("control"):
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
self.send(l, "outbox")
f.close()
self.send(self.recv("control"), "signal")
while not self.dataReady("control"):This allows someone to shutdown our component cleanly. Other than that, the other major difference is that this:
<Body of generator>
self.send(self.recv("control"), "signal")
yield lHas been replaced with this:
self.send(l, "outbox")
ie rather than expecting to be called in a "call me & handle my values" loop, it just passes it's results out an outbox called "outbox". (much like tail -f spits it's results out of stdout)
The next part to convert is grep(). Recalling this, this looks like this:
def grep(lines, pattern):With the Kamaelia equivalent looking like this:
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
yield l
class Grep(Axon.Component.component):
# Default pattern, override in constructor with pattern="some pattern"
# See below
pattern = "."
def main(self):
regex = re.compile(self.pattern)
while not self.dataReady("control"):
for l in self.Inbox("inbox"):
if regex.match(l):
self.send(l, "outbox")
self.pause()
yield 1
self.send(self.recv("control"), "signal")
Once again the overall logic is unchanged, and again the shutdown logic added is in blue.
However, we don't need to explicitly pass in a location for lines to enter this component. Specifically we just expect to be passed lines form out standard inbox "inbox", in the same way normal grep expects data on stdin.
Also, because we have a logical default for the pattern (match everything using "."), we don't need to have an __init__ method. The reason for this is because the baseclass for all components does this in its __init__ method:
def __init__(self, *args, **argd):
..
self.__dict__.update(argd)
So calling Grep(pattern=".*pants.*") will specialise the component value of pattern (The reason for this is it actively enables monkey-patching as reconfiguration, as you can see in this greylisting mail server).
Again Grep is actually a special case of a filter, and as a result really just wants to take data from its std inbox and pass matching stuff to its std outbox. With if you look at the core logic, is precisely what it does:
for l in self.Inbox("inbox"):
if regex.match(l):
self.send(l, "outbox")
After clearing its inbox, it pauses, and releases control to the scheduler (Pausing is a component's way of letting the scheduler know "don't call me unless there's some data for me to work on").
Finally, we need to implement the Printer() component. Again we can adapt this generator based version, thus:
def printer(lines):Which transforms into this:
for l in lines:
print l.strip()
class Printer(Axon.Component.component):Again the core logic is the same (green), and control logic is blue. Again, rather than needing the explicit "lines" iterator, we have a standard place for data to come into the component - the inbox "inbox" - leaving the rest of the logic essentially unchanged.
def main(self):
while not self.dataReady("control"):
for l in self.Inbox("inbox"):
print l.strip()
self.pause()
yield 1
self.send(self.recv("control"), "signal")
Finally, in the same way the generator version puts the whole thing together:
f = follow('/var/log/system.log')We can put the Kamaelia components together:
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
Pipeline(This is very different. The generator version creates a chain of iterators, passing them in as the first parameter to the next one in the chain, with the last item in the chain (the for loop) being the one that pulls things along. The Kamaelia version instantiates a Pipeline component which is very clearly piping the outputs from one component into the inputs of the following one.
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
Printer(),
).run()
In fact the Pipeline component creates linkages between the components in the pipeline. These are conceptually similar to stackless's channels (though inspired by unix pipes and occam). Implementation whise, a linkage actually collapses an inbox into an outbox. As a result, when Follow sends a message to it's outbox "outbox", that message is instantly delivered into the inbox "inbox" of Grep. Inboxes for generator based components are just plain old lists, and for threaded components they're threadsafe queues.
The nice thing about this, is that if you wanted to use this in a network server, you could do this:
from Kamaelia.Chassis.ConnectedServer import ServerCoreThough that opens a Follow & Grep component for every client. To use just one Follow client, you could just follow and publish the data for all clients:
def FollowPantsInMyLogs(*args):
return Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
)
ServerCore(protocol=FollowPantsInMyLogs, port=8000).run()
from Kamaelia.Util.Backplane import *
Backplane("PANTSINMYLOGS").activate()
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
PublishTo("PANTSINMYLOGS"),
).activate()
def clientProtocol(*args):
return SubscribeTo("PANTSINMYLOGS")
ServerCore(protocol=clientProtocol, port=8000).run()
You could also monitor this on the server console by adding in a local printer:
Backplane("PANTSINMYLOGS").activate()... which all fairly naturally describes the higher level co-ordination going on. Now you can do all this from the ground up using plain old generators, but personally I find this approach easier to follow. Some people find others simple :)
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
PublishTo("PANTSINMYLOGS"),
).activate()
Pipeline(
SubscribeTo("PANTSINMYLOGS"),
Printer(),
).activate()
def clientProtocol(*args):
return SubscribeTo("PANTSINMYLOGS")
ServerCore(protocol=clientProtocol, port=8000).run()
Anyway, for the curious reader, I hope this is a useful/interesting comparison of a non-kamaelia based approach with a kamaelia based approach, with a visible demonstration at the end of why I prefer the latter :) ... and all this ignores graphlines too :)
Bar Camp Leeds UK
May 05, 2009 at 09:56 PM | categories: python, oldblog | View Comments- May 30th, May 31st
- Old Broadcasting House, Leeds, UK
- Website: http://barcampleeds.com/
Schedulers matter
March 04, 2009 at 10:26 AM | categories: python, oldblog | View Commentsscheduler.install(self.messageLoop())If you delve inside the fibra scheduler (which doesn't appear to be here unexpectedly) you see the following core:
# self.MessageLoop is a regular python generator
...
yield self.incrementCounter()
yield kickTo.messageQueue.push(self)
def tick(self):The core of this appears to be "when I'm done, do this later" next. If you think that's familiar, it should be - its very similar (at least conceptually) to what twisted does with deferreds. It's not identical, but similar. So what does this mean for the hacksack demo? Well, if we look at self.tasks after each run, by changing:
"""Iterates the scheduler, running all tasks and calling all
handlers.
"""
active = False
for handler in self.handlers:
active = handler.is_waiting() or active
active = (len(self.tasks) > 0) or active
tasks = []
while True:
try:
task, send_value = self.tasks.pop(0)
except IndexError, e:
break
try:
if isinstance(send_value, Exception):
r = task.throw(send_value)
else:
r = task.send(send_value)
except StopIteration, e:
r = e
if r is None:
tasks.append((task, None))
else:
try:
handler = self.type_handlers[type(r)]
except KeyError:
raise RuntimeError("Don't know what to do with yielded type: %s" % type(r))
handler.handle(r, task)
self.tasks[:] = tasks
return active
def run(self):to:
while self.tick():
pass
def run(self):It becomes apparent what's happening (though it's fairly obvious from above):
while self.tick():
print "TASKS", [x[0] for x in self.tasks]
TASKS [<generator object at 0xb7b04fcc>]Fundamentally, the reason it's quicker for two reasons:
TASKS []
TASKS [<generator object at 0xb780f94c>]
TASKS []
TASKS [<generator object at 0xb7a1b04c>]
TASKS []
TASKS [<generator object at 0xb776fcac>]
TASKS []
TASKS [<generator object at 0xb79327ac>]
TASKS []
TASKS [<generator object at 0xb7b0ff2c>]
TASKS []
- It always knows which generator is ready to run next.
- It also defaults to pausing the generator, unless it specifically asks to be run. ie the tasks default to being passive.
- The sender knows who the receiver is. This allows the sender to schedule the receiver explicitly.
- Is effectively a round robin scheduler - essentially for simplicity. The key question we wanted to ask was "does a component model based around inboxes/outboxes make it easier to make easier to write/reuse concurrent software", rather than "how can we build a faster, more responsive scheduler". As a result the round robin scheduler was always a compromise. There's a little bit of intelligence in there, but not much.
- Kamaelia's components default to active, rather than passive. That is they're expected to run continuously unless explicitly paused. This design decision impacts the scheduler as a whole.
- The sender does not know who the receiver is. This requires something different to happen in the scheduler. On the upside, it means that code can be tested easier in isolation, or reused in situations it wasn't originally expected to be used within.
Beyond all that though, fibra looks neat :)
The 5 minute (version), not the full half hour...
February 26, 2009 at 11:15 PM | categories: python, oldblog | View CommentsAs you may know Kamaelia is a project I'm rather familar with (ok understatement), but one of the key ideas behind it is about how to make concurrency usable for everyday problems (from greylisting, database modelling assistance, though to learning to read & write). No one concurrency method suits everyone - as Jesse's recent series of posts shows (if you've not read them they're well worth it :), but Kamaelia's approach fits my head, and hopefully yours too. There may be a few areas where nicer syntactic sugar may be appropriate and so on, and feedback is always welcome - preferably to the project's google group/mailing list.
OK, the point of this post is that recently at O'Reilly Ignite UK North I gave a 5 minute talk which was essentially "what we've learnt about practical concurrency", and by necessity of time it's a whistle stop tour. The slides are on slideshare, and the video is on blip.tv - along with the rest of the talks - and by the magic of the internet they're below as well.
Embracing Concurrency (slides)
Embracing Concurrency (video)
Many thanks to Imran Ali of Carbon Imagineering and Craig Smith of O'Reilly GMT for organising a great evening of interesting talks!
Actor Model vs Denial of Service
November 17, 2008 at 03:39 AM | categories: python, oldblog | View CommentsCommunications among actors occur asynchronously: that is, the sending actor does not wait until the message is received before proceeding with computation.The difference between Kamaelia and the Actor Model is in a few places, but possibly the most fundamental is this:
(this summary courtesy of the Wikipedia page on the Actor Model, but the literature supports this view as well)
- With the actor model (as I understand it)
- You have a mailbox where you receive messages. ("recv from inbox")
- communications defaults to unidirectional, though you can build bi-directional
- When you send a message, you know the recipient and send it direct to the recipient. ("send to bob")
- Message sending is asynchronous
- ie sender knows receiver, receiver does not (necessarily) know sender
- This potentially introduces coupling in ways that makes co-ordination languages harder to build
- With Kamaelia's model:
- You receive messages from multiple named inboxes (ala receiving data on stdin, or receiving signals)
- communications defaults to unidirectional, though you can build bi-directional
- You send messages to named outboxes (ala sending to stdout, stderr)
- A higher level co-ordination language (effectively) joins the dots between outboxes to inboxes (generally)
- Message sending defaults to asynchronous, but you can define a "pipewidth" or "max number of messages in transit" to allow synchrony, where needed (such as a producer that produces data faster than the consumer can consume it)
- ie sender does NOT know receiver, receiver does not (necessarily) know sender
- Much like a CPU doesn't know whether it's plugged into a motherboard or a testing harness for example.
- This defaults to needing a co-ordination language - but this encourages greater reusability of components, through a dataflow model
- I say "kamaelia's" model here, but this is pretty similar to hardware, unix pipes, etc.
- Do they do the literal, and solely asynchronous thing? (ie always return immediately without error when you send a message)
- Or do they introduce the ability to add in synchrony where necessary? (block or return an exception when a maximum inbox size is reached)
So, the question I really have is this: if you use or implement an actor model system, do you have any ability in your implementation to be able to say "maximum number of pending incoming messages" ? If you don't, then it is remarkably easy to write code that can break an actor based system by mistake, with problems in dealing with that - making code more complex, and less reusable.
I'd really be interested in hearing both positives and negatives here...
Simple Chat Server in Kamaelia
November 13, 2008 at 11:00 AM | categories: python, oldblog | View CommentsWell, the reason there wasn't an example of that up is because it's a really trivial example in Kamaelia. The most basic version for example looks like this:
from Kamaelia.Util.Backplane import Backplane, PublishTo, SubscribeToA slightly more interesting version, which at least tells clients who they're talking to, and also has slightly better (more explicit) structure is this:
from Kamaelia.Chassis.ConnectedServer import ServerCore
from Kamaelia.Chassis.Pipeline import Pipeline
Backplane("CHAT").activate() # This handles the sharing of data between clients
def ChatClient(*argv, **argd):
return Pipeline(
PublishTo("CHAT"), # Take data from client and publish to all clients
SubscribeTo("CHAT"), # Take data from other clients and send to our client
)
print "Chat server running on port 1501"
ServerCore(protocol = ChatClient, port=1501).run()
#!/usr/bin/pythonTo try this yourself:
from Kamaelia.Util.Backplane import Backplane, PublishTo, SubscribeTo
from Kamaelia.Chassis.ConnectedServer import ServerCore
from Kamaelia.Chassis.Pipeline import Pipeline
from Kamaelia.Util.PureTransformer import PureTransformer
Backplane("CHAT").activate() # This handles the sharing of data between clients
def ChatClient(*argc, **argd):
peer = argd["peer"]
peerport = argd["peerport"]
return Pipeline(
PureTransformer(lambda x: " %s:%s says %s" % (str(peer), str(peerport), str(x))),
PublishTo("CHAT"), # Take data from client and publish to all clients
# ----------------------
SubscribeTo("CHAT"), # Take data from other clients and send to our client
PureTransformer(lambda x: "Chat:"+ str(x).strip()+"\n"),
)
class ChatServer(ServerCore):
protocol = ChatClient
print "Chat server running on port 1501"
ChatServer(port=1501).run()
- sudo easy_install Kamaelia
- curl -O http://kamaelia.googlecode.com/svn/trunk/Sketches/MPS/Examples/SimpleChatServer.py
- ./SimpleChatServer.py
A nice Pygame based client for this looks like this incidentally:
from Kamaelia.Chassis.Pipeline import PipelineTo run that (assuming you have pygame installed):
from Kamaelia.UI.Pygame.Text import Textbox, TextDisplayer
from Kamaelia.Internet.TCPClient import TCPClient
Pipeline(
Textbox(size = (800, 300), position = (100,380)),
TCPClient("127.0.0.1", 1501),
TextDisplayer(size = (800, 300), position = (100,40))
).run()
- sudo easy_install Kamaelia
- curl -O http://kamaelia.googlecode.com/svn/trunk/Sketches/MPS/Examples/SimpleChatClient.py
- ./SimpleChatClient.py
Now there's clearly a lot interesting directions you can take this, but as you can see, this is a relatively simple starting point. For something more complex, there is a basic P2P splitting radio system in our subversion tree. It's just over a 100 lines long for the source side (ie capturing radio off air and serving it), and the client is a similar size (has a playback component rather than capture one). The code for those two examples is here:
The two examples actually contain a lot of common code, so we could extract the common code and have two smaller examples, but like this the files are standalone, which is pretty nice.
Anyway, the point of this post was "what sort of examples would you like to see?" and I'm really interested in any feedback :-)
Have fun :)
A post per day in November
November 02, 2008 at 02:38 PM | categories: python, oldblog | View CommentsThe reason for it being that form is because it is significantly clearer than text.
The world is big enough
October 30, 2008 at 02:35 PM | categories: python, oldblog | View Comments"I believe there is room within Python as a language, and CPython as an implementation of that language - for a virtual buffet of helpful libraries for concurrency and distributed systems."He's absolutely right - if we only have hammers, every solution involves
I'd write more, but fluey at the moment.
Jesse's comment did remind me of this though:
"The best way to have a good idea is to have a lot of ideas." Linus Pauling.It also reminds me of my personal view on things like this - the world would be very boring if we all agreed and nobody ever did anything new or different.
Bar Camp Liverpool
October 26, 2008 at 11:42 PM | categories: python, oldblog | View Commentsthe people who attend, and Pythonistas are an interesting and varied lot, hence posting this here. Key info:
- http://www.barcampliverpool.com/
- 6th-7th December
If there's interest, and if it seems appropriate, could hold a Kamaelia sprint that weekend ?
« Previous Page -- Next Page »