如何将参数从谷歌作曲家传递到数据流模板

Posted

技术标签:

【中文标题】如何将参数从谷歌作曲家传递到数据流模板【英文标题】:How to pass parameters from google composer to dataflow template 【发布时间】:2022-01-12 07:05:55 【问题描述】:

我正在尝试按以下方式将参数从 google composer 传递到数据流模板中,但它不起作用。

# composer code
trigger_dataflow = DataflowTemplateOperator(
     task_id="trigger_dataflow",
     template="gs://mybucket/my_template",
     dag=dag,
     job_name='appsflyer_events_daily',
     parameters=
         "input": f'gs://my_bucket/' + " ds " + "/*.gz"
     
)

# template code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument(
        '--input',
        default='gs://my_bucket/*.gz',
        help='path of input file')

def main():
    pipeline_options = PipelineOptions()
    user_options = pipeline_options.view_as(UserOptions) 
    p = beam.Pipeline(options=pipeline_options)
    lines = (
        p
        | MatchFiles(user_options.input)
    )

【问题讨论】:

您使用的是哪个版本的 Airflow? @vdolez 我正在使用版本:1.10.10+composer 另外,你能提供一些错误日志吗?可能模板没有正确创建,或者 Composer 错过了访问权限。 日志不多@vdolez。管道只是不匹配任何文件并完成没有任何错误。如果我将文件模式硬编码到模板中,一切正常。 可以在没有 Composer 的情况下运行模板吗?例如从用户界面?您是否按照described in documentation 的步骤进行操作?上传了您的元数据文件等 【参考方案1】:

你可以像下面这样通过。

DataflowTemplateOperator(,
         task_id="task1",
         template=get_variable_value("template"),
         on_failure_callback=update_job_message,
         parameters=
             "fileBucket": get_variable_value("file_bucket"),
             "basePath": get_variable_value("path_input"),         
             "Day": " json.loads(ti.xcom_pull(key=run_id))['day'] ",
         ,
    )

我们正在使用 Java,在 Dataflow 作业中,我们有如下选项类获取和设置

public interface MyOptions extends CommonOptions 
 
    @Description("The output bucket")
    @Validation.Required
    ValueProvider<String> getFileBucket();
    void setFileBucket(ValueProvider<String> value);
    

我们需要为此数据流作业创建模板,该模板将由 composer dag 触发。

【讨论】:

【参考方案2】:

从数据流经典模板迁移到弹性模板解决了这个问题。

【讨论】:

以上是关于如何将参数从谷歌作曲家传递到数据流模板的主要内容,如果未能解决你的问题,请参考以下文章

有啥方法可以将数据从谷歌电子表格传递到 clickhouse?

如何从谷歌文本到 PHP 中的语音 API 中的 URL 参数获取性别声音?

用于将数据从谷歌电子表格加载到 bigquery 的独立脚本

将数据从谷歌数据存储复制到 CSV

从谷歌分析到谷歌表格的数据 - 格式错误的数字

如何将数据传递到 GAE 基本模板?