自建zipline的databundle

Posted Anaconda开发教程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自建zipline的databundle相关的知识,希望对你有一定的参考价值。

01


交易日历

zipline内置了不少交易日历,能够很好的对这些数据集的金融资产进行策略回测。在trading_calendars/calendar_utils.py中,定义了:

_default_calendar_factories = { # Exchange calendars. 'ASEX': ASEXExchangeCalendar, 'BVMF': BVMFExchangeCalendar, 'CMES': CMESExchangeCalendar, 'IEPA': IEPAExchangeCalendar, 'XAMS': XAMSExchangeCalendar, 'XASX': XASXExchangeCalendar, 'XBKK': XBKKExchangeCalendar, 'XBOG': XBOGExchangeCalendar, 'XBOM': XBOMExchangeCalendar, 'XBRU': XBRUExchangeCalendar, 'XBUD': XBUDExchangeCalendar, 'XBUE': XBUEExchangeCalendar, 'XCBF': XCBFExchangeCalendar, 'XCSE': XCSEExchangeCalendar, 'XDUB': XDUBExchangeCalendar, 'XFRA': XFRAExchangeCalendar, 'XHEL': XHELExchangeCalendar, 'XHKG': XHKGExchangeCalendar, 'XICE': XICEExchangeCalendar, 'XIDX': XIDXExchangeCalendar, 'XIST': XISTExchangeCalendar, 'XJSE': XJSEExchangeCalendar, 'XKAR': XKARExchangeCalendar, 'XKLS': XKLSExchangeCalendar, 'XKRX': XKRXExchangeCalendar, 'XLIM': XLIMExchangeCalendar, 'XLIS': XLISExchangeCalendar, 'XLON': XLONExchangeCalendar, 'XMAD': XMADExchangeCalendar, 'XMEX': XMEXExchangeCalendar, 'XMIL': XMILExchangeCalendar, 'XMOS': XMOSExchangeCalendar, 'XNYS': XNYSExchangeCalendar, 'XNZE': XNZEExchangeCalendar, 'XOSL': XOSLExchangeCalendar, 'XPAR': XPARExchangeCalendar, 'XPHS': XPHSExchangeCalendar, 'XPRA': XPRAExchangeCalendar, 'XSES': XSESExchangeCalendar, 'XSGO': XSGOExchangeCalendar, 'XSHG': XSHGExchangeCalendar, 'XSTO': XSTOExchangeCalendar, 'XSWX': XSWXExchangeCalendar, 'XTAI': XTAIExchangeCalendar, 'XTKS': XTKSExchangeCalendar, 'XTSE': XTSEExchangeCalendar, 'XWAR': XWARExchangeCalendar, 'XWBO': XWBOExchangeCalendar, # Miscellaneous calendars. 'us_futures': QuantopianUSFuturesCalendar, '24/7': AlwaysOpenCalendar, '24/5': WeekdayCalendar,}

另外,由于不少市场的交易日历完全相同,因此zipline定义了一些别名:

_default_calendar_aliases = { 'NYSE': 'XNYS', 'NASDAQ': 'XNYS', 'BATS': 'XNYS', 'FWB': 'XFRA', 'LSE': 'XLON', 'TSX': 'XTSE', 'BMF': 'BVMF', 'CME': 'CMES', 'CBOT': 'CMES', 'COMEX': 'CMES', 'NYMEX': 'CMES', 'ICE': 'IEPA', 'ICEUS': 'IEPA', 'NYFE': 'IEPA', 'CFE': 'XCBF', 'JKT': 'XIDX',}
default_calendar_names = sorted(_default_calendar_factories.keys())

在trading_calendars下面定义了上述的日历。

以数字货币24/7的交易日历为例:

 '24/7': AlwaysOpenCalendar,
from datetime import timefrom pytz import UTCfrom trading_calendars import TradingCalendarclass AlwaysOpenCalendar(TradingCalendar): """A TradingCalendar for an exchange that's open every minute of every day. """ name = '24/7' tz = UTC weekmask = '1111111' open_times = ( (None, time(0)), ) close_times = ( (None, time(23, 59)), )
'24/5': WeekdayCalendar
class WeekdayCalendar(TradingCalendar): """ A TradingCalendar for an exchange that is open every minute of every weekday.     TradingCalendar中,weekday缺省为'1111100' (i.e., Monday-Friday). """ name = '24/5' tz = UTC open_times = ( (None, time(0)), ) close_times = ( (None, time(23, 59)), )
class XNYSExchangeCalendar(TradingCalendar): """    Exchange calendar for the New York Stock Exchange (XNYS). Open Time: 9:31 AM, US/Eastern Close Time: 4:00 PM, US/Eastern NOTE: Until 1993, the standard early close time for the NYSE was 2:00 PM. From 1993 onward, it has been 1:00 PM.    """    regular_early_close = time(13)    name = 'XNYS'    tz = timezone('US/Eastern') open_times = ( (None, time(9, 31)),    ) close_times = ( (None, time(16)), )

对于自定义日历,需要考虑周末、节假日等regular_holidays,还要考虑special_opens、special_closes,adhoc_holidays、special_closes_adhoc。

02


数据集

Zipline使用Data Bundles方式组织数据,使得我们可以轻松得整合不同数据源。自定义一个Data Bundle只需要实现一个ingest方法。
ingest方法负责将数据加载到内存,并传给各种writer来写入zipline,数据源可以来自quandl这种在线api,也可以来自本地的文件、数据库等。
zipline内部保证了注入过程的事务性,如果存在失败情况,不会导致数据最终的不一致性。

ingest方法如下:

ingest(environ, asset_db_writer, minute_bar_writer, daily_bar_writer, adjustment_writer, calendar, start_session, end_session, cache, show_progress, output_dir)

对于各个参数的简要说明

  • environ
    environ是一个表示环境变量的map,这里传入了注入过程需要使用的环境变量,如在quandl注入时,就需要传入api_key的参数。

  • asset_db_writer
    asset_db_writer用于写入金融资产的元数据(存在时间区间,证券代码与zipline内部sid的映射),通常也包含名称、交易所及其他一些数据。将相关数据写入Dataframe后,调用write()方法写入。

    以苹果股票为例:

    在equity_symbol_mappings中存储如下:

    ['id', 'sid', 'symbol', 'company_symbol', 'share_class_symbol', 'start_date', 'end_date'] [(9, 8, 'AAPL', 'AAPL', '', 345427200000000000, 1522108800000000000)]
    在equity_symbol_mappings中存储如下:
    ['sid', 'asset_name', 'start_date', 'end_date', 'first_traded', 'auto_close_date', 'exchange', 'exchange_full'] [(8, None, 345427200000000000, 1522108800000000000, -9223372036854775808, 1522195200000000000, 'QUANDL', None)]
  • minute_bar_writer
    minute_bar_writer可以将数据转换为zipline内部使用的bcolz格式文件。如果有分钟行情数据,需要循环调用将(sid,dataframe)的数据写入。show_progress参数会传入write方法。

  • daily_bar_writer
    daily_bar_writer用来写入每日收盘行情数据,与minute_bar_writer类似。

  • adjustment_writer
    adjustment_writer用来写入分拆合并、分红、送股等信息。

  • calendar
    canlendar表示交易日期,会影响某些日期计算。

  • start_session
    start_session 表示需要注入数据的起始时间。

  • end_session
    end_session 表示需要注入数据的结束时间。

  • cache
    防止注入失败后重复加载数据,重试时优先从cache中取。

  • show_progress
    表示是否需要给用户反馈进度

  • output_dir
    指定注入路径,为$ZIPLINE_ROOT下相对路径。


03

以CSV文件的databundle为例

Zipline提供了一个名为csvdir的bundle,允许用户直接使用csv格式的文件导入数据。csv文件需要满足OHLCV(Open,High,Low, Close, Volumn)格式,同时也需要包含日期、分红及分拆信息,zipline/tests/resources/csvdir_samples路径提供了csv模板示例。比如AAPL.CSV文件格式如下:

Date,open,high,low,close,volume,dividend,split
2012-01-03,58.485714,58.92857,58.42857,58.747143,75555200,0.0,1.0
2012-01-04,58.57143,59.240002,58.468571,59.062859,65005500,0.0,1.0
2012-01-05,59.278572,59.792858,58.952858,59.718571,67817400,0.0,1.0
2012-01-06,59.967144,60.392857,59.888573,60.342857,79573200,0.0,1.0
2012-01-09,60.785713,61.107143,60.192856,60.247143,98506100,0.0,1.0
2012-01-10,60.844284,60.857143,60.214287,60.462856,64549100,0.0,1.0
2012-01-11,60.382858,60.407143,59.901428,60.364285,53771200,0.0,1.0
2012-01-12,60.325714,60.414288,59.82143,60.19857,53146800,0.0,1.0

有了正确格式的csv文件后,需要修改 ~/.zipline/extension.py,同时需要引入csvdir与pandas。
 
import pandas as pdfrom zipline.data.bundles import registerfrom zipline.data.bundles.csvdir import csvdir_equities

使用pandas指定起始结束时间:

start_session = pd.Timestamp('2021-01-04', tz='utc')end_session = pd.Timestamp('2021-02-26', tz='utc')

使用register()注册该bundle:

register(    'custom-csvdir-bundle', csvdir_equities( ['daily'], '/path/to/your/csvs',    ), start_session=start_session, end_session=end_session)
$ zipline bundles

csvdir <no ingestions>
custom-csvdir-bundle <no ingestions>
quandl 2021-01-26 09:56:51.314274
quantopian-quandl 2021-01-28 22:11:32.898555

下面就可以注入该csv数据文件:

$ zipline ingest -b custom-csvdir-bundle

Loading custom pricing data: [############------------------------] 33% | FAKE: sid 0
Loading custom pricing data: [########################------------] 66% | FAKE1: sid 1
Loading custom pricing data: [####################################] 100% | FAKE2: sid 2
Loading custom pricing data: [####################################] 100%
Merging daily equity files: [####################################]


# 也可以通过环境变量传入路径

$ CSVDIR=/path/to/your/csvs zipline ingest -b custom-csvdir-bundle

05


数据验证

>>> import bcolz>>>csvuri='/path/to/your/bcolz'>>> f=bcolz.open(csvuri)>>> df=f.todataframe()>>> df

    open   high    low  close    volume         day  id
0  58485  58928  58428  58747  75555200  1325548800   0
1  58571  59240  58468  59062  65005500  1325635200   0
2  59278  59792  58952  59718  67817400  1325721600   0
3  59967  60392  59888  60342  79573200  1325808000   0
4  60785  61107  60192  60247  98506100  1326067200   0
5  60844  60857  60214  60462  64549100  1326153600   0
6  60382  60407  59901  60364  53771200  1326240000   0


04


代码阅读

zipline自带的csvbundle处理在zipline/data/bundles/csvdir.py中。

def csvdir_equities(tframes=None, csvdir=None): """ Generate an ingest function for custom data bundle This function can be used in ~/.zipline/extension.py to register bundle with custom parameters, e.g. with a custom trading calendar.
Parameters ---------- tframes: tuple, optional The data time frames, supported timeframes: 'daily' and 'minute' csvdir : string, optional, default: CSVDIR environment variable The path to the directory of this structure: <directory>/<timeframe1>/<symbol1>.csv <directory>/<timeframe1>/<symbol2>.csv <directory>/<timeframe1>/<symbol3>.csv <directory>/<timeframe2>/<symbol1>.csv <directory>/<timeframe2>/<symbol2>.csv <directory>/<timeframe2>/<symbol3>.csv
Returns ------- ingest : callable The bundle ingest function
Examples -------- This code should be added to ~/.zipline/extension.py .. code-block:: python from zipline.data.bundles import csvdir_equities, register register('custom-csvdir-bundle', csvdir_equities(["daily", "minute"], '/full/path/to/the/csvdir/directory'))    """ return CSVDIRBundle(tframes, csvdir).ingest


class CSVDIRBundle:
"""
Wrapper class to call csvdir_bundle with provided
list of time frames and a path to the csvdir directory
"""

def __init__(self, tframes=None, csvdir=None):
self.tframes = tframes
self.csvdir = csvdir

def ingest(self,
environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir):

csvdir_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir,
self.tframes,
self.csvdir)


@bundles.register("csvdir")
def csvdir_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir,
tframes=None,
csvdir=None):
"""
Build a zipline data bundle from the directory with csv files.
"""
if not csvdir:
csvdir = environ.get('CSVDIR')
if not csvdir:
raise ValueError("CSVDIR environment variable is not set")

if not os.path.isdir(csvdir):
raise ValueError("%s is not a directory" % csvdir)

if not tframes:
tframes = set(["daily", "minute"]).intersection(os.listdir(csvdir))

if not tframes:
raise ValueError("'daily' and 'minute' directories "
"not found in '%s'" % csvdir)

divs_splits = {'divs': DataFrame(columns=['sid', 'amount',
'ex_date', 'record_date',
'declared_date', 'pay_date']),
'splits': DataFrame(columns=['sid', 'ratio',
'effective_date'])}
for tframe in tframes:
ddir = os.path.join(csvdir, tframe)

symbols = sorted(item.split('.csv')[0]
for item in os.listdir(ddir)
if '.csv' in item)
if not symbols:
raise ValueError("no <symbol>.csv* files found in %s" % ddir)

dtype = [('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('auto_close_date', 'datetime64[ns]'),
('symbol', 'object')]
metadata = DataFrame(empty(len(symbols), dtype=dtype))

if tframe == 'minute':
writer = minute_bar_writer
else:
writer = daily_bar_writer

writer.write(_pricing_iter(ddir, symbols, metadata,
divs_splits, show_progress),
show_progress=show_progress)

# Hardcode the exchange to "CSVDIR" for all assets and (elsewhere)
# register "CSVDIR" to resolve to the NYSE calendar, because these
# are all equities and thus can use the NYSE calendar.
metadata['exchange'] = "CSVDIR"

asset_db_writer.write(equities=metadata)

divs_splits['divs']['sid'] = divs_splits['divs']['sid'].astype(int)
divs_splits['splits']['sid'] = divs_splits['splits']['sid'].astype(int)
adjustment_writer.write(splits=divs_splits['splits'],
dividends=divs_splits['divs'])


def _pricing_iter(csvdir, symbols, metadata, divs_splits, show_progress):
with maybe_show_progress(symbols, show_progress,
label='Loading custom pricing data: ') as it:
files = os.listdir(csvdir)
for sid, symbol in enumerate(it):
logger.debug('%s: sid %s' % (symbol, sid))

try:
fname = [fname for fname in files
if '%s.csv' % symbol in fname][0]
except IndexError:
raise ValueError("%s.csv file is not in %s" % (symbol, csvdir))

dfr = read_csv(os.path.join(csvdir, fname),
parse_dates=[0],
infer_datetime_format=True,
index_col=0).sort_index()

start_date = dfr.index[0]
end_date = dfr.index[-1]

# The auto_close date is the day after the last trade.
ac_date = end_date + Timedelta(days=1)
metadata.iloc[sid] = start_date, end_date, ac_date, symbol

if 'split' in dfr.columns:
tmp = 1. / dfr[dfr['split'] != 1.0]['split']
split = DataFrame(data=tmp.index.tolist(),
columns=['effective_date'])
split['ratio'] = tmp.tolist()
split['sid'] = sid

splits = divs_splits['splits']
index = Index(range(splits.shape[0],
splits.shape[0] + split.shape[0]))
split.set_index(index, inplace=True)
divs_splits['splits'] = splits.append(split)

if 'dividend' in dfr.columns:
# ex_date amount sid record_date declared_date pay_date
tmp = dfr[dfr['dividend'] != 0.0]['dividend']
div = DataFrame(data=tmp.index.tolist(), columns=['ex_date'])
div['record_date'] = NaT
div['declared_date'] = NaT
div['pay_date'] = NaT
div['amount'] = tmp.tolist()
div['sid'] = sid

divs = divs_splits['divs']
ind = Index(range(divs.shape[0], divs.shape[0] + div.shape[0]))
div.set_index(ind, inplace=True)
divs_splits['divs'] = divs.append(div)

yield sid, dfr


register_calendar_alias("CSVDIR", "NYSE")


===============================================


=============================================



以上是关于自建zipline的databundle的主要内容,如果未能解决你的问题,请参考以下文章

json解码使用zipline进行回测

zipline

Zipline Beginner Tutorial

zipline-- 开发指南

zipline install instruction

Zipline Data Bundles