Apache Beam - 将 BigQuery TableRow 写入 Cassandra
Posted
技术标签:
【中文标题】Apache Beam - 将 BigQuery TableRow 写入 Cassandra【英文标题】:Apache Beam - Write BigQuery TableRow to Cassandra 【发布时间】:2020-12-21 07:52:01 【问题描述】:我正在尝试从 BigQuery(使用 TableRow)读取数据并将输出写入 Cassandra。该怎么做?
这是我尝试过的。这有效:
/* Read BQ */
PCollection<CxCpmMapProfile> data = p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, CxCpmMapProfile>()
public CxCpmMapProfile apply(SchemaAndRecord record)
GenericRecord r = record.getRecord();
return new CxCpmMapProfile((String) r.get("channel_no").toString(), (String) r.get("channel_name").toString());
).fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`").usingStandardSql().withoutValidation());
/* Write to Cassandra */
data.apply(CassandraIO.<CxCpmMapProfile>write()
.withHosts(Arrays.asList("<IP addr1>", "<IP addr2>"))
.withPort(9042)
.withUsername("cassandra_user").withPassword("cassandra_password").withKeyspace("cassandra_keyspace")
.withEntity(CxCpmMapProfile.class));
但是当我像这样使用 TableRow 更改 Read BQ 部分时:
/* Read from BQ using readTableRow */
PCollection<TableRow> data = p.apply(BigQueryIO.readTableRows()
.fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`")
.usingStandardSql().withoutValidation());
在 写入 Cassandra 时出现以下错误
The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (CassandraIO.Write<CxCpmMacProfile>)
【问题讨论】:
【参考方案1】:错误是由于输入 PCollection 包含 TableRow
元素,而 CassandraIO 读取需要 CxCpmMacProfile
元素。您需要将 BigQuery 中的元素读取为 CxCpmMacProfile
元素。 BigQueryIO documentation 有一个从表中读取行并将其解析为自定义类型的示例,通过 read(SerializableFunction)
方法完成。
【讨论】:
谢谢,所以在这种情况下,唯一的选择似乎是自定义类型,而不是 TableRow以上是关于Apache Beam - 将 BigQuery TableRow 写入 Cassandra的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表
Apache Beam:将具有对象列表的对象转换为多个 TableRows 以写入 BigQuery
在 BigQuery Apache Beam 中访问 TableRow 列