使用 python Spark 将大型 CSV 发送到 Kafka

Posted

技术标签:

【中文标题】使用 python Spark 将大型 CSV 发送到 Kafka【英文标题】:Sending Large CSV to Kafka using python Spark 【发布时间】:2015-11-26 00:59:20 【问题描述】:

我正在尝试将大型 CSV 发送到 kafka。基本结构是读取一行CSV并与标题一起压缩。

a = dict(zip(header, line.split(",")

然后将其转换为 json:

message = json.dumps(a)

然后我使用 kafka-python 库发送消息

from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)

使用 PYSPARK 我已经轻松地从 CSV 文件中创建了消息的 RDD

sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)

messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))

现在我想发送这些消息:我定义了一个函数

def sendkafka(message):
  kafka = KafkaClient("localhost:9092")
  producer = SimpleProducer(kafka)
  return producer.send_messages('topic',message)

然后我创建一个新的 RDD 来发送消息

sentRDD = messageRDD.map(lambda x: kafkasend(x))

然后我调用 sendRDD.count()

开始搅动和发送消息

不幸的是,这非常慢。它每秒发送 1000 条消息。这是在一个 10 个节点的集群上,每个集群有 4 个 CPU 和 8GB 内存。

相比之下,在 1000 万行 csv 上创建消息大约需要 7 秒。 ~ 约 2gb

我认为问题在于我在函数内部实例化了一个 kafka 生产者。但是,如果我不这样做,那么 spark 会抱怨即使我尝试在全球范围内定义生产者也不存在。

也许有人可以阐明如何解决这个问题。

谢谢,

【问题讨论】:

【参考方案1】:

您可以为每个分区创建一个生产者并使用mapPartitionsforeachPartition

def sendkafka(messages):
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    for message in messages:
        yield producer.send_messages('topic', message)

sentRDD = messageRDD.mapPartitions(sendkafka)

如果仅靠上述方法没有帮助,您可以尝试使用 asynchronous producer 对其进行扩展。

在 Spark 2.x 中也可以使用 Kafka 数据源。您必须包含 spark-sql-kafka jar,匹配 Spark 和 Scala 版本(此处分别为 2.2.0 和 2.11):

spark.jars.packages  org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

将数据转换为DataFrame(如果不是DataFrame):

messageDF = spark.createDataFrame(messageRDD, "string")

并使用DataFrameWriter

(messageDF.write
    .format("kafka")
    .option("topic", topic_name)
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .save())

【讨论】:

感谢 zero323。在 spark 之外使用 async 的单个生产者,我每秒可以获得 8000 个。所以我做了一些调整。我发现这个 csv 有 15 个分区,所以我给了这个工作 15 个核心。然后我使用异步选项,直到批量大小为 20000。这给了我每秒 225,000 的最大吞吐量。所以通过一些调整,我实际上得到了一个合理的比率。流式传输 1000 万行 csv 需要 45 秒。 @PhineasDashevsky,如果您能分享最终解决方案的代码,将会非常有帮助。 iabdb.me/2015/09/09/… 在这篇文章中,我有代码和更详细的说明。 我尝试这样做并得到 NameError: global name 'KafkaClient' is not defined。我认为它无法识别 foreachpartition 中的 kafkaclient。任何人都有同样的问题。

以上是关于使用 python Spark 将大型 CSV 发送到 Kafka的主要内容,如果未能解决你的问题,请参考以下文章

动态构建大型数据框(spark 或 pandas)以导出到 csv 的方法

将大型 csv 加载到 mysql 等 RDB 的推荐方法

根据行值python将大型csv文件拆分为多个文件

使用 python 和 pandas 将错误创建的大型 csv 文件转换为制表符分隔文件

Apache Sqoop 和 Spark

Python Spark-如何将空 DataFrame 输出到 csv 文件(仅输出标头)?