CCXT如何获取历史行情,为后面开发策略做准备?

Posted mossloo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CCXT如何获取历史行情,为后面开发策略做准备?相关的知识,希望对你有一定的参考价值。

文章目录


前言

工欲利其事,必先利其器。在量化交易中,数据质量的高低直接决定了策略可不可靠。本章主要是为后续开发量化策略打下基础,并且给出两个策略代码,一个代码可以获取比特币3年内行情数据,一个可以获取10年以上的数据。


一、CCXT

CCXT 库可用于世界各地的加密货币/山寨币交易所的连接和交易,以及转账支付处理服务。它提供了快速访问市场数据的途径,可用于存储数据,分析,可视化,指标开发,算法交易,策略回测,机器人程序,网上商店集成及其它相关的软件工程。

这个章节我们主要利用了ccxt中集成好的框架获取各个交易所的币种行情数据,保存到本地文件。

1.1 下载ccxt

pip install ccxt

1.2 获取行情数据

这块会有个bug,由于数字货币交易所都在国外,访问外网需要有一定条件(懂的都懂),需要在本地项目中设置代理。

# -*- coding: utf-8 -*-
# __file__name:binance-fetch-ohlcv-to-csv.py
import os
import time

import pandas as pd

os.environ["http_proxy"] = "http://127.0.0.1:1001"
os.environ["https_proxy"] = "http://127.0.0.1:1001"
# -----------------------------------------------------------------------------

root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import ccxt  # noqa: E402


# -----------------------------------------------------------------------------

def retry_fetch_ohlcv(exchange, max_retries, symbol, timeframe, since, limit):
    num_retries = 0
    try:
        num_retries += 1
        ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since, limit)
        # print('Fetched', len(ohlcv), symbol, 'candles from', exchange.iso8601 (ohlcv[0][0]), 'to', exchange.iso8601 (ohlcv[-1][0]))
        time.sleep(0.05)
        return ohlcv
    except Exception:
        if num_retries > max_retries:
            raise  # Exception('Failed to fetch', timeframe, symbol, 'OHLCV in', max_retries, 'attempts')


def scrape_ohlcv(exchange, max_retries, symbol, timeframe, since, limit):
    timeframe_duration_in_seconds = exchange.parse_timeframe(timeframe)
    timeframe_duration_in_ms = timeframe_duration_in_seconds * 1000
    timedelta = limit * timeframe_duration_in_ms
    now = exchange.milliseconds()
    all_ohlcv = []
    fetch_since = since
    while fetch_since < now:
        try:
            ohlcv = retry_fetch_ohlcv(exchange, max_retries, symbol, timeframe, fetch_since, limit)
            fetch_since = (ohlcv[-1][0] + 1) if len(ohlcv) else (fetch_since + timedelta)
            all_ohlcv = all_ohlcv + ohlcv
            if len(all_ohlcv):
                print(len(all_ohlcv), 'candles in total from', exchange.iso8601(all_ohlcv[0][0]), 'to',
                      exchange.iso8601(all_ohlcv[-1][0]))
            else:
                print(len(all_ohlcv), 'candles in total from', exchange.iso8601(fetch_since))
        except Exception as e:
            print(e)
    return exchange.filter_by_since_limit(all_ohlcv, since, None, key=0)


def write_to_csv(filename, data):
    df = pd.DataFrame(data, columns=["时间戳", "开盘价", "最高价", "最低价", "收盘价", "成交量"])
    df.to_csv(filename, index=False)


def scrape_candles_to_csv(filename, exchange_id, max_retries, symbol, timeframe, since, limit):
    # instantiate the exchange by id
    exchange = getattr(ccxt, exchange_id)()
    # convert since from string to milliseconds integer if needed
    if isinstance(since, str):
        since = exchange.parse8601(since)
    # preload all markets from the exchange
    exchange.load_markets()
    # fetch all candles
    ohlcv = scrape_ohlcv(exchange, max_retries, symbol, timeframe, since, limit)
    # save them to csv file
    write_to_csv(filename, ohlcv)
    print('Saved', len(ohlcv), 'candles from', exchange.iso8601(ohlcv[0][0]), 'to', exchange.iso8601(ohlcv[-1][0]),
          'to', filename)


