Python金融量化

Posted Hear7

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python金融量化相关的知识,希望对你有一定的参考价值。

Python股票数据分析

最近在学习基于python的股票数据分析,其中主要用到了tushare和seaborn。tushare是一款财经类数据接口包,国内的股票数据还是比较全的

官网地址:http://tushare.waditu.com/index.html#id5。seaborn则是一款绘图库,通过seaborn可以轻松地画出简洁漂亮的图表,而且库本身具有一定的统计功能。

  导入的模块:

import matplotlib.pyplot as plt

  import seaborn as sns

  import seaborn.linearmodels as snsl

from datetime import datetime

  import tushare as ts

代码部分:

  股票收盘价走势曲线

  sns.set_style("whitegrid")

  end = datetime.today() #开始时间结束时间,选取最近一年的数据

  start = datetime(end.year-1,end.month,end.day)

  end = str(end)[0:10]

  start = str(start)[0:10]

stock = ts.get_hist_data(\'300104\',start,end)#选取一支股票

  stock[\'close\'].plot(legend=True ,figsize=(10,4))

  plt.show()

股票日线

同理,可以做出5日均线、10日均线以及20日均线

  stock[[\'close\',\'ma5\',\'ma10\',\'ma20\']].plot(legend=True ,figsize=(10,4))

日线、5日均线、10日均线、20日均线

股票每日涨跌幅度

  stock[\'Daily Return\'] = stock[\'close\'].pct_change()

  stock[\'Daily Return\'].plot(legend=True,figsize=(10,4))

每日涨跌幅

核密度估计

  sns.kdeplot(stock[\'Daily Return\'].dropna())

核密度估计

核密度估计+统计柱状图

  sns.distplot(stock[\'Daily Return\'].dropna(),bins=100)

核密度+柱状图

两支股票的皮尔森相关系数

  sns.jointplot(stock[\'Daily Return\'],stock[\'Daily Return\'],alpha=0.2)

皮尔森相关系数

多只股票相关性计算

  stock_lis=[\'300113\',\'300343\',\'300295\',\'300315`] #随便选取了四支互联网相关的股票

  df=pd.DataFrame()

  for stock in stock_lis: closing_df = ts.get_hist_data(stock,start,end)[\'close\'] df = df.join(pd.DataFrame({stock:closing_df}),how=\'outer\')

  tech_rets = df.pct_change()

  snsl.corrplot(tech_rets.dropna())

  相关性

简单地计算股票的收益与风险,衡量股票收益与风险的数值分别为股票涨跌的平均值以及标准差,平均值为正则说明收益是正的,标准差越大则说明股票波动大,风险也大。

  rets = tech_rets.dropna()

  plt.scatter(rets.mean(),rets.std())

  plt.xlabel(\'Excepted Return\')

  plt.ylabel(\'Risk\')

  for label,x,y in zip(rets.columns,rets.mean(),rets.std()):#添加标注 plt.annotate( label, xy =(x,y),xytext=(15,15), textcoords = \'offset points\', arrowprops = dict(arrowstyle = \'-\',connectionstyle = \'arc3,rad=-0.3\'))

声明:本文由入驻搜狐公众平台的作者撰写,除搜狐官方账号外,观点仅代表作者本人,不代表搜狐立场。

 

 

用Python分析公开数据选出高送转预期股票

根据以往的经验,每年年底都会有一波高送转预期行情。今天,米哥就带大家实践一下如何利用tushare实现高送转预期选股。

本文主要是讲述选股的思路方法,选股条件和参数大家可以根据米哥提供的代码自行修改。

1. 选股原理

一般来说,具备高送转预期的个股,都具有总市值低、每股公积金高、每股收益大,流通股本少的特点。当然,也还有其它的因素,比如当前股价、经营收益变动情况、以及以往分红送股习惯等等。

这里我们暂时只考虑每股公积金、每股收益、流通股本和总市值四个因素,将公积金大于等于5元,每股收益大于等于5毛,流通股本在3亿以下,总市值在100亿以内作为高送转预期目标(这些参数大家可根据自己的经验随意调整)。

2. 数据准备

首先要导入tushare:

import tushare as ts

调取股票基本面数据和行情数据

# 基本面数据
basic = ts.get_stock_basics()

# 行情和市值数据
hq = ts.get_today_all()

3. 数据清洗整理

对获取到的数据进行清洗和整理,只保留需要的字段。(其它字段及含义,请参考 http:// tushare.org 文档)

#当前股价,如果停牌则设置当前价格为上一个交易日股价
hq[\'trade\'] = hq.apply(lambda x:x.settlement if x.trade==0 else x.trade, axis=1)

#分别选取流通股本,总股本,每股公积金,每股收益
basedata = basic[[\'outstanding\', \'totals\', \'reservedPerShare\', \'esp\']]

#选取股票代码,名称,当前价格,总市值,流通市值
hqdata = hq[[\'code\', \'name\', \'trade\', \'mktcap\', \'nmc\']]

#设置行情数据code为index列
hqdata = hqdata.set_index(\'code\')

#合并两个数据表
data = basedata.merge(hqdata, left_index=True, right_index=True)

4. 选股条件

根据上文提到的选股参数和条件,我们对数据进一步处理。

将总市值和流通市值换成亿元单位
data[\'mktcap\'] = data[\'mktcap\'] / 10000
data[\'nmc\'] = data[\'nmc\'] / 10000

设置参数和过滤值(此次各自调整)

#每股公积金>=5
res = data.reservedPerShare >= 5
#流通股本<=3亿
out = data.outstanding <= 30000
#每股收益>=5毛
eps = data.esp >= 0.5
#总市值<100亿
mktcap = data.mktcap <= 100

取并集结果:

allcrit = res & out & eps & mktcap
selected = data[allcrit]

具有高送转预期股票的结果呈现:

以上字段的含义分别为:股票名称、收盘价格、每股公积金、流通股本、每股收益(应该为eps,之前发布笔误)、总市值和流通市值。

https://zhuanlan.zhihu.com/p/23829205

 

 

 

Python 金叉判定

View Code

 

Python 过滤次新股、停牌、涨跌停

#过滤次新股、是否涨跌停、是否停牌等条件

def filcon(context,bar_dict,tar_list):

    def zdt_trade(stock, context, bar_dict):

        yesterday = history(2,\'1d\', \'close\')[stock].values[-1]

        zt = round(1.10 * yesterday,2)

        dt = round(0.99 * yesterday,2)

        #last最后交易价

        return dt < bar_dict[stock].last < zt

    filstock = []

    for stock in tar_list:

        con1 = ipo_days(stock,context.now) > 60

        con2 = bar_dict[stock].is_trading

        con3 = zdt_trade(stock,context,bar_dict)

        if con1 & con2 & con3:

                filstock.append(stock)

    return filstock
View Code

 

Python 按平均持仓市值调仓

# 按平均持仓市值调仓
def for_balance(context, bar_dict):
    #mvalues = context.portfolio.market_value
    #avalues = context.portfolio.portfolio_value
    #per = mvalues / avalues
    hlist = []
    for stock in context.portfolio.positions:
        #获取股票及对应持仓市值 
        hlist.append([stock,bar_dict[stock].last * context.portfolio.positions[stock].quantity])
    
    if hlist:
        #按持仓市值由大到小排序
        hlist = sorted(hlist,key=lambda x:x[1], reverse=True)
        temp = 0
        for li in hlist:
            #计算持仓总市值
            temp += li[1]
        for li in hlist:
            #平均各股持仓市值
            if bar_dict[li[0]].is_trading:
                order_target_value(li[0], temp/len(hlist))
    return
View Code

 

Python PCA主成分分析算法

Python主成分分析算法的作用是提取样本的主要特征向量,从而实现数据降维的目的。

# -*- coding: utf-8 -*-
"""
Created on Sun Feb 28 10:04:26 2016
PCA source code
@author: liudiwei
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

#计算均值,要求输入数据为numpy的矩阵格式,行表示样本数,列表示特征    
def meanX(dataX):
    return np.mean(dataX,axis=0)#axis=0表示按照列来求均值,如果输入list,则axis=1


#计算方差,传入的是一个numpy的矩阵格式,行表示样本数,列表示特征    
def variance(X):
    m, n = np.shape(X)
    mu = meanX(X)
    muAll = np.tile(mu, (m, 1))    
    X1 = X - muAll
    variance = 1./m * np.diag(X1.T * X1)
    return variance

#标准化,传入的是一个numpy的矩阵格式,行表示样本数,列表示特征    
def normalize(X):
    m, n = np.shape(X)
    mu = meanX(X)
    muAll = np.tile(mu, (m, 1))    
    X1 = X - muAll
    X2 = np.tile(np.diag(X.T * X), (m, 1))
    XNorm = X1/X2
    return XNorm

"""
参数:
    - XMat:传入的是一个numpy的矩阵格式,行表示样本数,列表示特征    
    - k:表示取前k个特征值对应的特征向量
返回值:
    - finalData:参数一指的是返回的低维矩阵,对应于输入参数二
    - reconData:参数二对应的是移动坐标轴后的矩阵
"""  
def pca(XMat, k):
    average = meanX(XMat) 
    m, n = np.shape(XMat)
    data_adjust = []
    avgs = np.tile(average, (m, 1))
    data_adjust = XMat - avgs
    covX = np.cov(data_adjust.T)   #计算协方差矩阵
    featValue, featVec=  np.linalg.eig(covX)  #求解协方差矩阵的特征值和特征向量
    index = np.argsort(-featValue) #按照featValue进行从大到小排序
    finalData = []
    if k > n:
        print "k must lower than feature number"
        return
    else:
        #注意特征向量时列向量,而numpy的二维矩阵(数组)a[m][n]中,a[1]表示第1行值
        selectVec = np.matrix(featVec.T[index[:k]]) #所以这里需要进行转置
        finalData = data_adjust * selectVec.T 
        reconData = (finalData * selectVec) + average  
    return finalData, reconData

def loaddata(datafile):
    return np.array(pd.read_csv(datafile,sep="\\t",header=-1)).astype(np.float)


def plotBestFit(data1, data2):    
    dataArr1 = np.array(data1)
    dataArr2 = np.array(data2)
    
    m = np.shape(dataArr1)[0]
    axis_x1 = []
    axis_y1 = []
    axis_x2 = []
    axis_y2 = []
    for i in range(m):
        axis_x1.append(dataArr1[i,0])
        axis_y1.append(dataArr1[i,1])
        axis_x2.append(dataArr2[i,0]) 
        axis_y2.append(dataArr2[i,1])                
    fig = plt.figure()
    ax = fig.add_subplot(111)
    ax.scatter(axis_x1, axis_y1, s=50, c=\'red\', marker=\'s\')
    ax.scatter(axis_x2, axis_y2, s=50, c=\'blue\')
    plt.xlabel(\'x1\'); plt.ylabel(\'x2\');
    plt.savefig("outfile.png")
    plt.show() 

#简单测试
#数据来源:http://www.cnblogs.com/jerrylead/archive/2011/04/18/2020209.html
def test():
    X = [[2.5, 0.5, 2.2, 1.9, 3.1, 2.3, 2, 1, 1.5, 1.1],
         [2.4, 0.7, 2.9, 2.2, 3.0, 2.7, 1.6, 1.1, 1.6, 0.9]]
    XMat = np.matrix(X).T  
    k = 2
    return pca(XMat, k)

#根据数据集data.txt
def main():    
    datafile = "data.txt"
    XMat = loaddata(datafile)
    k = 2
    return pca(XMat, k)
    
if __name__ == "__main__":
    finalData, reconMat = main()
    plotBestFit(finalData, reconMat)
View Code

经过主成分降维的数据如红色图案所示,蓝色的是恢复的原始数据。可以看到经过降维的数据样本差异更加明显。

 

Python KNN最近邻分类算法

KNN最近邻算法:利用向量之间的距离来分类。
步骤:
第一步:计算新样本与已知分类样本之间的距离。
第二步:将所求距离按从小到大排列。
第三步:选取距离最近的k个样本。
第四步:将新样本归为以上k个样本大多数中的一类。
以下为KNN最近邻分类算法的python代码:
第一部分:KNN分类代码
# -*- coding: utf-8 -*-
"""
Created on Mon Feb 22 13:21:22 2016
K-NearestNeighbor
"""
import numpy as np
import operator

class KNNClassifier():
    """This is a Nearest Neighbor classifier. """

    #定义k的值
    def __init__(self, k=3):
        self._k = k

    #计算新样本与已知分类样本的距离并从小到大排列    
    def _calEDistance(self, inSample, dataset):
        m = dataset.shape[0]
        diffMat = np.tile(inSample, (m,1)) - dataset
        sqDiffMat = diffMat**2 #每个元素平方
        sqDistances = sqDiffMat.sum(axis = 1)  #求和
        distances = sqDistances ** 0.5 #开根号
        return distances.argsort()  #按距离的从小到达排列的下标值

    
    def _classify0(self, inX, dataSet, labels):
        k = self._k
        dataSetSize = dataSet.shape[0]                  
        diffMat = np.tile(inX, (dataSetSize,1)) - dataSet      
        sqDiffMat = diffMat**2
        sqDistances = sqDiffMat.sum(axis=1)                  
        distances = sqDistances**0.5
        sortedDistIndicies = distances.argsort()            
        classCount={}                                      
        for i in range(k):
            voteIlabel = labels[sortedDistIndicies[i]]
            classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1
        sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True)
        return sortedClassCount[0][0]
        
    #对一个样本进行分类
    def _classify(self, sample, train_X, train_y):
        #数据类型检测
        if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \\
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                sample = np.array(sample)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        sortedDistances = self._calEDistance(sample, train_X)
        classCount = {}
        for i in range(self._k):
            oneVote = train_y[sortedDistances[i]] #获取最近的第i个点的类别
            classCount[oneVote] = classCount.get(oneVote, 0) + 1
        sortedClassCount = sorted(classCount.iteritems(),\\
                                    key=operator.itemgetter(1), reverse=True)
        #print "the sample :", sample, "is classified as",sortedClassCount[0][0]    
        return sortedClassCount[0][0]
    
    
    def classify(self, test_X, train_X, train_y):
        results = [] 
        #数据类型检测
        if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \\
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                test_X = np.array(test_X)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        d = len(np.shape(test_X))
        if d == 1:
            sample = test_X
            result = self._classify(sample, train_X, train_y)
            results.append(result)
        else:
            for i in range(len(test_X)):
                sample = test_X[i]
                result = self._classify(sample, train_X, train_y)
                results.append(result)
        return results
        
        
if __name__=="__main__":
    train_X = [[1, 2, 0, 1, 0],
               [0, 1, 1, 0, 1],
               [1, 0, 0, 0, 1],
               [2, 1, 1, 0, 1],
               [1, 1, 0, 1, 1]]
    train_y = [1, 1, 0, 0, 0]
    clf = KNNClassifier(k = 3)
    sample = [[1,2,0,1,0],[1,2,0,1,1]]
    result = clf.classify(sample, train_X, train_y)
View Code

 

第二部分:KNN测试代码

# -*- coding: utf-8 -*-
"""
Created on Mon Feb 22 13:21:22 2016
K-NearestNeighbor
"""
import numpy as np
import operator

class KNNClassifier():
    """This is a Nearest Neighbor classifier. """

    #定义k的值
    def __init__(self, k=3):
        self._k = k

    #计算新样本与已知分类样本的距离并从小到大排列    
    def _calEDistance(self, inSample, dataset):
        m = dataset.shape[0]
        diffMat = np.tile(inSample, (m,1)) - dataset
        sqDiffMat = diffMat**2 #每个元素平方
        sqDistances = sqDiffMat.sum(axis = 1)  #求和
        distances = sqDistances ** 0.5 #开根号
        return distances.argsort()  #按距离的从小到达排列的下标值

    
    def _classify0(self, inX, dataSet, labels):
        k = self._k
        dataSetSize = dataSet.shape[0]                  
        diffMat = np.tile(inX, (dataSetSize,1)) - dataSet      
        sqDiffMat = diffMat**2
        sqDistances = sqDiffMat.sum(axis=1)                  
        distances = sqDistances**0.5
        sortedDistIndicies = distances.argsort()            
        classCount={}                                      
        for i in range(k):
            voteIlabel = labels[sortedDistIndicies[i]]
            classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1
        sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True)
        return sortedClassCount[0][0]
        
    #对一个样本进行分类
    def _classify(self, sample, train_X, train_y):
        #数据类型检测
        if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \\
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                sample = np.array(sample)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        sortedDistances = self._calEDistance(sample, train_X)
        classCount = {}
        for i in range(self._k):
            oneVote = train_y[sortedDistances[i]] #获取最近的第i个点的类别
            classCount[oneVote] = classCount.get(oneVote, 0) + 1
        sortedClassCount = sorted(classCount.iteritems(),\\
                                    key=operator.itemgetter(1), reverse=True)
        #print "the sample :", sample, "is classified as",sortedClassCount[0][0]    
        return sortedClassCount[0][0]
    
    
    def classify(self, test_X, train_X, train_y):
        results = [] 
        #数据类型检测
        if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \\
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                test_X = np.array(test_X)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        d = len(np.shape(test_X))
        if d == 1:
            sample = test_X
            result = self._classify(sample, train_X, train_y)
            results.append(result)
        else:
            for i in range(len(test_X)):
                sample = test_X[i]
                result = self._classify(sample, train_X, train_y)
                results.append(result)
        return results
        
        
if __name__=="__main__":
    train_X = [[1, 2, 0, 1, 0],
               [0, 1, 1, 0, 1],
               [1, 0, 0, 0, 1],
               [2, 1, 1, 0, 1],
               [1, 1, 0, 1, 1]]
    train_y = [1, 1, 0, 0, 0]
    clf = KNNClassifier(k = 3)
    sample = [[1,2,0,1,0],[1,2,0,1,1]]
    result = clf.classify(sample, train_X, train_y)
View Code

 

Python 决策树算法(ID3 &C4.5)

决策树(Decision Tree)算法:按照样本的属性逐步进行分类,为了能够使分类更快、更有效。每一个新分类属性的选择依据可以是信息增益IG和信息增益率IGR,前者为最基本的ID3算法,后者为改进后的C4.5算法。

以ID3为例,其训练过程的编程思路如下:

(1)输入x、y(x为样本,y为label),行为样本,列为样本特征。

(2)计算信息增益IG,获取使IG最大的特征。

(3)获得删除最佳分类特征后的样本阵列。

(4)按照最佳分类特征的属性值将更新后的样本进行归类。

属性值1(x1,y1)    属性值2(x2,y2)    属性值(x3,y3)

(5)分别对以上类别重复以上操作直至到达叶节点(递归调用)。

叶节点的特征:

(1)所有的标签值y都一样。

(2)没有特征可以继续划分。

测试过程的编程思路如下:

(1)读取训练好的决策树。

(2)从根节点开始递归遍历整个决策树直到到达叶节点为止。

以下为具体代码,训练后的决策树结构为递归套用的字典,其是由特征值组成的索引加上label组成的。

 

# -*- coding: utf-8 -*-
"""
Created on Mon Nov 07 09:06:37 2016

@author: yehx
"""
# -*- coding: utf-8 -*-
"""
Created on Sun Feb 21 12:17:10 2016
Decision Tree Source Code
@author: liudiwei
"""
import os
import numpy as np

class DecitionTree():
    """This is a decision tree classifier. """
    
    def __init__(self, criteria=\'ID3\'):
        self._tree = None
        if criteria == \'ID3\' or criteria == \'C4.5\':
            self._criteria = criteria
        else:
            raise Exception("criterion should be ID3 or C4.5")
    
    def _calEntropy(slef, y):
        \'\'\'
        功能:_calEntropy用于计算香农熵 e=-sum(pi*log pi)
        参数:其中y为数组array
        输出:信息熵entropy
        \'\'\'
        n = y.shape[0]  
        labelCounts = {}
        for label in y:
            if label not in labelCounts.keys():
                labelCounts[label] = 1
            else:
                labelCounts[label] += 1
        entropy = 0.0
        for key in labelCounts:
            prob = float(labelCounts[key])/n
            entropy -= prob * np.log2(prob)
        return entropy
    
    def _splitData(self, X, y, axis, cutoff):
        """
        参数:X为特征,y为label,axis为某个特征的下标,cutoff是下标为axis特征取值值
        输出:返回数据集中特征下标为axis,特征值等于cutoff的子数据集
        先将特征列从样本矩阵里除去,然后将属性值为cutoff的数据归为一类
        """
        ret = []
        featVec = X[:,axis]
        n = X.shape[1]      #特征个数
        #除去第axis列特征后的样本矩阵
        X = X[:,[i for i in range(n) if i!=axis]]
        for i in range(len(featVec)):
            if featVec[i] == cutoff:
                ret.append(i)
        return X[ret, :], y[ret]   
            
    def _chooseBestSplit(self, X, y):
        """ID3 & C4.5
        参数:X为特征,y为label
        功能:根据信息增益或者信息增益率来获取最好的划分特征
        输出:返回最好划分特征的下标
        """
        numFeat = X.shape[1]
        baseEntropy = self._calEntropy(y)
        bestSplit = 0.0
        best_idx  = -1
        for i in range(numFeat):
            featlist = X[:,i]   #得到第i个特征对应的特征列
            uniqueVals = set(featlist)
            curEntropy = 0.0
            splitInfo = 0.0
            for value in uniqueVals:
                sub_x, sub_y = self._splitData(X, y, i, value)
                prob = len(sub_y)/float(len(y))      #计算某个特征的某个值的概率
                curEntropy += prob * self._calEntropy(sub_y)    #迭代计算条件熵
                splitInfo -=  prob * np.log2(prob) #分裂信息,用于计算信息增益率
            IG = baseEntropy - curEntropy
            if self._criteria=="ID3":
                以上是关于Python金融量化的主要内容,如果未能解决你的问题,请参考以下文章

在量化金融中15个最流行的Python数据分析库

大数据项目实战之Python金融应用编程(数据分析定价与量化投资)

Python金融量化

day32 Python与金融量化分析

Python金融量化必备资源及工具,一键获取

Python&R&量化 金融之路