Flink的累加器和广播变量广播流分布式缓存
Posted 月疯
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的累加器和广播变量广播流分布式缓存相关的知识,希望对你有一定的参考价值。
1、Accumulator累加器
Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。可以在Flink job任务中的算子函数中使用累加器,但是只能在任务执行结束后才能获得累加器的最终结果。Counter是一个具体的累加器(Accumulator)实现,常用的Counter有IntCounter,LongCounter和DoubleCounter。
用法:
1:创建累加器
private IntCounter numLines = new IntCounter();
2:注册累加器
getRuntimeContext().addAccumulator("num-lines",this.numLines);
3:使用累加器
this.numLines.add(1);
4:获取累加器的结果
myJobExcutionResult.getAccumulatorResult("num-lines")
案列:统计map算子处理数据的条数
package Flink_API;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.configuration.Configuration;
/**
* 统计一下map函数处理了多少条数据
*/
public class BatchCounterTest
public static void main(String[] args) throws Exception
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource=env.fromElements("1","2","3","4","5");
DataSet<String> map = dataSource.map(new RichMapFunction<String, String>()
// 1:创建累加器
private IntCounter numLines = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception
//注册累加器
getRuntimeContext().addAccumulator("num-lines", numLines);
@Override
public String map(String s) throws Exception
//使用累加器
numLines.add(1);
return s;
).setParallelism(5);
map.print();
env.execute("BatchCounterTest");
2、广播变量:是通过广播将广播变量分发到taskmanager中进行处理
广播变量的使用步骤:
1、初始化数据
DataSet<Integer> toBroadcast = env.fromElements(1,2,3);
2、广播数据(即注册数据,那个算子用,就在那个算子后面进行注册)
算子.withBroadcastSet(toBroadcast,"broadcastSetName");
3、获取数据
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
实例程序:Flink从数据园中静静可以获取到用户的性命,最终需要将用户的性命和年龄信息打印出来。
package Flink_API;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class BatchBroadcastTest
public static void main(String[] args)
//获取Flink的运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//准备需要的广播数据
ArrayList<Tuple2<String,Integer>> broadData=new ArrayList<>();
broadData.add(new Tuple2<>("wtt",29));
broadData.add(new Tuple2<>("lidong",30));
broadData.add(new Tuple2<>("hengda",40));
DataSource<Tuple2<String,Integer>> tupleData=env.fromCollection(broadData);
//处理需要广播的数据,将数据集转换成Map类型,Map中的key就是用户的性命,value就是用户年龄。
DataSet<HashMap<String,Integer>> toBroadCast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String,Integer>>()
@Override
public HashMap<String, Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception
HashMap<String,Integer> hashMap=new HashMap<>();
hashMap.put(stringIntegerTuple2.f0,stringIntegerTuple2.f1);
return hashMap;
).setParallelism(3);//到此,广播的数据已经准备好了
//注意:在这里使用RichMapFunction获取广播变量
//数据源单纯的姓名信息
DataSource<String> nameDataSource = env.fromElements("wtt","lidong","hengda");
DataSet<String> data=nameDataSource.map(new RichMapFunction<String, String>()
List<HashMap<String,Integer>> broadCastMap=new ArrayList<HashMap<String,Integer>>();
HashMap<String,Integer> allMap=new HashMap<String,Integer>();
/**
* 1、类似MR当中的setup方法,只会执行一次
* 2、可以在这里进行一些初始化操作
* 3、可以在open方法当中获取广播变量
*/
@Override
public void open(Configuration parameters) throws Exception
super.open(parameters);
//获取广播数据
broadCastMap = getRuntimeContext().getBroadcastVariable("toBroadCastMapName");
for(HashMap map:broadCastMap)
allMap.putAll(map);//最终保存的格式就是"name":"age"
/**
*
*每次条用map方法从allMap中获取数据即可
*/
@Override
public String map(String s) throws Exception
return s;
);
3、广播流:批处理当中就是广播变量,流处理当中就是广播流
package Flink_API;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Properties;
//广播流
public class FlinkBroadcastStream
public static void main(String[] args) throws Exception
//创建运行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//Flink是以数据自带的时间戳字段为准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置并行度
env.setParallelism(1);
//1、获取第一个流,获取用户的浏览信息
DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);
//获取用户的黑名单流信息
//2、获取用户的点击信息
DataStream<BlackUser> blackUserDataStream = getUserBlackUserDataStream(env);
//1定义一个MapStateDescriptor来描述我们要广播的数据的格式
MapStateDescriptor<String,BlackUser> descriptor=new MapStateDescriptor<String, BlackUser>("userBlackList",String.class,BlackUser.class);
//2将其中的配置数据源注册成广播流
BroadcastStream<BlackUser> broadcastStream = blackUserDataStream.broadcast(descriptor);
//3通过connect连接主流和广播流
DataStream<UserBrowseLog> filterDataStream = browseStream.connect(broadcastStream)
.process(new BroadcastProcessFunction<UserBrowseLog, BlackUser, UserBrowseLog>()
@Override
public void processElement(UserBrowseLog value, ReadOnlyContext readOnlyContext, Collector<UserBrowseLog> collector) throws Exception
//从广播中获取对应的key的value
ReadOnlyBroadcastState<String,BlackUser> broadcastState=readOnlyContext.getBroadcastState(descriptor);
BlackUser blackUser=broadcastState.get(value.userID);
if(blackUser !=null)
System.out.print("用户"+value.userID + "在黑名单中,过滤掉该用户的浏览信息");
else
collector.collect(value);
@Override
public void processBroadcastElement(BlackUser value, Context context, Collector<UserBrowseLog> collector) throws Exception
//实时更新广播流当中的数据
BroadcastState<String,BlackUser> broadcastState=context.getBroadcastState(descriptor);
broadcastState.put(value.userID,value);
System.out.print("------------------>广播流当前的数据是:---------------->");
System.out.print(broadcastState);
);
filterDataStream.print();
env.execute("FlinkBroadcastStream");
private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env)
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.severs","page01:9001");
consumerProperties.setProperty("grop.id","browsegroup");
DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>()
@Override
public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception
try
UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
if(browseLog !=null)
collector.collect(browseLog);
catch(Exception e)
System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
);
//设置watermark
return processData;
private static DataStream<BlackUser> getUserBlackUserDataStream(StreamExecutionEnvironment env)
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.severs","page01:9002");
consumerProperties.setProperty("grop.id","browsegroup");
DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
DataStream<BlackUser> processData=dataStreamSource.process(new ProcessFunction<String, BlackUser>()
@Override
public void processElement(String s, Context context, Collector<BlackUser> collector) throws Exception
try
BlackUser blackUser = com.alibaba.fastjson.JSON.parseObject(s, BlackUser.class);
if(blackUser !=null)
collector.collect(blackUser);
catch(Exception e)
System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
);
return processData;
//定义用户黑名单的配置信息
public static class BlackUser implements Serializable
private String userID;
private String userName;
public BlackUser()
public BlackUser(String userID, String userName)
this.userID = userID;
this.userName = userName;
public String getUserID()
return userID;
public void setUserID(String userID)
this.userID = userID;
public String getUserName()
return userName;
public void setUserName(String userName)
this.userName = userName;
//浏览类
public static class UserBrowseLog implements Serializable
private String userID;
private String eventTime;
private String eventType;
private String productID;
private Integer productPrice;
public String getUserID()
return userID;
public void setUserID(String userID)
this.userID = userID;
public String getEventTime()
return eventTime;
public void setEventTime(String eventTime)
this.eventTime = eventTime;
public String getEventType()
return eventType;
public void setEventType(String eventType)
this.eventType = eventType;
public String getProductID()
return productID;
public void setProductID(String productID)
this.productID = productID;
public Integer getProductPrice()
return productPrice;
public void setProductPrice(Integer productPrice)
this.productPrice = productPrice;
@Override
public String toString()
return "UserBrowseLog" +
"userID='" + userID + '\\'' +
", eventTime='" + eventTime + '\\'' +
", eventType='" + eventType + '\\'' +
", productID='" + productID + '\\'' +
", productPrice=" + productPrice +
'';
4、Flink分布式缓存Distributed Cache
- Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
- 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它
注册:
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");
使用:
File myFile = getRuntimeContext().getDistributedCache().getFile("a.text");
a.text文件
hello flink hello FLINK
完整代码:
public class DisCacheTest
public static void main(String[] args) throws Exception
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");
DataSource<String> data = env.fromElements("a", "b", "c", "d");
DataSet<String> result = data.map(new RichMapFunction<String, String>()
private ArrayList<String> dataList = new ArrayList<String>();
@Override
public void open(Configuration parameters) throws Exception
super.open(parameters);
//2:使用文件
File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
List<String> lines = FileUtils.readLines(myFile);
for (String line : lines)
this.dataList.add(line);
System.err.println("分布式缓存为:" + line);
@Override
public String map(String value) throws Exception
//在这里就可以使用dataList
System.err.println("使用datalist:" + dataList + "------------" +value);
//业务逻辑
return dataList +":" + value;
);
result.printToErr();
//
以上是关于Flink的累加器和广播变量广播流分布式缓存的主要内容,如果未能解决你的问题,请参考以下文章