Restarting Python Northwest. 24th Sept
September 14, 2009 at 11:36 PM | categories: python, oldblog | View CommentsDetails:
- When: Thursday 24th September, 6pm
- Where: Rain Bar, Manchester, UK:
- http://www.rain-bar.co.uk/
- 80 Great Bridgewater Street, Manchester, M1 5JG
- map: http://tinyurl.com/mn4qnl
- Who: Who can come? If you're reading this YOU can (assuming you're sufficiently close :-)
More specifically anyone from beginners, the inexperienced through deeply experienced and all the way back to the plain py-curious.
- What: Suggestion is to start off with a social meet, and chat about stuff we've found interesting/useful/fun with python recently. Topics likely to include robots and audio generation, the recent unconference, and europython.
- Twitter: http://twitter.com/pynw / #pythonnorthwest / #pynw
- Mailing list: http://groups.google.com/group/python-north-west
How did this happen? I tweeted the idea, a couple of others seconded it, the David Jones pointed out "it easier to arrange for a specific 2 people to meet than it was to e-mail a vague cloud of people and get _any_ 2 to meet anywhere.", so that's where we'll be.
If twitter feedback is anything go by, we're hardly going to be alone, so please come along - the more the merrier :-) Better yet, please reply to this post saying you're coming along!
More generally, assuming this continues, pynw will probably be every third thursday in the month, maybe alternating between technical meets and social ones. (probably topic for discussion :-)
Please forward this to anyone you think may be interested!
See you there!
Traffic Server to be Open Source?!
July 07, 2009 at 12:02 PM | categories: python, oldblog | View CommentsThat's pretty much made my day that has.
Europython Videos Transcoding
July 07, 2009 at 01:02 AM | categories: python, oldblog | View CommentsFortunately/obviously, I'm automating this, and it'll come as no shock to some that I'm automating it using kamaelia. This automation needs to to be stoppable, since I need to only do this overnight, for practicality reasons.
Anyway, for those curious, this is the code I'm using to do the transcode & upload. You'll note that it saturates my CPU, keeping both cores busy. Also, it's interleaving an IO bound process (ftp upload) with CPU bound - transcode.
import os
import re
import Axon
from Kamaelia.Chassis.Graphline import Graphline
from Kamaelia.Chassis.Pipeline import Pipeline
class Find(Axon.Component.component):
path = "."
walktype = "a"
act_like_find = True
def find(self, path = ".", walktype="a"):
if walktype == "a":
addfiles = True
adddirs = True
elif walktype == "f":
addfiles = True
adddirs = False
elif walktype == "d":
adddirs = True
addfiles = False
deque = []
deque.insert(0, (os.path.join(path,x) for x in os.listdir(path)) )
while len(deque)>0:
try:
fullentry = deque[0].next()
if os.path.isdir(fullentry):
if adddirs:
yield fullentry
try:
X= [os.path.join(fullentry,x) for x in os.listdir(fullentry)]
deque.insert(0, iter(X))
except OSError:
if not self.act_like_find:
raise
elif os.path.isfile(fullentry):
if addfiles:
yield fullentry
except StopIteration:
deque.pop(0)
def main(self):
gotShutdown = False
for e in self.find(path = self.path, walktype=self.walktype):
self.send(e, "outbox")
yield 1
if self.dataReady("control"):
gotShutdown = True
break
if not gotShutdown:
self.send(Axon.Ipc.producerFinished(), "signal")
else:
self.send(self.recv("control"), "signal")
class Sort(Axon.Component.component):
def main(self):
dataset = []
while 1:
for i in self.Inbox("inbox"):
dataset.append(i)
if self.dataReady("control"):
break
self.pause()
yield 1
dataset.sort()
for i in dataset:
self.send(i, "outbox")
yield 1
self.send(self.recv("control"), "signal")
class Grep(Axon.Component.component):
pattern = "."
invert = False
def main(self):
match = re.compile(self.pattern)
while 1:
for i in self.Inbox("inbox"):
if match.search(i):
if not self.invert:
self.send(i, "outbox")
else:
if self.invert:
self.send(i, "outbox")
if self.dataReady("control"):
break
self.pause()
yield 1
self.send(self.recv("control"), "signal")
class TwoWayBalancer(Axon.Component.component):
Outboxes=["outbox1", "outbox2", "signal1","signal2"]
def main(self):
c = 0
while 1:
yield 1
for job in self.Inbox("inbox"):
if c == 0:
dest = "outbox1"
else:
dest = "outbox2"
c = (c + 1) % 2
self.send(job, dest)
job = None
if not self.anyReady():
self.pause()
if self.dataReady("control"):
break
R=self.recv("control")
self.send(R, "signal1")
self.send(R, "signal2")
class Transcoder(Axon.ThreadedComponent.threadedcomponent):
command = 'ffmpeg >transcode.log 2>&1 -i "%(SOURCEFILE)s" -s 640x360 -vcodec mpeg4 -acodec copy -vb 1500000 %(ENCODINGNAME)s'
def main(self):
while 1:
for sourcefile in self.Inbox("inbox"):
shortname = os.path.basename(sourcefile)
encoding_name = shortname.replace(".mp4", ".avi")
finalname = sourcefile.replace(".mp4", ".avi")
# Do the actual transcode
print "TRANSCODING", sourcefile, encoding_name
os.system( self.command % {"SOURCEFILE": sourcefile, "ENCODINGNAME":encoding_name})
# file is transcoded, move to done
print "MOVING DONE FILE", sourcefile, os.path.join("done", sourcefile)
os.rename(sourcefile, os.path.join("done", sourcefile))
# Move encoded version to upload queue
upload_name = os.path.join( "to_upload", encoding_name)
print "MOVING TO UPLOAD QUEUE", encoding_name, upload_name
os.rename(encoding_name, upload_name )
# And tell the encoder to upload it please
print "SETTING OFF UPLOAD",upload_name, finalname
self.send( (upload_name, finalname), "outbox")
print "-----------------"
if self.dataReady("control"):
break
self.send(self.recv("control"), "signal")
class Uploader(Axon.ThreadedComponent.threadedcomponent):
command = "ftpput --server=%(HOSTNAME)s --verbose --user=%(USERNAME)s --pass=%(PASSWORD)s --binary --passive %(UPLOADFILE)s"
username = < editted :-) >
password = < editted :-) >
hostname = "ftp.blip.tv"
def main(self):
while 1:
for (upload_name, finalname) in self.Inbox("inbox"):
print "UPLOADING", upload_name
os.system( self.command % {
"HOSTNAME":self.hostname,
"USERNAME":self.username,
"PASSWORD":self.password,
"UPLOADFILE":upload_name,
} )
print "MOVING", upload_name, "TO", os.path.join("encoded", finalname)
os.rename(upload_name, os.path.join("encoded", finalname))
print "-----------------"
if self.dataReady("control"):
break
if not self.anyReady():
self.pause()
self.send(self.recv("control"), "signal")
Graphline(
FILES = Pipeline(
Find(path=".",walktype="f"),
Sort(),
Grep(pattern="(done|encoded|unsorted|transcode.log|to_upload)",
invert = True),
),
SPLIT = TwoWayBalancer(), # Would probably be nicer as a customised PAR chassis
CONSUME1 = Pipeline(
Transcoder(),
Uploader(),
),
CONSUME2 = Pipeline(
Transcoder(),
Uploader(),
),
linkages = {
("FILES","outbox"):("SPLIT","inbox"),
("SPLIT","outbox1"):("CONSUME1","inbox"),
("SPLIT","outbox2"):("CONSUME2","inbox"),
("FILES","signal"):("SPLIT","control"),
("SPLIT","signal1"):("CONSUME1","control"),
("SPLIT","signal2"):("CONSUME2","control"),
}
).run()
It should be fairly clear that this will go as fast as it can, so please be patient :-)
Autoloading in python
June 21, 2009 at 04:14 PM | categories: python, oldblog | View CommentsIf it seems chaotic, consider the Unix PATH variable. Any time you type a name, the shell looks in lots of locations and runs the first one it finds. That's effectively the same sort of idea as autoloading. Yes, you can do some really nasty magic if you want, but then you can do that with the shell to, and generally people get along find.
Anyway, vaguely curious about it I decided to do some digging around, and came across this post by Leif K Brookes, which suggests this:
You could wrap it in an object, but that's a bit of a hack.That looked reasonable, so I created a file mymod.py which looks like this:
import sys
class Foo(object):
def __init__(self, wrapped):
self.wrapped = wrapped
def __getattr__(self, name):
try:
return getattr(self.wrapped, name)
except AttributeError:
return 'default'
sys.modules[__name__] = Foo(sys.modules[__name__])
import sysAnd tried using it like this:
def greet(greeting="Hello World"):
print greeting
class mymod_proxy(object):
def __init__(self):
super(mymod_proxy, self).__init__()
self.wrapped = sys.modules["mymod"]
def __getattr__(self, name):
try:
return getattr(self.wrapped, name)
except AttributeError:
def f():
greet(name)
return f
sys.modules["mymod"] = mymod()
~> pythonAnd as you can see, it seems to work as expected/desired.
Python 2.5.1 (r251:54863, Jan 10 2008, 18:01:57)
[GCC 4.2.1 (SUSE Linux)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import mymod
>>> mymod.hello()
hello
>>> from mymod import Hello_World
>>> Hello_World()
Hello_World
Now the reason I'd been thinking about this, is because I'd like to retain the hierarchy of components in Kamaelia that we have at the moment (it's useful for navigating what's where), but given we tend to use them in a similar way to Unix pipelines it's natural to want to be able to do something like:
from Kamaelia import Pipeline, ConsoleReader, ConsoleWriterRather than the more verbose form specifically pulling them in from particular points. Likewise, we don't really want to import every single module in Kamaelia.py, because of the large number of components there (Kamaelia is really a toolbox IMO where things get wired together, and Axon is the tool for making new tools), the majority of which won't be used in ever application!
Pipeline(
ConsoleReader(),
ConsoleWriter(),
).run()
Now, I haven't done this yet, and wouldn't do it lightly, but the fact that you can actually make autoload functionality work, seems kinda cool, and and a nice opportunity. But I'm also now wondering just how nasty this approach seems to people. After all, Leif describes it as "a bit of a hack", and whilst it's neat, I'm not taking in the positive view. I'm interested in any views on better ways of doing Autoload in python, and also whether people view it as a nice thing at all. (One person's aesthetic is another person's pain after all...)
Europython
June 10, 2009 at 05:35 PM | categories: python, oldblog | View CommentsI'm giving a tutorial on Kamaelia at Europython '09 this year.
Europython details:
Where: Birmingham UK
When:
Tutorial days 28/29th June.
Main conference: 30th June - 2nd July
Kamaelia specifically: 28th June, 9am
http://www.europython.eu/talks/timetable/
Cost:
Tutorial days: £100
Conference days: £190
More info:
http://www.europython.eu/
http://www.europython.eu/talks/timetable/
Blurb for the Kamaelia tutorial:
Kamaelia: Pragmatic ConcurrencyThe structure of this in terms of time is 2 x 1.5 hour sessions, with a 15 minute break in the middle, so hopefully enough time to impart enough useful knowledge to help you get started with Kamaelia.
Tutorial, Half day (intermediate)
Why use concurrency? Since concurrency is viewed as an advanced topic by many developers, this question is often overlooked. However, many real world systems, including transportation, companies, electronics and Unix systems are highly concurrent and accessible by the majority of people. One motivation can be “many hands make light work” but in software this often appears to be false – in no small part due to the tools we use to create systems. Despite this, the need for concurrency often creeps into many systems.
Kamaelia is a toolset and mindset aimed at assisting in structuring your code such that you can focus on the problem you want to solve, but in a way that results in naturally reusable code that happens to be painlessly concurrent. It was designed originally to make maintenance of highly concurrent network systems simpler, but has general application in a wider variety of problem domains, including desktop applications, web backend systems (eg video transcode & SMS services), through to tools for teaching a child to read and write.
This tutorial will cover:During this highly practical tutorial, where you will create your own version of Axon, your own components and first Kamaelia based system (bring a laptop!). The course expects a level of familiarity with Python but no prior experience of concurrency is assumed.
- A fast overview in the style of a lightning talk.
- Kamaelia's core – Axon – which provides the basic tools needed for concurrent systems, followed by a session on implementing your own core.
- Practical walk throughs of real world Kamaelia systems to demonstrate how to build and design systems of your own.
- More advanced concepts such as reusing centralised services and constrained data sharing, as well as ongoing open issues will be touched upon.
- Tips, tricks and rules of thumb when working with concurrent systems.
Also, if Kamaelia isn't interesting to you (sob :), Ali Afshar who hangs out on Kamaelia's IRC channel is also giving a tutorial there on PyGTK, along with lots of other people giving interesting tutorials and talks :-)
New Tool for helping debugging Kamaelia Systems
June 08, 2009 at 06:17 PM | categories: python, oldblog | View CommentsUse in a system
This allows a variety of things, from a basic command line console:StandaloneInterpreter().run()Which you can run in the background on any console. For example you could do
this:
ServerCore(protocol=myprotocol, port=1234).activate()Alternatively, you can use an embeddable component that speaks to inbox/outbox rather than stdin/stdout. Crudely you can do something like this:
StandaloneInterpreter().run()
Pipeline(But you can also put it inside a pygame application, reading & writing from a
ConsoleReader(),
InterpreterTransformer(),
ConsoleEchoer(),
).run()
Textbox/TextDisplayer:
Pipeline(
Textbox(size = (800, 300), position = (100,380)),
InterpreterTransformer(),
TextDisplayer(size = (800, 300), position = (100,40)),
).run()
This looks like this:
The interesting option this opens up of course is the fact that you can add in a networked interpreter (note: this is hideously insecure incidentally - so only safe of a network you completely control!) into any application:
from Kamaelia.Chassis.Pipeline import PipelineIn one console:
from Kamaelia.Chassis.ConnectedServer import ServerCore
from Kamaelia.Util.PureTransformer import PureTransformer
from Kamaelia.Experimental.PythonInterpreter import InterpreterTransformer
def NetInterpreter(*args, **argv):
return Pipeline(
PureTransformer(lambda x: str(x).rstrip()),
InterpreterTransformer(),
PureTransformer(lambda x: str(x)+"\r\n>>> "),
)
ServerCore(protocol=NetInterpreter, port=1236).run()
# ./ServerBasedPythonInterpreter.pyIn another:
~> telnet 127.0.0.1 1236
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
>>> self
Component
Kamaelia.Experimental.PythonInterpreter.InterpreterTransformer_21 [ inboxes :
{'control': [], 'inbox': []} outboxes : {'outbox': [], 'signal': []}
>>> "hello world"
hello world
>>> Axon
<module 'Axon'
from '/usr/local/lib/python2.5/site-packages/Axon/__init__.pyc'>
>>> dir(self)
['Inbox', 'Inboxes', 'Outboxes', 'Usescomponents', '_AxonObject_...
>>> self.scheduler
Axon.Scheduler.scheduler_1 :1 :1 :0 :
>>> self.scheduler.threads
{<Kamaelia.Internet.TCPServer.TCPServer object at 0xb7b972cc>: <object object
at 0xb7c92468>,
<Kamaelia.Internet.ConnectedSocketAdapter.ConnectedSocketAdapter object at
0xb7ba4acc>: <object object at 0xb7c92468>,
<Kamaelia.Experimental.PythonInterpreter.InterpreterTransformer object at
0xb7ba6bec>: <object object at 0xb7c92468>,
<Kamaelia.Chassis.Pipeline.Pipeline object at 0xb7bafc2c>: <object object at
0xb7c92468>, <Kamaelia.Experimental.PythonInterpreter.InterpreterTransformer
object at 0xb7b9dc4c>: <object object at 0xb7c92468>,
<Kamaelia.Util.PureTransformer.PureTransformer object at 0xb7baf86c>: <object
object at 0xb7c92460>, <Kamaelia.Chassis.ConnectedServer.ServerCore object at
0xb7b8acec>: <object object at 0xb7c92468>,
<Kamaelia.Chassis.Pipeline.Pipeline object at 0xb7ba1d0c>: <object object at
0xb7c92468>, <Kamaelia.Util.PureTransformer.PureTransformer object at
0xb7ba192c>: <object object at 0xb7c92468>,
<Kamaelia.Util.PureTransformer.PureTransformer object at 0xb7b9d96c>: <object
object at 0xb7c92468>, <Kamaelia.Internet.Selector.Selector object at
0xb7b979cc>: <object object at 0xb7c92468>}
etc.
Rummaging Around inside Running Systems
The idea behind this component really is to assist in debugging live running systems by directly rummaging around inside them, which is why this form is probably most useful:if interactive_debug:And whilst the other forms naturally drop out as useful, this form is probably the safest of the bunch!
StandaloneInterpreter().activate()
MyMainApplication().run()
This form also is slightly more flexible, in that it allows this sort of thing:
~/> ./BasicPythonInterpreter.py
> from Kamaelia.UI.Pygame.Ticker import Ticker
ok
> X=Ticker()
ok
Component Kamaelia.UI.Pygame.Ticker.Ticker_7 [ inboxes : {'control':
[], '_displaycontrol': [], 'unpausebox': [], 'pausebox': [], 'inbox':
[], 'alphacontrol': []} outboxes : {'outbox': <>, 'signal':
<>, '_displaysignal': <>}
> X.activate()
ok
Component Kamaelia.UI.Pygame.Ticker.Ticker_7 [ inboxes : {'control':
[], '_displaycontrol': [], 'unpausebox': [], 'pausebox': [], 'inbox':
[''], 'alphacontrol': []} outboxes : {'outbox': <>, 'signal':
<>, '_displaysignal': <>}
> self.link((self, "outbox"), (X,"inbox"))
ok
Link( source:
[Kamaelia.Experimental.PythonInterpreter.StandaloneInterpreter_5,outbox],
sink:[Kamaelia.UI.Pygame.Ticker.Ticker_7,inbox] )
> self.send("Hello", "outbox")
ok
As you can guess this then results the text displayer outputting to the display.
As another example, if you're rummaging inside a system, you could start up an introspector in another console:
./AxonVisualiser.py --port 1500And then in the enbedded interpreter do this:
> from Kamaelia.Util.Introspector import IntrospectorAnd that then adds an Axon visualisation/introspector to a running system - which looks like this:
ok
> from Kamaelia.Chassis.Pipeline import Pipeline
ok
> from Kamaelia.Internet.TCPClient import TCPClient
ok
> Pipeline(
Introspector(),
TCPClient("127.0.0.1", 1500),
).activate()
> > >
ok
Clearly this can aid in identifying when something has gone wrong.
Hope others find it useful too :-)
Going from generator coroutines to Kamaelia Components
May 17, 2009 at 02:09 AM | categories: python, oldblog | View CommentsPete then posted to the list a small collection of different ways of implementing...
#!/bin/sh... in python.
tail -f /var/log/system.log |grep pants
This struck me as a nice example worth comparing to Kamaelia - specifically to compare the generator version. Indeed the Kamaelia version is fairly directly derived from the generator version, which is why I'm picking on it. The full generator version looks like this:
import timeNow, this is nice, but the core logic:
import re
def follow(fname):
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
def grep(lines, pattern):
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
yield l
def printer(lines):
for l in lines:
print l.strip()
f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
tail -f /var/log/system.log |grep pantsHas somehow become obfuscated at the end - which is a shame, because it doesn't need to be. Taking the Kamaelia version step by step, let's take that end part:
f = follow('/var/log/system.log')... and turn it into something which looks like what you'd do in Kamaelia:
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
from Kamaelia.Chassis.Pipeline import Pipeline(incidentally, to actually use separate processes, ala shell, you'd use ProcessPipeline, not Pipeline)
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
Printer(),
).run()
So in order for this to be valid, we now need to adapt follow into Follow, grep into Grep, and printer into Printer.
Let's start by adapting follow:
def follow(fname):First of all, we note that this follow source blocks - specifically calling time.sleep. Now there are other ways of doing this, but the simplest way of keeping this code structure is to just create a threaded component. We also need to capture the argument (fname) which isn't optional and has no logical default, so we need to have an __init__ method. So we grab that, store it somewhere handy, and call the super class initialiser. Pretty standard stuff.
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
import AxonThe main() method of this component now follows the same core logic as the follow() generator, as highlighted in green. The logic added in blue, is simply control logic, and is fairly commonly used, which looks like this:
class Follow(Axon.ThreadedComponent.threadedcomponent):
def __init__(self, fname, **argv):
self.fname = fname
super(Follow,self).__init__(**argv)
def main(self):
f = file(self.fname)
f.seek(0,2) # go to the end
while not self.dataReady("control"):
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
self.send(l, "outbox")
f.close()
self.send(self.recv("control"), "signal")
while not self.dataReady("control"):This allows someone to shutdown our component cleanly. Other than that, the other major difference is that this:
<Body of generator>
self.send(self.recv("control"), "signal")
yield lHas been replaced with this:
self.send(l, "outbox")
ie rather than expecting to be called in a "call me & handle my values" loop, it just passes it's results out an outbox called "outbox". (much like tail -f spits it's results out of stdout)
The next part to convert is grep(). Recalling this, this looks like this:
def grep(lines, pattern):With the Kamaelia equivalent looking like this:
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
yield l
class Grep(Axon.Component.component):
# Default pattern, override in constructor with pattern="some pattern"
# See below
pattern = "."
def main(self):
regex = re.compile(self.pattern)
while not self.dataReady("control"):
for l in self.Inbox("inbox"):
if regex.match(l):
self.send(l, "outbox")
self.pause()
yield 1
self.send(self.recv("control"), "signal")
Once again the overall logic is unchanged, and again the shutdown logic added is in blue.
However, we don't need to explicitly pass in a location for lines to enter this component. Specifically we just expect to be passed lines form out standard inbox "inbox", in the same way normal grep expects data on stdin.
Also, because we have a logical default for the pattern (match everything using "."), we don't need to have an __init__ method. The reason for this is because the baseclass for all components does this in its __init__ method:
def __init__(self, *args, **argd):
..
self.__dict__.update(argd)
So calling Grep(pattern=".*pants.*") will specialise the component value of pattern (The reason for this is it actively enables monkey-patching as reconfiguration, as you can see in this greylisting mail server).
Again Grep is actually a special case of a filter, and as a result really just wants to take data from its std inbox and pass matching stuff to its std outbox. With if you look at the core logic, is precisely what it does:
for l in self.Inbox("inbox"):
if regex.match(l):
self.send(l, "outbox")
After clearing its inbox, it pauses, and releases control to the scheduler (Pausing is a component's way of letting the scheduler know "don't call me unless there's some data for me to work on").
Finally, we need to implement the Printer() component. Again we can adapt this generator based version, thus:
def printer(lines):Which transforms into this:
for l in lines:
print l.strip()
class Printer(Axon.Component.component):Again the core logic is the same (green), and control logic is blue. Again, rather than needing the explicit "lines" iterator, we have a standard place for data to come into the component - the inbox "inbox" - leaving the rest of the logic essentially unchanged.
def main(self):
while not self.dataReady("control"):
for l in self.Inbox("inbox"):
print l.strip()
self.pause()
yield 1
self.send(self.recv("control"), "signal")
Finally, in the same way the generator version puts the whole thing together:
f = follow('/var/log/system.log')We can put the Kamaelia components together:
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
Pipeline(This is very different. The generator version creates a chain of iterators, passing them in as the first parameter to the next one in the chain, with the last item in the chain (the for loop) being the one that pulls things along. The Kamaelia version instantiates a Pipeline component which is very clearly piping the outputs from one component into the inputs of the following one.
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
Printer(),
).run()
In fact the Pipeline component creates linkages between the components in the pipeline. These are conceptually similar to stackless's channels (though inspired by unix pipes and occam). Implementation whise, a linkage actually collapses an inbox into an outbox. As a result, when Follow sends a message to it's outbox "outbox", that message is instantly delivered into the inbox "inbox" of Grep. Inboxes for generator based components are just plain old lists, and for threaded components they're threadsafe queues.
The nice thing about this, is that if you wanted to use this in a network server, you could do this:
from Kamaelia.Chassis.ConnectedServer import ServerCoreThough that opens a Follow & Grep component for every client. To use just one Follow client, you could just follow and publish the data for all clients:
def FollowPantsInMyLogs(*args):
return Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
)
ServerCore(protocol=FollowPantsInMyLogs, port=8000).run()
from Kamaelia.Util.Backplane import *
Backplane("PANTSINMYLOGS").activate()
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
PublishTo("PANTSINMYLOGS"),
).activate()
def clientProtocol(*args):
return SubscribeTo("PANTSINMYLOGS")
ServerCore(protocol=clientProtocol, port=8000).run()
You could also monitor this on the server console by adding in a local printer:
Backplane("PANTSINMYLOGS").activate()... which all fairly naturally describes the higher level co-ordination going on. Now you can do all this from the ground up using plain old generators, but personally I find this approach easier to follow. Some people find others simple :)
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
PublishTo("PANTSINMYLOGS"),
).activate()
Pipeline(
SubscribeTo("PANTSINMYLOGS"),
Printer(),
).activate()
def clientProtocol(*args):
return SubscribeTo("PANTSINMYLOGS")
ServerCore(protocol=clientProtocol, port=8000).run()
Anyway, for the curious reader, I hope this is a useful/interesting comparison of a non-kamaelia based approach with a kamaelia based approach, with a visible demonstration at the end of why I prefer the latter :) ... and all this ignores graphlines too :)
Bar Camp Leeds UK
May 05, 2009 at 09:56 PM | categories: python, oldblog | View Comments- May 30th, May 31st
- Old Broadcasting House, Leeds, UK
- Website: http://barcampleeds.com/
Schedulers matter
March 04, 2009 at 10:26 AM | categories: python, oldblog | View Commentsscheduler.install(self.messageLoop())If you delve inside the fibra scheduler (which doesn't appear to be here unexpectedly) you see the following core:
# self.MessageLoop is a regular python generator
...
yield self.incrementCounter()
yield kickTo.messageQueue.push(self)
def tick(self):The core of this appears to be "when I'm done, do this later" next. If you think that's familiar, it should be - its very similar (at least conceptually) to what twisted does with deferreds. It's not identical, but similar. So what does this mean for the hacksack demo? Well, if we look at self.tasks after each run, by changing:
"""Iterates the scheduler, running all tasks and calling all
handlers.
"""
active = False
for handler in self.handlers:
active = handler.is_waiting() or active
active = (len(self.tasks) > 0) or active
tasks = []
while True:
try:
task, send_value = self.tasks.pop(0)
except IndexError, e:
break
try:
if isinstance(send_value, Exception):
r = task.throw(send_value)
else:
r = task.send(send_value)
except StopIteration, e:
r = e
if r is None:
tasks.append((task, None))
else:
try:
handler = self.type_handlers[type(r)]
except KeyError:
raise RuntimeError("Don't know what to do with yielded type: %s" % type(r))
handler.handle(r, task)
self.tasks[:] = tasks
return active
def run(self):to:
while self.tick():
pass
def run(self):It becomes apparent what's happening (though it's fairly obvious from above):
while self.tick():
print "TASKS", [x[0] for x in self.tasks]
TASKS [<generator object at 0xb7b04fcc>]Fundamentally, the reason it's quicker for two reasons:
TASKS []
TASKS [<generator object at 0xb780f94c>]
TASKS []
TASKS [<generator object at 0xb7a1b04c>]
TASKS []
TASKS [<generator object at 0xb776fcac>]
TASKS []
TASKS [<generator object at 0xb79327ac>]
TASKS []
TASKS [<generator object at 0xb7b0ff2c>]
TASKS []
- It always knows which generator is ready to run next.
- It also defaults to pausing the generator, unless it specifically asks to be run. ie the tasks default to being passive.
- The sender knows who the receiver is. This allows the sender to schedule the receiver explicitly.
- Is effectively a round robin scheduler - essentially for simplicity. The key question we wanted to ask was "does a component model based around inboxes/outboxes make it easier to make easier to write/reuse concurrent software", rather than "how can we build a faster, more responsive scheduler". As a result the round robin scheduler was always a compromise. There's a little bit of intelligence in there, but not much.
- Kamaelia's components default to active, rather than passive. That is they're expected to run continuously unless explicitly paused. This design decision impacts the scheduler as a whole.
- The sender does not know who the receiver is. This requires something different to happen in the scheduler. On the upside, it means that code can be tested easier in isolation, or reused in situations it wasn't originally expected to be used within.
Beyond all that though, fibra looks neat :)
The 5 minute (version), not the full half hour...
February 26, 2009 at 11:15 PM | categories: python, oldblog | View CommentsAs you may know Kamaelia is a project I'm rather familar with (ok understatement), but one of the key ideas behind it is about how to make concurrency usable for everyday problems (from greylisting, database modelling assistance, though to learning to read & write). No one concurrency method suits everyone - as Jesse's recent series of posts shows (if you've not read them they're well worth it :), but Kamaelia's approach fits my head, and hopefully yours too. There may be a few areas where nicer syntactic sugar may be appropriate and so on, and feedback is always welcome - preferably to the project's google group/mailing list.
OK, the point of this post is that recently at O'Reilly Ignite UK North I gave a 5 minute talk which was essentially "what we've learnt about practical concurrency", and by necessity of time it's a whistle stop tour. The slides are on slideshare, and the video is on blip.tv - along with the rest of the talks - and by the magic of the internet they're below as well.
Embracing Concurrency (slides)
Embracing Concurrency (video)
Many thanks to Imran Ali of Carbon Imagineering and Craig Smith of O'Reilly GMT for organising a great evening of interesting talks!
« Previous Page -- Next Page »