python 在Quantopian中创建一个管道实例并以块的形式运行以避免内存过载。包括管道的常见导入。

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 在Quantopian中创建一个管道实例并以块的形式运行以避免内存过载。包括管道的常见导入。相关的知识,希望对你有一定的参考价值。

def make_pipeline():
    """Create a pipeline instance in Quantopian"""
    # Base Universe
    base_universe = Q500US()
    # base_universe = Fundamentals.symbol.latest.element_of(['GS', 'AAPL', 'JPM', 'XOM'])
    closed_end_funds = Fundamentals.share_class_description.latest.startswith('CE')
    
    universe = base_universe & ~closed_end_funds

    # Momentum Factor Example
    momo_1_mo = Returns(inputs=[USEquityPricing.close], 
                        window_length=22,
                        mask=universe)
    
    return Pipeline(columns={'momentum_1_mo': momo_1_mo,
                               'sector_code': Sector()
                              },
                    screen=universe)

start_date = '2017-01-04' 
end_date = '2017-01-05'

# Uncomment to run pipeline in one go
result = run_pipeline(make_pipeline(), start_date, end_date)  
# Typical imports for use with Pipeline
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.research import run_pipeline

# Datasets
from quantopian.pipeline.data.builtin import USEquityPricing

# New way: Fundamentals.my_field: 
from quantopian.pipeline.data import Fundamentals  

# Factors, Classifiers, and Filters
# New way for classifiers: classifiers.fundamentals 
from quantopian.pipeline.factors import AverageDollarVolume, Returns
from quantopian.pipeline.classifiers.fundamentals import Sector 
from quantopian.pipeline.filters import QTradableStocksUS, Q500US

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import alphalens as al
def run_pipe_in_chunks(pipe, start_date, end_date, 
                       weeks = 25, days = 0):
    """Run pipeline in chunks to avoid memory overload
    --------
    Inputs:
    --------
    pipe: pipeline object
    start_date: string, Timestamp, or datetime object
        a date string or datetime object representing the start of the period
    end_date: string, Timestamp, or datetime object
        a date string or datetime object representing the end of the period
    weeks, days: int
        number of weeks and/or days in each chunk
    
    -------
    Return
    --------
    a multi-index dataframe:
         Index=(Date, asset)
         Columns = pipeline columns
    """
    
    # Convert start and end_date to timestamp
    start_date = pd.Timestamp(start_date)
    end_date = pd.Timestamp(end_date)
    
    # Set inital chunk date endpoints
    start = start_date
    step = pd.Timedelta(weeks = weeks, days = days)
    end = start + step
    
    # Initialize a list named chunks to store the pipeline results
    chunks  = []
    while start < end_date:
        # Run pipeline and append to chunks
        print "Running Pipeline for %s to %s" % (start, end)
        result = run_pipeline(pipe, start_date= start, end_date= end)
        chunks.append(result)
        
        # Update start and end dates
        end = result.index.get_level_values(0)[-1].tz_localize(None)
        start = end + pd.Timedelta(days = 1)
        end = start + step
        
        # If end is after last day of period, set to last day of period
        if end > end_date:
            end = end_date
    
    try:
        final_result = pd.concat(chunks, axis = 0)
        print "Pipeline Computations Complete"
    except:
        print 'Concat Failed: Returned List of Dataframes instead of one Dataframe'
        final_result = chunks
        
    return final_result

start_date = '2003-01-01' 
end_date = '2012-01-01'

# Run Pipeline in chunks
result = run_pipe_in_chunks(make_pipeline(), 
                            start_date, 
                            end_date, 
                            weeks=26, days=0)
  
# Pipeline API Documentation Link
https://www.quantopian.com/help#pipeline-title
class MyCustomFactor(CustomFactor):
    """This example will create a custom 
    volatility-adjusted momentum factor"""
    # Set constants - These will not necessarily be 
    # in every custom factor
    LAG = 21
    MOMO_PERIOD = 252
    
    # Default inputs and window_length
    inputs = [USEquityPricing.close]
    window_length = MOMO_PERIOD + LAG + 1
  
    def compute(self, today, asset_ids, out, close):
        daily_returns = close[1:] / close[:-1] - 1
        std = np.nanstd(daily_returns[:self.MOMO_PERIOD ], ddof=1, axis=0)
        out[:] = (close[-self.LAG-1] / close[0] - 1) / std

以上是关于python 在Quantopian中创建一个管道实例并以块的形式运行以避免内存过载。包括管道的常见导入。的主要内容,如果未能解决你的问题,请参考以下文章

python QUANTOPIAN进口管道

python QUANTOPIAN ALGO模板(无管道)

在 Python 中创建一个临时 FIFO(命名管道)?

如何在 Python 中创建从 Pub/Sub 到 GCS 的数据流管道

subprocess库:Python中创建附加进程

subprocess库:Python中创建附加进程