如何从 Dataflow 中的 PCollection 读取 bigQuery
Posted
技术标签:
【中文标题】如何从 Dataflow 中的 PCollection 读取 bigQuery【英文标题】:How to read bigQuery from PCollection in Dataflow 【发布时间】:2018-11-08 06:22:35 【问题描述】:我有一个从 pubsub 获得的对象的 PCollection,比如说:
PCollection<Student> pStudent ;
在学生属性中,有一个属性,比如说学生ID; 我想用这个学生 id 从 BigQuery 中读取属性(class_code),并将我从 BQ 获得的 class_code 设置为 PColllcetion 中的学生对象
有人知道如何实现吗?
我知道在梁中有一个 BigQueryIO
但是如果我想在 BQ 中执行的查询字符串标准来自 PCollection 中的学生对象 (studentID) 我该怎么做从 BigQuery 的结果中将值设置为 PCollection?
【问题讨论】:
您不想在此用例中使用 BigQuery,因为 BigQuery 不是 OLTP 数据库,而且此用例似乎需要数据库中的事务属性。当然,您可以将 BigQuery 表作为映射表获取到您的数据流,但数据要么保持不变,要么您最终会多次查询 BigQuery,这可能会导致您不必要地付出代价。如果您想为此目的使用 GCP 中的某些内容,请使用 Cloud SQL 或 Cloud Datastore。 【参考方案1】:我考虑了两种选择来做到这一点。一种是使用BigQueryIO
读取整个表格并将其具体化为侧面输入,或者使用CoGroupByKey
连接所有数据。另一种可能性,我在这里实现的,是直接使用 Java 客户端库。
我创建了一些虚拟数据:
$ bq mk test.students name:STRING,grade:STRING
$ bq query --use_legacy_sql=false 'insert into test.students (name, grade) values ("Yoda", "A+"), ("Leia", "B+"), ("Luke", "C-"), ("Chewbacca", "F")'
看起来像这样:
然后,在管道中,我生成一些输入虚拟数据:
Create.of("Luke", "Leia", "Yoda", "Chewbacca")
对于这些“学生”中的每一个,我按照this example 中的方法在 BigQuery 表中获取相应的成绩。根据之前的评论,根据您的数据量、速率(配额)和成本考虑因素加以考虑。完整示例:
public class DynamicQueries
private static final Logger LOG = LoggerFactory.getLogger(DynamicQueries.class);
@SuppressWarnings("serial")
public static void main(String[] args)
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// create input dummy data
PCollection<String> students = p.apply("Read students data", Create.of("Luke", "Leia", "Yoda", "Chewbacca").withCoder(StringUtf8Coder.of()));
// ParDo to map each student with the grade in BigQuery
PCollection<KV<String, String>> marks = students.apply("Read marks from BigQuery", ParDo.of(new DoFn<String, KV<String, String>>()
@ProcessElement
public void processElement(ProcessContext c) throws Exception
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"SELECT name, grade "
+ "FROM `PROJECT_ID.test.students` "
+ "WHERE name = "
+ "\"" + c.element() + "\" " // fetch the appropriate student
+ "LIMIT 1")
.setUseLegacySql(false) // Use standard SQL syntax for queries.
.build();
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Wait for the query to complete.
queryJob = queryJob.waitFor();
// Check for errors
if (queryJob == null)
throw new RuntimeException("Job no longer exists");
else if (queryJob.getStatus().getError() != null)
throw new RuntimeException(queryJob.getStatus().getError().toString());
// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId)
TableResult result = queryJob.getQueryResults();
String mark = new String();
for (FieldValueList row : result.iterateAll())
mark = row.get("grade").getStringValue();
c.output(KV.of(c.element(), mark));
));
// log to check everything is right
marks.apply("Log results", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>()
@ProcessElement
public void processElement(ProcessContext c) throws Exception
LOG.info("Element: " + c.element().getKey() + " " + c.element().getValue());
c.output(c.element());
));
p.run();
输出是:
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Yoda A+
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Luke C-
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Chewbacca F
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Leia B+
(使用 BigQuery 1.22.0 和 2.5.0 Java SDK for Dataflow 测试)
【讨论】:
以上是关于如何从 Dataflow 中的 PCollection 读取 bigQuery的主要内容,如果未能解决你的问题,请参考以下文章
从 Dataflow 中的 GCS 读取时如何获取正在处理的文件名?
如何使用 python 将字典写入 Dataflow 中的 Bigquery
如何使用 Stream 为 Spring Cloud Dataflow 中的子任务设置全局属性 - Task-Launcher-Dataflow
Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象