FLINK 基于1.15.2的Java开发-如何使用外部配置文件
Posted TGITCIC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-如何使用外部配置文件相关的知识,希望对你有一定的参考价值。
前言
Flink在代码中写一个连接IP、端口,这不是我们需要的。
我们也希望Flink在取这些配置时有一个类似的properties文件如下所示:
#redis config
redis.host=localhost:27001,localhost:27002,localhost:27003
redis.sentinel.master=master1
redis.password=111111
jedis.pool.min-idle=25
jedis.pool.max-active=100
jedis.pool.max-idle=100
jedis.pool.max-wait=-1
jedis.pool.timeBetweenEvictionRunsMillis=5000
jedis.pool.testOnBorrow=true;
jedis.pool.testWhileIdle=true
connection.timeout=0
redis.selected.database=0
#kafka config
kafka.host=127.0.0.1
kafka.port=9092
kafka.bootstrapservers=127.0.0.1:9092
kafka.topic=test
然后在代码里就可以这样的形式来使用了
KafkaSource<ProductBean> source = KafkaSource.<ProductBean>builder()
.setBootstrapServers(paras.get("kafka.bootstrapservers")).setTopics(paras.get("kafka.topic"))
.setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer(true, true)))
.build();
Flink是标准Java,你完全可以使用下面任意一种方式来读入外部properties文件内容:
- 通过外部配置注册中心读入key->value形式的properties;
- 也可以通过flink打包后内的properties文件,读入key->value键值;
- 可以通过外部注入一个properties文件,读入key->value键值;
这三种方式都可以。但是无论是哪种方式这都属于“外围”技术,flink的核心是通过Configuration这个数据结构实现的读取key->value键值的。我个人比较倾向于使用“通过外部注入一个properties文件”的方法。因为在实际生产运行环境下,我们经常会要去改properties文件中的内容。改完后我们不想再重新打包了,直接重新提交或者重启就可以了。
为了说明这个Cofiguration的用法,我们先用伪指令梳理出flink内使用Configuration的全过程,在这个过程下,你可以用我上述提到的任何方法去读入你想要的properties文件内容。
flink读取外部properties文件步骤伪指令
Flink是通过以下方式来实现的。
- 第一步,我们声明一个config.properties文件,在磁盘一个路径上;
- 第二步,flink运行时带上这个参数-config_path /Users/chrishu123126.com/opt/eclipse-workspace/FlinkKafka2Redis/config.properties;
- 第三步:通过以下三行代码把传入的外部配置文件中的key:value值读入ParameterTools对象内:
- ParameterTool.fromArgs(args);
- String propertiesFilePath = argParas.get("config_path");
- ParameterTool paras = ParameterTool.fromPropertiesFile(propertiesFilePath);
- 第四步,通过org.apache.flink.configuration.Configuration,把这个读入的外部配置内容,一个个塞到这个Configuration对象里;
- 第五步,把这个Configuration通过:env.getConfig().setGlobalJobParameters(conf);设成一个Flink该Job全局变量;
- 在这个Flink Job运行的任何地方可以通过这样的代码来使用:
-
ExecutionConfig.GlobalJobParameters parameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
-
Configuration globConf = (Configuration) parameters;
- globConf.getString(ConfigOptions.key("redis.host").stringType().noDefaultValue());或者是globConf.getInteger(ConfigOptions.key("redis.port").intType().noDefaultValue());
-
样例代码
下面给出样例代码:
config.properties
#redis config
redis.host=localhost:27001,localhost:27002,localhost:27003
redis.sentinel.master=master1
redis.password=111111
jedis.pool.min-idle=25
jedis.pool.max-active=100
jedis.pool.max-idle=100
jedis.pool.max-wait=-1
jedis.pool.timeBetweenEvictionRunsMillis=5000
jedis.pool.testOnBorrow=true;
jedis.pool.testWhileIdle=true
connection.timeout=0
redis.selected.database=0
#kafka config
kafka.host=127.0.0.1
kafka.port=9092
kafka.bootstrapservers=127.0.0.1:9092
kafka.topic=test
运行flink赋-config参数
读入config.properties代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool argParas = ParameterTool.fromArgs(args);
String propertiesFilePath = argParas.get("config_path");
logger.info(">>>>>>start to load properties from ", propertiesFilePath);
ParameterTool paras = ParameterTool.fromPropertiesFile(propertiesFilePath);
使用Configuration来存储读入的properties文件内容
Configuration conf = new Configuration();
conf.setString("redis.host", paras.get("redis.host"));
conf.setString("redis.password", paras.get("redis.password"));
conf.setInteger("redis.selected.database", paras.getInt("redis.selected.database"));
conf.setString("redis.sentinel.master", paras.get("redis.sentinel.master"));
conf.setString("jedis.pool.min-idle", paras.get("jedis.pool.min-idle"));
conf.setString("jedis.pool.max-active", paras.get("jedis.pool.max-active"));
conf.setString("jedis.pool.max-idle", paras.get("jedis.pool.max-idle"));
conf.setString("jedis.pool.max-wait", paras.get("jedis.pool.max-wait"));
conf.setString("jedis.pool.timeBetweenEvictionRunsMillis",
paras.get("jedis.pool.timeBetweenEvictionRunsMillis"));
conf.setString("jedis.pool.testOnBorrow", paras.get("jedis.pool.testOnBorrow"));
conf.setString("jedis.pool.testWhileIdle", paras.get("jedis.pool.testWhileIdle"));
conf.setString("connection.timeout", paras.get("connection.timeout"));
env.getConfig().setGlobalJobParameters(conf);
然后我们就可以在需要使用的地方以getRuntimeContext().getExecutionConfig().getGlobalJobParameters()来使用外部变量了
ExecutionConfig.GlobalJobParameters parameters = getRuntimeContext().getExecutionConfig()
.getGlobalJobParameters();
Configuration globConf = (Configuration) parameters;
//开始使用
String redisPassword=globConf.getString(ConfigOptions.key("redis.password").stringType().noDefaultValue()));
Integer port = globConf.getInteger(ConfigOptions.key("redis.port").intType().noDefaultValue());
以上是关于FLINK 基于1.15.2的Java开发-如何使用外部配置文件的主要内容,如果未能解决你的问题,请参考以下文章
FLINK 基于1.15.2的Java开发-连接kafka并把内容sink到redis
FLINK 基于1.15.2的Java开发-自定义Source端
FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境