Apache Beam 根据前一行的值更新当前行的值

Posted

技术标签:

【中文标题】Apache Beam 根据前一行的值更新当前行的值【英文标题】:Apache Beam update current row values based on the values from previous row 【发布时间】:2021-12-16 13:11:50 【问题描述】:

Apache Beam 根据上一行的值更新值

我已将 CSV 文件中的值分组。在分组的行中,我们发现一些缺失值需要根据前一行的值进行更新。如果该行的第一列是空的,那么我们需要将其更新为 0。

我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现?

记录

customerId date amount
BS:89481 1/1/2012 100
BS:89482 1/1/2012
BS:89483 1/1/2012 300
BS:89481 1/2/2012 900
BS:89482 1/2/2012 200
BS:89483 1/2/2012

分组记录

customerId date amount
BS:89481 1/1/2012 100
BS:89481 1/2/2012 900
BS:89482 1/1/2012
BS:89482 1/2/2012 200
BS:89483 1/1/2012 300
BS:89483 1/2/2012

更新缺失值

customerId date amount
BS:89481 1/1/2012 100
BS:89481 1/2/2012 900
BS:89482 1/1/2012 000
BS:89482 1/2/2012 200
BS:89483 1/1/2012 300
BS:89483 1/2/2012 300

到目前为止的代码:

public class GroupByTest 
    public static void main(String[] args) throws IOException 
        System.out.println("We are about to start!!");

        final File schemaFile = new File(
                "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\schema_transform2.avsc");

        File csvFile = new File(
                "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\CustomerRequest-case2.csv");
        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions 
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows
        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

        // Transformation
        //Convert row to KV
        final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
            .aggregateField("balance", Sum.ofDoubles(), "balances");

        final PCollection<Row> aggregagte = rows.apply(combine);

        PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
        
                        
        
        pipeline.run().waitUntilFinish();
        System.out.println("The end");

    

    private static String getColumnValue(String columnName, Row row, Schema sourceSchema) 
        String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
        LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
        if (logicalType != null) 
            type = logicalType.getName();
        

        switch (type) 
        case "string":
            return row.getString(columnName);
        case "int":
            return Objects.requireNonNull(row.getInt32(columnName)).toString();
        case "bigint":
            return Objects.requireNonNull(row.getInt64(columnName)).toString();
        case "double":
            return Objects.requireNonNull(row.getDouble(columnName)).toString();
        case "timestamp-millis":
            return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();

        default:
            return row.getString(columnName);

        
    




修改后的代码: 原代码

final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
        .aggregateField("amount", Sum.ofDoubles(), "balances");

按客户 ID 分组

class ToKV extends DoFn<Row, KV<String, Row>> 

    private static final long serialVersionUID = -8093837716944809689L;
    String columnName1 = null;

    @ProcessElement
    public void processElement(ProcessContext context) 
        Row row = context.element();
        org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
        context.output(KV.of(row.getValue(columnName1).toString(), row));
    

    public void setColumnName1(String columnName1) 
        this.columnName1 = columnName1;
    



按客户 ID 分组:

ToKV toKV = new ToKV();
toKV.setColumnName1("ID");
PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
    
    
PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());

// 尝试按日期分组

    PCollection<Row> outputRow = 
            groupedKVRows
            .apply(ParDo.of(new GroupByDate()))
            .setCoder(RowCoder.of(AvroUtils.toBeamSchema(schema)));

如何编写将 Iterable 转换为 pCollection 的逻辑,以便对日期进行排序。

class GroupByDate extends DoFn<KV<String,Iterable<Row>>, Row> 

    private static final long serialVersionUID = -1345126662309830332L;

    @ProcessElement
    public void processElement(ProcessContext context) 
        String strKey = context.element().getKey();
        Iterable<Row> rows = context.element().getValue();
        
    
        
        
    

Avro 架构:


  "type" : "record",
  "name" : "Entry",
  "namespace" : "transform",
  "fields" : [  
    "name" : "customerId",
    "type" : [ "string", "null" ]
  , 
    "name" : "date",
    "type" : [ "string", "null" ],
    "logicalType": "date"
    
  , 
    "name" : "amount",
    "type" : [ "double", "null" ]
   ]

