自建zipline的databundle
Posted Anaconda开发教程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自建zipline的databundle相关的知识,希望对你有一定的参考价值。
01
—
交易日历
zipline内置了不少交易日历,能够很好的对这些数据集的金融资产进行策略回测。在trading_calendars/calendar_utils.py中,定义了:
_default_calendar_factories = {
# Exchange calendars.
'ASEX': ASEXExchangeCalendar,
'BVMF': BVMFExchangeCalendar,
'CMES': CMESExchangeCalendar,
'IEPA': IEPAExchangeCalendar,
'XAMS': XAMSExchangeCalendar,
'XASX': XASXExchangeCalendar,
'XBKK': XBKKExchangeCalendar,
'XBOG': XBOGExchangeCalendar,
'XBOM': XBOMExchangeCalendar,
'XBRU': XBRUExchangeCalendar,
'XBUD': XBUDExchangeCalendar,
'XBUE': XBUEExchangeCalendar,
'XCBF': XCBFExchangeCalendar,
'XCSE': XCSEExchangeCalendar,
'XDUB': XDUBExchangeCalendar,
'XFRA': XFRAExchangeCalendar,
'XHEL': XHELExchangeCalendar,
'XHKG': XHKGExchangeCalendar,
'XICE': XICEExchangeCalendar,
'XIDX': XIDXExchangeCalendar,
'XIST': XISTExchangeCalendar,
'XJSE': XJSEExchangeCalendar,
'XKAR': XKARExchangeCalendar,
'XKLS': XKLSExchangeCalendar,
'XKRX': XKRXExchangeCalendar,
'XLIM': XLIMExchangeCalendar,
'XLIS': XLISExchangeCalendar,
'XLON': XLONExchangeCalendar,
'XMAD': XMADExchangeCalendar,
'XMEX': XMEXExchangeCalendar,
'XMIL': XMILExchangeCalendar,
'XMOS': XMOSExchangeCalendar,
'XNYS': XNYSExchangeCalendar,
'XNZE': XNZEExchangeCalendar,
'XOSL': XOSLExchangeCalendar,
'XPAR': XPARExchangeCalendar,
'XPHS': XPHSExchangeCalendar,
'XPRA': XPRAExchangeCalendar,
'XSES': XSESExchangeCalendar,
'XSGO': XSGOExchangeCalendar,
'XSHG': XSHGExchangeCalendar,
'XSTO': XSTOExchangeCalendar,
'XSWX': XSWXExchangeCalendar,
'XTAI': XTAIExchangeCalendar,
'XTKS': XTKSExchangeCalendar,
'XTSE': XTSEExchangeCalendar,
'XWAR': XWARExchangeCalendar,
'XWBO': XWBOExchangeCalendar,
# Miscellaneous calendars.
'us_futures': QuantopianUSFuturesCalendar,
'24/7': AlwaysOpenCalendar,
'24/5': WeekdayCalendar,
}
另外,由于不少市场的交易日历完全相同,因此zipline定义了一些别名:
_default_calendar_aliases = {
'NYSE': 'XNYS',
'NASDAQ': 'XNYS',
'BATS': 'XNYS',
'FWB': 'XFRA',
'LSE': 'XLON',
'TSX': 'XTSE',
'BMF': 'BVMF',
'CME': 'CMES',
'CBOT': 'CMES',
'COMEX': 'CMES',
'NYMEX': 'CMES',
'ICE': 'IEPA',
'ICEUS': 'IEPA',
'NYFE': 'IEPA',
'CFE': 'XCBF',
'JKT': 'XIDX',
}
default_calendar_names = sorted(_default_calendar_factories.keys())
在trading_calendars下面定义了上述的日历。
以数字货币24/7的交易日历为例:
'24/7': AlwaysOpenCalendar,
from datetime import time
from pytz import UTC
from trading_calendars import TradingCalendar
class AlwaysOpenCalendar(TradingCalendar):
"""A TradingCalendar for an exchange that's open every minute of every day.
"""
name = '24/7'
tz = UTC
weekmask = '1111111'
open_times = (
(None, time(0)),
)
close_times = (
(None, time(23, 59)),
)
'24/5': WeekdayCalendar
class WeekdayCalendar(TradingCalendar):
"""
A TradingCalendar for an exchange that is open every minute of every
weekday.
TradingCalendar中,weekday缺省为'1111100' (i.e., Monday-Friday).
"""
name = '24/5'
tz = UTC
open_times = (
(None, time(0)),
)
close_times = (
(None, time(23, 59)),
)
class XNYSExchangeCalendar(TradingCalendar):
"""
Exchange calendar for the New York Stock Exchange (XNYS).
Open Time: 9:31 AM, US/Eastern
Close Time: 4:00 PM, US/Eastern
NOTE: Until 1993, the standard early close time for the NYSE was 2:00 PM.
From 1993 onward, it has been 1:00 PM.
"""
time(13) =
'XNYS' =
timezone('US/Eastern') =
open_times = (
time(9, 31)),
)
close_times = (
time(16)),
)
对于自定义日历,需要考虑周末、节假日等regular_holidays,还要考虑special_opens、special_closes,adhoc_holidays、special_closes_adhoc。
02
—
数据集
Zipline使用Data Bundles方式组织数据,使得我们可以轻松得整合不同数据源。自定义一个Data Bundle只需要实现一个ingest
方法。ingest
方法负责将数据加载到内存,并传给各种writer
来写入zipline,数据源可以来自quandl
这种在线api,也可以来自本地的文件、数据库等。
zipline内部保证了注入过程的事务性,如果存在失败情况,不会导致数据最终的不一致性。
ingest
方法如下:
ingest(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir)
对于各个参数的简要说明
environ
environ
是一个表示环境变量的map,这里传入了注入过程需要使用的环境变量,如在quandl
注入时,就需要传入api_key的参数。asset_db_writer
asset_db_writer
用于写入金融资产的元数据(存在时间区间,证券代码与zipline内部sid的映射),通常也包含名称、交易所及其他一些数据。将相关数据写入Dataframe后,调用write()
方法写入。以苹果股票为例:
在equity_symbol_mappings中存储如下:
['id', 'sid', 'symbol', 'company_symbol', 'share_class_symbol', 'start_date', 'end_date'] [(9, 8, 'AAPL', 'AAPL', '', 345427200000000000, 1522108800000000000)] 在equity_symbol_mappings中存储如下: ['sid', 'asset_name', 'start_date', 'end_date', 'first_traded', 'auto_close_date', 'exchange', 'exchange_full'] [(8, None, 345427200000000000, 1522108800000000000, -9223372036854775808, 1522195200000000000, 'QUANDL', None)]
minute_bar_writer
minute_bar_writer
可以将数据转换为zipline内部使用的bcolz
格式文件。如果有分钟行情数据,需要循环调用将(sid,dataframe)的数据写入。show_progress
参数会传入write方法。daily_bar_writer
daily_bar_writer
用来写入每日收盘行情数据,与minute_bar_writer
类似。adjustment_writer
adjustment_writer
用来写入分拆合并、分红、送股等信息。calendar
canlendar
表示交易日期,会影响某些日期计算。start_session
start_session
表示需要注入数据的起始时间。end_session
end_session
表示需要注入数据的结束时间。cache
防止注入失败后重复加载数据,重试时优先从cache中取。show_progress
表示是否需要给用户反馈进度output_dir
指定注入路径,为$ZIPLINE_ROOT
下相对路径。
03
—
以CSV文件的databundle为例
Zipline提供了一个名为csvdir的bundle,允许用户直接使用csv格式的文件导入数据。csv文件需要满足OHLCV(Open,High,Low, Close, Volumn)格式,同时也需要包含日期、分红及分拆信息,zipline/tests/resources/csvdir_samples路径提供了csv模板示例。比如AAPL.CSV文件格式如下:
2012-01-03,58.485714,58.92857,58.42857,58.747143,75555200,0.0,1.0
2012-01-04,58.57143,59.240002,58.468571,59.062859,65005500,0.0,1.0
2012-01-05,59.278572,59.792858,58.952858,59.718571,67817400,0.0,1.0
2012-01-06,59.967144,60.392857,59.888573,60.342857,79573200,0.0,1.0
2012-01-09,60.785713,61.107143,60.192856,60.247143,98506100,0.0,1.0
2012-01-10,60.844284,60.857143,60.214287,60.462856,64549100,0.0,1.0
2012-01-11,60.382858,60.407143,59.901428,60.364285,53771200,0.0,1.0
2012-01-12,60.325714,60.414288,59.82143,60.19857,53146800,0.0,1.0
有了正确格式的csv文件后,需要修改
~/.zipline/extension.py
,同时需要引入csvdir与pandas。
import pandas as pd
from zipline.data.bundles import register
from zipline.data.bundles.csvdir import csvdir_equities
使用pandas指定起始结束时间:
start_session = pd.Timestamp('2021-01-04', tz='utc')
end_session = pd.Timestamp('2021-02-26', tz='utc')
使用register()注册该bundle:
register(
'custom-csvdir-bundle',
csvdir_equities(
['daily'],
'/path/to/your/csvs',
),
start_session=start_session,
end_session=end_session
)
zipline bundles
csvdir <no ingestions>
custom-csvdir-bundle <no ingestions>
quandl 2021-01-26 09:56:51.314274
quantopian-quandl 2021-01-28 22:11:32.898555
下面就可以注入该csv数据文件:
zipline ingest -b custom-csvdir-bundle
Loading custom pricing data: [############------------------------] 33% | FAKE: sid 0
Loading custom pricing data: [########################------------] 66% | FAKE1: sid 1
Loading custom pricing data: [####################################] 100% | FAKE2: sid 2
Loading custom pricing data: [####################################] 100%
Merging daily equity files: [####################################]
# 也可以通过环境变量传入路径
$ CSVDIR=/path/to/your/csvs zipline ingest -b custom-csvdir-bundle
05
—
数据验证
> import bcolz
'/path/to/your/bcolz' >csvuri=
> f=bcolz.open(csvuri)
> df=f.todataframe()
> df
open high low close volume day id
0 58485 58928 58428 58747 75555200 1325548800 0
1 58571 59240 58468 59062 65005500 1325635200 0
2 59278 59792 58952 59718 67817400 1325721600 0
3 59967 60392 59888 60342 79573200 1325808000 0
4 60785 61107 60192 60247 98506100 1326067200 0
5 60844 60857 60214 60462 64549100 1326153600 0
6 60382 60407 59901 60364 53771200 1326240000 0
04
—
代码阅读
zipline自带的csvbundle处理在zipline/data/bundles/csvdir.py中。
def csvdir_equities(tframes=None, csvdir=None):
"""
Generate an ingest function for custom data bundle
This function can be used in ~/.zipline/extension.py
to register bundle with custom parameters, e.g. with
a custom trading calendar.
Parameters
----------
tframes: tuple, optional
The data time frames, supported timeframes: 'daily' and 'minute'
csvdir : string, optional, default: CSVDIR environment variable
The path to the directory of this structure:
<directory>/<timeframe1>/<symbol1>.csv
<directory>/<timeframe1>/<symbol2>.csv
<directory>/<timeframe1>/<symbol3>.csv
<directory>/<timeframe2>/<symbol1>.csv
<directory>/<timeframe2>/<symbol2>.csv
<directory>/<timeframe2>/<symbol3>.csv
Returns
-------
ingest : callable
The bundle ingest function
Examples
--------
This code should be added to ~/.zipline/extension.py
.. code-block:: python
from zipline.data.bundles import csvdir_equities, register
register('custom-csvdir-bundle',
csvdir_equities(["daily", "minute"],
'/full/path/to/the/csvdir/directory'))
"""
return CSVDIRBundle(tframes, csvdir).ingest
class CSVDIRBundle:
"""
Wrapper class to call csvdir_bundle with provided
list of time frames and a path to the csvdir directory
"""
def __init__(self, tframes=None, csvdir=None):
self.tframes = tframes
self.csvdir = csvdir
def ingest(self,
environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir):
csvdir_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir,
self.tframes,
self.csvdir)
@bundles.register("csvdir")
def csvdir_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir,
tframes=None,
csvdir=None):
"""
Build a zipline data bundle from the directory with csv files.
"""
if not csvdir:
csvdir = environ.get('CSVDIR')
if not csvdir:
raise ValueError("CSVDIR environment variable is not set")
if not os.path.isdir(csvdir):
raise ValueError("%s is not a directory" % csvdir)
if not tframes:
tframes = set(["daily", "minute"]).intersection(os.listdir(csvdir))
if not tframes:
raise ValueError("'daily' and 'minute' directories "
"not found in '%s'" % csvdir)
divs_splits = {'divs': DataFrame(columns=['sid', 'amount',
'ex_date', 'record_date',
'declared_date', 'pay_date']),
'splits': DataFrame(columns=['sid', 'ratio',
'effective_date'])}
for tframe in tframes:
ddir = os.path.join(csvdir, tframe)
symbols = sorted(item.split('.csv')[0]
for item in os.listdir(ddir)
if '.csv' in item)
if not symbols:
raise ValueError("no <symbol>.csv* files found in %s" % ddir)
dtype = [('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('auto_close_date', 'datetime64[ns]'),
('symbol', 'object')]
metadata = DataFrame(empty(len(symbols), dtype=dtype))
if tframe == 'minute':
writer = minute_bar_writer
else:
writer = daily_bar_writer
writer.write(_pricing_iter(ddir, symbols, metadata,
divs_splits, show_progress),
show_progress=show_progress)
# Hardcode the exchange to "CSVDIR" for all assets and (elsewhere)
# register "CSVDIR" to resolve to the NYSE calendar, because these
# are all equities and thus can use the NYSE calendar.
metadata['exchange'] = "CSVDIR"
asset_db_writer.write(equities=metadata)
divs_splits['divs']['sid'] = divs_splits['divs']['sid'].astype(int)
divs_splits['splits']['sid'] = divs_splits['splits']['sid'].astype(int)
adjustment_writer.write(splits=divs_splits['splits'],
dividends=divs_splits['divs'])
def _pricing_iter(csvdir, symbols, metadata, divs_splits, show_progress):
with maybe_show_progress(symbols, show_progress,
label='Loading custom pricing data: ') as it:
files = os.listdir(csvdir)
for sid, symbol in enumerate(it):
logger.debug('%s: sid %s' % (symbol, sid))
try:
fname = [fname for fname in files
if '%s.csv' % symbol in fname][0]
except IndexError:
raise ValueError("%s.csv file is not in %s" % (symbol, csvdir))
dfr = read_csv(os.path.join(csvdir, fname),
parse_dates=[0],
infer_datetime_format=True,
index_col=0).sort_index()
start_date = dfr.index[0]
end_date = dfr.index[-1]
# The auto_close date is the day after the last trade.
ac_date = end_date + Timedelta(days=1)
metadata.iloc[sid] = start_date, end_date, ac_date, symbol
if 'split' in dfr.columns:
tmp = 1. / dfr[dfr['split'] != 1.0]['split']
split = DataFrame(data=tmp.index.tolist(),
columns=['effective_date'])
split['ratio'] = tmp.tolist()
split['sid'] = sid
splits = divs_splits['splits']
index = Index(range(splits.shape[0],
splits.shape[0] + split.shape[0]))
split.set_index(index, inplace=True)
divs_splits['splits'] = splits.append(split)
if 'dividend' in dfr.columns:
# ex_date amount sid record_date declared_date pay_date
tmp = dfr[dfr['dividend'] != 0.0]['dividend']
div = DataFrame(data=tmp.index.tolist(), columns=['ex_date'])
div['record_date'] = NaT
div['declared_date'] = NaT
div['pay_date'] = NaT
div['amount'] = tmp.tolist()
div['sid'] = sid
divs = divs_splits['divs']
ind = Index(range(divs.shape[0], divs.shape[0] + div.shape[0]))
div.set_index(ind, inplace=True)
divs_splits['divs'] = divs.append(div)
yield sid, dfr
register_calendar_alias("CSVDIR", "NYSE")
=============================================
以上是关于自建zipline的databundle的主要内容,如果未能解决你的问题,请参考以下文章