python ZIPLINE PIPELINE战略,没有大的因素

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python ZIPLINE PIPELINE战略,没有大的因素相关的知识,希望对你有一定的参考价值。

"""
This is a template algorithm on Zipline for you to adapt and fill in.
"""

from zipline.api import attach_pipeline, pipeline_output
from zipline import run_algorithm
from zipline.api import symbols, get_datetime, schedule_function
from zipline.utils.events import date_rules, time_rules
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.factors import CustomFactor, Returns, DailyReturns
from zipline.pipeline.filters import StaticAssets
import pandas as pd
import numpy as np
from scipy.stats.mstats import winsorize
from sklearn import preprocessing
from datetime import datetime
import pytz

WINDOW_LENGTH = 5
WIN_LIMIT = 0


# flag used for first WINDOW_LENGTH days, where the algo is "only" innitialising buffers. One can avoid that using a second pipeline, which is call only at initialization and compute the alphas for the entire window... But I have not yet found a good solution for this!

def preprocess(a):
    a = a.astype(np.float64)
    a[np.isinf(a)] = np.nan
    a = np.nan_to_num(a - np.nanmean(a))
    a = winsorize(a, limits=[WIN_LIMIT, WIN_LIMIT])

    return preprocessing.scale(a)


def make_factor():
    class Direction(CustomFactor):
        inputs = [USEquityPricing.open, USEquityPricing.close]
        window_length = 21
        window_safe = True

        def compute(self, today, assets, out, open, close):
            p = (close - open) / close
            out[:] = preprocess(np.nansum(-p, axis=0))

    class mean_rev(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close]
        window_length = 30
        window_safe = True

        def compute(self, today, assets, out, high, low, close):
            p = (high + low + close) / 3

            m = len(close[0, :])
            n = len(close[:, 0])

            b = np.zeros(m)
            a = np.zeros(m)

            for k in range(10, n + 1):
                price_rel = np.nanmean(p[-k:, :], axis=0) / p[-1, :]
                wt = np.nansum(price_rel)
                b += wt * price_rel
                price_rel = 1.0 / price_rel
                wt = np.nansum(price_rel)
                a += wt * price_rel

            out[:] = preprocess(b - a)

    factors = {
        'Direction': Direction,
        'mean_rev': mean_rev
    }

    return factors


class Factor_N_Days_Ago(CustomFactor):

    def compute(self, today, assets, out, input_factor):
        out[:] = input_factor[0]


def initialize(context):
    """
    Called once at the start of the algorithm.
    """

    context.alphas = pd.DataFrame()

    # Rebalance every day, 1 hour after market open.
    algo.schedule_function(
        rebalance,
        algo.date_rules.every_day(),
        algo.time_rules.market_open(hours=1),
    )

    # Record tracking variables at the end of each day.
    algo.schedule_function(
        record_vars,
        algo.date_rules.every_day(),
        algo.time_rules.market_close(),
    )

    # Create our dynamic stock selector.
    attach_pipeline(make_pipeline(), 'pipeline')
    attach_pipeline(make_pipeinit(), 'pipeinit')

    context.first_trading_day = True
    context.factor_name_list = make_factor().keys()


def make_pipeinit():
    factors = make_factor()

    pipeline_columns = {}
    for f in factors.keys():
        for days_ago in reversed(range(WINDOW_LENGTH)):
            pipeline_columns[f + '-' + str(days_ago)] = Factor_N_Days_Ago([factors[f](mask=QTradableStocksUS())],
                                                                          window_length=days_ago + 1,
                                                                          mask=QTradableStocksUS())

    pipe = Pipeline(columns=pipeline_columns,
                    screen=QTradableStocksUS())

    return pipe


def make_pipeline():
    all_factors = make_factor()
    factors = {a: all_factors[a]() for a in all_factors}
    pipe = Pipeline(
        columns=factors,
        screen=QTradableStocksUS()
    )
    return pipe


