Flink之Redis的安装及RedisSink的用法

Posted 月疯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink之Redis的安装及RedisSink的用法相关的知识,希望对你有一定的参考价值。

maven:

<dependency>
     <groupId>org.apache.bahir</groupId>
     <artifactId>flink-connector-redis_2.11</artifactId>
     <version>1.0</version>
</dependency>

redis是key-value的形式存储。

redis的安装:

编译安装redis到指定的目录下面
下载地址:http://download.redis.io/releases/
1、tar -zxvf redis-3.2.8.tar.gz -C /usr/local/download/
2、安装gcc支持
yum install -y gcc
3、cd /usr/local/download/redis-3.2.8
make PREFIX=/usr/local/software/redis-3.2.8 install
4、创建软连接
ln -s /usr/local/software/redis-3.2.8  /usr/local/software/redis
5、配置环境变量
编辑/etc/profile
最后一行
export REDIS_HOME=/usr/local/software/redis
export PATH=$PATH:$REDIS_HOME/bin
6、让环境变量生效
source /etc/profile

启动reids服务:
cd  /usr/local/software/redis-3.2.8    redis-server &

查看端口号:
cd  /usr/local/software/redis-3.2.8    netstat -anop |grep 6379

启动cli连接程序端
redis-cli -h localhost -p 6379

使用set name huitao

Flink里面使用redis:

package Flink_API;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.Properties;
public class TestRedis 

        //主要介绍Flink里面Redis的用法
        public static void main(String[] args) throws Exception 
            //创建运行环境
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink是以数据自带的时间戳字段为准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //设置并行度
            env.setParallelism(1);

            Properties consumerProperties = new Properties();
            consumerProperties.setProperty("bootstrap.severs","page01:9001");
            consumerProperties.setProperty("grop.id","browsegroup");

            DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

            DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() 
                @Override
                public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception 
                    try
                        UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
                        if(browseLog !=null)
                            collector.collect(browseLog);
                        
                    catch(Exception e)
                        System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                    
                
            );

            //每个用户浏览商品最大记录
            DataStream<UserBrowseLog> maxData=processData.keyBy("userID").maxBy("productPrice");
            maxData.print();

            //配置redis
            FlinkJedisConfigBase conf=new FlinkJedisPoolConfig.Builder().setHost("192.168.208.200").setPort(6379).build();
            maxData.addSink(new RedisSink<>(conf,new MyRedisMapper()));
            //程序的入口类
            env.execute("TestRedis");

        

    public static class MyRedisMapper implements RedisMapper<UserBrowseLog> 
        /**
         * 指定rredis中的那种操作,这里用SET操作(写入)
         */
        @Override
        public RedisCommandDescription getCommandDescription() 
            return new RedisCommandDescription(RedisCommand.SET);
        

        /**
         * 表示从接受数据中获取需要操作的key
         * @param userBrowseLog
         * @return
         */
        @Override
        public String getKeyFromData(UserBrowseLog userBrowseLog) 
            return userBrowseLog.getUserID();
        

        /**
         * 表示从接受的数据中获取需要操作的redis value
         * @param userBrowseLog
         * @return
         */
        @Override
        public String getValueFromData(UserBrowseLog userBrowseLog) 
            return String.valueOf(userBrowseLog.getProductPrice());
        

    
    //浏览类
        public static class UserBrowseLog implements Serializable 
            private String userID;
            private String eventTime;
            private String eventType;
            private String productID;
            private Integer productPrice;

            public String getUserID() 
                return userID;
            

            public void setUserID(String userID) 
                this.userID = userID;
            

            public String getEventTime() 
                return eventTime;
            

            public void setEventTime(String eventTime) 
                this.eventTime = eventTime;
            

            public String getEventType() 
                return eventType;
            

            public void setEventType(String eventType) 
                this.eventType = eventType;
            

            public String getProductID() 
                return productID;
            

            public void setProductID(String productID) 
                this.productID = productID;
            

            public Integer getProductPrice() 
                return productPrice;
            

            public void setProductPrice(Integer productPrice) 
                this.productPrice = productPrice;
            

            @Override
            public String toString() 
                return "UserBrowseLog" +
                        "userID='" + userID + '\\'' +
                        ", eventTime='" + eventTime + '\\'' +
                        ", eventType='" + eventType + '\\'' +
                        ", productID='" + productID + '\\'' +
                        ", productPrice=" + productPrice +
                        '';
            
        


    

以上是关于Flink之Redis的安装及RedisSink的用法的主要内容,如果未能解决你的问题,请参考以下文章

FLINK 基于1.15.2的Java开发-自定义Redis Sink用于连接 Redis Sentinel模式

flink04 -----1 kafkaSource

Flink 系例 之 Connectors 连接 Redis

大数据之使用Flink处理Kafka中的数据到Redis

10-flink-1.10.1- flink Sink api 输出算子

redis 之 redis简介及下载安装