# -----------------------------------------------------------------------------
if __name__ == '__main__':
    # Binance's BTC/USDT candles start on 2017-08-17
    path = r'datas\\binance.csv'
    scrape_candles_to_csv(os.path.join(root, path), 'binance', 3, 'BTC/USDT', '1d', '2017-08-17T00:00:00Z', 1)

代码中需要强调几个地方:

  1. 代理主机

  2. 函数scrape_candles_to_csv()中

    1. timeframe:获取k线时间周期
    2. since开始时间,UTC格式
    3. limit:1 表示上下两个数据之间间隔1个timeframe
  3. 由于币安交易所限制,最多只能获取近5年的数据,而且每分钟最多请求1200个数据,平均1秒20个数据

1.3 交易所时间戳转换问题

在真实交易前面临的第一个问题就是时间戳,时间戳是一个不可避免的问题。时间戳指的是UNIX时间戳,即从UTC时间1970年1月1日0时0分0秒到现在所过去的时间数。

时间戳无论在交易还是数据研究中都具有重要意义。在实际交易中,当交易信号产生后,应检查本地时间戳与服务器时间戳的差异,若两个时间戳差距过大,应当停止当前交易,等待下一次交易信号产生。在回测数据的研究中,CCXT一般返回的都是13位的UNIX时间戳,因此也需要将时间戳转换成正常日期后,才方便进一步地回测研究。
引用:https://blog.csdn.net/c_morning/article/details/120679567

3.1 UNIX时间戳与ISO8601

Unix时间戳根据精度不同,大致有10位(秒级)、13位(毫秒级)、19位(纳秒级)。

​ 大多数交易所,包括CCXT的接口目前提供的是13位的UNIX时间戳,即毫秒级时间戳。本文将针对13位UNIX时间戳与ISO8601时间格式转换进行。

  1. ​ Unix毫秒级时间戳 = Unix秒级时间戳 * 1000

  2. ​ 交易所目前提供的ISO8601格式为 “2021-10-09T10:08:09.999Z”

3.2 时间戳在线转化

由于一直通不过审核,所以就把链接给下了

二、cryptocompare

可以根据api文档进行开发https://min-api.cryptocompare.com/documentation

2.1 代码获取日线数据

获取其他级别周期的数据可以参考文档开发

import os
import json
import requests
import pandas as pd
# 设置你的代理
os.environ["http_proxy"] = "http://127.0.0.1:1001"
os.environ["https_proxy"] = "http://127.0.0.1:1001"
def fetch_ohlcv(fsym,tsym,limit,toTs,api_key=None):
    url = f"https://min-api.cryptocompare.com/data/v2/histoday?fsym=fsym&tsym=tsym&limit=limit&toTs=toTs&api_key=api_key"
    response = requests.get(url)
    json_content = json.loads(response.content)
    data = json_content["Data"]["Data"]	# 转化为json格式
    df = pd.DataFrame(data,columns=["time","high","low","open","close","volumefrom"])
    return df
start_time = 1356998400	# 获取行情的起始时间
end_time = 1652918400	# 获取行情的终止时间
frames = []
while start_time<end_time:
    df = fetch_ohlcv("BTC","USD","1000",end_time,"your_api_key")
    end_time = df['time'].iloc[0]
    frames.append(df)
