Walk through of how to build a simple protocol using Kamaelia
October 07, 2008 at 02:23 AM | categories: python, oldblog | View Comments
A question was raised on the Kamaelia list asking for a demo on how to nest protocols using Kamaelia. In order to show some clarity, and avoid a protocol mismatch I decided to build a protocol which is a composite of the following 3 things:
You'd probably want to take the resulting code a little further than I've taken it if you were to use it in production, since (for example) DataChunker is designed to be used with larger & larger numbers of messages. It would be relatively simple to transform this to a pub/sub system/router for sending recieving Javascript fragments, but wanted to leave the example looking relatively simple for now. I may come back to that as a pub/sub demo.
The final example looks like this:
- Something clumping data into messages
- JSON data being forwarded in serialised form as those messages
- The serialised messages encrypted (naively) using DES encryption.
You'd probably want to take the resulting code a little further than I've taken it if you were to use it in production, since (for example) DataChunker is designed to be used with larger & larger numbers of messages. It would be relatively simple to transform this to a pub/sub system/router for sending recieving Javascript fragments, but wanted to leave the example looking relatively simple for now. I may come back to that as a pub/sub demo.
The final example looks like this:
#!/usr/bin/python
import cjson
import Axon
from Kamaelia.Chassis.Pipeline import Pipeline
from Kamaelia.Util.Chooser import Chooser
from Kamaelia.Util.Console import ConsoleEchoer
from Kamaelia.Internet.TCPClient import TCPClient
from Kamaelia.Chassis.ConnectedServer import ServerCore
from Kamaelia.Protocol.Framing import DataChunker, DataDeChunker
from Kamaelia.Apps.Grey.PeriodicWakeup import PeriodicWakeup
from Crypto.Cipher import DES
messages = [ {"hello": "world" },
{"hello": [1,2,3] },
{"world": [1,2,3] },
{"world": {"game":"over"} },
]*10
class MarshallJSON(Axon.Component.component):
def main(self):
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_encoded = cjson.encode(j)
self.send(j_encoded, "outbox")
if not self.anyReady():
self.pause()
yield 1
class DeMarshallJSON(Axon.Component.component):
def main(self):
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_decoded = cjson.decode(j)
self.send(j_decoded, "outbox")
if not self.anyReady():
self.pause()
yield 1
class Encoder(object):
"""Null encoder/base encoder - returns the same string
for encode/decode"""
def __init__(self, key, **argd):
super(Encoder, self).__init__(**argd)
self.__dict__.update(argd)
self.key = key
def encode(self, some_string):
return some_string
def decode(self, some_string):
return some_string
class DES_CRYPT(Encoder):
def __init__(self, key, **argd):
super(DES_CRYPT, self).__init__(key, **argd)
self.key = self.pad_eight(key)[:8]
self.obj = obj=DES.new(self.key, DES.MODE_ECB)
def encode(self, some_string):
padded = self.pad_eight(some_string)
encrypted = self.obj.encrypt(padded)
return encrypted
def decode(self, some_string):
padded = self.obj.decrypt(some_string)
decoded = self.unpad(padded)
return decoded
def pad_eight(self, some_string):
X = len(some_string)
if X % 8 != 0:
pad_needed = 8-X % 8
else:
pad_needed = 8
pad_needed = 8-(X % 8)
PAD = pad_needed * chr(pad_needed)
return some_string+PAD
def unpad(self, some_string):
x = ord(some_string[-1])
return some_string[:-x]
class Encrypter(Axon.Component.component):
key = "ABCD"
def main(self):
crypter = DES_CRYPT(self.key)
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_encoded = crypter.encode(j)
self.send(j_encoded, "outbox")
if not self.anyReady():
self.pause()
yield 1
class Decrypter(Axon.Component.component):
key = "ABCD"
def main(self):
crypter = DES_CRYPT(self.key)
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_decoded = crypter.decode(j)
self.send(j_decoded, "outbox")
if not self.anyReady():
self.pause()
yield 1
def protocol(*args,**argd):
return Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
MarshallJSON(),
Encrypter(), # Encrypt on the way out
DataChunker(),
)
def json_client_prefab(ip, port):
return Pipeline(
TCPClient(ip, port=port),
DataDeChunker(),
Decrypter(), # Decrypt on the way in
DeMarshallJSON(),
ConsoleEchoer(use_repr=True)
)
ServerCore(protocol=protocol, port=2345).activate()
json_client_prefab("127.0.0.1", 2345).run()