将数据流中的 STRUCT 数组写入大查询
Posted
技术标签:
【中文标题】将数据流中的 STRUCT 数组写入大查询【英文标题】:Writing an array of STRUCT from Dataflow to big query 【发布时间】:2017-03-20 15:09:04 【问题描述】:我正在尝试将数据流管道中的结构数组字段写入大查询,生成的表的架构是正确的,但字段中没有填充数据。
我的 DoFn 函数:
public class ProcessIpBlocks
public static class IpBlocksToIp extends DoFn<TableRow, TableRow>
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws JSONException
TableRow row = c.element();
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar cal = Calendar.getInstance();
long startIp = 0L, endIp = 0L;
if(row.get("start_ip") != null)
startIp = Long.parseLong((String)row.get("start_ip"));
if(row.get("end_ip") != null)
endIp = Long.parseLong((String)row.get("end_ip"));
for(long i= startIp; i<=endIp; i++)
TableRow outputRow = new TableRow();
outputRow.set("start_ip", startIp);
outputRow.set("ip", i);
if(row.get("postal_code") != null && !((String)row.get("postal_code")).isEmpty())
System.out.println("This is getting written to logs");
endIp = Long.parseLong((String)row.get("end_ip"));
JSONArray atrArray = new JSONArray();
JSONObject atr = new JSONObject();
atr.put("id", "zippostal_code");
JSONArray atrValueArray = new JSONArray();
atr.put("value", atrValueArray.put((String)row.get("postal_code")));
atr.put("pr", 0.5);
atr.put("dt", cal.getTime());
atrArray.put(atr);
outputRow.set("atr", atrArray);
c.output(outputRow);
我的管道写入步骤:
iPBlocksToIPData.apply("Foo", ParDo.of(new ProcessIpBlocks.IpBlocksToIp()))
.apply(BigQueryIO.Write
.named("WriteIPs")
.to(String.format("%1$s:%2$s.%3$s",projectId, eventDataset, ipBlocksToIpTable))
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
【问题讨论】:
【参考方案1】:以下解决方案有效,使用 TableRow
而不是 JSONArray
公共类 Foo
public static class Foo extends DoFn<TableRow, TableRow>
@Override
public void processElement(ProcessContext c) throws JSONException
TableRow row = c.element();
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar cal = Calendar.getInstance();
long startIp = 0L, endIp = 0L;
if(row.get("start_ip") != null)
startIp = Long.parseLong((String)row.get("start_ip"));
if(row.get("end_ip") != null)
endIp = Long.parseLong((String)row.get("end_ip"));
for(long i= startIp; i<=endIp; i++)
TableRow outputRow = new TableRow();
outputRow.set("start_ip", startIp);
outputRow.set("ip", i);
if(row.get("postal_code") != null && !((String)row.get("postal_code")).isEmpty())
endIp = Long.parseLong((String)row.get("end_ip"));
TableRow atrRow = new TableRow();
atrRow.set("id", "zippostal_code");
atrRow.set("value", new String[] (String)row.get("postal_code"));
outputRow.set("atr", atrRow);
System.out.println(outputRow);
c.output(outputRow);
【讨论】:
以上是关于将数据流中的 STRUCT 数组写入大查询的主要内容,如果未能解决你的问题,请参考以下文章
C 语言文件操作 ( 学生管理系统 | 命令行接收数据填充结构体 | 结构体写出到文件中 | 查询文件中的结构体数据 )