Going from generator coroutines to Kamaelia Components
May 17, 2009 at 02:09 AM | categories: python, oldblog | View Comments
Earlier this evening an announcement by Pete Fein regarding the formation of a python-concurrency mailing list (aka Python Concurrency Special Interest Group) bounced past my inbox in the unladen-swallow mailing list. Naturally, given my work on Axon (a concurrency library) and Kamaelia (a bunch of components that use it), it jumped out at me as interesting. (5 minute overview for those that don't know what Kamaelia is...)
Pete then posted to the list a small collection of different ways of implementing...
This struck me as a nice example worth comparing to Kamaelia - specifically to compare the generator version. Indeed the Kamaelia version is fairly directly derived from the generator version, which is why I'm picking on it. The full generator version looks like this:
So in order for this to be valid, we now need to adapt follow into Follow, grep into Grep, and printer into Printer.
Let's start by adapting follow:
ie rather than expecting to be called in a "call me & handle my values" loop, it just passes it's results out an outbox called "outbox". (much like tail -f spits it's results out of stdout)
The next part to convert is grep(). Recalling this, this looks like this:
class Grep(Axon.Component.component):
# Default pattern, override in constructor with pattern="some pattern"
# See below
pattern = "."
def main(self):
regex = re.compile(self.pattern)
while not self.dataReady("control"):
for l in self.Inbox("inbox"):
if regex.match(l):
self.send(l, "outbox")
self.pause()
yield 1
self.send(self.recv("control"), "signal")
Once again the overall logic is unchanged, and again the shutdown logic added is in blue.
However, we don't need to explicitly pass in a location for lines to enter this component. Specifically we just expect to be passed lines form out standard inbox "inbox", in the same way normal grep expects data on stdin.
Also, because we have a logical default for the pattern (match everything using "."), we don't need to have an __init__ method. The reason for this is because the baseclass for all components does this in its __init__ method:
So calling Grep(pattern=".*pants.*") will specialise the component value of pattern (The reason for this is it actively enables monkey-patching as reconfiguration, as you can see in this greylisting mail server).
Again Grep is actually a special case of a filter, and as a result really just wants to take data from its std inbox and pass matching stuff to its std outbox. With if you look at the core logic, is precisely what it does:
for l in self.Inbox("inbox"):
if regex.match(l):
self.send(l, "outbox")
After clearing its inbox, it pauses, and releases control to the scheduler (Pausing is a component's way of letting the scheduler know "don't call me unless there's some data for me to work on").
Finally, we need to implement the Printer() component. Again we can adapt this generator based version, thus:
Finally, in the same way the generator version puts the whole thing together:
In fact the Pipeline component creates linkages between the components in the pipeline. These are conceptually similar to stackless's channels (though inspired by unix pipes and occam). Implementation whise, a linkage actually collapses an inbox into an outbox. As a result, when Follow sends a message to it's outbox "outbox", that message is instantly delivered into the inbox "inbox" of Grep. Inboxes for generator based components are just plain old lists, and for threaded components they're threadsafe queues.
The nice thing about this, is that if you wanted to use this in a network server, you could do this:
You could also monitor this on the server console by adding in a local printer:
Anyway, for the curious reader, I hope this is a useful/interesting comparison of a non-kamaelia based approach with a kamaelia based approach, with a visible demonstration at the end of why I prefer the latter :) ... and all this ignores graphlines too :)
Pete then posted to the list a small collection of different ways of implementing...
#!/bin/sh... in python.
tail -f /var/log/system.log |grep pants
This struck me as a nice example worth comparing to Kamaelia - specifically to compare the generator version. Indeed the Kamaelia version is fairly directly derived from the generator version, which is why I'm picking on it. The full generator version looks like this:
import timeNow, this is nice, but the core logic:
import re
def follow(fname):
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
def grep(lines, pattern):
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
yield l
def printer(lines):
for l in lines:
print l.strip()
f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
tail -f /var/log/system.log |grep pantsHas somehow become obfuscated at the end - which is a shame, because it doesn't need to be. Taking the Kamaelia version step by step, let's take that end part:
f = follow('/var/log/system.log')... and turn it into something which looks like what you'd do in Kamaelia:
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
from Kamaelia.Chassis.Pipeline import Pipeline(incidentally, to actually use separate processes, ala shell, you'd use ProcessPipeline, not Pipeline)
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
Printer(),
).run()
So in order for this to be valid, we now need to adapt follow into Follow, grep into Grep, and printer into Printer.
Let's start by adapting follow:
def follow(fname):First of all, we note that this follow source blocks - specifically calling time.sleep. Now there are other ways of doing this, but the simplest way of keeping this code structure is to just create a threaded component. We also need to capture the argument (fname) which isn't optional and has no logical default, so we need to have an __init__ method. So we grab that, store it somewhere handy, and call the super class initialiser. Pretty standard stuff.
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
import AxonThe main() method of this component now follows the same core logic as the follow() generator, as highlighted in green. The logic added in blue, is simply control logic, and is fairly commonly used, which looks like this:
class Follow(Axon.ThreadedComponent.threadedcomponent):
def __init__(self, fname, **argv):
self.fname = fname
super(Follow,self).__init__(**argv)
def main(self):
f = file(self.fname)
f.seek(0,2) # go to the end
while not self.dataReady("control"):
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
self.send(l, "outbox")
f.close()
self.send(self.recv("control"), "signal")
while not self.dataReady("control"):This allows someone to shutdown our component cleanly. Other than that, the other major difference is that this:
<Body of generator>
self.send(self.recv("control"), "signal")
yield lHas been replaced with this:
self.send(l, "outbox")
ie rather than expecting to be called in a "call me & handle my values" loop, it just passes it's results out an outbox called "outbox". (much like tail -f spits it's results out of stdout)
The next part to convert is grep(). Recalling this, this looks like this:
def grep(lines, pattern):With the Kamaelia equivalent looking like this:
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
yield l
class Grep(Axon.Component.component):
# Default pattern, override in constructor with pattern="some pattern"
# See below
pattern = "."
def main(self):
regex = re.compile(self.pattern)
while not self.dataReady("control"):
for l in self.Inbox("inbox"):
if regex.match(l):
self.send(l, "outbox")
self.pause()
yield 1
self.send(self.recv("control"), "signal")
Once again the overall logic is unchanged, and again the shutdown logic added is in blue.
However, we don't need to explicitly pass in a location for lines to enter this component. Specifically we just expect to be passed lines form out standard inbox "inbox", in the same way normal grep expects data on stdin.
Also, because we have a logical default for the pattern (match everything using "."), we don't need to have an __init__ method. The reason for this is because the baseclass for all components does this in its __init__ method:
def __init__(self, *args, **argd):
..
self.__dict__.update(argd)
So calling Grep(pattern=".*pants.*") will specialise the component value of pattern (The reason for this is it actively enables monkey-patching as reconfiguration, as you can see in this greylisting mail server).
Again Grep is actually a special case of a filter, and as a result really just wants to take data from its std inbox and pass matching stuff to its std outbox. With if you look at the core logic, is precisely what it does:
for l in self.Inbox("inbox"):
if regex.match(l):
self.send(l, "outbox")
After clearing its inbox, it pauses, and releases control to the scheduler (Pausing is a component's way of letting the scheduler know "don't call me unless there's some data for me to work on").
Finally, we need to implement the Printer() component. Again we can adapt this generator based version, thus:
def printer(lines):Which transforms into this:
for l in lines:
print l.strip()
class Printer(Axon.Component.component):Again the core logic is the same (green), and control logic is blue. Again, rather than needing the explicit "lines" iterator, we have a standard place for data to come into the component - the inbox "inbox" - leaving the rest of the logic essentially unchanged.
def main(self):
while not self.dataReady("control"):
for l in self.Inbox("inbox"):
print l.strip()
self.pause()
yield 1
self.send(self.recv("control"), "signal")
Finally, in the same way the generator version puts the whole thing together:
f = follow('/var/log/system.log')We can put the Kamaelia components together:
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
Pipeline(This is very different. The generator version creates a chain of iterators, passing them in as the first parameter to the next one in the chain, with the last item in the chain (the for loop) being the one that pulls things along. The Kamaelia version instantiates a Pipeline component which is very clearly piping the outputs from one component into the inputs of the following one.
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
Printer(),
).run()
In fact the Pipeline component creates linkages between the components in the pipeline. These are conceptually similar to stackless's channels (though inspired by unix pipes and occam). Implementation whise, a linkage actually collapses an inbox into an outbox. As a result, when Follow sends a message to it's outbox "outbox", that message is instantly delivered into the inbox "inbox" of Grep. Inboxes for generator based components are just plain old lists, and for threaded components they're threadsafe queues.
The nice thing about this, is that if you wanted to use this in a network server, you could do this:
from Kamaelia.Chassis.ConnectedServer import ServerCoreThough that opens a Follow & Grep component for every client. To use just one Follow client, you could just follow and publish the data for all clients:
def FollowPantsInMyLogs(*args):
return Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
)
ServerCore(protocol=FollowPantsInMyLogs, port=8000).run()
from Kamaelia.Util.Backplane import *
Backplane("PANTSINMYLOGS").activate()
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
PublishTo("PANTSINMYLOGS"),
).activate()
def clientProtocol(*args):
return SubscribeTo("PANTSINMYLOGS")
ServerCore(protocol=clientProtocol, port=8000).run()
You could also monitor this on the server console by adding in a local printer:
Backplane("PANTSINMYLOGS").activate()... which all fairly naturally describes the higher level co-ordination going on. Now you can do all this from the ground up using plain old generators, but personally I find this approach easier to follow. Some people find others simple :)
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
PublishTo("PANTSINMYLOGS"),
).activate()
Pipeline(
SubscribeTo("PANTSINMYLOGS"),
Printer(),
).activate()
def clientProtocol(*args):
return SubscribeTo("PANTSINMYLOGS")
ServerCore(protocol=clientProtocol, port=8000).run()
Anyway, for the curious reader, I hope this is a useful/interesting comparison of a non-kamaelia based approach with a kamaelia based approach, with a visible demonstration at the end of why I prefer the latter :) ... and all this ignores graphlines too :)