10-flink-1.10.1- flink Sink api 输出算子
Posted 逃跑的沙丁鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10-flink-1.10.1- flink Sink api 输出算子相关的知识,希望对你有一定的参考价值。
1 flink sink
2 file sink
package com.study.liucf.unbounded.sink
import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._
/**
* @Author liucf
* @Date 2021/9/13
*/
object FileSink {
def main(args: Array[String]): Unit = {
//创建flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//读取数据
val inputStream: DataStream[String] = env.readTextFile("src\\\\main\\\\resources\\\\sensor.txt")
//转换数据类型 string 类型转换成LiucfSensorReding,求最小值
val ds = inputStream.map(r=>{
val arr = r.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
//输出到控制台
ds.print()
//输出到文件
// ds
// .writeAsCsv("src\\\\main\\\\resources\\\\sensor.csv")
// .setParallelism(1)//默认会分布式并行执行根据多少并行度生成多少文件,这里我让它生成一个文件
ds.addSink(StreamingFileSink.forRowFormat(
new Path("src\\\\main\\\\resources\\\\sensor2.csv"),
new SimpleStringEncoder[LiucfSensorReding]()
).build())
//可见writeAsCSV已经被弃用了
//启动flink执行
env.execute("liucf sink api")
}
}
以上是关于10-flink-1.10.1- flink Sink api 输出算子的主要内容,如果未能解决你的问题,请参考以下文章
在一个 flink 作业中使用 collect() 和 env.execute()
Flink 1.7.1 无法使用 core-site.xml 对 s3a 进行身份验证
无线数传模块SI4463SI4438SI4432方案无线通信比对
超低功耗智能门锁Si522A/Si523/Si512--具有超低功耗自动载波侦测功能(ACD功能)的13.56MHz芯片