更新将 PCollection 转换为 Row[]

class KVToRow extends DoFn<KV<String, Iterable<Row>>, Row[]> 

    private static final long serialVersionUID = -1345126662309830332L;

    @ProcessElement
    public void processElement(ProcessContext context) 
        String strKey = context.element().getKey();
        List<Row> rowList = new ArrayList();
        Iterable<Row> rowValue = context.element().getValue();
        rowValue.forEach(data -> 
            rowList.add(data);

        );
        Row[] rowArray = new Row[rowList.size()-1];
        rowArray=rowList.toArray(rowArray);
        context.output(rowArray);
    

建议代码

Row[] rowArray = Iterables.toArray(rows, Row.class);

错误:

Iterables 类型中的 toArray(Iterable extends T>, Class) 方法不适用于参数 (PCollection, Class)

将可迭代对象转换为数组

Row[] rowArray =  groupedKVRows.apply(ParDo.of(new KVToRow()));

错误:

此行有多个标记 - 类型不匹配:无法从 PCollection 转换 到行[] - 1 行更改,2 行删除

【问题讨论】:

【参考方案1】:

Beam 不提供任何订单保证,因此您必须像以前那样对它们进行分组。

但据我所知,您需要按customerId 分组。之后,您可以应用 ParDo 之类的 PTransform 以按 date 对分组的行进行排序,并根据需要填充缺失值。

转换为数组的排序示例

static class SortAndForwardFillFn extends DoFn<KV<String, Iterable<Row>>> 

    @ProcessElement
    public void processElement(@Element KV<String, Iterable<Row>> element, OutputReceiver<KV<String, Iterable<Row>>> outputReceiver) 

        // Create a formatter for parsing dates
        DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");

        // Convert iterable to array
        Row[] rowArray = Iterables.toArray(rows, Row.class);

        // Sort array using dates
        Arrays
            .sort(
                rowArray,
                Comparator
                .comparingLong(row -> formatter.parseDateTime(row.getString("date")).getMillis())
        );

        // Store the last amount
        Double lastAmount = 0.0;

        // Create a List for storing sorted and filled rows
        List<Row> resultRows = new ArrayList<>(rowArray.length);

        // Iterate over the array and fill in the missing parts
        for (Row row : rowArray) 

            // Get current amount
            Double currentAmount = row.getDouble("amount");

            // If null, fill the previous value and add to results, 
            // otherwise add as it is
            resultRows.add(...);
        

        // Output using the output receiver
        outputReceiver
            .output(
                KV.of(element.getKey(), resultRows)
            )
        );
    

【讨论】:

在我的代码之前,我已经同时按 customerID 和 date 进行了分组,现在,根据您的建议,我仅按 customerID 对行进行分组,我正在努力按日期对记录进行分组。请帮我一个 sudo 代码! 我已更新问题以包含修改后的代码!! 您不必将值转换为 PCollection 进行排序。简单地说,以任何适合 Java 的方式(stream.sorted()Arrays.sort() 等)对Iterable&lt;Row&gt; 进行排序。 这个答案看起来相当不错,尽管为了清楚起见,我建议将其构建为DoFn。这将有助于澄清输入类型,尤其是。例如,从上下文来看,rows 应该是一个 Iterable&lt;Row&gt;,它是一个 DoFn 的元素输入。 (如果您愿意,我可以编辑帖子以执行此操作。) @User27854 PCollection 的返回类型是这种方法的预期返回类型。元素现在应该对前向填充的记录数组进行排序,然后您可以从那里进行任何您想要的后续工作。

以上是关于Apache Beam 根据前一行的值更新当前行的值的主要内容,如果未能解决你的问题,请参考以下文章

如何将当前行的负值转移到数据框中的前一行?

linux之 vim 常用命令

SQLAlchemy中所有行的高效更新

根据其他行的值更新列[关闭]

使用 python 根据 apache Beam 中的条件调用特定的 pubsub 主题

如何使用 Apache BEAM 在 BigQuery 中执行快速联接