将数据流中的 STRUCT 数组写入大查询

Posted

技术标签:

【中文标题】将数据流中的 STRUCT 数组写入大查询【英文标题】:Writing an array of STRUCT from Dataflow to big query 【发布时间】:2017-03-20 15:09:04 【问题描述】:

我正在尝试将数据流管道中的结构数组字段写入大查询,生成的表的架构是正确的,但字段中没有填充数据。

我的 DoFn 函数:

public class ProcessIpBlocks 

    public static class IpBlocksToIp extends DoFn<TableRow, TableRow> 

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(ProcessContext c) throws JSONException 

            TableRow row = c.element();
            DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Calendar cal = Calendar.getInstance();


            long startIp = 0L, endIp = 0L;
            if(row.get("start_ip") != null)
                startIp = Long.parseLong((String)row.get("start_ip"));

            if(row.get("end_ip") != null)
                endIp = Long.parseLong((String)row.get("end_ip"));

            for(long i= startIp; i<=endIp; i++)
            
                TableRow outputRow = new TableRow();
                outputRow.set("start_ip", startIp);
                outputRow.set("ip", i);

                if(row.get("postal_code") != null && !((String)row.get("postal_code")).isEmpty())

                    System.out.println("This is getting written to logs");
                    endIp = Long.parseLong((String)row.get("end_ip"));
                    JSONArray atrArray = new JSONArray();

                    JSONObject atr = new JSONObject();
                    atr.put("id", "zippostal_code");

                    JSONArray atrValueArray = new JSONArray();
                    atr.put("value", atrValueArray.put((String)row.get("postal_code")));


                    atr.put("pr", 0.5);
                    atr.put("dt", cal.getTime());
                    atrArray.put(atr);
                    outputRow.set("atr", atrArray);
                

                c.output(outputRow);
            
        
    


我的管道写入步骤:

iPBlocksToIPData.apply("Foo", ParDo.of(new ProcessIpBlocks.IpBlocksToIp()))
        .apply(BigQueryIO.Write
                .named("WriteIPs")
                .to(String.format("%1$s:%2$s.%3$s",projectId, eventDataset, ipBlocksToIpTable))
                .withSchema(schema)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

【问题讨论】:

【参考方案1】:

以下解决方案有效,使用 TableRow 而不是 JSONArray

公共类 Foo

public static class Foo extends DoFn<TableRow, TableRow> 


    @Override
    public void processElement(ProcessContext c) throws JSONException 

        TableRow row = c.element();
        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Calendar cal = Calendar.getInstance();


        long startIp = 0L, endIp = 0L;
        if(row.get("start_ip") != null)
            startIp = Long.parseLong((String)row.get("start_ip"));

        if(row.get("end_ip") != null)
            endIp = Long.parseLong((String)row.get("end_ip"));

        for(long i= startIp; i<=endIp; i++)
        
            TableRow outputRow = new TableRow();
            outputRow.set("start_ip", startIp);
            outputRow.set("ip", i);

            if(row.get("postal_code") != null && !((String)row.get("postal_code")).isEmpty())

                endIp = Long.parseLong((String)row.get("end_ip"));

                TableRow atrRow = new TableRow();
                atrRow.set("id", "zippostal_code");
                atrRow.set("value", new String[] (String)row.get("postal_code"));



                outputRow.set("atr", atrRow);
            

            System.out.println(outputRow);

            c.output(outputRow);
        
    

【讨论】:

以上是关于将数据流中的 STRUCT 数组写入大查询的主要内容,如果未能解决你的问题,请参考以下文章

使用“struct.pack”将数据写入文件时出错

谷歌数据流写入大查询表性能

大查询删除嵌套在 STRUCT 对象中的列

数据类型和文件格式

C 语言文件操作 ( 学生管理系统 | 命令行接收数据填充结构体 | 结构体写出到文件中 | 查询文件中的结构体数据 )

Matlab 处理中的一个struct 数组怎么用code放入到excel里