Flink之Redis的安装及RedisSink的用法
Posted 月疯
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink之Redis的安装及RedisSink的用法相关的知识,希望对你有一定的参考价值。
maven:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
redis是key-value的形式存储。
redis的安装:
编译安装redis到指定的目录下面
下载地址:http://download.redis.io/releases/
1、tar -zxvf redis-3.2.8.tar.gz -C /usr/local/download/
2、安装gcc支持
yum install -y gcc
3、cd /usr/local/download/redis-3.2.8
make PREFIX=/usr/local/software/redis-3.2.8 install
4、创建软连接
ln -s /usr/local/software/redis-3.2.8 /usr/local/software/redis
5、配置环境变量
编辑/etc/profile
最后一行
export REDIS_HOME=/usr/local/software/redis
export PATH=$PATH:$REDIS_HOME/bin
6、让环境变量生效
source /etc/profile
启动reids服务:
cd /usr/local/software/redis-3.2.8 redis-server &查看端口号:
cd /usr/local/software/redis-3.2.8 netstat -anop |grep 6379启动cli连接程序端
redis-cli -h localhost -p 6379使用set name huitao
Flink里面使用redis:
package Flink_API;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Properties;
public class TestRedis
//主要介绍Flink里面Redis的用法
public static void main(String[] args) throws Exception
//创建运行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//Flink是以数据自带的时间戳字段为准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置并行度
env.setParallelism(1);
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.severs","page01:9001");
consumerProperties.setProperty("grop.id","browsegroup");
DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>()
@Override
public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception
try
UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
if(browseLog !=null)
collector.collect(browseLog);
catch(Exception e)
System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
);
//每个用户浏览商品最大记录
DataStream<UserBrowseLog> maxData=processData.keyBy("userID").maxBy("productPrice");
maxData.print();
//配置redis
FlinkJedisConfigBase conf=new FlinkJedisPoolConfig.Builder().setHost("192.168.208.200").setPort(6379).build();
maxData.addSink(new RedisSink<>(conf,new MyRedisMapper()));
//程序的入口类
env.execute("TestRedis");
public static class MyRedisMapper implements RedisMapper<UserBrowseLog>
/**
* 指定rredis中的那种操作,这里用SET操作(写入)
*/
@Override
public RedisCommandDescription getCommandDescription()
return new RedisCommandDescription(RedisCommand.SET);
/**
* 表示从接受数据中获取需要操作的key
* @param userBrowseLog
* @return
*/
@Override
public String getKeyFromData(UserBrowseLog userBrowseLog)
return userBrowseLog.getUserID();
/**
* 表示从接受的数据中获取需要操作的redis value
* @param userBrowseLog
* @return
*/
@Override
public String getValueFromData(UserBrowseLog userBrowseLog)
return String.valueOf(userBrowseLog.getProductPrice());
//浏览类
public static class UserBrowseLog implements Serializable
private String userID;
private String eventTime;
private String eventType;
private String productID;
private Integer productPrice;
public String getUserID()
return userID;
public void setUserID(String userID)
this.userID = userID;
public String getEventTime()
return eventTime;
public void setEventTime(String eventTime)
this.eventTime = eventTime;
public String getEventType()
return eventType;
public void setEventType(String eventType)
this.eventType = eventType;
public String getProductID()
return productID;
public void setProductID(String productID)
this.productID = productID;
public Integer getProductPrice()
return productPrice;
public void setProductPrice(Integer productPrice)
this.productPrice = productPrice;
@Override
public String toString()
return "UserBrowseLog" +
"userID='" + userID + '\\'' +
", eventTime='" + eventTime + '\\'' +
", eventType='" + eventType + '\\'' +
", productID='" + productID + '\\'' +
", productPrice=" + productPrice +
'';
以上是关于Flink之Redis的安装及RedisSink的用法的主要内容,如果未能解决你的问题,请参考以下文章
FLINK 基于1.15.2的Java开发-自定义Redis Sink用于连接 Redis Sentinel模式
Flink 系例 之 Connectors 连接 Redis