df = pd.concat(frames)
df = df.sort_values(by=["time"])
df = df.sort_values(by=["time"])
df.set_index("time",inplace=True)
df[df.index>=start_time]  
df.to_csv("btc.csv")  # 保存文件
def fetch_ohlcv(start: datetime, end: datetime = None):
    """
    获取数据行情,只能获取到2010/7/16之后的数据
    :param start:数据获取开始时间
    :param end: 数据结束获取时间
    :return:
    """
    api_key = "your api"
    fsym: str = "BTC"
    tsym: str = "USD"
    limit: int = 2000   # 0-2000
    toTs: int = int(end.timestamp())
    df: DataFrame = DataFrame()
    while True:
        url = f"https://min-api.cryptocompare.com/data/v2/histoday?fsym=fsym&tsym=tsym&limit=limit&toTs=toTs&api_key=api_key"
        response = requests.get(url)
        res = json.loads(response.content)
        data = res["Data"]["Data"]
        tmp = pd.DataFrame(data, columns=["time", "high", "low", "open", "close", "volumefrom"])
        toTs = tmp.iloc[0].at["time"]
        df = pd.concat([df, tmp])
        if toTs <= start.timestamp():
            break
    df["time"] = df["time"].apply(lambda time: datetime.fromtimestamp(time))
    df.dropna(inplace=True)
    df.duplicated(subset="time", keep="first")
    df = df.set_index("time")
    df = df.sort_index()
    df = df[df.index > datetime(2010,7,17)]
    df.to_csv("your_path.csv")

2.2 pandas

在2.1给出的代码中含有部分对pandas的操作,对pandas不熟悉的朋友可以参考官方文档进行学习,在这里留一个小坑,后续我会出个系列教程,教大家如何入门pandas。

从Tushare获取历史行情数据

从Tushare获取历史行情数据,分为两种,一种是后复权(daily_hfq)数据,一种是不复权(daily)数据,获取到的数据存储在MongoDB数据库中,每个集合(collection)中,数据字段包含如下:

技术图片

抓取指数历史行情

流程图如下

技术图片


首先准备好数据库的连接,可查看python对MongoDB数据库的操作,这里在database文件中创建了对MongoDB数据的连接及指定存储的数据库
datebase.py文件

from pymongo import MongoClient

