Apache Beam - 将 BigQuery TableRow 写入 Cassandra

Posted

技术标签:

【中文标题】Apache Beam - 将 BigQuery TableRow 写入 Cassandra【英文标题】:Apache Beam - Write BigQuery TableRow to Cassandra 【发布时间】:2020-12-21 07:52:01 【问题描述】:

我正在尝试从 BigQuery(使用 TableRow)读取数据并将输出写入 Cassandra。该怎么做?

这是我尝试过的。这有效:

/* Read BQ */
PCollection<CxCpmMapProfile> data =  p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, CxCpmMapProfile>() 
    public CxCpmMapProfile apply(SchemaAndRecord record) 
        GenericRecord r = record.getRecord();
        return new CxCpmMapProfile((String) r.get("channel_no").toString(), (String) r.get("channel_name").toString());
    
).fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`").usingStandardSql().withoutValidation());

/* Write to Cassandra */
data.apply(CassandraIO.<CxCpmMapProfile>write()
    .withHosts(Arrays.asList("<IP addr1>", "<IP addr2>"))
    .withPort(9042)
    .withUsername("cassandra_user").withPassword("cassandra_password").withKeyspace("cassandra_keyspace")
    .withEntity(CxCpmMapProfile.class));

但是当我像这样使用 TableRow 更改 Read BQ 部分时:

/* Read from BQ using readTableRow */
PCollection<TableRow> data = p.apply(BigQueryIO.readTableRows()
    .fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`")
    .usingStandardSql().withoutValidation());

写入 Cassandra 时出现以下错误

The method apply(PTransform&lt;? super PCollection&lt;TableRow&gt;,OutputT&gt;) in the type PCollection&lt;TableRow&gt; is not applicable for the arguments (CassandraIO.Write&lt;CxCpmMacProfile&gt;)

【问题讨论】:

【参考方案1】:

错误是由于输入 PCollection 包含 TableRow 元素,而 CassandraIO 读取需要 CxCpmMacProfile 元素。您需要将 BigQuery 中的元素读取为 CxCpmMacProfile 元素。 BigQueryIO documentation 有一个从表中读取行并将其解析为自定义类型的示例,通过 read(SerializableFunction) 方法完成。

【讨论】:

谢谢,所以在这种情况下,唯一的选择似乎是自定义类型,而不是 TableRow

以上是关于Apache Beam - 将 BigQuery TableRow 写入 Cassandra的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表

Apache Beam:将具有对象列表的对象转换为多个 TableRows 以写入 BigQuery

在 BigQuery Apache Beam 中访问 TableRow 列

Apache Beam 数据流 BigQuery

结合 BigQuery 和 Pub/Sub Apache Beam

如何使用 BigQuery 和 Apache Beam 将 SQL 表转换为行序列列表?