PySpark四: 机器学习


sklearn 官方网站:

API Reference — scikit-learn 1.0.2 documentation




package spark2.ml

import org.apache.log4j.Level, Logger
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.stat.ChiSquareTest
import org.apache.spark.sql.SparkSession

  * Created by Administrator on 2020/6/28.
object MLChiSquareTest 
    * 设置日志级别
  def main(args: Array[String]) 
    val spark = SparkSession
      .config("spark.driver.maxResultSize", "2G")

    import spark.implicits._

    val data = Seq(
      (0.0, Vectors.dense(1.0, 1.0, 1.0)),
      (0.0, Vectors.dense(1.0, 1.0, 2.0)),
      (0.0, Vectors.dense(1.0, 1.0, 3.0)),
      (1.0, Vectors.dense(1.0, 3.0, 4.0)),
      (1.0, Vectors.dense(1.0, 3.0, 5.0)),
      (1.0, Vectors.dense(1.0, 3.0, 6.0))
    val df = data.toDF("label", "features")
    val chi = ChiSquareTest.test(df, "features", "label")


  决策树以及随机森立见:pyspark在机器学习中实战小练 - 知乎


TO DO:预测一个人新收入是否会超过5万美金



from pyspark.sql import SparkSession


df = spark.read.csv('adult.csv', inferSchema = True, header=True) #读取csv文件
df.show(3)  #用来显示前3行


cols = df.columns #和pandas一样看列名

|-- age: integer (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: integer (nullable = true)
|-- education: string (nullable = true)
|-- education-num: integer (nullable = true)
|-- marital-status: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital-gain: integer (nullable = true)
|-- capital-loss: integer (nullable = true)
|-- hours-per-week: integer (nullable = true)
|-- native-country: string (nullable = true)
|-- income: string (nullable = true)

cat_features = [item[0] for item in df.dtypes if item[1]=='string']
# 需要删除 income列,否则标签泄露
num_features = [item[0] for item in df.dtypes if item[1]!='string']

对于类别变量我们需要进行编码,在pyspark中提供了StringIndexer, OneHotEncoder, VectorAssembler特征编码模式

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stages = []
for col in cat_features:
    # 字符串转成索引
    string_index = StringIndexer(inputCol = col, outputCol = col + 'Index')
    # 转换为OneHot编码
    encoder = OneHotEncoder(inputCols=[string_index.getOutputCol()], outputCols=[col + "_one_hot"])
    # 将每个字段的转换方式 放到stages中
    stages += [string_index, encoder]

# 将income转换为索引
label_string_index = StringIndexer(inputCol = 'income', outputCol = 'label')
# 添加到stages中
stages += [label_string_index]

# 类别变量 + 数值变量
assembler_cols = [c + "_one_hot" for c in cat_features] + num_features
assembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")
stages += [assembler]

# 使用pipeline完成数据处理
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df)
df = pipeline_model.transform(df)
selected_cols = ["label", "features"] + cols
df = df.select(selected_cols)

因为pyspark显示的数据比较像mysql 那样不方便观看


import pandas as pd
pd.DataFrame(df.take(20), columns = df.columns)


因为使用VectorAssembler直接将特征转成了features这一列,因为pyspark做ML时 需要特征编码好了并做成向量列


分割数据集 测试集

train, test = df.randomSplit([0.7, 0.3], seed=2021)




from pyspark.ml.classification import LogisticRegression
# 创建模型
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label',maxIter=10)
lr_model = lr.fit(train)



predictions = lr_model.transform(test)



|-- label: double (nullable = false)
|-- features: vector (nullable = true)
|-- age: integer (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: integer (nullable = true)
|-- education: string (nullable = true)
|-- education-num: integer (nullable = true)
|-- marital-status: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital-gain: integer (nullable = true)
|-- capital-loss: integer (nullable = true)
|-- hours-per-week: integer (nullable = true)
|-- native-country: string (nullable = true)
|-- income: string (nullable = true)
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = false)


selected = predictions.select("label", "prediction", "probability", "age", "occupation")


from pyspark.ml.evaluation import BinaryClassificationEvaluator
# 模型评估,通过原始数据 rawPrediction计算AUC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('AUC:', evaluator.evaluate(predictions))

AUC: 0.9062153434371653


from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# 创建网络参数,用于交叉验证
param_grid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
# 五折交叉验证,设置模型,网格参数,验证方法,折数
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
# 交叉验证运行
cv_model = cv.fit(train)
# 对于测试数据,使用五折交叉验证
predictions = cv_model.transform(test)
print('AUC:', evaluator.evaluate(predictions))

AUC: 0.9054096433333642




#!/usr/bin/env python3
# -*- coding: utf-8 -*-
Created on Thu Jun  7 18:08:40 2018

@author: luogan

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql import SparkSession

spark= SparkSession\\
                .builder \\
                .appName("dataFrame") \\

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("/home/luogan/lg/softinstall/spark-2.2.0-bin-hadoop2.7/data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
|prediction|indexedLabel|            features|
|       1.0|         1.0|(692,[95,96,97,12...|
|       1.0|         1.0|(692,[100,101,102...|
|       1.0|         1.0|(692,[121,122,123...|
|       1.0|         1.0|(692,[123,124,125...|
|       1.0|         1.0|(692,[124,125,126...|
only showing top 5 rows

Test Error = 0.0285714 
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_49f5b151055db55ad5a5) of depth 1 with 3 nodes


注:单纯拿Pyspark练练手,可无需配置Pyspark集群,直接本地配置下单机Pyspark,也可以使用线上spark集群(如: community.cloud.databricks.com)。


#!/usr/bin/env python
# coding: utf-8
#  初始化SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark RF example").config("spark.some.config.option", "some-value").getOrCreate()
# 加载数据
df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data.csv",header=True)
from pyspark.sql.functions import *# 数据基本信息分析
df.dtypes # Return df column names and data types
df.show()  #Display the content of df
df.head()  #Return first n rows
df.first()  #Return first row 
df.take(2)  #Return the first n rows
df.schema   # Return the schema of df
df.columns # Return the columns of df
df.count()  #Count the number of rows in df
df.distinct().count()  #Count the number of distinct rows in df
df.printSchema()  #Print the schema of df
df.explain()  #Print the (logical and physical)  plans
df.describe().show()  #Compute summary statistics 
df.groupBy('Survived').agg(avg("Age"),avg("Fare")).show()  # 聚合分析
df.select(df.Sex, df.Survived==1).show()  # 带条件查询 
df.sort("Age", ascending=False).collect() # 排序
# 特征加工
df = df.dropDuplicates()   # 删除重复值
df = df.na.fill(value=0)  # 缺失填充值
df = df.na.drop()        # 或者删除缺失值
df = df.withColumn('isMale', when(df['Sex']=='male',1).otherwise(0)) # 新增列:性别0 1
df = df.drop('_c0','Name','Sex') # 删除姓名、性别、索引列
# 设定特征/标签列
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=[x for x in df.columns  
                  if x not in ignore], outputCol = 'features')
new_df = vectorAssembler.transform(df)
new_df = new_df.select(['features', 'Survived'])
# 划分测试集训练集
train, test = new_df.randomSplit([0.75, 0.25], seed = 12345)
# 模型训练
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', 
lr_model = lr.fit(test)
# 模型评估
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = lr_model.transform(test)
auc = BinaryClassificationEvaluator().setLabelCol('Survived')
print('AUC of the model:' + str(auc.evaluate(predictions)))
print('features weights', lr_model.coefficientMatrix)

PySpark四: 机器学习_starry0001的博客-CSDN博客_pyspark 机器学习


​​​​​​PySpark入门二十三:ML--随机森林_Roc Huang的博客-CSDN博客_pyspark的随机森林


# -*- coding: UTF-8 -*-
from matplotlib.font_manager import FontProperties
import matplotlib.pyplot as plt
from math import log
import operator

def createDataSet():
    dataSet = [[0, 0, 0, 0, 'no'],
               [0, 0, 0, 1, 'no'],
               [0, 1, 0, 1, 'yes'],
               [0, 1, 1, 0, 'yes'],
               [0, 0, 0, 0, 'no'],
               [1, 0, 0, 0, 'no'],
               [1, 0, 0, 1, 'no'],
               [1, 1, 1, 1, 'yes'],
               [1, 0, 1, 2, 'yes'],
               [1, 0, 1, 2, 'yes'],
               [2, 0, 1, 2, 'yes'],
               [2, 0, 1, 1, 'yes'],
               [2, 1, 0, 1, 'yes'],
               [2, 1, 0, 2, 'yes'],
               [2, 0, 0, 0, 'no']]
    labels = ['F1-AGE', 'F2-WORK', 'F3-HOME', 'F4-LOAN']
    return dataSet, labels

def createTree(dataset, labels, featLabels):
    classList = [example[-1] for example in dataset]
    if classList.count(classList[0]) == len(classList):
        return classList[0]
    if len(dataset[0]) == 1:
        return majorityCnt(classList)
    bestFeat = chooseBestFeatureToSplit(dataset)
    bestFeatLabel = labels[bestFeat]
    myTree = bestFeatLabel: 
    del labels[bestFeat]
    featValue = [example[bestFeat] for example in dataset]
    uniqueVals = set(featValue)
    for value in uniqueVals:
        sublabels = labels[:]
        myTree[bestFeatLabel][value] = createTree(splitDataSet(dataset, bestFeat, value), sublabels, featLabels)
    return myTree

def majorityCnt(classList):
    classCount = 
    for vote in classList:
        if vote not in classCount.keys(): classCount[vote] = 0
        classCount[vote] += 1
    sortedclassCount = sorted(classCount.items(), key=operator.itemgetter(1), reverse=True)
    return sortedclassCount[0][0]

def chooseBestFeatureToSplit(dataset):
    numFeatures = len(dataset[0]) - 1
    baseEntropy = calcShannonEnt(dataset)
    bestInfoGain = 0
    bestFeature = -1
    for i in range(numFeatures):
        featList = [example[i] for example in dataset]
        uniqueVals = set(featList)
        newEntropy = 0
        for val in uniqueVals:
            subDataSet = splitDataSet(dataset, i, val)
            prob = len(subDataSet) / float(len(dataset))
            newEntropy += prob * calcShannonEnt(subDataSet)
        infoGain = baseEntropy - newEntropy
        if (infoGain > bestInfoGain):
            bestInfoGain = infoGain
            bestFeature = i
    return bestFeature

def splitDataSet(dataset, axis, val):
    retDataSet = []
    for featVec in dataset:
        if featVec[axis] == val:
            reducedFeatVec = featVec[:axis]
            reducedFeatVec.extend(featVec[axis + 1:])
    return retDataSet

def calcShannonEnt(dataset):
    numexamples = len(dataset)
    labelCounts = 
    for featVec in dataset:
        currentlabel = featVec[-1]
        if currentlabel not in labelCounts.keys():
            labelCounts[currentlabel] = 0
        labelCounts[currentlabel] += 1

    shannonEnt = 0
    for key in labelCounts:
        prop = float(labelCounts[key]) / numexamples
        shannonEnt -= prop * log(prop, 2)
    return shannonEnt

def getNumLeafs(myTree):
    numLeafs = 0
    firstStr = next(iter(myTree))
    secondDict = myTree[firstStr]
    for key in secondDict.keys():
        if type(secondDict[key]).__name__ == 'dict':
            numLeafs += getNumLeafs(secondDict[key])
            numLeafs += 1
    return numLeafs

def getTreeDepth(myTree):
    maxDepth = 0
    firstStr = next(iter(myTree))
    secondDict = myTree[firstStr]
    for key in secondDict.keys():
        if type(secondDict[key]).__name__ == 'dict':
            thisDepth = 1 + getTreeDepth(secondDict[key])
            thisDepth = 1
        if thisDepth > maxDepth: maxDepth = thisDepth
    return maxDepth

def plotNode(nodeTxt, centerPt, parentPt, nodeType):
    arrow_args = dict(arrowstyle="<-")
    font = FontProperties(fname=r"c:\\windows\\fonts\\simsunb.ttf", size=14)
    createPlot.ax1.annotate(nodeTxt, xy=parentPt, xycoords='axes fraction',
                            xytext=centerPt, textcoords='axes fraction',
                            va="center", ha="center", bbox=nodeType, arrowprops=arrow_args, FontProperties=font)

def plotMidText(cntrPt, parentPt, txtString):
    xMid = (parentPt[0] - cntrPt[0]) / 2.0 + cntrPt[0]
    yMid = (parentPt[1] - cntrPt[1]) / 2.0 + cntrPt[1]
    createPlot.ax1.text(xMid, yMid, txtString, va="center", ha="center", rotation=30)

def plotTree(myTree, parentPt, nodeTxt):
    decisionNode = dict(boxstyle="sawtooth", fc="0.8")
    leafNode = dict(boxstyle="round4", fc="0.8")
    numLeafs = getNumLeafs(myTree)
    depth = getTreeDepth(myTree)
    firstStr = next(iter(myTree))
    cntrPt = (plotTree.xOff + (1.0 + float(numLeafs)) / 2.0 / plotTree.totalW, plotTree.yOff)
    plotMidText(cntrPt, parentPt, nodeTxt)
    plotNode(firstStr, cntrPt, parentPt, decisionNode)
    secondDict = myTree[firstStr]
    plotTree.yOff = plotTree.yOff - 1.0 / plotTree.totalD
    for key in secondDict.keys():
        if type(secondDict[key]).__name__ == 'dict':
            plotTree(secondDict[key], cntrPt, str(key))
            plotTree.xOff = plotTree.xOff + 1.0 / plotTree.totalW
            plotNode(secondDict[key], (plotTree.xOff, plotTree.yOff), cntrPt, leafNode)
            plotMidText((plotTree.xOff, plotTree.yOff), cntrPt, str(key))
    plotTree.yOff = plotTree.yOff + 1.0 / plotTree.totalD

def createPlot(inTree):
    fig = plt.figure(1, facecolor='white')  # 创建fig
    fig.clf()  # 清空fig
    axprops = dict(xticks=[], yticks=[])
    createPlot.ax1 = plt.subplot(111, frameon=False, **axprops)  # 去掉x、y轴
    plotTree.totalW = float(getNumLeafs(inTree))  # 获取决策树叶结点数目
    plotTree.totalD = float(getTreeDepth(inTree))  # 获取决策树层数
    plotTree.xOff = -0.5 / plotTree.totalW;
    plotTree.yOff = 1.0;  # x偏移
    plotTree(inTree, (0.5, 1.0), '')  # 绘制决策树

if __name__ == '__main__':
    dataset, labels = createDataSet()
    featLabels = []
    myTree = createTree(dataset, labels, featLabels)


003 机器学习中的基础知识

机器学习算法之 k-means 聚类算法



