如何使用 Python 对 Spark 中的 LIBSVM 文件进行特征选择和缩减?
Posted
技术标签:
【中文标题】如何使用 Python 对 Spark 中的 LIBSVM 文件进行特征选择和缩减?【英文标题】:How to do feature selection and reduction on a LIBSVM file in Spark using Python? 【发布时间】:2015-07-05 18:13:49 【问题描述】:我有几个 LIBSVM 文件,我必须使用 python 在 spark 中实现集群。该文件以 空格 作为分隔符,第一列表示类型 [1 或 -1],其余均为格式 [1:2.566] 的特征。有很多这样的列,我想对此执行特征选择[最好实现 ChiSquareTest 模型],然后使用 PCA 或 SVD 执行特征减少过程。但是,我在 spark 中找不到像样的 python 教程来实现这些过程。
我在网上找到了一个link,它有一个示例脚本可以在 python 中实现 Chisqtest。我使用相同的逻辑来实现模型,但无法完成。在该链接的假设测试部门下,代码在传递给 ChiSqTest 模型之前并行化 RDD[LabeledPoint]。我以不同的方式尝试了相同的逻辑,但我得到了不同的错误。
data = MLUtils.loadLibSVMFile(sc, "PATH/FILENAME.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
obs = sc.parallelize(LabeledPoint(label,features))
这给了我一个错误说明 TypeError: float() argument must be a string or a number
然后,我使用 Normalizer() 对数据进行了标准化,并做了同样的事情,得到了同样的错误。所以,我写了一个返回标记点的函数
def parsepoint(line):
values = [float(x) for x in line.split(' ')]
return sc.parallelize(LabeledPoint(values[0],values[1:]))
parsedData = data.map(lambda x: parsepoint(x))
obs = sc.parallelize(parsedData)
这给了我一个错误,说明 Pipeline RDD is not iterable。我尝试了其他几种方法,但一切都以错误告终。有人可以告诉我哪里出错了吗?而且,对于使用 PCA 或 SVD 的特征缩减过程,我在 python 中找不到示例脚本。对此的任何投入都会对我很有帮助。
堆栈跟踪:
Py4JJavaError Traceback (most recent call last)
<ipython-input-1-8d0164c0957d> in <module>()
10 sct = SparkContext()
11 data = MLUtils.loadLibSVMFile(sct, "PATH")
---> 12 print data.take(1)
13 #label = data.map(lambda x: x.label)
14 #features = data.map(lambda x: x.features)
SPARK_HOME\rdd.pyc in take(self, num)
1263
1264 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1265 res = self.context.runJob(self, takeUpToNumLeft, p, True)
1266
1267 items += res
SPARK_HOME\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
879 mappedRDD = rdd.mapPartitions(partitionFunc)
880 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions,
--> 881 allowLocal)
882 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
883
SPARK\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
SPARK\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling 012.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost): java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at java.io.BufferedOutputStream.write(Unknown Source)
at java.io.DataOutputStream.write(Unknown Source)
at java.io.FilterOutputStream.write(Unknown Source)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$wr ite$1(PythonRDD.scala:413)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:425)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:425)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:425)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRD D.scala:248)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$ $failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler .scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAG Scheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala :1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
【问题讨论】:
【参考方案1】:MLUtils.loadLibSVMFile
返回RDD[LabeledPoint]
,因此您可以将输出直接传递给Statistics.chiSqTest
。使用示例数据:
from pyspark.mllib.util import MLUtils
from pyspark.mllib.stat import Statistics
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
chiSqResults = Statistics.chiSqTest(data)
print chiSqResults[-1]
【讨论】:
我现在试过了。它不工作。代码:sc = SparkContext() endofline data = MLUtils.loadLibSVMFile(sc, "FILEPATH") endofline fitresult = Statistics.chiSqTest(data) endofline 遇到大错误: Py4JJavaError: 调用 z:org.apache.spark.api.python.PythonRDD.runJob 时出错。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0(TID 2,本地主机):java.net.SocketException:连接重置by peer:套接字写入错误 您能提供更多详细信息吗?你使用什么版本的 Spark?执行data.take(1)
时发生了什么?您可以在上面的示例输入上运行代码吗?
我使用的是 Spark 1.4。我正在使用 Ipython 执行脚本。而且,当我使用 data.take(1) 打印数据时,我得到了相同的 P4JJavaError。我曾经为 rdd.take() 函数获得正确的输出。但是,最近,我只是看不到该函数的输出。不知道我在做什么错误。而且,当我在示例数据上运行脚本时,我得到了输出
您确定输入文件格式正确吗?你是如何获得它的?您是否尝试将其加载到 Spark 外部?例如scikit.
我添加了输入文件内容的屏幕截图。我使用 scikit 库加载了文件,并且加载正确【参考方案2】:
试试这个为你解析函数。之后的代码打印第一个标签和第一个标签本身的特征。希望这会有所帮助!
def parsepoint(line):
values = line.split(" ")
return LabeledPoint(values[0], values[1:])
parsedData = map(parsepoint, data.take(1))
firstFeatures = parsedData[0].features
firstLabel = parsedData[0].label
print firstFeatures
print firstLabel
【讨论】:
以上是关于如何使用 Python 对 Spark 中的 LIBSVM 文件进行特征选择和缩减?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Python 使用 Selenium 获取 <ul> 中的 <li> 元素列表?
Spark python如何使用特殊标记对RDD项目进行分组? [复制]
解释 Spark 中的聚合功能(使用 Python 和 Scala)