flink 并行计数器实现

Posted asker009

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink 并行计数器实现相关的知识,希望对你有一定的参考价值。

1、flink实现计数器的灵感来源于Hadoop的MapReduce计算框架里的理念。

flink通过实现Accumulator接口实现并行计数。并行管理是由flink实现的。

public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable 

计数的结果通过JobExecutionResul的getAccumulatorResult方法t获取。

2、示例,在正常业务处理流程中对空字段计数,空字段包括null、空格、TAB等内容。这场景比较多见。

public class EmptyFieldsCountAccumulator 
    private static final String EMPTY_FIELD_ACCUMULATOR= "empty-fields";

    public static void main(String args[]) throws Exception
        final ParameterTool params = ParameterTool.fromArgs(args);

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get the data set
        final DataSet<StringTriple> file = getDataSet(env, params);

        // filter lines with empty fields
        final DataSet<StringTriple> filteredLines = file.filter(new EmptyFieldFilter());

        // Here, we could do further processing with the filtered lines...
        JobExecutionResult result;
        // output the filtered lines
        if (params.has("output")) 
            filteredLines.writeAsCsv(params.get("output"));
            // execute program
            result = env.execute("Accumulator example");
         else 
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            filteredLines.print();
            result = env.getLastJobExecutionResult();
        

        // get the accumulator result via its registration key
        final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
        System.out.format("Number of detected empty fields per column: %s\n", emptyFields);
    

    @SuppressWarnings("unchecked")
    private static DataSet<StringTriple> getDataSet(ExecutionEnvironment env, ParameterTool params) 
        if (params.has("input")) 
            return env.readCsvFile(params.get("input"))
                    .fieldDelimiter(";")
                    .pojoType(StringTriple.class);
         else 
            System.out.println("Executing EmptyFieldsCountAccumulator example with default input data set.");
            System.out.println("Use --input to specify file input.");
            return env.fromCollection(getExampleInputTuples());
        
    

    private static Collection<StringTriple> getExampleInputTuples() 
        Collection<StringTriple> inputTuples = new ArrayList<StringTriple>();
        inputTuples.add(new StringTriple("John", "Doe", "Foo Str."));
        inputTuples.add(new StringTriple("Joe", "Johnson", ""));
        inputTuples.add(new StringTriple(null, "Kate Morn", "Bar Blvd."));
        inputTuples.add(new StringTriple("Tim", "Rinny", ""));
        inputTuples.add(new StringTriple("Alicia", "Jackson", "  "));
        inputTuples.add(new StringTriple("Alicia", "Jackson", "  "));
        inputTuples.add(new StringTriple("Alicia", "Jackson", "  "));
        inputTuples.add(new StringTriple("Tom", "Jackson", "A"));
        inputTuples.add(new StringTriple("Amy", "li", "B  "));
        return inputTuples;
    

    /**
     * This function filters all incoming tuples that have one or more empty fields.
     * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under
     * @link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR).
     */
    public static final class EmptyFieldFilter extends RichFilterFunction<StringTriple> 

        // create a new accumulator in each filter function instance
        // accumulators can be merged later on
        private final VectorAccumulator emptyFieldCounter = new VectorAccumulator();

        @Override
        public void open(final Configuration parameters) throws Exception 
            super.open(parameters);

            // register the accumulator instance
            getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
                    this.emptyFieldCounter);
        

        @Override
        public boolean filter(final StringTriple t) 
            boolean containsEmptyFields = false;

            // iterate over the tuple fields looking for empty ones
            for (int pos = 0; pos < t.getArity(); pos++) 

                final String field = t.getField(pos);
                if (field == null || field.trim().isEmpty()) 
                    containsEmptyFields = true;

                    // if an empty field is encountered, update the
                    // accumulator
                    this.emptyFieldCounter.add(pos);
                
            

            return !containsEmptyFields;
        
    

    /**
     * This accumulator maintains a vector of counts. Calling @link #add(Integer) increments the
     * <i>n</i>-th vector component. The size of the vector is automatically managed.
     * 这个向量计数器输入是整数,输出是List,并按字段位置计数,List里的索引就是字段计数位置,其值就是计数结果
     */
    public static class VectorAccumulator implements Accumulator<Integer,ArrayList<Integer>>
        //存储计数器向量
        private final ArrayList<Integer> resultVector;

        public VectorAccumulator() 
            this(new ArrayList<>());
        

        public VectorAccumulator(ArrayList<Integer> resultVector) 
            this.resultVector = resultVector;
        

        private void updateResultVector(int position,int delta)
            //如果给出的位置不够就扩充向量容器
            while (this.resultVector.size()<=position)
                this.resultVector.add(0);
            

            final int component = this.resultVector.get(position);
            this.resultVector.set(position,component+delta);
        

        //在指定位置加1
        @Override
        public void add(Integer position) 
            updateResultVector(position,1);
        

        @Override
        public ArrayList<Integer> getLocalValue() 
            return this.resultVector;
        

        @Override
        public void resetLocal() 
            this.resultVector.clear();
        

        @Override
        public void merge(Accumulator<Integer, ArrayList<Integer>> other) 
            //合并两个向量计数器容器,按容器的索引合并
            final ArrayList<Integer> otherVector = other.getLocalValue();
            for(int i=0;i<otherVector.size();i++)
                updateResultVector(i,otherVector.get(i));
            
        

        @Override
        public Accumulator<Integer, ArrayList<Integer>> clone() 
            return new VectorAccumulator(new ArrayList<>(this.resultVector));
        

        @Override
        public String toString() 
            return StringUtils.join(this.resultVector,‘:‘);
        
    


    public static class StringTriple extends Tuple3<String, String, String> 

        public StringTriple() 

        public StringTriple(String f0, String f1, String f2) 
            super(f0, f1, f2);
        

    

 

 

 

以上是关于flink 并行计数器实现的主要内容,如果未能解决你的问题,请参考以下文章

在并行位片代码中实现快速计数器

flink计数不同的问题

Flink 流式传输,如何进行计数?

OpenMP 嵌套循环任务并行性,计数器未给出正确结果

Flink 滑动计数窗口行为

学习笔记Flink—— Flink开发环境配置及运行实例(单词计数)