#指定数据库的连接,quant_01是数据库名
DB_CONN = MongoClient(‘mongodb://127.0.0.1:27017‘)[‘quant_01‘]


在daily_crawler.py文件中完成初始化、数据的获取、储存等操作。

import tushare as ts 
from database import DB_CONN
from datetime import datetime
from pymongo import UpdateOne

class DailyCrawler:
    def __init__(self):
        #创建daily数据集(集合)
        self.daily = DB_CONN[‘daily‘]
        #创建daily_hfq数据集(集合)
        self.daily_hfq = DB_CONN[‘daily_hfq‘]

获取指数历史行情数据(index= true)

def crawl_index(self,begin_date = None,end_date=None):
        """
        抓取指数的日k数据
        指数行情的主要作用:
        1、用来生成交易日历
        2、回测时作为收益的对比基准
        :param begin_date:开始日期
        :param end_date:结束日期
        """
        #指定抓取的指数列表,可以增加和改变列表中的值
        index_codes = [‘000001‘,‘000300‘,‘399001‘,‘399006‘,‘399006‘]
        #当前日期
        now = datetime.now().strftime(‘%Y-%m-%d‘)
        #如果没有指定开始日期,则默认当前日期
        if begin_date is None:
            begin_date = now
        #如果没有指定结束日期,则默认当前日期
        if end_date is None:
            end_date = now
        #按指数的代码循环,抓取所有指数信息
        for code in index_codes:
            #抓取一个指数在一个时间区间的数据
            df_daily = ts.get_k_data(code,index=True,start=begin_date,end=end_date)
            #保存数据
            self.save_data(code,df_daily,self.daily,{‘index‘:True})

获取股票历史数据(index=False)

流程图如下:

技术图片

获取所有股票行情数据
调用tushare中get_stock_basics()获取所有股票的基本信息,然后将基本信息的索引列表转化为股票代码列表,就得到了所有股票代码
再调用get_k_data()获取不复权、后复权历史价格数据

def crawl(self,begin_date=None,end_date=None):
        ‘‘‘
        抓取股票的日k数据,主要包括不复权和后复权两种
        :param begin_date:开始日期
        :param end_date:结束日期
        ‘‘‘
        #通过tushare的基本信息API,获取股票的基本信息
        stock_df = ts.get_stock_basics()
        #将基本信息的索引列表转换为股票代码列表
        codes = list(stock_df.index)

        #当前日期
        now = datetime.now().strftime("%Y-%M-%D")

        #如果没有指定开始/结束日期,则默认为当前日期
        if begin_date is None:
            begin_date = now
        if end_date is None:
            end_date = now
        
        for code in codes:
            #不复权价格
            df_daily = ts.get_k_data(code,start=begin_date,end=end_date,autype=None)
            self.save_data(code,df_daily,self.daily,{‘index‘:False})
            #后复权价格
            df_daily_hfq = ts.get_k_data(code,start=begin_date,end=end_date,autype=‘hfq‘)
            self.save_data(code,df_daily_hfq,self.daily_hfq,{‘index‘:False})

这里曾经很好奇,为何(color{purple}{stock_df.index})就可以获得股票代码呢?
在get_stock_basics()实现源码中,作者将(color{purple}{code})设为了index,因此该语句才能有效的获取股票代码

技术图片

保存数据

流程图:

技术图片

随着数据量的增加,写入速度会变慢,因此需要创建索引,这里对code、date、index三个字段加上索引
创建索引的命令式如下:

db.daily.createIndex({‘code‘:1,‘date‘:1,‘index‘:1})

可通过db.daily.getIndexes()查看索引

技术图片

保存数据代码:

def save_data(self,code,df_daily,collection,extra_fields =None):
        ‘‘‘
        将从网上抓取的数据保存在本地MongoDB中
        :param code:股票代码
        :param df_daily:包含日线数据的DataFrame
        :param collection:储存的数据集
        :param extra_fields:除k线数据中保存的字段,需要额外保存的字段
        ‘‘‘
        #数据更新的请求列表
        update_requests = []

        #将DataFrame中的行情数据,生成更新数据的请求
        for df_index in df_daily.index:
            #将DataFrame中的一行数据转换成dict类型:
            doc = dict(df_daily.loc[df_index])
            #设置股票代码
            doc[‘code‘] = code

            #如果指定了其他字段,则更行dict
            if extra_fields is not None:
                doc.update(extra_fields)
            
            #生成一条数据库的更新请求
            #注意:
            #需要在code、date、index三个字段上增加索引,否则随着数据量的增加,写入速度会变慢
            #创建索引的命令式:
            #db.daily.createIndex({‘code‘:1,‘date‘:1,‘index‘:1})
            update_requests.append(
                UpdateOne(
                    {‘code‘:doc[‘code‘],‘date‘:doc[‘date‘],‘index‘:doc[‘index‘]},
                    {‘$set‘:doc},
                    upsert = True
                )
            )
            #如果写入的请求列表不为空,则都保存在数据库中
            if len(update_requests)>0:
                #批量写入到数据库中,批量写入可以降低网络IO,提高速度
                update_result = collection.bulk_write(update_requests,ordered=False)
                print(‘保存日线数据,代码:%s ,插入:%4d 条,更新:%4d 条‘%(code,update_result.upserted_count,update_result.modified_count),flush=True)

程序入口

if __name__ == "__main__":
    dc = DailyCrawler()
    dc.crawl_index(‘2015-01-01‘, ‘2015-01-06‘)
    dc.crawl(‘2015-01-01‘, ‘2015-01-06‘)

运行效果:

技术图片

查看有多少条数据:

技术图片

以上是关于CCXT如何获取历史行情,为后面开发策略做准备?的主要内容,如果未能解决你的问题,请参考以下文章

金融量化利用ccxt爬取交易所的交易历史烛线图数据信息

DolphinDB 历史数据回放功能应用:股票行情回放

请问怎样用Java获取股票行情历史数据?新浪、搜狐、百度财经都可以......

量化交易如何获取实时行情数据?

如何从tushare中调取十大股东数据?

5.6今日BTC行情分析:比特币(BTC)日内行情分析,后市操作策略分析