def before_trading_start(context, data):
    if context.first_trading_day == True:
        df = (pipeline_output("pipeinit")).astype('float32')
        df = df.stack()
        df.index.names = ['stock', 'alphas']
        df = df.reset_index(level=['alphas', 'stock'])
        alphaname = np.empty(df['alphas'].values.size, dtype='object')
        dayaname = np.empty(df['alphas'].values.size, dtype='int')
        for i, a in enumerate(df['alphas'].values):
            pos = a.find('-')
            alphaname[i] = a[:pos]
            dayaname[i] = a[pos + 1:]

        df['factor'] = pd.Series(alphaname, index=df.index)
        df['day'] = pd.Series(dayaname, index=df.index)
        df = df.drop('alphas', axis=1)
        df = df.set_index(['stock', 'factor', 'day'])
        df = df[0]
        df = df.unstack(level=2)
        context.alphas = df

        context.first_trading_day = False
    else:
        df = (pipeline_output("pipeline")).astype('float32')
        df = df.stack().to_frame()
        df.index.names = ['stock', 'factor']
        context.alphas = context.alphas.drop([4], axis=1)
        context.alphas.columns = [1, 2, 3, 4]
        context.alphas = pd.concat([df, context.alphas], axis=1)


def rebalance(context, data):
    """
    Execute orders according to our schedule_function() timing.
    """
    pass


def record_vars(context, data):
    """
    Plot variables at the end of each day.
    """
    pass


def handle_data(context, data):
    """
    Called every minute.
    """
    pass
    


if __name__ == "__main__":
    start = datetime(2013, 1, 1, 0, 0, 0, 0, pytz.utc)
    end = datetime(2013, 1, 10, 0, 0, 0, 0, pytz.utc)
    #     end = datetime.today().replace(tzinfo=timezone.utc)
    capital_base = 100000

    result = run_algorithm(start=start, end=end, initialize=initialize, \
                           capital_base=capital_base, \
                           before_trading_start=before_trading_start,
                           bundle='etfs_bundle')

    print(result[-3:])
# https://www.quantopian.com/posts/alpha-combination-via-clustering
# adapted for Zipline

"""
This is a template algorithm on Zipline for you to adapt and fill in.
"""

from zipline.api import attach_pipeline, pipeline_output
from zipline import run_algorithm
from zipline.api import symbols, get_datetime, schedule_function
from zipline.utils.events import date_rules, time_rules
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.factors import CustomFactor, Returns, DailyReturns
from zipline.pipeline.filters import StaticAssets
import pandas as pd
import numpy as np
from scipy.stats.mstats import winsorize
from sklearn import preprocessing
from datetime import datetime
import pytz

WINDOW_LENGTH = 5
WIN_LIMIT = 0


# flag used for first WINDOW_LENGTH days, where the algo is "only" innitialising buffers. One can avoid that using a second pipeline, which is call only at initialization and compute the alphas for the entire window... But I have not yet found a good solution for this!

def preprocess(a):
    a = a.astype(np.float64)
    a[np.isinf(a)] = np.nan
    a = np.nan_to_num(a - np.nanmean(a))
    a = winsorize(a, limits=[WIN_LIMIT, WIN_LIMIT])

    return preprocessing.scale(a)


def make_factor():
    class Direction(CustomFactor):
        inputs = [USEquityPricing.open, USEquityPricing.close]
        window_length = 21
        window_safe = True

        def compute(self, today, assets, out, open, close):
            p = (close - open) / close
            out[:] = preprocess(np.nansum(-p, axis=0))

    class mean_rev(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close]
        window_length = 30
        window_safe = True

        def compute(self, today, assets, out, high, low, close):
            p = (high + low + close) / 3

            m = len(close[0, :])
            n = len(close[:, 0])

            b = np.zeros(m)
            a = np.zeros(m)

            for k in range(10, n + 1):
                price_rel = np.nanmean(p[-k:, :], axis=0) / p[-1, :]
                wt = np.nansum(price_rel)
                b += wt * price_rel
                price_rel = 1.0 / price_rel
                wt = np.nansum(price_rel)
                a += wt * price_rel

            out[:] = preprocess(b - a)

    factors = {
        'Direction': Direction,
        'mean_rev': mean_rev
    }

    return factors


class Factor_N_Days_Ago(CustomFactor):

    def compute(self, today, assets, out, input_factor):
        out[:] = input_factor[0]


