用 Python 和币安 API 构建数字货币交易机器人
Posted Python中文社区
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用 Python 和币安 API 构建数字货币交易机器人相关的知识,希望对你有一定的参考价值。
交易数字货币或任何其他资产的首要任务是要有目标和策略来实现。在这里,我们不是在谈论交易策略,而只是构建一个简单而功能强大的数字货币交易机器人来应用您的策略。本系列更像是一个数字货币自动交易机器人框架。
我们将使用python 3.9(3.9.2)首先创建项目文件结构。
/exchanges
/strategies
/models
在这里,“ exchanges”文件夹存储了Exchange API包装器,为您的策略制定策略并为我们将要使用的业务对象建模。
模型
我们将为此项目定义几个业务对象,例如价格,货币类型或订单。首先,让我们从价格开始,将其用于我们随后将要创建的策略。
在我们为业务对象编写通用抽象层之前:
./models/model.py
from datetime import datetime
class AbstractModel:
created: datetime
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
然后是我们的第一个Price类:
./models/price
from models.model import AbstractModel
class Price(AbstractModel):
pair: str = ''
exchange: str = ''
current: float = 0
lowest: float = 0
highest: float = 0
currency: str = ''
asset: str = ''
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.pair = self.get_pair()
def get_pair(self):
return self.currency + '_' + self.asset
策略部分
这个系统的最大部分——策略将负责运行您的交易逻辑。有两种方法,一种是经典的利用时间间隔然后运行对外部交易所API的调用,或者运行内部webhooks、路由,以及 使用WebSocket的实时事件。
为此,我们将首先创建一个抽象策略类,该策略类将进行扩展。
./strategies/strategy.py
import json
import threading
import time
from datetime import datetime
from decouple import config
from models.price import Price
class Strategy(object):
price: Price
def __init__(self, exchange, interval=60, *args, **kwargs):
self._timer = None
self.interval = interval
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.portfolio = {}
self.exchange = exchange
# Load account portfolio for pair at load
self.get_portfolio()
def _run(self):
self.is_running = False
self.start()
self.run(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
print(datetime.now())
if self._timer is None:
self.next_call = time.time()
else:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
def get_portfolio(self):
self.portfolio = {'currency': self.exchange.get_asset_balance(self.exchange.currency),
'asset': self.exchange.get_asset_balance(self.exchange.asset)}
def get_price(self):
try:
self.price = self.exchange.symbol_ticker()
except Exception as e:
pass
...
在这里,我们的策略抽象层构造函数签名需要一个Exchange实例,我们将在稍后使用Binance API编写第一个包装器来完成这一部分。
我们在没有任何额外库的情况下定义了一个简单但功能强大的无限间隔运行器,请注意,每次运行都会在另一个线程上启动下一个调用,但是实际上,您的策略将永远使用不超过两个线程,如果您使用一个主线程和当前运行策略需要大量的外部调用或繁重的计算。每个线程消耗0.3%的RAM和0或0.1 的CPU使用率,这还涉及以下策略:获取报价单和订单,然后将价格和订单相关的数据存储在另一个内部API中。
虽然间隔运行精度可以在微秒上漂移一点,但是在秒级别上将保持稳定。
这是该层的简单用法,它是一种基本打印您的兑换账户投资组合和兑换价格的策略。我们提供了一种方法,该方法可以从以后要连接的外部交易所中检索交易品种代码,还可以找到一种方法来检索您在已连接的交易所中可用的当前投资组合。
./strategies/watcher.py
from exchanges.exchange import Exchange
from strategies.strategy import Strategy
class Watcher(Strategy):
def __init__(self, exchange: Exchange, timeout=60, *args, **kwargs):
super().__init__(exchange, timeout, *args, **kwargs)
def run(self):
self.get_price()
print('*******************************')
print('Exchange: ', self.exchange.name)
print('Pair: ', self.exchange.get_symbol())
print('Available: ', self.portfolio['currency'] + ' ' + self.exchange.currency)
print('Available: ', self.portfolio['asset'] + ' ' + self.exchange.asset)
print('Price: ', self.price.current)
在接下来的部分中,我们将通过使用币安官方API:
https://github.com/binance/binance/binance-spot-api-docs
和Python request库:
https://pypi.org/project/requests/
对一个简单的策略进行编码,来连接我们的第一个Exchange Binance。然后,对业务对象订单和货币进行编码,以分别存储您发送给交易所的订单和我们将要操作的法定货币或数字货币,然后将所有这些不同的货币放在一起。
感谢您的阅读,敬请期待下一部分。
欢迎添加下方二维码进群交流
Python数字货币量化交易技术
以上是关于用 Python 和币安 API 构建数字货币交易机器人的主要内容,如果未能解决你的问题,请参考以下文章