Flink ParameterTool fromArgs源码分析

Posted 浅然言而信


篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink ParameterTool fromArgs源码分析相关的知识,希望对你有一定的参考价值。




 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.flink.streaming.examples.socket;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

 * Implements a streaming windowed version of the "WordCount" program.
 * <p>This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text server (at port 12345)
 * using the <i>netcat</i> tool via
 * <pre>
 * nc -l 12345
 * </pre>
 * and run this example with the hostname and the port as arguments.
public class SocketWindowWordCount 

   public static void main(String[] args) throws Exception 

      // the host and the port to connect to
      final String hostname;//主机名
      final int port;//端口
          * 参数解析
         final ParameterTool params = ParameterTool.fromArgs(args);
         hostname = params.has("hostname") ? params.get("hostname") : "localhost";
         port = params.getInt("port");
       catch (Exception e) 
         System.err.println("No port specified. Please run 'SocketWindowWordCount " +
            "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
            "and port is the address of the text server");
         System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
            "type the input text into the command line");

      // get the execution environment
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // get input data by connecting to the socket
      DataStream<String> text = env.socketTextStream(hostname, port, "\\n");

      // parse the data, group it, window it, and aggregate the counts
      DataStream<WordWithCount> windowCounts = text

            .flatMap(new FlatMapFunction<String, WordWithCount>() 
               public void flatMap(String value, Collector<WordWithCount> out) 
                  for (String word : value.split("\\\\s")) 
                     out.collect(new WordWithCount(word, 1L));


            .reduce(new ReduceFunction<WordWithCount>() 
               public WordWithCount reduce(WordWithCount a, WordWithCount b) 
                  return new WordWithCount(a.word, a.count + b.count);

      // print the results with a single thread, rather than in parallel

      env.execute("Socket Window WordCount");

   // ------------------------------------------------------------------------

    * Data type for words with count.
   public static class WordWithCount 

      public String word;
      public long count;

      public WordWithCount() 

      public WordWithCount(String word, long count) 
         this.word = word;
         this.count = count;

      public String toString() 
         return word + " : " + count;


 * 参数解析
final ParameterTool params = ParameterTool.fromArgs(args);


package org.apache.flink.api.java.utils;

 * ParameterTool测试类
 * linjie
public class testParameter 
   public static void main(String[] args) 
      args = new String[]"-count","-1","-count2","-2";
      final ParameterTool params = ParameterTool.fromArgs(args);



 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.flink.api.java.utils;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;

import org.apache.commons.lang3.math.NumberUtils;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

 * 这个类提供了用于从不同源读取和解析程序参数的简单实用方法
 * This class provides simple utility methods for reading and parsing program arguments from different sources.
public class ParameterTool extends ExecutionConfig.GlobalJobParameters implements Serializable, Cloneable 
   private static final long serialVersionUID = 1L;

   protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";
   protected static final String DEFAULT_UNDEFINED = "<undefined>";

   public static ParameterTool fromArgs(String[] args) 
      final Map<String, String> map = new HashMap<>(args.length / 2);//因为args是键值对形式,所以这里map大小是args的一半

      int i = 0;
      while (i < args.length) 

          * 获取key,key在args的索引是奇数
         final String key;
         //例子:args[0] = --input-topic
         if (args[i].startsWith("--")) 
            key = args[i].substring(2);
          else if (args[i].startsWith("-")) 
            key = args[i].substring(1);
            throw new IllegalArgumentException(
               String.format("Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.",
                  Arrays.toString(args), args[i]));

         if (key.isEmpty()) 
            throw new IllegalArgumentException(
               "The input " + Arrays.toString(args) + " contains an empty argument");

          * 获取value,value在args的索引是偶数,索引i+=1
         //NumberUtils.isNumber(args[i]):因为key是可能没有value的:new String[]"--input","--output",这样也是可以的
         //如果args是这样的:new String[]"-count","-1"
         //new String[]"-count","-1","-count2","-2"
         i += 1; // try to find the value

         if (i >= args.length) 
            map.put(key, NO_VALUE_KEY);
          else if (NumberUtils.isNumber(args[i])) 
            map.put(key, args[i]);
            i += 1;
          else if (args[i].startsWith("--") || args[i].startsWith("-")) 
            // the argument cannot be a negative number because we checked earlier
            // -> the next argument is a parameter name
            map.put(key, NO_VALUE_KEY);
            map.put(key, args[i]);
            i += 1;

      return fromMap(map);

以上是关于Flink ParameterTool fromArgs源码分析的主要内容,如果未能解决你的问题,请参考以下文章

flink任务使用ParameterTool加载配置报错:No data for required key ‘redis.port‘

flink任务使用ParameterTool加载配置报错:No data for required key ‘redis.port‘

flink任务使用ParameterTool加载配置报错:No data for required key ‘redis.port‘

Flink 如何解析与传递参数

Flink 如何解析与传递参数
