Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .behaverc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[behave]
format=plain
paths=test/scenarios
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
max-line-length = 120
exclude=test/*
18 changes: 12 additions & 6 deletions .github/workflows/pythonapp.yml
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: Runner
name: AlgoRunner

on:
push:
branches: [ master ]
branches: [ master, develop ]
pull_request:
branches: [ master ]

jobs:
build:
ci:
strategy:
fail-fast: true
matrix:
python-version: [3.9] # Py3 only, and latest release UP.
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.8
uses: actions/setup-python@v1
- uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- uses: abatilo/actions-poetry@v2.0.0
with:
python-version: 3.8
poetry-version: 1.1.7
- name: Install dependencies
run: |
make deps
Expand Down
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3.9-slim

RUN apt-get update && apt-get install build-essential -y

WORKDIR /algorunner
COPY poetry.lock pyproject.toml Makefile setup.sh /algorunner/

RUN make env-check

# @todo --no-dev --no-ansi
RUN poetry config virtualenvs.create false && make deps

COPY . /code
ENTRYPOINT [ "make" "run" ]
22 changes: 14 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
.PHONY: deps test
.PHONY: env-check build lint deps test run

env-check:
@sh setup.sh

build:
echo "build docker container"

lint:
flake8 ./lib --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 ./lib --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
poetry run flake8

deps:
pip install pandas
pip install python-binance
pip install flake8
poetry install --no-interaction

test:
python -m test.account
python -m test.runner
poetry run pytest
poetry run behave

run:
poetry run python run.py
52 changes: 15 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,51 +1,29 @@
# Runner ![Runner](https://github.com/FergusInLondon/Runner/workflows/Runner/badge.svg)
# ... @todo?

A massive WIP that may or may not be worth an actual README at this point in time. It has no tests, and the account functionality is still being baked in.
## Running

Currently it does invoke a strategy and provides it with real-time streamed data from Binance though.
@todo

### Note
This is *vaguely* related to my form of Enigma Catalyst, as (a) I really want to brush up on my Python, and (b) Binance seems like the best exchange to implement streaming trades on - so I'd like to get used to interacting with them.
### Configuration

## Example
@todo

Check `example.py` for a runnable version of this strategy:
## Development

```python
class ExampleStrategy(object):
"""
A simple example strategy that computes the average price change over
the previous 5 2000ms updates.
"""
### Docker
There's also a `Dockerfile` contained in this repository; this installs all the requirements to commence development.

def start(self, control):
self.series = pd.DataFrame()
self.control = control
### Finding Tasks

def process(self, kline):
self.series = self.series.append(kline)
The codebase is littered with `@todo` tags where low-hanging fruit is marked when discovered/encountered.

if self.series.shape[0] > 5:
print("Average price change over past 5 windows: ", pd.to_numeric(self.series[-5:]["PriceChange"]).mean())
```
➜ adapters git:(huge-refactor) ✗ grep -r '@todo' .
./binance/test_user_transformations.py: pass # @todo - transformation not implemented
➜ adapters git:(huge-refactor) ✗ ../..

When executed via the runner, this will calculate the average price change over the past 5 2000ms updates, and display it to the user.

```
python example.py
Average price change over past 5 windows: 26.694
Average price change over past 5 windows: 26.356
Average price change over past 5 windows: 26.444
Average price change over past 5 windows: 26.246000000000002
Average price change over past 5 windows: 26.272000000000002
Average price change over past 5 windows: 26.706
Average price change over past 5 windows: 27.142000000000003
Average price change over past 5 windows: 27.182
Average price change over past 5 windows: 27.562
Average price change over past 5 windows: 28.002
Average price change over past 5 windows: 28.246000000000002
Average price change over past 5 windows: 28.49
Average price change over past 5 windows: 28.754
➜ Runner git:(huge-refactor) ✗ grep -r '@todo' . | wc -l
17
```

---
Expand Down
Empty file added algorunner/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions algorunner/abstract/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from algorunner.abstract.strategy import Strategy # noqa: F401
from algorunner.abstract.calculator import Calculator # noqa: F401
11 changes: 11 additions & 0 deletions algorunner/abstract/calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from abc import ABC


class Calculator(ABC):
"""
A `Calculator` is responsible for determining whether a position should
be opened, and how those positions should be sized/placed.

@todo: interface TBD
"""
pass
24 changes: 24 additions & 0 deletions algorunner/abstract/strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from abc import ABC, abstractmethod

import pandas as pd


class Strategy(ABC):
"""
A `Strategy` is the container for an algorithm, it simply needs to respond
to incoming market payloads and be able to generate events for the `Account`
Actor.
"""
@abstractmethod
def process(self, tick: pd.DataFrame):
"""
@todo - accept Union[pd.DataFrame, RawMarketPayload]
where RawMarketPayload is a TypedDict w/ no pandas processing.
"""
pass

def dispatch(self):
"""
@todo - fire events to the Actor queue. Concrete implementation.
"""
pass
9 changes: 9 additions & 0 deletions algorunner/adapters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from algorunner.adapters._binance import BinanceAdapter
from algorunner.adapters.base import ( # noqa: F401
Credentials,
InvalidPayloadRecieved
)

ADAPTERS = {
"binance": BinanceAdapter
}
169 changes: 169 additions & 0 deletions algorunner/adapters/_binance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
from logging import getLogger
from typing import Tuple

from binance.client import Client
from binance import BinanceSocketManager
import pandas as pd


from algorunner.abstract.strategy import Strategy
from algorunner.adapters.base import (
Adapter, Credentials, InvalidPayloadRecieved
)
from algorunner.trader import Trader
from algorunner.events import (
AccountStatus,
BalanceUpdate,
Position,
PositionStatus,
UpdateEvent,
UpdateType
)


logger = getLogger()


class BinanceAdapter(Adapter):
""" """

class MarketStreamPandasTransformer:
def __call__(self, tick) -> pd.DataFrame:
"""Converts the inbound tick to something exchange-agnostic."""
df = pd.DataFrame([tick])
df.rename(columns=lambda col: {
'e': "24hrTicker",
'E': "EventTime",
's': "Symbol",
'p': "PriceChange",
'P': "PriceChangePercent",
'w': "WeightedAveragePrice",
'x': "FirstTradePrice",
'c': "LastPrice",
'Q': "LastQuantity",
'b': "BestBidPrice",
'B': "BestBidQuantity",
'a': "BestAskPrice",
'A': "BestAskQuantity",
'o': "OpenPrice",
'h': "HighPrice",
'l': "LowPrice",
'v': "TotalTradedBaseAssetVolume",
'q': "TotalTradedQuoteAssetVolume",
'O': "StatisticsOpenTime",
'C': "StatisticsCloseTime",
'F': "FirstTradeId",
'L': "LastTradeId",
'n': "TotalNumberOfTrades",
}[col],
inplace=True)
df.set_index('EventTime', inplace=True)
df.index = pd.to_datetime(df.index, unit='ms')
return df

class UserStreamEventTransformer:
""" """

def __call__(self, payload) -> Tuple[str, UpdateEvent]:
try:
message_map = {
"outboundAccountInfo": self.account_update,
"outboundAccountPosition": self.position_update,
"balanceUpdate": self.balance_update,
"executionReport": self.order_report
}

return message_map[payload["e"]](payload)
except KeyError:
msg = "unknown payload type {p}".format(p=payload.get("e"))
raise InvalidPayloadRecieved(msg)
except Exception as e:
raise Exception("unknown error occured in user stream", e)

def initial_rest_payload(self, payload) -> AccountStatus:
# @todo - there is so much data in this payload that we're missing
# out on, like commission rates etc.
return AccountStatus(
CanWithdraw=payload["canWithdraw"],
CanTrade=payload["canTrade"],
CanDeposit=payload["canDeposit"],
Positions={
balance["asset"]: Position(
Locked=float(balance["locked"]),
Free=float(balance["free"])
) for balance in payload["balances"]
}
)

def account_update(self, payload) -> Tuple[str, AccountStatus]:
return UpdateType.ACCOUNT, AccountStatus(
CanWithdraw=payload["W"],
CanTrade=payload["T"],
CanDeposit=payload["D"],
Positions={
balance["a"]: Position(
Locked=float(balance["l"]),
Free=float(balance["f"])
) for balance in payload["B"]
}
)

def balance_update(self, payload) -> Tuple[str, BalanceUpdate]:
return UpdateType.BALANCE, BalanceUpdate(
Asset=payload["a"], Update=float(payload["d"])
)

def position_update(self, payload) -> Tuple[str, PositionStatus]:
return UpdateType.POSITION, {
balance["a"]: Position(
Locked=float(balance["l"]),
Free=float(balance["f"])
) for balance in payload["B"]
}

def order_report(self, payload):
# @todo - never did work out how to handle these.
pass

def connect(self, creds: Credentials, trader: Trader):
self.trader = trader
self.client = Client(creds['key'], creds['secret'])
self.socket_manager = BinanceSocketManager(self.client)

self.user_transformer = self.UserStreamEventTransformer()
self.market_transformer = self.MarketStreamPandasTransformer()
update = self.transformer.initial_rest_payload(
self.client.get_account()
)
self.trader(UpdateType.ACCOUNT, update)

self.socket_manager.start_user_socket(self.handle_user_stream)

def run(self, strategy: Strategy, symbol: str):
self.strategy = strategy
self.socket_manager.start_symbol_ticker_socket(
symbol, self._handle_ticker
)

def _handle_ticker(self, tick):
"""Given an incoming payload from the market websocket stream,
prepare it for the `Strategy` and then execute the strategy
against it."""
try:
parsed_data = self.market_transformer(tick)
self.strategy.process(parsed_data)
except InvalidPayloadRecieved as e:
logger.warn(
"received exception when handling market stream. ignoring tick.",
e
)

def _handle_user_stream(self, payload):
try:
update_type, transformed = self.user_transformer(payload)
self.account(update_type, transformed)
except Exception as e:
logger.warn(
"recieved exception handling user stream payload. ignoring message.",
e
)
Loading