以 libsvm 格式迭代保存数据帧
Posted
技术标签:
【中文标题】以 libsvm 格式迭代保存数据帧【英文标题】:save dataframe in libsvm format iteratively 【发布时间】:2017-10-11 02:54:56 【问题描述】:我需要以 libsvm 格式迭代地保存数据帧。我的代码是这样的
im_df = im_table.select("m_id", "fsz", "fnm")
all_recs_df = None
fake_df = None
firstRec = True
for eachRec in (im_df.rdd.zipWithIndex().map(lambda ((mi, fs, fn), i): (mi, fs, fn)).collect()):
m_id = eachRec[0]
fsz = eachRec[1]
fnm = eachRec[2]
volume_df = volume_table.select("id","m_id").filter(volume_table['m_id']==m_id)
m_bytes = 0
for eachVolRec in (volume_df.rdd.zipWithIndex().map(lambda ((id), i): (id)).collect()):
each_v_id = eachVolRec[0]
volume_m_id = eachVolRec[1]
vsnp_df = vsnp_table.select("v_id","ssb").filter(vsnp_table['v_id']==each_v_id)
vsnp_sum_df = vsnp_df.groupBy("v_id").agg(sum("ssb").alias("ssb_sum"))
v_bytes = vsnp_sum_df.rdd.zipWithIndex().map(lambda ((vi, vb), i): (vi, vb)).collect()[0][1]
print "\t total = %s" %(v_bytes)
m_bytes += v_bytes
print "im.fnm = %s, im.fsz = %s , total_snaphot_size_bytes: %s" %(fnm, fsz, m_bytes)
if firstRec:
firstRec = False
all_recs_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, m_bytes), label=0.0)]))
fake_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, 1000 * m_bytes), label=1.0)]))
all_recs_df = all_recs_df.unionAll(fake_df)
all_recs_df.registerTempTable("temp_table")
else:
each_rec_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, m_bytes), label=0.0)]))
all_recs_df = sqlContext.sql("select * from temp_table")
all_recs_df = all_recs_df.unionAll(each_rec_df)
all_recs_df.registerTempTable("temp_table")
现在运行命令all_recs_df = sqlContext.sql("select * from temp_table")
给出错误no such table temp_table
并运行命令all_recs_df.collect()
给出错误'NoneType' object has no attribute 'collect'
显然,一旦程序退出 for
循环,all_recs_df
和 temp_table
就会脱离上下文。
问题:那么的替代方法是什么
我尝试立即将数据帧保存到磁盘,但无法将数据附加到同一个文件中
MLUtils.saveAsLibSVMFile(d, "/tmp/test1")
这里的 d 是一个 LabeledPoint RDD。在for
循环中运行上述命令会得到Output directory file:/tmp/test1 already exists
问题:有没有办法将数据附加到现有的 libsvm 格式文件中
【问题讨论】:
【参考方案1】:我尝试立即将数据帧保存到磁盘,但无法将数据附加到同一个文件中
MLUtils.saveAsLibSVMFile(d, "/tmp/test1")
这里的 d 是一个 LabeledPoint RDD。在 for 循环中运行上述命令给出输出目录文件:/tmp/test1 已经存在
问题:有没有办法将数据附加到现有的 libsvm 格式文件中
您可以将文件保存和覆盖为here,但它们不由 MLUtils.saveAsLibSVMFile() 处理。
使用 MLUtils.saveAsLibSVMFile() 我认为您无法覆盖现有文件。
因此,以下代码不会将数据附加到现有的 libsvm 格式文件,而是一个循环,您可以将每个周期获得的数据与前一个周期获得的数据结合起来,因此最后您将保存单个文件:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import MLUtils
yourRDD = sc.emptyRDD() # start with an empty RDD
for elem in xrange(0,3): # your loop
rdd_aux = sc.parallelize([LabeledPoint(elem,[elem*2,elem*3])]) #just an example
#store and overwrite your new data in an auxiliary RDD at every cycle
yourRDD = yourRDD.union(rdd_aux) # combine your RDD_aux with the RDD that you want to make longer at every cycle
#yourRDD.take(3)
#[LabeledPoint(0.0, [0.0,0.0]), LabeledPoint(1.0, [2.0,3.0]), LabeledPoint(2.0, [4.0,6.0])]
MLUtils.saveAsLibSVMFile(yourRDD,"/your/path")
通过这种方式,您可以将新的 RDD 附加到以前的 RDD,然后保存单个文件,而不是将新数据附加到现有文件中。
【讨论】:
以上是关于以 libsvm 格式迭代保存数据帧的主要内容,如果未能解决你的问题,请参考以下文章