为啥我的数据流作业使用很少的 cpu 而它却无法赶上消息积压
Posted
技术标签:
【中文标题】为啥我的数据流作业使用很少的 cpu 而它却无法赶上消息积压【英文标题】:Why does my dataflow job use very little cpu whereas it is not able to catch up the message backlog为什么我的数据流作业使用很少的 cpu 而它却无法赶上消息积压 【发布时间】:2021-11-28 03:45:04 【问题描述】:我对数据流很陌生,但我很难找到一份工作速度很快的工作。
我们正在测试以下设置:我们在 Pub/Sub 1 事件中进行流式传输,并有一个数据流管道应该读取、解析和写入 Bigquery。
CPU 使用率为 4/5%,似乎效率不高 处理似乎很长,但解析似乎很短 即使删除最后一步,它仍然处理 0.5 事件/秒这是我在 python 中的管道代码
class CustomParsing(beam.DoFn):
""" Custom ParallelDo class to apply a custom transformation """
@staticmethod
def if_exists_in_data(data, key):
if key in data.keys():
return data[key]
return None
def to_runner_api_parameter(self, unused_context):
return "beam:transforms:custom_parsing:custom_v0", None
def process(self, message: beam.io.PubsubMessage, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
data = json.loads(message.data.decode('utf-8'))
yield
"data": message.data,
"dataflow_timestamp": timestamp.to_rfc3339(),
"created_at": self.if_exists_in_data(data, 'createdAt')
def run():
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_subscription",
help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'
)
parser.add_argument(
"--output_table", help="Output BigQuery Table"
)
known_args, pipeline_args = parser.parse_known_args()
# Creating pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
# Defining our pipeline and its steps
p = beam.Pipeline(options=pipeline_options)
(
p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=known_args.input_subscription, timestamp_attribute=None, with_attributes=True
)
| "CustomParse" >> beam.ParDo(CustomParsing())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=BIGQUERY_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
)
)
pipeline_result = p.run()
if __name__ == "__main__":
run()
然后我使用以下 bash 命令启动我的工作
poetry run python $filepath \
--streaming \
--update \
--runner DataflowRunner \
--project $1 \
--region xx \
--subnetwork regions/xx/subnetworks/$3 \
--temp_location gs://$4/temp \
--job_name $jobname \
--max_num_workers 10 \
--num_worker 2 \
--input_subscription projects/$1/subscriptions/$jobname \
--output_table $1:yy.$filename \
--autoscaling_algorithm=THROUGHPUT_BASED \
--dataflow_service_options=enable_prime
我正在使用 python 3.7.8 和 apache-beam==2.32.0。我看不到任何重要的错误日志,但我可能不是在寻找可以帮助我调试的日志。
您对在日志中查看什么或我是否做错了什么有任何线索吗?
【问题讨论】:
如果它只有 1event/s 那么也许你只需要 1 个工人。还要检查 bigquery 输出的 batch_size 参数。 感谢您的回答。 1event/s 用于测试目的,但在生产中我们将有 100event/s。我正在尝试减少 batch_size,因为如果我理解得很好,它会在写入之前等待 500 次插入? 不幸的是,减少批量大小并没有改变我们仍然处理约 0.5 个事件/秒的任何事情。 100 个事件/秒对于 BQ 流对我来说似乎有点贵。每 15 分钟执行一次批处理作业会更便宜,甚至可以从单个实例运行 Beam 作业。 即使我只保留第一步(阅读),它仍然很长。我在启动函数中缺少参数吗? 【参考方案1】:可能您的性能较低,因为 Dataflow 已将您的所有 DoFn 融合到一个步骤中。这意味着您的步骤一个接一个地按顺序运行,而不是并行运行(您将等待 pubsub 和 bq api 调用完成)。
为了防止融合,您可以在 ReadFromPubSub() 之后在管道中添加 Reshuffle() 步骤:
p = beam.Pipeline(options=pipeline_options)
(
p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=known_args.input_subscription, timestamp_attribute=None, with_attributes=True
)
| "Prevent fusion" >> beam.transforms.util.Reshuffle()
| "CustomParse" >> beam.ParDo(CustomParsing())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=BIGQUERY_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
)
)
Google 有很多关于 preventing fusion 以及如何查看您正在运行的数据流作业是否有融合步骤的文档。
顺便说一句 - 您可以优化您的代码,删除与 dict.get() 具有相同功能的 if_exists_in_data
静态方法。
您可以摆脱从data.keys()
创建列表,只需使用 data.get('createdAt') 进行迭代(即 O(n) ):
def process(self, message: beam.io.PubsubMessage, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
data = json.loads(message.data.decode('utf-8'))
yield
"data": message.data,
"dataflow_timestamp": timestamp.to_rfc3339(),
"created_at": data.get['createdAt']
当您有比现在更多的事件时,这种优化会为您提供帮助;-)。
【讨论】:
非常感谢您的回答和 .get。不幸的是,只需复制粘贴您的代码,我没有任何改进(0.5 个事件/秒和低 cpu 使用率)。图表没有改变,只是多了一个步骤(防止融合) 这个问题似乎与订购有关。为了避免订购,有没有办法在自定义解析中获取发布的时间戳?时间戳参数是发布时间还是我需要指定一些东西以上是关于为啥我的数据流作业使用很少的 cpu 而它却无法赶上消息积压的主要内容,如果未能解决你的问题,请参考以下文章
为啥 LINQ 在我的查询中使用错误的数据类型,而它在 EF 架构中被正确声明?
为啥我的查询不会在我的代码中执行,而它在普通 sql 中执行得很好?
中国移动的哀伤,用户吐槽资费太贵,而它却未能获得相应的利润,到底谁赚走了用户的钱?...
为啥 CakePHP 无法为一个控制器创建夹具并返回 SQLSTATE[42000] 而它是为另一个控制器?