Integrating ExchangesΒΆ

antalla tries to make it easy to integrate with new exchanges. A couple of already implemented exchanges can be found in the antalla/exchange_listeners directory. We currently offer a WebsocketListener base class which allows to easily connect to exchanges using a websocket interface.

The constructor must have the following signature and call its parent as shown.

def __init__(self,
             exchange,
             on_event,
             # all arguments below should have a default value
             markets=LIST_OF_MARKETS,
             ws_url=WEBSOCKET_URL,
             session=db.session,
             event_type="orders"):
    super().__init__(exchange, on_event, markets, ws_url, session=session, event_type=event_type)

The following two methods must be implemented to integrate with a new exchange.

async def _setup_connection(self, websocket)

def _parse_message(self, message)

_setup_connection should take care of sending all the necessary messages to the websocket server to subscribe to all the markets in the markets list passed in the constructor and the events passed in event_type, or all events if nothing has been passed. self._log_event(market, "connect", event_type) should be called for each market subscribed.

_parse_message must take a raw JSON message and return a list of actions to perform. For example, if a new order is received, _parse_message will most likely return an InsertAction with a single Order. It is the job of the ExchangeListener to transform orders received into Order models.

Here is a minimal example of a custom exchange listener.

import websockets

from antalla import actions
from antalla import db
from antalla import models
from antalla.websocket_listener import WebsocketListener
from antalla.exchange_listener import ExchangeListener

WS_URL = "ws://example.com/ws"
MARKETS = ["ETHBTC"]

@ExchangeListener.register("dummy")
class DummyListener(WebsocketListener):
    def __init__(self,
                exchange,
                on_event,
                session=db.session,
                markets=MARKETS,
                ws_url=WS_URL,
                event_type=None):
        super().__init__(exchange, on_event, markets, ws_url, session=session, event_type=event_type)

    async def _setup_connection(self, websocket):
        subscription_data = {
          "action": "subscribe",
          "markets": self.markets,
          "event": self.event_type
        }
        await websocket.send(json.dumps(subscription_data))
        for market in self.markets:
            self._log_event(market, "connect", self.event_type)

    def _parse_order(self, payload):
        # TODO: transform the payload in an Order
        # order = models.Order(...)
        return order

    def _parse_message(self, message):
        payload = json.loads(message["payload"])
        if payload["action"] == self.event_type:
            order = self._parse_order(payload)
            return [actions.InsertAction([order])]
        return []