如何用 Pyspark 的 SVM 拟合两个 numpy 矩阵?

Posted

技术标签:

【中文标题】如何用 Pyspark 的 SVM 拟合两个 numpy 矩阵?【英文标题】:How to fit two numpy matrices with Pyspark's SVM? 【发布时间】:2016-07-02 06:28:50 【问题描述】:

我有两个这样的 numpy 矩阵:

Features:
    (878049, 6)
    <type 'numpy.ndarray'>

Labels:
    (878049,)
    <type 'numpy.ndarray'>

我很好奇是否可以使用Pyspark's random forests 来拟合前面提到的矩阵。从文档中,我们可以按如下方式使用 RF 算法:

model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo=,
                                     numTrees=3, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))

因此,我的问题是:我是否需要将 numpy 数组转换为 rdd,或者我需要以哪种格式转换 featureslabels 矩阵以使其适合 MLlib 的 RF 实现? .

更新 然后从@CafeFeed 回答我尝试了以下内容:

In [24]:

#CV

(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [26]:

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

from pyspark.mllib.util import MLUtils

import numpy as np

​

# Train a DecisionTree model.

# Empty categoricalFeaturesInfo indicates all features are continuous.

​

model = DecisionTree.trainClassifier(trainingData, numClasses=np.unique(y))

​

# Evaluate model on test instances and compute test error

predictions = model.predict(testData.map(lambda x: x.features))

labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())

print('Test Error = ' + str(testErr))

print('Learned classification tree model:')

print(model.toDebugString())

​

但是,我得到了这个例外:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-27-ded4b074521b> in <module>()
      6 # Empty categoricalFeaturesInfo indicates all features are continuous.
      7 
----> 8 model = DecisionTree.trainClassifier(trainingData, numClasses=np.unique(y), categoricalFeaturesInfo=,impurity='gini', maxDepth=5, maxBins=32)
      9 
     10 # Evaluate model on test instances and compute test error

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/mllib/tree.pyc in trainClassifier(cls, data, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
    183         """
    184         return cls._train(data, "classification", numClasses, categoricalFeaturesInfo,
--> 185                           impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
    186 
    187     @classmethod

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/mllib/tree.pyc in _train(cls, data, type, numClasses, features, impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
    124         assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
    125         model = callMLlibFunc("trainDecisionTreeModel", data, type, numClasses, features,
--> 126                               impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
    127         return DecisionTreeModel(model)
    128 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args)
    128     sc = SparkContext._active_spark_context
    129     api = getattr(sc._jvm.PythonMLLibAPI(), name)
--> 130     return callJavaFunc(sc, api, *args)
    131 
    132 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args)
    120 def callJavaFunc(sc, func, *args):
    121     """ Call Java Function """
--> 122     args = [_py2java(sc, a) for a in args]
    123     return _java2py(sc, func(*args))
    124 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/mllib/common.pyc in _py2java(sc, obj)
     86     else:
     87         data = bytearray(PickleSerializer().dumps(obj))
---> 88         obj = sc._jvm.SerDe.loads(data)
     89     return obj
     90 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     34     def deco(*a, **kw):
     35         try:
---> 36             return f(*a, **kw)
     37         except py4j.protocol.Py4JJavaError as e:
     38             s = e.java_exception.toString()

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling 012.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.mllib.api.python.SerDe.loads.
: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:701)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:171)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:85)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:98)
    at org.apache.spark.mllib.api.python.SerDe$.loads(PythonMLLibAPI.scala:1462)
    at org.apache.spark.mllib.api.python.SerDe.loads(PythonMLLibAPI.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

【参考方案1】:

文档很清楚。你需要 RDD:

>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import RandomForest
>>> import numpy as np
>>>
>>> np.random.seed(1)
>>> features = np.random.random((100, 10))
>>> labels = np.random.choice([0, 1], 100)
>>> data = sc.parallelize(zip(labels, features)).map(lambda x: LabeledPoint(x[0], x[1])) 
>>> RandomForest.trainClassifier(data, numClasses=2, categoricalFeaturesInfo=, numTrees=2)
TreeEnsembleModel classifier with 2 trees

【讨论】:

以上是关于如何用 Pyspark 的 SVM 拟合两个 numpy 矩阵?的主要内容,如果未能解决你的问题,请参考以下文章

如何用matlab做正弦曲线拟合

PySpark:如何用逗号指定列作为十进制

如何用 sacala 代码详细说明 pyspark 代码?

如何用origin非线性拟合曲线

如何用origin数据拟合曲线

如何用单边数据拟合高斯分布?