Source code for spotware_connect.client
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import clientFromString
from twisted.application.internet import ClientService
from twisted.internet import reactor
from .protocol import Protocol
from .protobuf import Protobuf
[docs]class Client(ClientService):
PROXY_DEMO = "demo.ctraderapi.com:5035"
PROXY_LIVE = "live.ctraderapi.com:5035"
EVENT_CONNECT_NAME = "connect"
EVENT_DISCONNECT_NAME = "disconnect"
EVENT_MESSAGE_NAME = "message"
[docs] class Protocol(Protocol):
client = None
[docs] def connectionMade(self):
super().connectionMade()
self.client.connect()
[docs] def connectionLost(self, reason):
super().connectionLost(reason)
self.client.disconnect()
[docs] def receive(self, message):
self.client.receive(message)
[docs] class Factory(Factory):
client = None
def __init__(self, *args, **kwargs):
super().__init__()
self.client = kwargs['client']
[docs] def buildProtocol(self, addr):
p = super().buildProtocol(addr)
p.client = self.client
return p
def __init__(self, live=False, retryPolicy=None,
clock=None, prepareConnection=None):
host = "ssl:" + (self.PROXY_LIVE if live else self.PROXY_DEMO)
endpoint = clientFromString(reactor, host)
factory = Client.Factory.forProtocol(Client.Protocol, client=self)
super().__init__(endpoint, factory, retryPolicy=retryPolicy,
clock=clock, prepareConnection=prepareConnection)
[docs] def start(self, timeout=None):
self.startService()
if timeout:
reactor.callLater(timeout, self.stop)
reactor.run()
[docs] def stop(self):
self.stopService()
if reactor.running:
reactor.stop()
[docs] def connect(self):
self.exec_events(self.EVENT_CONNECT_NAME)
[docs] def disconnect(self):
self.exec_events(self.EVENT_DISCONNECT_NAME)
[docs] def receive(self, message):
payload = Protobuf.extract(message)
kargs = dict(msg=message, msgid=message.clientMsgId,
msgtype=message.payloadType,
payload=payload,
**{fv[0].name: fv[1] for fv in payload.ListFields()})
if "ctidTraderAccountId" in kargs:
kargs["ctid"] = payload.ctidTraderAccountId
self.exec_events(self.EVENT_MESSAGE_NAME, **kargs)
[docs] def emit(self, message, msgid=None, **params):
if type(message) in [str, int]:
message = Protobuf.get(message, **params)
def protocol_send(protocol):
protocol.send(message, msgid=msgid)
con = self.whenConnected()
con.addCallback(protocol_send)
return con
_events = dict()
[docs] def event(self, name_or_func=None, **filters):
if not self._events: # lazy create
for e in [self.EVENT_CONNECT_NAME,
self.EVENT_DISCONNECT_NAME, self.EVENT_MESSAGE_NAME]:
self._events[e] = []
if callable(name_or_func): # callable append
name = name_or_func.__name__
self._events[name].append(name_or_func)
return name_or_func
def decorate(func): # decorate with args
evname = name_or_func
from functools import wraps
@wraps(func)
def func_wrap(*args, **kwargs):
for k, v in filters.items():
if k not in kwargs or kwargs[k] != v:
return
func(*args, **kwargs)
self._events[evname].append(func_wrap)
return func
return decorate
[docs] def message(self, **filters):
if 'msgtype' in filters and type(filters['msgtype']) in [str, int]:
filters['msgtype'] = Protobuf.get_type(filters['msgtype'])
return self.event(self.EVENT_MESSAGE_NAME, **filters)
[docs] def exec_events(self, name, *args, **kwargs):
if name not in self._events:
return
for f in self._events[name]:
f(*args, **kwargs)