Source code for spotware_connect.protocol

import sys
from collections import deque
from twisted.protocols.basic import Int32StringReceiver
from twisted.internet import task
from .messages.OpenApiCommonMessages_pb2 \
    import ProtoMessage, ProtoHeartbeatEvent


[docs]class Protocol(Int32StringReceiver): MAX_LENGTH = sys.maxsize // 2 _rps_limit = 5 _send_queue = deque([]) _send_task_interval = 1 _send_task = None
[docs] def connectionMade(self): super().connectionMade() if not self._send_task: self._send_task = task.LoopingCall(self._sendStrings) self._send_task.start(self._send_task_interval)
[docs] def connectionLost(self, reason): super().connectionLost(reason) if self._send_task.running: self._send_task.stop()
[docs] def heartbeat(self): self.send(ProtoHeartbeatEvent(), True)
[docs] def send(self, message, instant=False, msgid=None): data = b'' if isinstance(message, ProtoMessage): data = message.SerializeToString() if isinstance(message, bytes): data = message if isinstance(message, ProtoMessage.__base__): msg = ProtoMessage(payload=message.SerializeToString(), clientMsgId=msgid, payloadType=message.payloadType) data = msg.SerializeToString() if instant: self.sendString(data) else: self._send_queue.append(data)
def _sendStrings(self): size = len(self._send_queue) if not size: return # pragma: no cover for _ in range(min(size, self._rps_limit)): self.sendString(self._send_queue.popleft())
[docs] def stringReceived(self, data): msg = ProtoMessage() msg.ParseFromString(data) if msg.payloadType == ProtoHeartbeatEvent().payloadType: self.heartbeat() self.receive(msg) return data
[docs] def receive(self, message): pass # pragma: no cover