为啥我的数据流作业使用很少的 cpu 而它却无法赶上消息积压

Posted

技术标签:

【中文标题】为啥我的数据流作业使用很少的 cpu 而它却无法赶上消息积压【英文标题】:Why does my dataflow job use very little cpu whereas it is not able to catch up the message backlog为什么我的数据流作业使用很少的 cpu 而它却无法赶上消息积压 【发布时间】:2021-11-28 03:45:04 【问题描述】:

我对数据流很陌生,但我很难找到一份工作速度很快的工作。

我们正在测试以下设置:我们在 Pub/Sub 1 事件中进行流式传输,并有一个数据流管道应该读取、解析和写入 Bigquery。

CPU 使用率为 4/5%,似乎效率不高 处理似乎很长,但解析似乎很短 即使删除最后一步,它仍然处理 0.5 事件/秒

这是我在 python 中的管道代码

class CustomParsing(beam.DoFn):
    """ Custom ParallelDo class to apply a custom transformation """

    @staticmethod
    def if_exists_in_data(data, key):
        if key in data.keys():
            return data[key]
        return None

    def to_runner_api_parameter(self, unused_context):
        return "beam:transforms:custom_parsing:custom_v0", None

    def process(self, message: beam.io.PubsubMessage, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        data = json.loads(message.data.decode('utf-8'))
        yield 
            "data": message.data,
            "dataflow_timestamp": timestamp.to_rfc3339(),
            "created_at": self.if_exists_in_data(data, 'createdAt')
        


def run():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_subscription",
        help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'
    )
    parser.add_argument(
        "--output_table", help="Output BigQuery Table"
    )

    known_args, pipeline_args = parser.parse_known_args()

    # Creating pipeline options
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(StandardOptions).streaming = True

    # Defining our pipeline and its steps
    p = beam.Pipeline(options=pipeline_options)
    (
        p
        | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
            subscription=known_args.input_subscription, timestamp_attribute=None, with_attributes=True
        )
        | "CustomParse" >> beam.ParDo(CustomParsing())
        | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
            known_args.output_table,
            schema=BIGQUERY_SCHEMA,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
        )
    )
    pipeline_result = p.run()


if __name__ == "__main__":

    run()

然后我使用以下 bash 命令启动我的工作

poetry run python $filepath \
      --streaming \
      --update \
      --runner DataflowRunner \
      --project $1 \
      --region xx \
      --subnetwork regions/xx/subnetworks/$3 \
      --temp_location gs://$4/temp \
      --job_name $jobname  \
      --max_num_workers 10 \
      --num_worker 2 \
      --input_subscription projects/$1/subscriptions/$jobname \
      --output_table $1:yy.$filename \
      --autoscaling_algorithm=THROUGHPUT_BASED \
      --dataflow_service_options=enable_prime

我正在使用 python 3.7.8 和 apache-beam==2.32.0。我看不到任何重要的错误日志,但我可能不是在寻找可以帮助我调试的日志。

您对在日志中查看什么或我是否做错了什么有任何线索吗?

【问题讨论】:

如果它只有 1event/s 那么也许你只需要 1 个工人。还要检查 bigquery 输出的 batch_size 参数。 感谢您的回答。 1event/s 用于测试目的,但在生产中我们将有 100event/s。我正在尝试减少 batch_size,因为如果我理解得很好,它会在写入之前等待 500 次插入? 不幸的是,减少批量大小并没有改变我们仍然处理约 0.5 个事件/秒的任何事情。 100 个事件/秒对于 BQ 流对我来说似乎有点贵。每 15 分钟执行一次批处理作业会更便宜,甚至可以从单个实例运行 Beam 作业。 即使我只保留第一步(阅读),它仍然很长。我在启动函数中缺少参数吗? 【参考方案1】:

可能您的性能较低,因为 Dataflow 已将您的所有 DoFn 融合到一个步骤中。这意味着您的步骤一个接一个地按顺序运行,而不是并行运行(您将等待 pubsub 和 bq api 调用完成)。

为了防止融合,您可以在 ReadFromPubSub() 之后在管道中添加 Reshuffle() 步骤:

p = beam.Pipeline(options=pipeline_options)
    (
        p
        | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
            subscription=known_args.input_subscription, timestamp_attribute=None, with_attributes=True
        ) 
        | "Prevent fusion" >> beam.transforms.util.Reshuffle()
        | "CustomParse" >> beam.ParDo(CustomParsing())
        | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
            known_args.output_table,
            schema=BIGQUERY_SCHEMA,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
        )
    )

Google 有很多关于 preventing fusion 以及如何查看您正在运行的数据流作业是否有融合步骤的文档。

顺便说一句 - 您可以优化您的代码,删除与 dict.get() 具有相同功能的 if_exists_in_data 静态方法。

您可以摆脱从data.keys() 创建列表,只需使用 data.get('createdAt') 进行迭代(即 O(n) ):

def process(self, message: beam.io.PubsubMessage, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        data = json.loads(message.data.decode('utf-8'))
        yield 
            "data": message.data,
            "dataflow_timestamp": timestamp.to_rfc3339(),
            "created_at": data.get['createdAt']
        

当您有比现在更多的事件时,这种优化会为您提供帮助;-)。

【讨论】:

非常感谢您的回答和 .get。不幸的是,只需复制粘贴您的代码,我没有任何改进(0.5 个事件/秒和低 cpu 使用率)。图表没有改变,只是多了一个步骤(防止融合) 这个问题似乎与订购有关。为了避免订购,有没有办法在自定义解析中获取发布的时间戳?时间戳参数是发布时间还是我需要指定一些东西

以上是关于为啥我的数据流作业使用很少的 cpu 而它却无法赶上消息积压的主要内容,如果未能解决你的问题,请参考以下文章

为啥 LINQ 在我的查询中使用错误的数据类型,而它在 EF 架构中被正确声明?

为啥我的查询不会在我的代码中执行,而它在普通 sql 中执行得很好?

中国移动的哀伤,用户吐槽资费太贵,而它却未能获得相应的利润,到底谁赚走了用户的钱?...

为啥 CakePHP 无法为一个控制器创建夹具并返回 SQLSTATE[42000] 而它是为另一个控制器?

为啥 Android Studio 设计器显示我的自定义视图嵌套在自身内部,而它不是

为啥我的 gitlab-ci.yml 在 npm run test 上一直崩溃,而它在代码编辑器中工作?