如何从本地目录中读取,kmeans 流式传输 pyspark
Posted
技术标签:
【中文标题】如何从本地目录中读取,kmeans 流式传输 pyspark【英文标题】:How to read from local directory, kmeans streaming pyspark 【发布时间】:2017-11-02 04:28:47 【问题描述】:在使用 pyspark 运行 kmeans 流式传输时,我需要从本地目录读取的帮助。 ***上没有关于这个话题的好答案
这是我的代码
if __name__ == "__main__":
ssc = StreamingContext(sc, 1)
training_data_raw, training_data_df = prepare_data(TRAINING_DATA_SET)
trainingData = parse2(training_data_raw)
testing_data_raw, testing_data_df = prepare_data(TEST_DATA_SET)
testingData = testing_data_raw.map(parse1)
#print(testingData)
trainingQueue = [trainingData]
testingQueue = [testingData]
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
ssc.textFileStream('file:///Users/userrname/PycharmProjects/MLtest/training/data/')
ssc.start()
ssc.awaitTermination()
谢谢!!
【问题讨论】:
【参考方案1】:from pyspark.mllib.linalg import Vectors
trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
用于测试示例
from pyspark.mllib.regression import LabeledPoint
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(',')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
testData = ssc.textFileStream("/testing/data/dir").map(parse)
【讨论】:
以上是关于如何从本地目录中读取,kmeans 流式传输 pyspark的主要内容,如果未能解决你的问题,请参考以下文章
HuggingFace:使用自定义 data_loader 和 data_collator 从本地目录流式传输数据集