FLINK 基于1.15.2的Java开发-在flink内如何使用log4j

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-在flink内如何使用log4j相关的知识,希望对你有一定的参考价值。

背景

在代码中我们不要使用System.out.println,这个是不正规的。网上有很多使用log4j.properties的文章,也不对。

其实在flink内是直接支持log4j2的。因此我们要使用log4j2.

因此,我们在项目中需要加入slf4j的相关依赖如下,但一定要记得把flink-connector-redis带入的slf4j-log4j12给exclude掉,否则你的log输出不了的,会有冲突的。

flink内log4j2的正确pom依赖

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <scope>compile</scope>
            <version>2.17.1</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <scope>compile</scope>
            <version>2.17.1</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <scope>compile</scope>
            <version>2.17.1</version>
        </dependency>
<!-- redis特性-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

然后在项目的src/main/resources下写一个log4j2.xml文件

log4j2详细配置

<!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出 -->
<!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数 -->
<configuration status="ALL">
    <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
 
    <!--变量配置 -->
    <Properties>
        <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符 -->
        <!-- %logger36 表示 Logger 名字最长36个字符 -->
        <property name="LOG_PATTERN"
            value="%dateHH:mm:ss.SSS [%thread] %-5level %logger36 - %msg%n" />
        <!-- 定义日志存储的路径 -->
        <property name="FILE_PATH" value="../logs" />
        <property name="FILE_NAME" value="FlinkKafka2Redis" />
    </Properties>
 
    <appenders>
 
        <console name="Console" target="SYSTEM_OUT">
            <!--输出日志的格式 -->
            <PatternLayout pattern="$LOG_PATTERN" />
            <!--控制台只输出level及其以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="DEBUG" onMatch="ACCEPT"
                onMismatch="DENY" />
        </console>
 
        <!--文件会打印出所有信息,这个log每次运行程序会自动清空,由append属性决定,适合临时测试用 -->
        <File name="Filelog" fileName="$FILE_PATH/FlinkKafka2Redis.log"
            append="false">
            <PatternLayout pattern="$LOG_PATTERN" />
        </File>
 
        <!-- 这个会打印出所有的info及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 -->
        <RollingFile name="RollingFileInfo"
            fileName="$FILE_PATH/info.log"
            filePattern="$FILE_PATH/$FILE_NAME-INFO-%dyyyy-MM-dd_%i.log.gz">
            <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="info" onMatch="ACCEPT"
                onMismatch="DENY" />
            <PatternLayout pattern="$LOG_PATTERN" />
            <Policies>
                <!--interval属性用来指定多久滚动一次,默认是1 hour -->
                <TimeBasedTriggeringPolicy interval="1" />
                <SizeBasedTriggeringPolicy size="10MB" />
            </Policies>
            <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
            <DefaultRolloverStrategy max="15" />
        </RollingFile>
        <RollingFile name="RollingFileDebug"
            fileName="$FILE_PATH/FlinkKafka2Redis-debug.log"
            filePattern="$FILE_PATH/$FILE_NAME-INFO-%dyyyy-MM-dd_%i.log.gz">
            <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="debug" onMatch="ACCEPT"
                onMismatch="DENY" />
            <PatternLayout pattern="$LOG_PATTERN" />
            <Policies>
                <!--interval属性用来指定多久滚动一次,默认是1 hour -->
                <TimeBasedTriggeringPolicy interval="1" />
                <SizeBasedTriggeringPolicy size="10MB" />
            </Policies>
            <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
            <DefaultRolloverStrategy max="15" />
        </RollingFile>
 
        <!-- 这个会打印出所有的warn及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 -->
        <RollingFile name="RollingFileWarn"
            fileName="$FILE_PATH/FlinkKafka2Redis-warn.log"
            filePattern="$FILE_PATH/$FILE_NAME-WARN-%dyyyy-MM-dd_%i.log.gz">
            <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="warn" onMatch="ACCEPT"
                onMismatch="DENY" />
            <PatternLayout pattern="$LOG_PATTERN" />
            <Policies>
                <!--interval属性用来指定多久滚动一次,默认是1 hour -->
                <TimeBasedTriggeringPolicy interval="1" />
                <SizeBasedTriggeringPolicy size="10MB" />
            </Policies>
            <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
            <DefaultRolloverStrategy max="15" />
        </RollingFile>
 
        <!-- 这个会打印出所有的error及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 -->
        <RollingFile name="RollingFileError"
            fileName="$FILE_PATH/FlinkKafka2Redis-error.log"
            filePattern="$FILE_PATH/$FILE_NAME-ERROR-%dyyyy-MM-dd_%i.log.gz">
            <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="error" onMatch="ACCEPT"
                onMismatch="DENY" />
            <PatternLayout pattern="$LOG_PATTERN" />
            <Policies>
                <!--interval属性用来指定多久滚动一次,默认是1 hour -->
                <TimeBasedTriggeringPolicy interval="1" />
                <SizeBasedTriggeringPolicy size="10MB" />
            </Policies>
            <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
            <DefaultRolloverStrategy max="15" />
        </RollingFile>
 
    </appenders>
 
    <!--Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等。 -->
    <!--然后定义loggers,只有定义了logger并引入的appender,appender才会生效 -->
    <loggers>
 
        <!--过滤掉spring和mybatis的一些无用的DEBUG信息 -->
        <logger name="org.mybatis" level="info" additivity="false">
            <AppenderRef ref="Console" />
        </logger>
        <!--监控系统信息 -->
        <!--若是additivity设为false,则 子Logger 只会在自己的appender里输出,而不会在 父Logger 的appender里输出。 -->
        <Logger name="org.springframework" level="info"
            additivity="false">
            <AppenderRef ref="Console" />
        </Logger>
 
        <root level="INFO">
            <appender-ref ref="Console" />
            <appender-ref ref="Filelog" />
            <appender-ref ref="RollingFileInfo" />
            <appender-ref ref="RollingFileWarn" />
            <appender-ref ref="RollingFileDebug" />
            <appender-ref ref="RollingFileError" />
        </root>
    </loggers>
 
</configuration>

调用例子

然后,你就可以在代码任意处这样调用log4j了。

private final static Logger logger = LoggerFactory.getLogger(ProductBeanJSONDeSerializer.class);
 
logger.error(">>>>>>invoke redis operation to sink data into redis->" + this.keyName + " error: "
                + e.getMessage(), e);

flink在eclipse运行时就会在和项目平级的目录下自动新建一个logs目录,并在这个目录内按照文件名.warn,文件名.error,文件名.debug这样的方式输出正确的log了。并且项目在启动时也不会抛slf4j冲突的warning message了。

以上是关于FLINK 基于1.15.2的Java开发-在flink内如何使用log4j的主要内容,如果未能解决你的问题,请参考以下文章

FLINK 基于1.15.2的Java开发-自定义Redis Sink用于连接 Redis Sentinel模式

FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜

FLINK 基于1.15.2的Java开发-连接kafka并把内容sink到redis

FLINK 基于1.15.2的Java开发-入门

FLINK 基于1.15.2的Java开发-自定义Source端

FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境