python 运行ZIPLINE PIPELINE(包括模拟输入)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 运行ZIPLINE PIPELINE(包括模拟输入)相关的知识,希望对你有一定的参考价值。

# where did I get this from?
# From HERE : https://www.quantopian.com/posts/possible-to-simulate-inputs-to-pipeline-in-the-research-platform

import numpy as np
import pandas as pd

from zipline.pipeline.data import Column  
from zipline.pipeline.data import DataSet

from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.utils.calendars import get_calendar
from zipline.data.bundles import register,load
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.filters import StaticAssets
from zipline.pipeline.engine import SimplePipelineEngine  
from zipline.pipeline.loaders.frame import DataFrameLoader

trading_calendar = get_calendar('NYSE')
bundle_data = load('quandl')


# Set up Custom Data Source for two sids for DataFrameLoader
class MyDataSet(DataSet):  
    column_A = Column(dtype=float)
    column_B = Column(dtype=bool) 

dates = pd.date_range('2014-01-01', '2017-01-01', tz='UTC')
assets = bundle_data.asset_finder.lookup_symbols(['AAPL', 'MSFT'], as_of_date=None)
sids = pd.Int64Index([asset.sid for asset in assets])

# The values for Column A will just be a 2D array of numbers ranging from 1 -> N.  
column_A_frame = pd.DataFrame(  
    data=np.arange(len(dates)*len(assets), dtype=float).reshape(len(dates), len(assets)),  
    index=dates,
    columns=sids,
)

# Column B will always provide True for 0 and False for 1.  
column_B_frame = pd.DataFrame(data={sids[0]: True, sids[1]: False}, index=dates)

loaders = {  
    MyDataSet.column_A: DataFrameLoader(MyDataSet.column_A, column_A_frame),  
    MyDataSet.column_B: DataFrameLoader(MyDataSet.column_B, column_B_frame),  
}

def my_dispatcher(column):
    return loaders[column]


# Set up pipeline engine

# Loader for pricing
pipeline_loader = USEquityPricingLoader(
    bundle_data.equity_daily_bar_reader,
    bundle_data.adjustment_reader,
)

def choose_loader(column):
    if column in USEquityPricing.columns:
        return pipeline_loader
    return my_dispatcher(column)

engine = SimplePipelineEngine(
    get_loader=choose_loader,
    calendar=trading_calendar.all_sessions,
    asset_finder=bundle_data.asset_finder,
)

p = Pipeline(
    columns={
        'price': USEquityPricing.close.latest,
        'col_A': MyDataSet.column_A.latest,
        'col_B': MyDataSet.column_B.latest
    },
    screen=StaticAssets(assets)
)

df = engine.run_pipeline(
    p,
    pd.Timestamp('2016-01-05', tz='utc'),
    pd.Timestamp('2016-01-07', tz='utc')
)

print(df)

以上是关于python 运行ZIPLINE PIPELINE(包括模拟输入)的主要内容,如果未能解决你的问题,请参考以下文章

python ZIPLINE RUN SIMPLE PIPELINE

python ZIPLINE RUN PIPELINE

python ZIPLINE PIPELINE策略与捆绑

python ZIPLINE PIPELINE战略,没有大的因素

python ZIPLINE简单策略与管道因素

python 什么ZIPLINE数据看起来像