2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql
Posted 大数据Manor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql相关的知识,希望对你有一定的参考价值。
前言
大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。
这次是上篇文章的续集,最新的Flink版本大大简化了之前复杂的写法~
之前的文章
首先准备模拟数据:
//1、准备配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "node1.itcast.cn:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("KafkaCustomPartitioner.class", "test.KafkaCustomPartitioner");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Kafka的一系列配置,可以从官网直接copy过来@~@~
然后正式生产模拟数据:
//2、创建KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"};
Random random = new Random();
while (true){
//随机生成分类和金额
int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
String category = categorys[index];//获取的随机分类
double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机数,*100之后表示[0~100)
CategoryPojo categoryPojo = new CategoryPojo(category, price,System.currentTimeMillis());
String data = JSON.toJSONString(categoryPojo);
//3、发送数据
kafkaProducer.send(new ProducerRecord<String, String>("topicDemo",data));
System.out.println("数据是"+data);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这里的实体类用Lombok,比较简单:
这是之前写的Lombok用法文章
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CategoryPojo {
private String category;//分类名称
private double price;//该分类总销售额
private long time;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
}
有了数据写入Kafka,我们开始消费“她”:
设置一下Flink运行环境:
//TODO 1.设置环境env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1,表示不分区
env.setParallelism(1);
配置Kafka相关并从哪里开始读offset
//TODO 2设置Kafka相关参数
Properties props = new Properties();
//kafka的地址,消费组名
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.88.161:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"category");
//Flink设置kafka的offset,从最新的开始
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"myDemo",
new SimpleStringSchema(),
props
);
consumer.setStartFromLatest();
consumer.setCommitOffsetsOnCheckpoints(true);
第3步解析数据源并测试:
DataStreamSource<String> source = env.addSource(consumer);
SingleOutputStreamOperator<Order> mapDS = source.map(new MapFunction<String, Order>() {
@Override
public Order map(String s) throws Exception {
JSONObject jsonObject = JSON.parseObject(s);
Order order = JSON.toJavaObject(jsonObject, Order.class);
return order;
}
});
//测试一下
mapDS.print();
success!
最后存入mysql
//sink输出到Mysql
result.addSink(JdbcSink.sink(
"INSERT INTO t_order(category,price,time) values(?,?,?)",
(ps,order)->{
ps.setString(1,order.category);
ps.setDouble(2,order.price);
ps.setLong(3,order.time);
},
//批处理
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://192.168.88.163:3306/bigdata?characterEncoding=utf-8") //jdbc
.withUsername("root") //配置用户名
.withPassword("123456") //密码
.withDriverName("com.mysql.jdbc.Driver") //驱动类
.build()
));
env.execute();
以上就是全部内容了,感谢您的阅读!
另外补充一些不成熟的代码:双流Join
//双流Join
SingleOutputStreamOperator<Order> order1watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem1WaterMark());
SingleOutputStreamOperator<Order> order2watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem2WaterMark());
//商品ID=订单ID
final DataStream<Order> result = order1watermark.join(order2watermark)
.where(o1 -> o1.category)
.equalTo(o2 -> o2.category)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply((o1, o2) -> {
Order order = new Order();
order.setCategory(o1.category);
order.setPrice(o2.price);
order.setTime(o2.time);
return order;
});
// result.print();
水印机制,简化了直接使用系统时间
//水印机制
public static class OrderItem2WaterMark implements WatermarkStrategy<Order>{
@Override
public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Order>() {
@Override
public void onEvent(Order order, long l, WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
}
};
}
@Override
public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (element,recordTimestamp)->System.currentTimeMillis();
}
}
public static class OrderItem1WaterMark implements WatermarkStrategy<Order> {
@Override
public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (element, recordTimestamp) -> System.currentTimeMillis();
}
@Override
public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Order>() {
@Override
public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
};
}
}
好了,终于完成了✅
双流join不怎么会写,慢慢来吧,
毕竟对于考60分的人,下一次考80分已经是极大的进步~~
总结
以上便是Flink数据写入Kafka+从Kafka存入Mysql(二)~
喜欢的小伙伴欢迎一键三连
!!!
我是manor
,一枚相信技术改变世界的码农,我们下期再见~
以上是关于2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql的主要内容,如果未能解决你的问题,请参考以下文章
2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(建议收藏!!)
2021年最新最全Flink系列教程__Flink高级API
2021年最新最全Flink系列教程_Flink流批一体API