如何使用 Python 对 Spark 中的 LIBSVM 文件进行特征选择和缩减?

Posted

技术标签:

【中文标题】如何使用 Python 对 Spark 中的 LIBSVM 文件进行特征选择和缩减?【英文标题】:How to do feature selection and reduction on a LIBSVM file in Spark using Python? 【发布时间】:2015-07-05 18:13:49 【问题描述】:

我有几个 LIBSVM 文件,我必须使用 python 在 spark 中实现集群。该文件以 空格 作为分隔符,第一列表示类型 [1 或 -1],其余均为格式 [1:2.566] 的特征。有很多这样的列,我想对此执行特征选择[最好实现 ChiSquareTest 模型],然后使用 PCA 或 SVD 执行特征减少过程。但是,我在 spark 中找不到像样的 python 教程来实现这些过程。

我在网上找到了一个link,它有一个示例脚本可以在 python 中实现 Chisqtest。我使用相同的逻辑来实现模型,但无法完成。在该链接的假设测试部门下,代码在传递给 ChiSqTest 模型之前并行化 RDD[LabeledPoint]。我以不同的方式尝试了相同的逻辑,但我得到了不同的错误。

data = MLUtils.loadLibSVMFile(sc, "PATH/FILENAME.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
obs = sc.parallelize(LabeledPoint(label,features))

这给了我一个错误说明 TypeError: float() argument must be a string or a number

然后,我使用 Normalizer() 对数据进行了标准化,并做了同样的事情,得到了同样的错误。所以,我写了一个返回标记点的函数

def parsepoint(line):
    values = [float(x) for x in line.split(' ')]
    return sc.parallelize(LabeledPoint(values[0],values[1:]))
parsedData = data.map(lambda x: parsepoint(x))
obs = sc.parallelize(parsedData)

这给了我一个错误,说明 Pipeline RDD is not iterable。我尝试了其他几种方法,但一切都以错误告终。有人可以告诉我哪里出错了吗?而且,对于使用 PCA 或 SVD 的特征缩减过程,我在 python 中找不到示例脚本。对此的任何投入都会对我很有帮助。

堆栈跟踪:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-8d0164c0957d> in <module>()
  10 sct = SparkContext()
  11 data = MLUtils.loadLibSVMFile(sct, "PATH")
  ---> 12 print data.take(1)
  13 #label = data.map(lambda x: x.label)
  14 #features = data.map(lambda x: x.features)

  SPARK_HOME\rdd.pyc in take(self, num)
  1263 
  1264 p = range(partsScanned, min(partsScanned + numPartsToTry,   totalParts))
 -> 1265 res = self.context.runJob(self, takeUpToNumLeft, p, True)
  1266 
  1267 items += res

  SPARK_HOME\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
   879         mappedRDD = rdd.mapPartitions(partitionFunc)
   880         port = self._jvm.PythonRDD.runJob(self._jsc.sc(),   mappedRDD._jrdd, partitions,
   --> 881 allowLocal)
   882         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
   883 
      SPARK\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args)
   536 answer = self.gateway_client.send_command(command)
   537 return_value = get_return_value(answer, self.gateway_client,
   --> 538 self.target_id, self.name)
   539 
   540  for temp_arg in temp_args:
    SPARK\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
   298 raise Py4JJavaError(
   299 'An error occurred while calling 012.\n'.
   --> 300  format(target_id, '.', name), value)
   301   else:
   302   raise Py4JError(

  Py4JJavaError: An error occurred while calling       z:org.apache.spark.api.python.PythonRDD.runJob.
  : org.apache.spark.SparkException: Job aborted due to stage failure: Task   0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0   (TID 2, localhost): java.net.SocketException: Connection reset by peer: socket      write error
   at java.net.SocketOutputStream.socketWrite0(Native Method)
   at java.net.SocketOutputStream.socketWrite(Unknown Source)
   at java.net.SocketOutputStream.write(Unknown Source)
   at java.io.BufferedOutputStream.write(Unknown Source)
   at java.io.DataOutputStream.write(Unknown Source)
   at java.io.FilterOutputStream.write(Unknown Source)
   at    org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$wr       ite$1(PythonRDD.scala:413)
   at   org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:425)
   at  org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:425)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at   org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
   at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:425)
   at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRD D.scala:248)
   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
   at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

  Driver stacktrace:
   at   org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$ $failJobAndIndependentStages(DAGScheduler.scala:1266)
  at  org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler .scala:1257)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
  at   scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAG Scheduler.scala:730)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
  at scala.Option.foreach(Option.scala:236)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala :1411)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

【问题讨论】:

【参考方案1】:

MLUtils.loadLibSVMFile 返回RDD[LabeledPoint],因此您可以将输出直接传递给Statistics.chiSqTest。使用示例数据:

from pyspark.mllib.util import MLUtils
from pyspark.mllib.stat import Statistics

data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
chiSqResults = Statistics.chiSqTest(data)

print chiSqResults[-1]

【讨论】:

我现在试过了。它不工作。代码:sc = SparkContext() endofline data = MLUtils.loadLibSVMFile(sc, "FILEPATH") endofline fitresult = Statistics.chiSqTest(data) endofline 遇到大错误: Py4JJavaError: 调用 z:org.apache.spark.api.python.PythonRDD.runJob 时出错。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0(TID 2,本地主机):java.net.SocketException:连接重置by peer:套接字写入错误 您能提供更多详细信息吗?你使用什么版本的 Spark?执行data.take(1) 时发生了什么?您可以在上面的示例输入上运行代码吗? 我使用的是 Spark 1.4。我正在使用 Ipython 执行脚本。而且,当我使用 data.take(1) 打印数据时,我得到了相同的 P4JJavaError。我曾经为 rdd.take() 函数获得正确的输出。但是,最近,我只是看不到该函数的输出。不知道我在做什么错误。而且,当我在示例数据上运行脚本时,我得到了输出 您确定输入文件格式正确吗?你是如何获得它的?您是否尝试将其加载到 Spark 外部?例如scikit. 我添加了输入文件内容的屏幕截图。我使用 scikit 库加载了文件,并且加载正确【参考方案2】:

试试这个为你解析函数。之后的代码打印第一个标签和第一个标签本身的特征。希望这会有所帮助!

    def parsepoint(line):
        values = line.split(" ")
        return LabeledPoint(values[0], values[1:])

    parsedData = map(parsepoint, data.take(1))
    firstFeatures = parsedData[0].features
    firstLabel = parsedData[0].label
    print firstFeatures
    print firstLabel

【讨论】:

以上是关于如何使用 Python 对 Spark 中的 LIBSVM 文件进行特征选择和缩减?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Python 使用 Selenium 获取 <ul> 中的 <li> 元素列表?

Spark python如何使用特殊标记对RDD项目进行分组? [复制]

如何从键值对列表中创建 Spark Row

解释 Spark 中的聚合功能(使用 Python 和 Scala)

如何使用 spark(python)读取 zip 文件中的 CSV 文件的内容 [重复]

如何在 HDP 中的 zeppelin-spark2 中将库安装到 python