The first Truly Concurrent Kamaelia Component
November 25, 2007 at 10:20 PM | categories: python, oldblog | View Comments
OK, this is using a massively simplified version of the primitives needed for concurrency in Kamaelia, but the following is the first component that will happily run completely in parallel with the rest of the system.
As you can see this is pretty much identical to the traditional Kamaelia model. Indeed, change the baseclass & you get a single threaded component, though you'd probably want to change the time.sleep behaviour.
The advantage here of course, is that, given a bit more work, that we should be able to take the entirety of Kamaelia's component set and simply parallelise it where it makes sense. The most obvious way being as a specialised Chassis. (other chasses are Pipeline, Graphline, Carousel(which is a bit brain numbing :) )).
Full code, which is currently in /Sketches, looks like this:
class FirstProcessBasedComponent(SimplestProcessComponent):
def main(self):
while 1:
yield 1
time.sleep(0.3)
if self.dataReady():
print time.time(),"main : RECEIVE FROM CHANNEL", self.recv()
else:
print time.time(),"main: CHANNEL NOT READY"
As you can see this is pretty much identical to the traditional Kamaelia model. Indeed, change the baseclass & you get a single threaded component, though you'd probably want to change the time.sleep behaviour.
The advantage here of course, is that, given a bit more work, that we should be able to take the entirety of Kamaelia's component set and simply parallelise it where it makes sense. The most obvious way being as a specialised Chassis. (other chasses are Pipeline, Graphline, Carousel(which is a bit brain numbing :) )).
Full code, which is currently in /Sketches, looks like this:
import pprocessPersonally, I find this idea of true, but simple concurrency really quite a nice, fun idea :-)
import time
class SimplestProcessComponent(object):
def __init__(self):
self.exchange = pprocess.Exchange()
self.channel = None
self.inbound = []
def activate(self):
channel = pprocess.start(self.run, None, None, named1=None, named2=None)
exchange = pprocess.Exchange()
exchange.add(channel)
return exchange
def run(self, channel, arg1, arg2, named1=None, named2=None):
self.exchange.add(channel)
self.channel = channel
for i in self.main():
pass
def dataReady(self):
return self.exchange.ready(timeout=0)
def recv(self):
if self.dataReady():
for ch in self.exchange.ready(timeout=0):
D = ch.receive()
self.inbound.append(D)
return self.inbound.pop(0)
def main(self):
yield 1
class FirstProcessBasedComponent(SimplestProcessComponent):
def main(self):
while 1:
yield 1
time.sleep(0.3)
if self.dataReady():
print time.time(),"main : RECEIVE FROM CHANNEL", self.recv()
else:
print time.time(),"main: CHANNEL NOT READY"
exchange = FirstProcessBasedComponent().activate()
while 1:
time.sleep(0.7)
print time.time(),"__main__ : SENDING TO CHANNEL"
if exchange.ready(timeout=0):
for ch in exchange.ready():
ch.send({"hello":"X"})