import stomper from twisted.internet.protocol import Protocol, ReconnectingClientFactory from twisted.internet import reactor class StompProtocol(Protocol, stomper.Engine): def __init__(self, client, username='', password=''): stomper.Engine.__init__(self) self.buffer = stomper.stompbuffer.StompBuffer() self.client = client self.username = username self.password = password def ack(self, message): return stomper.NO_REPONSE_NEEDED def connected(self, message): stomper.Engine.connected(self, message) self.client.connected() def connectionMade(self): command = stomper.connect(self.username, self.password) self.transport.write(command) def dataReceived(self, data): self.buffer.appendData(data) while True: message = self.buffer.getOneMessage() if message is None: break returned = self.react(message) if returned: self.transport.write(returned) self.client.consume(message) def subscribe(self, destination, **headers): frame = stomper.Frame() frame.unpack(stomper.subscribe(destination)) frame.headers.update(headers) self.transport.write(frame.pack()) class Stomping(ReconnectingClientFactory): username = None password = None proto = None frames = None _destinations = None def __init__(self, host='127.0.0.1', port=61613, username='guest', password='guest'): self.username = username self.password = password self._frames = [] reactor.connectTCP(host, int(port), self) @staticmethod def add_route(destination, function): if not Stomping._destinations: Stomping._destinations = {} if not destination in Stomping._destinations: Stomping._destinations[destination] = [] if not function in Stomping._destinations[destination]: Stomping._destinations[destination].append(function) def buildProtocol(self, host): self.protocol = StompProtocol(self, self.username, self.password) return self.protocol def clientConnectionFailed(self, connector, reason): ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) def clientConnectionLost(self, connector, reason): pass def connected(self): for destination in self._destinations: self.subscribe(destination) for frame in self._frames: self.protocol.transport.write(frame.pack()) self.init() def consume(self, message): destination = message['headers'].get('destination') if destination and destination in self._destinations: function_list = self._destinations[destination] for function in function_list: function(self, message) def init(self): pass def run(self): reactor.run() def send(self, destination, message): frame = stomper.Frame() frame.unpack(stomper.send(destination, message)) if not self.protocol: self._frames.append(frame) else: self.protocol.transport.write(frame.pack()) def subscribe(self, destination): if self.protocol: self.protocol.subscribe(destination) def route(destination): def wrap(function): def wrap_function(self, message): function(self, message) Stomping.add_route(destination, wrap_function) return wrap_function return wrap