Python股票数据分析
最近在学习基于python的股票数据分析,其中主要用到了tushare和seaborn。tushare是一款财经类数据接口包,国内的股票数据还是比较全的
官网地址:http://tushare.waditu.com/index.html#id5。seaborn则是一款绘图库,通过seaborn可以轻松地画出简洁漂亮的图表,而且库本身具有一定的统计功能。
导入的模块:
import matplotlib.pyplot as plt
import seaborn as sns
import seaborn.linearmodels as snsl
from datetime import datetime
import tushare as ts
代码部分:
股票收盘价走势曲线
sns.set_style("whitegrid")
end = datetime.today() #开始时间结束时间,选取最近一年的数据
start = datetime(end.year-1,end.month,end.day)
end = str(end)[0:10]
start = str(start)[0:10]
stock = ts.get_hist_data(\'300104\',start,end)#选取一支股票
stock[\'close\'].plot(legend=True ,figsize=(10,4))
plt.show()
股票日线
同理,可以做出5日均线、10日均线以及20日均线
stock[[\'close\',\'ma5\',\'ma10\',\'ma20\']].plot(legend=True ,figsize=(10,4))
日线、5日均线、10日均线、20日均线
股票每日涨跌幅度
stock[\'Daily Return\'] = stock[\'close\'].pct_change()
stock[\'Daily Return\'].plot(legend=True,figsize=(10,4))
每日涨跌幅
核密度估计
sns.kdeplot(stock[\'Daily Return\'].dropna())
核密度估计
核密度估计+统计柱状图
sns.distplot(stock[\'Daily Return\'].dropna(),bins=100)
核密度+柱状图
两支股票的皮尔森相关系数
sns.jointplot(stock[\'Daily Return\'],stock[\'Daily Return\'],alpha=0.2)
皮尔森相关系数
多只股票相关性计算
stock_lis=[\'300113\',\'300343\',\'300295\',\'300315`] #随便选取了四支互联网相关的股票
df=pd.DataFrame()
for stock in stock_lis: closing_df = ts.get_hist_data(stock,start,end)[\'close\'] df = df.join(pd.DataFrame({stock:closing_df}),how=\'outer\')
tech_rets = df.pct_change()
snsl.corrplot(tech_rets.dropna())
相关性
简单地计算股票的收益与风险,衡量股票收益与风险的数值分别为股票涨跌的平均值以及标准差,平均值为正则说明收益是正的,标准差越大则说明股票波动大,风险也大。
rets = tech_rets.dropna()
plt.scatter(rets.mean(),rets.std())
plt.xlabel(\'Excepted Return\')
plt.ylabel(\'Risk\')
for label,x,y in zip(rets.columns,rets.mean(),rets.std()):#添加标注 plt.annotate( label, xy =(x,y),xytext=(15,15), textcoords = \'offset points\', arrowprops = dict(arrowstyle = \'-\',connectionstyle = \'arc3,rad=-0.3\'))
用Python分析公开数据选出高送转预期股票
根据以往的经验,每年年底都会有一波高送转预期行情。今天,米哥就带大家实践一下如何利用tushare实现高送转预期选股。
本文主要是讲述选股的思路方法,选股条件和参数大家可以根据米哥提供的代码自行修改。
1. 选股原理
一般来说,具备高送转预期的个股,都具有总市值低、每股公积金高、每股收益大,流通股本少的特点。当然,也还有其它的因素,比如当前股价、经营收益变动情况、以及以往分红送股习惯等等。
这里我们暂时只考虑每股公积金、每股收益、流通股本和总市值四个因素,将公积金大于等于5元,每股收益大于等于5毛,流通股本在3亿以下,总市值在100亿以内作为高送转预期目标(这些参数大家可根据自己的经验随意调整)。
2. 数据准备
首先要导入tushare:
import tushare as ts
调取股票基本面数据和行情数据
# 基本面数据
basic = ts.get_stock_basics()
# 行情和市值数据
hq = ts.get_today_all()
3. 数据清洗整理
对获取到的数据进行清洗和整理,只保留需要的字段。(其它字段及含义,请参考 http:// tushare.org 文档)
#当前股价,如果停牌则设置当前价格为上一个交易日股价
hq[\'trade\'] = hq.apply(lambda x:x.settlement if x.trade==0 else x.trade, axis=1)
#分别选取流通股本,总股本,每股公积金,每股收益
basedata = basic[[\'outstanding\', \'totals\', \'reservedPerShare\', \'esp\']]
#选取股票代码,名称,当前价格,总市值,流通市值
hqdata = hq[[\'code\', \'name\', \'trade\', \'mktcap\', \'nmc\']]
#设置行情数据code为index列
hqdata = hqdata.set_index(\'code\')
#合并两个数据表
data = basedata.merge(hqdata, left_index=True, right_index=True)
4. 选股条件
根据上文提到的选股参数和条件,我们对数据进一步处理。
将总市值和流通市值换成亿元单位
data[\'mktcap\'] = data[\'mktcap\'] / 10000
data[\'nmc\'] = data[\'nmc\'] / 10000
设置参数和过滤值(此次各自调整)
#每股公积金>=5
res = data.reservedPerShare >= 5
#流通股本<=3亿
out = data.outstanding <= 30000
#每股收益>=5毛
eps = data.esp >= 0.5
#总市值<100亿
mktcap = data.mktcap <= 100
取并集结果:
allcrit = res & out & eps & mktcap
selected = data[allcrit]
具有高送转预期股票的结果呈现:
以上字段的含义分别为:股票名称、收盘价格、每股公积金、流通股本、每股收益(应该为eps,之前发布笔误)、总市值和流通市值。
https://zhuanlan.zhihu.com/p/23829205
Python 金叉判定
Python 过滤次新股、停牌、涨跌停
#过滤次新股、是否涨跌停、是否停牌等条件 def filcon(context,bar_dict,tar_list): def zdt_trade(stock, context, bar_dict): yesterday = history(2,\'1d\', \'close\')[stock].values[-1] zt = round(1.10 * yesterday,2) dt = round(0.99 * yesterday,2) #last最后交易价 return dt < bar_dict[stock].last < zt filstock = [] for stock in tar_list: con1 = ipo_days(stock,context.now) > 60 con2 = bar_dict[stock].is_trading con3 = zdt_trade(stock,context,bar_dict) if con1 & con2 & con3: filstock.append(stock) return filstock
Python 按平均持仓市值调仓
# 按平均持仓市值调仓 def for_balance(context, bar_dict): #mvalues = context.portfolio.market_value #avalues = context.portfolio.portfolio_value #per = mvalues / avalues hlist = [] for stock in context.portfolio.positions: #获取股票及对应持仓市值 hlist.append([stock,bar_dict[stock].last * context.portfolio.positions[stock].quantity]) if hlist: #按持仓市值由大到小排序 hlist = sorted(hlist,key=lambda x:x[1], reverse=True) temp = 0 for li in hlist: #计算持仓总市值 temp += li[1] for li in hlist: #平均各股持仓市值 if bar_dict[li[0]].is_trading: order_target_value(li[0], temp/len(hlist)) return
Python PCA主成分分析算法
Python主成分分析算法的作用是提取样本的主要特征向量,从而实现数据降维的目的。
# -*- coding: utf-8 -*- """ Created on Sun Feb 28 10:04:26 2016 PCA source code @author: liudiwei """ import numpy as np import pandas as pd import matplotlib.pyplot as plt #计算均值,要求输入数据为numpy的矩阵格式,行表示样本数,列表示特征 def meanX(dataX): return np.mean(dataX,axis=0)#axis=0表示按照列来求均值,如果输入list,则axis=1 #计算方差,传入的是一个numpy的矩阵格式,行表示样本数,列表示特征 def variance(X): m, n = np.shape(X) mu = meanX(X) muAll = np.tile(mu, (m, 1)) X1 = X - muAll variance = 1./m * np.diag(X1.T * X1) return variance #标准化,传入的是一个numpy的矩阵格式,行表示样本数,列表示特征 def normalize(X): m, n = np.shape(X) mu = meanX(X) muAll = np.tile(mu, (m, 1)) X1 = X - muAll X2 = np.tile(np.diag(X.T * X), (m, 1)) XNorm = X1/X2 return XNorm """ 参数: - XMat:传入的是一个numpy的矩阵格式,行表示样本数,列表示特征 - k:表示取前k个特征值对应的特征向量 返回值: - finalData:参数一指的是返回的低维矩阵,对应于输入参数二 - reconData:参数二对应的是移动坐标轴后的矩阵 """ def pca(XMat, k): average = meanX(XMat) m, n = np.shape(XMat) data_adjust = [] avgs = np.tile(average, (m, 1)) data_adjust = XMat - avgs covX = np.cov(data_adjust.T) #计算协方差矩阵 featValue, featVec= np.linalg.eig(covX) #求解协方差矩阵的特征值和特征向量 index = np.argsort(-featValue) #按照featValue进行从大到小排序 finalData = [] if k > n: print "k must lower than feature number" return else: #注意特征向量时列向量,而numpy的二维矩阵(数组)a[m][n]中,a[1]表示第1行值 selectVec = np.matrix(featVec.T[index[:k]]) #所以这里需要进行转置 finalData = data_adjust * selectVec.T reconData = (finalData * selectVec) + average return finalData, reconData def loaddata(datafile): return np.array(pd.read_csv(datafile,sep="\\t",header=-1)).astype(np.float) def plotBestFit(data1, data2): dataArr1 = np.array(data1) dataArr2 = np.array(data2) m = np.shape(dataArr1)[0] axis_x1 = [] axis_y1 = [] axis_x2 = [] axis_y2 = [] for i in range(m): axis_x1.append(dataArr1[i,0]) axis_y1.append(dataArr1[i,1]) axis_x2.append(dataArr2[i,0]) axis_y2.append(dataArr2[i,1]) fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(axis_x1, axis_y1, s=50, c=\'red\', marker=\'s\') ax.scatter(axis_x2, axis_y2, s=50, c=\'blue\') plt.xlabel(\'x1\'); plt.ylabel(\'x2\'); plt.savefig("outfile.png") plt.show() #简单测试 #数据来源:http://www.cnblogs.com/jerrylead/archive/2011/04/18/2020209.html def test(): X = [[2.5, 0.5, 2.2, 1.9, 3.1, 2.3, 2, 1, 1.5, 1.1], [2.4, 0.7, 2.9, 2.2, 3.0, 2.7, 1.6, 1.1, 1.6, 0.9]] XMat = np.matrix(X).T k = 2 return pca(XMat, k) #根据数据集data.txt def main(): datafile = "data.txt" XMat = loaddata(datafile) k = 2 return pca(XMat, k) if __name__ == "__main__": finalData, reconMat = main() plotBestFit(finalData, reconMat)
经过主成分降维的数据如红色图案所示,蓝色的是恢复的原始数据。可以看到经过降维的数据样本差异更加明显。
Python KNN最近邻分类算法
# -*- coding: utf-8 -*- """ Created on Mon Feb 22 13:21:22 2016 K-NearestNeighbor """ import numpy as np import operator class KNNClassifier(): """This is a Nearest Neighbor classifier. """ #定义k的值 def __init__(self, k=3): self._k = k #计算新样本与已知分类样本的距离并从小到大排列 def _calEDistance(self, inSample, dataset): m = dataset.shape[0] diffMat = np.tile(inSample, (m,1)) - dataset sqDiffMat = diffMat**2 #每个元素平方 sqDistances = sqDiffMat.sum(axis = 1) #求和 distances = sqDistances ** 0.5 #开根号 return distances.argsort() #按距离的从小到达排列的下标值 def _classify0(self, inX, dataSet, labels): k = self._k dataSetSize = dataSet.shape[0] diffMat = np.tile(inX, (dataSetSize,1)) - dataSet sqDiffMat = diffMat**2 sqDistances = sqDiffMat.sum(axis=1) distances = sqDistances**0.5 sortedDistIndicies = distances.argsort() classCount={} for i in range(k): voteIlabel = labels[sortedDistIndicies[i]] classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1 sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True) return sortedClassCount[0][0] #对一个样本进行分类 def _classify(self, sample, train_X, train_y): #数据类型检测 if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \\ and isinstance(train_y, np.ndarray): pass else: try: sample = np.array(sample) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") sortedDistances = self._calEDistance(sample, train_X) classCount = {} for i in range(self._k): oneVote = train_y[sortedDistances[i]] #获取最近的第i个点的类别 classCount[oneVote] = classCount.get(oneVote, 0) + 1 sortedClassCount = sorted(classCount.iteritems(),\\ key=operator.itemgetter(1), reverse=True) #print "the sample :", sample, "is classified as",sortedClassCount[0][0] return sortedClassCount[0][0] def classify(self, test_X, train_X, train_y): results = [] #数据类型检测 if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \\ and isinstance(train_y, np.ndarray): pass else: try: test_X = np.array(test_X) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") d = len(np.shape(test_X)) if d == 1: sample = test_X result = self._classify(sample, train_X, train_y) results.append(result) else: for i in range(len(test_X)): sample = test_X[i] result = self._classify(sample, train_X, train_y) results.append(result) return results if __name__=="__main__": train_X = [[1, 2, 0, 1, 0], [0, 1, 1, 0, 1], [1, 0, 0, 0, 1], [2, 1, 1, 0, 1], [1, 1, 0, 1, 1]] train_y = [1, 1, 0, 0, 0] clf = KNNClassifier(k = 3) sample = [[1,2,0,1,0],[1,2,0,1,1]] result = clf.classify(sample, train_X, train_y)
第二部分:KNN测试代码
# -*- coding: utf-8 -*- """ Created on Mon Feb 22 13:21:22 2016 K-NearestNeighbor """ import numpy as np import operator class KNNClassifier(): """This is a Nearest Neighbor classifier. """ #定义k的值 def __init__(self, k=3): self._k = k #计算新样本与已知分类样本的距离并从小到大排列 def _calEDistance(self, inSample, dataset): m = dataset.shape[0] diffMat = np.tile(inSample, (m,1)) - dataset sqDiffMat = diffMat**2 #每个元素平方 sqDistances = sqDiffMat.sum(axis = 1) #求和 distances = sqDistances ** 0.5 #开根号 return distances.argsort() #按距离的从小到达排列的下标值 def _classify0(self, inX, dataSet, labels): k = self._k dataSetSize = dataSet.shape[0] diffMat = np.tile(inX, (dataSetSize,1)) - dataSet sqDiffMat = diffMat**2 sqDistances = sqDiffMat.sum(axis=1) distances = sqDistances**0.5 sortedDistIndicies = distances.argsort() classCount={} for i in range(k): voteIlabel = labels[sortedDistIndicies[i]] classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1 sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True) return sortedClassCount[0][0] #对一个样本进行分类 def _classify(self, sample, train_X, train_y): #数据类型检测 if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \\ and isinstance(train_y, np.ndarray): pass else: try: sample = np.array(sample) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") sortedDistances = self._calEDistance(sample, train_X) classCount = {} for i in range(self._k): oneVote = train_y[sortedDistances[i]] #获取最近的第i个点的类别 classCount[oneVote] = classCount.get(oneVote, 0) + 1 sortedClassCount = sorted(classCount.iteritems(),\\ key=operator.itemgetter(1), reverse=True) #print "the sample :", sample, "is classified as",sortedClassCount[0][0] return sortedClassCount[0][0] def classify(self, test_X, train_X, train_y): results = [] #数据类型检测 if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \\ and isinstance(train_y, np.ndarray): pass else: try: test_X = np.array(test_X) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") d = len(np.shape(test_X)) if d == 1: sample = test_X result = self._classify(sample, train_X, train_y) results.append(result) else: for i in range(len(test_X)): sample = test_X[i] result = self._classify(sample, train_X, train_y) results.append(result) return results if __name__=="__main__": train_X = [[1, 2, 0, 1, 0], [0, 1, 1, 0, 1], [1, 0, 0, 0, 1], [2, 1, 1, 0, 1], [1, 1, 0, 1, 1]] train_y = [1, 1, 0, 0, 0] clf = KNNClassifier(k = 3) sample = [[1,2,0,1,0],[1,2,0,1,1]] result = clf.classify(sample, train_X, train_y)
Python 决策树算法(ID3 &C4.5)
决策树(Decision Tree)算法:按照样本的属性逐步进行分类,为了能够使分类更快、更有效。每一个新分类属性的选择依据可以是信息增益IG和信息增益率IGR,前者为最基本的ID3算法,后者为改进后的C4.5算法。
以ID3为例,其训练过程的编程思路如下:
(1)输入x、y(x为样本,y为label),行为样本,列为样本特征。
(2)计算信息增益IG,获取使IG最大的特征。
(3)获得删除最佳分类特征后的样本阵列。
(4)按照最佳分类特征的属性值将更新后的样本进行归类。
属性值1(x1,y1) 属性值2(x2,y2) 属性值(x3,y3)
(5)分别对以上类别重复以上操作直至到达叶节点(递归调用)。
叶节点的特征:
(1)所有的标签值y都一样。
(2)没有特征可以继续划分。
测试过程的编程思路如下:
(1)读取训练好的决策树。
(2)从根节点开始递归遍历整个决策树直到到达叶节点为止。
以下为具体代码,训练后的决策树结构为递归套用的字典,其是由特征值组成的索引加上label组成的。
# -*- coding: utf-8 -*- """ Created on Mon Nov 07 09:06:37 2016 @author: yehx """ # -*- coding: utf-8 -*- """ Created on Sun Feb 21 12:17:10 2016 Decision Tree Source Code @author: liudiwei """ import os import numpy as np class DecitionTree(): """This is a decision tree classifier. """ def __init__(self, criteria=\'ID3\'): self._tree = None if criteria == \'ID3\' or criteria == \'C4.5\': self._criteria = criteria else: raise Exception("criterion should be ID3 or C4.5") def _calEntropy(slef, y): \'\'\' 功能:_calEntropy用于计算香农熵 e=-sum(pi*log pi) 参数:其中y为数组array 输出:信息熵entropy \'\'\' n = y.shape[0] labelCounts = {} for label in y: if label not in labelCounts.keys(): labelCounts[label] = 1 else: labelCounts[label] += 1 entropy = 0.0 for key in labelCounts: prob = float(labelCounts[key])/n entropy -= prob * np.log2(prob) return entropy def _splitData(self, X, y, axis, cutoff): """ 参数:X为特征,y为label,axis为某个特征的下标,cutoff是下标为axis特征取值值 输出:返回数据集中特征下标为axis,特征值等于cutoff的子数据集 先将特征列从样本矩阵里除去,然后将属性值为cutoff的数据归为一类 """ ret = [] featVec = X[:,axis] n = X.shape[1] #特征个数 #除去第axis列特征后的样本矩阵 X = X[:,[i for i in range(n) if i!=axis]] for i in range(len(featVec)): if featVec[i] == cutoff: ret.append(i) return X[ret, :], y[ret] def _chooseBestSplit(self, X, y): """ID3 & C4.5 参数:X为特征,y为label 功能:根据信息增益或者信息增益率来获取最好的划分特征 输出:返回最好划分特征的下标 """ numFeat = X.shape[1] baseEntropy = self._calEntropy(y) bestSplit = 0.0 best_idx = -1 for i in range(numFeat): featlist = X[:,i] #得到第i个特征对应的特征列 uniqueVals = set(featlist) curEntropy = 0.0 splitInfo = 0.0 for value in uniqueVals: sub_x, sub_y = self._splitData(X, y, i, value) prob = len(sub_y)/float(len(y)) #计算某个特征的某个值的概率 curEntropy += prob * self._calEntropy(sub_y) #迭代计算条件熵 splitInfo -= prob * np.log2(prob) #分裂信息,用于计算信息增益率 IG = baseEntropy - curEntropy if self._criteria=="ID3":以上是关于Python金融量化的主要内容,如果未能解决你的问题,请参考以下文章