pyspark:使用从 kafka 检索到的数据训练 kmeans 流式传输

Posted

技术标签:

【中文标题】pyspark:使用从 kafka 检索到的数据训练 kmeans 流式传输【英文标题】:pyspark: train kmeans streaming with data retrieved from kafka 【发布时间】:2017-07-20 16:02:14 【问题描述】:

我想用来自 kafka 主题的数据训练一个流式 kmeans 模型。

我的问题是如何呈现 kmeans streamig 模型的数据

sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 30)

zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", topic: 1)
lines = kvs.map(lambda x: x[1])

lines.pprint()

这个输出(这些是我的特征,用'|'分隔):

1.0|2.0|0.0|21.0|2.0

1.0|2.0|0.0|21.0|2.0

那我就想做这个

model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
model.trainOn(lines)

如果我将两段代码结合起来,我会得到错误:

TypeError: Cannot convert type <type 'unicode'> into Vector

【问题讨论】:

您能否分享一个端到端示例以及您正在使用的所有软件模块的版本(Kafka 和 spark 版本)? 错误日志不完整。可以分享完整版吗? 感谢@MedAli。我刚刚找到了解决方案。我会在下面发布答案 【参考方案1】:

第一个问题是格式化从 kafka 提取的流。这是管道分隔数据的工作原理

sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 30)

zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", topic: 1)

raw = kvs.flatMap(lambda kafkaS: [kafkaS])
lines = raw.map(lambda xs: xs[1].split("|"))

lines = lines.map(lambda x: DenseVector(x))

第二个问题是数据的维度setRandomCenters的第一个参数(它应该与特征数相同)

【讨论】:

以上是关于pyspark:使用从 kafka 检索到的数据训练 kmeans 流式传输的主要内容,如果未能解决你的问题,请参考以下文章

我可以使用 spark 2.3.0 和 pyspark 从 Kafka 进行流处理吗?

pyspark.sql.utils.AnalysisException:找不到数据源:kafka

我可以使用spark 2.3.0和pyspark从Kafka进行流处理吗?

IndexedRowMatrix().columnSimilarities() 检索到的 PySpark 相似性无法访问:INFO ExternalSorter: Thread *spilling i

将大型 DataFrame 从 PySpark 写入 Kafka 遇到超时

从 Docker 容器将 PySpark 连接到 Kafka