如何将参数从谷歌作曲家传递到数据流模板
Posted
技术标签:
【中文标题】如何将参数从谷歌作曲家传递到数据流模板【英文标题】:How to pass parameters from google composer to dataflow template 【发布时间】:2022-01-12 07:05:55 【问题描述】:我正在尝试按以下方式将参数从 google composer 传递到数据流模板中,但它不起作用。
# composer code
trigger_dataflow = DataflowTemplateOperator(
task_id="trigger_dataflow",
template="gs://mybucket/my_template",
dag=dag,
job_name='appsflyer_events_daily',
parameters=
"input": f'gs://my_bucket/' + " ds " + "/*.gz"
)
# template code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default='gs://my_bucket/*.gz',
help='path of input file')
def main():
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
p = beam.Pipeline(options=pipeline_options)
lines = (
p
| MatchFiles(user_options.input)
)
【问题讨论】:
您使用的是哪个版本的 Airflow? @vdolez 我正在使用版本:1.10.10+composer 另外,你能提供一些错误日志吗?可能模板没有正确创建,或者 Composer 错过了访问权限。 日志不多@vdolez。管道只是不匹配任何文件并完成没有任何错误。如果我将文件模式硬编码到模板中,一切正常。 可以在没有 Composer 的情况下运行模板吗?例如从用户界面?您是否按照described in documentation 的步骤进行操作?上传了您的元数据文件等 【参考方案1】:你可以像下面这样通过。
DataflowTemplateOperator(,
task_id="task1",
template=get_variable_value("template"),
on_failure_callback=update_job_message,
parameters=
"fileBucket": get_variable_value("file_bucket"),
"basePath": get_variable_value("path_input"),
"Day": " json.loads(ti.xcom_pull(key=run_id))['day'] ",
,
)
我们正在使用 Java,在 Dataflow 作业中,我们有如下选项类获取和设置
public interface MyOptions extends CommonOptions
@Description("The output bucket")
@Validation.Required
ValueProvider<String> getFileBucket();
void setFileBucket(ValueProvider<String> value);
我们需要为此数据流作业创建模板,该模板将由 composer dag 触发。
【讨论】:
【参考方案2】:从数据流经典模板迁移到弹性模板解决了这个问题。
【讨论】:
以上是关于如何将参数从谷歌作曲家传递到数据流模板的主要内容,如果未能解决你的问题,请参考以下文章
有啥方法可以将数据从谷歌电子表格传递到 clickhouse?
如何从谷歌文本到 PHP 中的语音 API 中的 URL 参数获取性别声音?