python ZIPLINE简单策略与管道因素

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python ZIPLINE简单策略与管道因素相关的知识,希望对你有一定的参考价值。

"""
This is a template algorithm on Zipline for you to adapt and fill in.
"""

from zipline.api import attach_pipeline, pipeline_output, get_datetime
from zipline import run_algorithm
from zipline.api import symbols, get_datetime, schedule_function
from zipline.utils.events import date_rules, time_rules
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.filters import StaticAssets
from datetime import datetime, timezone
import pytz


def initialize(context):
    """
    Called once at the start of the algorithm.
    """
    # Rebalance every day, 1 hour after market open.
    schedule_function(
        rebalance,
        date_rules.month_end(),
        time_rules.market_open(hours=1)
    )

    # Record tracking variables at the end of each day.
    schedule_function(
        record_vars,
        date_rules.every_day(),
        time_rules.market_close(),
    )

    # Create our dynamic stock selector.
    print('ATTACH PIPELINE')
    attach_pipeline(make_pipeline(), 'pipeline')
    print('PIPELINE ATTACHED')


def make_pipeline():
    """
    A function to create our dynamic stock selector (pipeline). Documentation
    on pipeline can be found here:
    https://www.quantopian.com/help#pipeline-title
    """

    base_universe = StaticAssets(symbols('XLY', 'XLP', 'XLE', 'XLF', 'XLV',
                                         'XLI', 'XLB', 'XLK', 'XLU'))

    # Factor of yesterday's close price.
    yesterday_close = USEquityPricing.close.latest

    pipeline = Pipeline(
        columns={
            'close': yesterday_close,
        },
        screen=base_universe
    )
    return pipeline


def before_trading_start(context, data):
    """
    Called every day before market open.
    """
    print('GET PIPELINE')
    context.output = pipeline_output('pipeline')
    print('PIPELINE OUTPUT')
    print(context.output)

    # These are the securities that we are interested in trading each day.
    context.security_list = context.output.index
    print('SECURITY LIST : ', context.security_list)


def rebalance(context, data):
    """
    Execute orders according to our schedule_function() timing.
    """
    print('REBALANCE - DATE', get_datetime())
    pass


def record_vars(context, data):
    """
    Plot variables at the end of each day.
    """
    pass


def handle_data(context, data):
    """
    Called every minute.
    """
    pass


if __name__ == "__main__":
    start = datetime(2013, 1, 1, 0, 0, 0, 0, pytz.utc)
    end = datetime(2013, 1, 10, 0, 0, 0, 0, pytz.utc)
    #     end = datetime.today().replace(tzinfo=timezone.utc)
    capital_base = 100000

    result = run_algorithm(start=start, end=end, initialize=initialize, \
                           capital_base=capital_base, \
                           before_trading_start=before_trading_start,
                           bundle='etfs_bundle')

    print(result[:3])
from zipline.pipeline import CustomFactor, Pipeline, CustomFilter
from zipline import TradingAlgorithm
from zipline.api import symbols, attach_pipeline, schedule_function, pipeline_output
from zipline.utils.events import date_rules, time_rules
from zipline.pipeline.factors import Returns
import numpy as np
import pandas as pd
from datetime import datetime
from zipline import run_algorithm
from zipline.api import order, record, symbol
import pytz
 
class SecurityInList(CustomFactor):  
    inputs = []  
    window_length = 1  
    securities = []
    def compute(self, today, assets, out):  
        out[:] = np.in1d(assets, self.securities)  
 
def initialize(context):
    
#     do NOT import symbols in RESEARCH and use following to define symbol list
#     sec_list = [symbols('SPY'),symbols('VEU'), symbols('SHY'), symbols('TLT'), symbols('AGG')]
    sec_list = symbols ('MDY', 'EFA')
    attach_pipeline(make_pipeline(sec_list, context), 'my_pipeline')
    
    schedule_function(func=rebalance,
                      date_rule=date_rules.every_day(),
                      time_rule=time_rules.market_open(minutes=30))
         
def make_pipeline(sec_list, context):
 
    # Return Factors
    mask = SecurityInList()
    mask.securities = sec_list
    mask = mask.eq(1)
    yr_returns = Returns(window_length=252, mask=mask)
     
    pipe = Pipeline(
        screen = mask,
        columns = {
            'yr_returns': yr_returns
        }
    )
    return pipe
 
def before_trading_start(context, data):
    """
    Called every day before market open.
    """
    context.output = pipeline_output('my_pipeline')
    print (context.output)
    
def rebalance (context, data) :
    #     print (dir(context))
    print (pipeline_output('my_pipeline'))
    pass
 
capital_base = 10000
start = datetime(2015, 1, 1, 0, 0, 0, 0, pytz.utc)
end = datetime(2016, 1, 1, 0, 0, 0, 0, pytz.utc)
 
    
result = run_algorithm(start = start, end = end, initialize=initialize,\
                capital_base=capital_base,\
                before_trading_start = before_trading_start, 
                bundle = 'etfs_bundle')

以上是关于python ZIPLINE简单策略与管道因素的主要内容,如果未能解决你的问题,请参考以下文章

python ZIPLINE PIPELINE策略与捆绑

python ZIPLINE PIPELINE战略,没有大的因素

json解码使用zipline进行回测

CtaAlgo vs PyAlgoTrade

自建zipline的databundle

python 其他拉链管道因素