Flink ParameterTool fromArgs源码分析
Posted 浅然言而信
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink ParameterTool fromArgs源码分析相关的知识,希望对你有一定的参考价值。
一、源码路径
java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
二、源码
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.socket;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Implements a streaming windowed version of the "WordCount" program.
*
* <p>This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text server (at port 12345)
* using the <i>netcat</i> tool via
* <pre>
* nc -l 12345
* </pre>
* and run this example with the hostname and the port as arguments.
*/
@SuppressWarnings("serial")
public class SocketWindowWordCount
public static void main(String[] args) throws Exception
//链接主机名和端口
// the host and the port to connect to
final String hostname;//主机名
final int port;//端口
try
/**
* 参数解析
*/
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
catch (Exception e)
System.err.println("No port specified. Please run 'SocketWindowWordCount " +
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
"type the input text into the command line");
return;
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>()
@Override
public void flatMap(String value, Collector<WordWithCount> out)
for (String word : value.split("\\\\s"))
out.collect(new WordWithCount(word, 1L));
)
.keyBy("word")//根据word字段分组(hash函数分组)
.timeWindow(Time.seconds(5))//时间窗口设为5秒,即非活动阈值。
.reduce(new ReduceFunction<WordWithCount>()
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b)
return new WordWithCount(a.word, a.count + b.count);
);
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
// ------------------------------------------------------------------------
/**
* Data type for words with count.
*/
public static class WordWithCount
public String word;
public long count;
public WordWithCount()
public WordWithCount(String word, long count)
this.word = word;
this.count = count;
@Override
public String toString()
return word + " : " + count;
三、源码分析
输入字符串参数的解析
/**
* 参数解析
*/
final ParameterTool params = ParameterTool.fromArgs(args);
编写ParameterTool测试类
package org.apache.flink.api.java.utils;
/**
* ParameterTool测试类
* linjie
*/
public class testParameter
public static void main(String[] args)
args = new String[]"-count","-1","-count2","-2";
final ParameterTool params = ParameterTool.fromArgs(args);
调试进入ParameterTool,笔者已在源码处编写详细注释,重点关注NumberUtils.isNumber(args[i])
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.utils;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.math.NumberUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 这个类提供了用于从不同源读取和解析程序参数的简单实用方法
* This class provides simple utility methods for reading and parsing program arguments from different sources.
*/
@Public
public class ParameterTool extends ExecutionConfig.GlobalJobParameters implements Serializable, Cloneable
private static final long serialVersionUID = 1L;
protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";
protected static final String DEFAULT_UNDEFINED = "<undefined>";
public static ParameterTool fromArgs(String[] args)
final Map<String, String> map = new HashMap<>(args.length / 2);//因为args是键值对形式,所以这里map大小是args的一半
//将args其中的key,value加入到map中的入口
int i = 0;
while (i < args.length)
/**
* 获取key,key在args的索引是奇数
*/
final String key;
//例子:args[0] = --input-topic
//startsWith("--"):判断参数前缀是否是--
//key=args[0]从第二个位置到结束的字符
if (args[i].startsWith("--"))
key = args[i].substring(2);
else if (args[i].startsWith("-"))
key = args[i].substring(1);
else
throw new IllegalArgumentException(
String.format("Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.",
Arrays.toString(args), args[i]));
if (key.isEmpty())
throw new IllegalArgumentException(
"The input " + Arrays.toString(args) + " contains an empty argument");
/**
* 获取value,value在args的索引是偶数,索引i+=1
*/
//关于NumberUtils.isNumber(args[i])对value值的判断目的如下:
//NumberUtils.isNumber(args[i]):因为key是可能没有value的:new String[]"--input","--output",这样也是可以的
//input=NO_VALUE_KEY
//output=NO_VALUE_KEY
//如果args是这样的:new String[]"-count","-1"
//如果没有NumberUtils.isNumber(args[i])判断
//count=NO_VALUE_KEY
//1=NO_VALUE_KEY
//但如果有NumberUtils.isNumber(args[i])判断
//结果就是count=-1
//new String[]"-count","-1","-count2","-2"
i += 1; // try to find the value
if (i >= args.length)
map.put(key, NO_VALUE_KEY);
else if (NumberUtils.isNumber(args[i]))
map.put(key, args[i]);
i += 1;
else if (args[i].startsWith("--") || args[i].startsWith("-"))
// the argument cannot be a negative number because we checked earlier
// -> the next argument is a parameter name
map.put(key, NO_VALUE_KEY);
else
map.put(key, args[i]);
i += 1;
return fromMap(map);
以上是关于Flink ParameterTool fromArgs源码分析的主要内容,如果未能解决你的问题,请参考以下文章
flink任务使用ParameterTool加载配置报错:No data for required key ‘redis.port‘
flink任务使用ParameterTool加载配置报错:No data for required key ‘redis.port‘
flink任务使用ParameterTool加载配置报错:No data for required key ‘redis.port‘