从 google pubsub 到 spark 流的数据摄取速度很慢
Posted
技术标签:
【中文标题】从 google pubsub 到 spark 流的数据摄取速度很慢【英文标题】:Data ingestion from google pubsub to spark streaming is slow 【发布时间】:2019-11-30 09:36:36 【问题描述】:我正在使用谷歌云 Dataproc Spark 集群来运行 Spark 流式传输作业,该作业从多个 PubSub 订阅中读取数据并写入 BigQuery。 PubSub 有 500 万个元素,滑动窗口为 2 分钟,批次/窗口为 30 秒,我每批次仅获得大约 200,000 个元素。我希望在第一批中获得全部 500 万。每个元素的大小约为 140 字节,采用 Avro 消息格式。
我在 Dataflow 中实现了每秒 100 万个元素的速度,但我想对 Dataproc 做同样的事情。我尝试了 Dataproc 的自动缩放选项,还尝试了与 Dataflow 相同的 Beam 管道代码。如果我增加订阅数量,那么它可能会给我更多的吞吐量。是否有可能从单个订阅中获得 1M 元素/秒的吞吐量?
以下是我的 Scala 代码:
// Reading from multiple PubSub.
for (a <- 0 to Integer.parseInt(subs))
logger.info("SKCHECK : Creating stream : " + subscription + a)
val everysub = PubsubUtils.createStream(
ssc, projectId, None, subscription + a,
SparkGCPCredentials.builder.jsonServiceAccount(jsonPath).build(),
StorageLevel.MEMORY_ONLY_SER).map(message =>
// Method to send avro bytes message and get row
val row : Row = avroMsgToRow(message.getData())
row
)
我的build.sbt
看起来像:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
// "org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
// "org.apache.spark" %% "spark-hive" % sparkVersion,
"com.google.cloud" % "google-cloud-bigquery" % bigQueryVersion,
"com.google.apis" % "google-api-services-bigquery" % googleApiBigQueryVersion,
"com.google.cloud" % "google-cloud-nio" % gcsNioVersion,
"com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion
)
// https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/bigquery-connector
libraryDependencies += "com.google.cloud.bigdataoss" % "bigquery-connector" % "0.10.0-hadoop2"
// https://mvnrepository.com/artifact/com.spotify/spark-bigquery
libraryDependencies += "com.spotify" %% "spark-bigquery" % "0.2.2"
libraryDependencies += "com.google.apis" % "google-api-services-pubsub" % "v1-rev425-1.25.0"
// https://mvnrepository.com/artifact/org.apache.bahir/spark-streaming-pubsub
libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.3.0"
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
libraryDependencies += "org.scala-lang" % "scala-library" % "2.10.0-M3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-avro
libraryDependencies += "org.apache.spark" %% "spark-avro" % "2.4.0"
如果您需要更多信息,请告诉我。
我希望通过单个 PubSub 订阅获得每秒 100 万个元素的数据摄取速度。
【问题讨论】:
面临与 bahir 库相同的问题。你能确定问题吗? 【参考方案1】:我认为您需要首先确定 Spark Streaming 作业的瓶颈。是 CPU 限制、内存限制、IO 限制还是因为 Spark 的某些参数导致它没有充分利用资源?我建议您先检查资源利用率,然后尝试不同的machine types。
【讨论】:
喜欢方向,但为什么不从检查资源利用率开始呢?以上是关于从 google pubsub 到 spark 流的数据摄取速度很慢的主要内容,如果未能解决你的问题,请参考以下文章
尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错
Google Cloud Platform:无法从 Container Engine 访问 Pubsub
Google Cloud Function - ImportError:无法从“google.cloud”(未知位置)导入名称“pubsub”
获取 Google Cloud PubSub 中单条消息的大小
Google PubSub / Gmail Webhook:发送电子邮件时始终从 PubSub 接收多个 POST 请求