FLINK 基于1.15.2的Java开发-在flink内如何使用log4j
Posted TGITCIC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-在flink内如何使用log4j相关的知识,希望对你有一定的参考价值。
背景
在代码中我们不要使用System.out.println,这个是不正规的。网上有很多使用log4j.properties的文章,也不对。
其实在flink内是直接支持log4j2的。因此我们要使用log4j2.
因此,我们在项目中需要加入slf4j的相关依赖如下,但一定要记得把flink-connector-redis带入的slf4j-log4j12给exclude掉,否则你的log输出不了的,会有冲突的。
flink内log4j2的正确pom依赖
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>compile</scope>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>compile</scope>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>compile</scope>
<version>2.17.1</version>
</dependency>
<!-- redis特性-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
然后在项目的src/main/resources下写一个log4j2.xml文件
log4j2详细配置
<!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出 -->
<!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数 -->
<configuration status="ALL">
<!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
<!--变量配置 -->
<Properties>
<!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符 -->
<!-- %logger36 表示 Logger 名字最长36个字符 -->
<property name="LOG_PATTERN"
value="%dateHH:mm:ss.SSS [%thread] %-5level %logger36 - %msg%n" />
<!-- 定义日志存储的路径 -->
<property name="FILE_PATH" value="../logs" />
<property name="FILE_NAME" value="FlinkKafka2Redis" />
</Properties>
<appenders>
<console name="Console" target="SYSTEM_OUT">
<!--输出日志的格式 -->
<PatternLayout pattern="$LOG_PATTERN" />
<!--控制台只输出level及其以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
<ThresholdFilter level="DEBUG" onMatch="ACCEPT"
onMismatch="DENY" />
</console>
<!--文件会打印出所有信息,这个log每次运行程序会自动清空,由append属性决定,适合临时测试用 -->
<File name="Filelog" fileName="$FILE_PATH/FlinkKafka2Redis.log"
append="false">
<PatternLayout pattern="$LOG_PATTERN" />
</File>
<!-- 这个会打印出所有的info及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 -->
<RollingFile name="RollingFileInfo"
fileName="$FILE_PATH/info.log"
filePattern="$FILE_PATH/$FILE_NAME-INFO-%dyyyy-MM-dd_%i.log.gz">
<!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
<ThresholdFilter level="info" onMatch="ACCEPT"
onMismatch="DENY" />
<PatternLayout pattern="$LOG_PATTERN" />
<Policies>
<!--interval属性用来指定多久滚动一次,默认是1 hour -->
<TimeBasedTriggeringPolicy interval="1" />
<SizeBasedTriggeringPolicy size="10MB" />
</Policies>
<!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
<DefaultRolloverStrategy max="15" />
</RollingFile>
<RollingFile name="RollingFileDebug"
fileName="$FILE_PATH/FlinkKafka2Redis-debug.log"
filePattern="$FILE_PATH/$FILE_NAME-INFO-%dyyyy-MM-dd_%i.log.gz">
<!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
<ThresholdFilter level="debug" onMatch="ACCEPT"
onMismatch="DENY" />
<PatternLayout pattern="$LOG_PATTERN" />
<Policies>
<!--interval属性用来指定多久滚动一次,默认是1 hour -->
<TimeBasedTriggeringPolicy interval="1" />
<SizeBasedTriggeringPolicy size="10MB" />
</Policies>
<!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
<DefaultRolloverStrategy max="15" />
</RollingFile>
<!-- 这个会打印出所有的warn及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 -->
<RollingFile name="RollingFileWarn"
fileName="$FILE_PATH/FlinkKafka2Redis-warn.log"
filePattern="$FILE_PATH/$FILE_NAME-WARN-%dyyyy-MM-dd_%i.log.gz">
<!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
<ThresholdFilter level="warn" onMatch="ACCEPT"
onMismatch="DENY" />
<PatternLayout pattern="$LOG_PATTERN" />
<Policies>
<!--interval属性用来指定多久滚动一次,默认是1 hour -->
<TimeBasedTriggeringPolicy interval="1" />
<SizeBasedTriggeringPolicy size="10MB" />
</Policies>
<!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
<DefaultRolloverStrategy max="15" />
</RollingFile>
<!-- 这个会打印出所有的error及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 -->
<RollingFile name="RollingFileError"
fileName="$FILE_PATH/FlinkKafka2Redis-error.log"
filePattern="$FILE_PATH/$FILE_NAME-ERROR-%dyyyy-MM-dd_%i.log.gz">
<!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
<ThresholdFilter level="error" onMatch="ACCEPT"
onMismatch="DENY" />
<PatternLayout pattern="$LOG_PATTERN" />
<Policies>
<!--interval属性用来指定多久滚动一次,默认是1 hour -->
<TimeBasedTriggeringPolicy interval="1" />
<SizeBasedTriggeringPolicy size="10MB" />
</Policies>
<!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
<DefaultRolloverStrategy max="15" />
</RollingFile>
</appenders>
<!--Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等。 -->
<!--然后定义loggers,只有定义了logger并引入的appender,appender才会生效 -->
<loggers>
<!--过滤掉spring和mybatis的一些无用的DEBUG信息 -->
<logger name="org.mybatis" level="info" additivity="false">
<AppenderRef ref="Console" />
</logger>
<!--监控系统信息 -->
<!--若是additivity设为false,则 子Logger 只会在自己的appender里输出,而不会在 父Logger 的appender里输出。 -->
<Logger name="org.springframework" level="info"
additivity="false">
<AppenderRef ref="Console" />
</Logger>
<root level="INFO">
<appender-ref ref="Console" />
<appender-ref ref="Filelog" />
<appender-ref ref="RollingFileInfo" />
<appender-ref ref="RollingFileWarn" />
<appender-ref ref="RollingFileDebug" />
<appender-ref ref="RollingFileError" />
</root>
</loggers>
</configuration>
调用例子
然后,你就可以在代码任意处这样调用log4j了。
private final static Logger logger = LoggerFactory.getLogger(ProductBeanJSONDeSerializer.class);
logger.error(">>>>>>invoke redis operation to sink data into redis->" + this.keyName + " error: "
+ e.getMessage(), e);
flink在eclipse运行时就会在和项目平级的目录下自动新建一个logs目录,并在这个目录内按照文件名.warn,文件名.error,文件名.debug这样的方式输出正确的log了。并且项目在启动时也不会抛slf4j冲突的warning message了。
以上是关于FLINK 基于1.15.2的Java开发-在flink内如何使用log4j的主要内容,如果未能解决你的问题,请参考以下文章
FLINK 基于1.15.2的Java开发-自定义Redis Sink用于连接 Redis Sentinel模式
FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜
FLINK 基于1.15.2的Java开发-连接kafka并把内容sink到redis