def initialize(context):
    """
    Called once at the start of the algorithm.
    """

    c = context

    c.etf_universe = StaticAssets(symbols('XLY', 'XLP', 'XLE', 'XLF', 'XLV',
                                          'XLI', 'XLB', 'XLK', 'XLU'))
    c.alphas = pd.DataFrame()

    # Rebalance every day, 1 hour after market open.
    schedule_function(
        rebalance,
        date_rules.every_day(),
        time_rules.market_open(hours=1),
    )

    # Record tracking variables at the end of each day.
    schedule_function(
        record_vars,
        date_rules.every_day(),
        time_rules.market_close(),
    )

    # Create our dynamic stock selector.
    attach_pipeline(make_pipeline(context), 'pipeline')
    attach_pipeline(make_pipeinit(context), 'pipeinit')

    c.first_trading_day = True
    c.factor_name_list = make_factor().keys()


def make_pipeinit(context):
    universe = context.etf_universe
    factors = make_factor()

    pipeline_columns = {}
    for f in factors.keys():
        for days_ago in reversed(range(WINDOW_LENGTH)):
            pipeline_columns[f + '-' + str(days_ago)] = Factor_N_Days_Ago([factors[f](mask=universe)],
                                                                          window_length=days_ago + 1,
                                                                          mask=universe)

    pipe = Pipeline(columns=pipeline_columns,
                    screen=universe)

    return pipe


def make_pipeline(context):
    universe = context.etf_universe
    all_factors = make_factor()
    factors = {a: all_factors[a]() for a in all_factors}
    pipe = Pipeline(
        columns=factors,
        screen=universe
    )
    return pipe


def before_trading_start(context, data):
    if context.first_trading_day == True:
        df = (pipeline_output("pipeinit")).astype('float32')
        df = df.stack()
        df.index.names = ['stock', 'alphas']
        df = df.reset_index(level=['alphas', 'stock'])
        alphaname = np.empty(df['alphas'].values.size, dtype='object')
        dayaname = np.empty(df['alphas'].values.size, dtype='int')
        for i, a in enumerate(df['alphas'].values):
            pos = a.find('-')
            alphaname[i] = a[:pos]
            dayaname[i] = a[pos + 1:]

        df['factor'] = pd.Series(alphaname, index=df.index)
        df['day'] = pd.Series(dayaname, index=df.index)
        df = df.drop('alphas', axis=1)
        df = df.set_index(['stock', 'factor', 'day'])
        df = df[0]
        df = df.unstack(level=2)
        context.alphas = df

        context.first_trading_day = False
    else:
        df = (pipeline_output("pipeline")).astype('float32')
        df = df.stack().to_frame()
        df.index.names = ['stock', 'factor']
        context.alphas = context.alphas.drop([4], axis=1)
        context.alphas.columns = [1, 2, 3, 4]
        context.alphas = pd.concat([df, context.alphas], axis=1)


def rebalance(context, data):
    """
    Execute orders according to our schedule_function() timing.
    """
    pass


def record_vars(context, data):
    """
    Plot variables at the end of each day.
    """
    pass


def handle_data(context, data):
    """
    Called every minute.
    """
    pass


if __name__ == "__main__":
    start = datetime(2013, 1, 1, 0, 0, 0, 0, pytz.utc)
    end = datetime(2013, 1, 10, 0, 0, 0, 0, pytz.utc)
    #     end = datetime.today().replace(tzinfo=timezone.utc)
    capital_base = 100000

    result = run_algorithm(start=start, end=end, initialize=initialize, \
                           capital_base=capital_base, \
                           before_trading_start=before_trading_start,
                           bundle='etfs_bundle')

    print(result[-3:])

以上是关于python ZIPLINE PIPELINE战略,没有大的因素的主要内容,如果未能解决你的问题,请参考以下文章

python ZIPLINE RUN PIPELINE

python 运行ZIPLINE PIPELINE(包括模拟输入)

python 用捆绑运行ZIPLINE PIPELINE

python ZIPLINE PIPELINE策略与捆绑

python ZIPLINE BUNDLES战略

python ZIPLINE简单策略与管道因素