10-flink-1.10.1- flink Sink api 输出算子

Posted 逃跑的沙丁鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10-flink-1.10.1- flink Sink api 输出算子相关的知识,希望对你有一定的参考价值。

1 flink sink

 2 file sink

package com.study.liucf.unbounded.sink

import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._

/**
 * @Author liucf
 * @Date 2021/9/13
 */
object FileSink {
  def main(args: Array[String]): Unit = {
    //创建flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //读取数据
    val inputStream: DataStream[String] = env.readTextFile("src\\\\main\\\\resources\\\\sensor.txt")
    //转换数据类型 string 类型转换成LiucfSensorReding,求最小值
    val ds = inputStream.map(r=>{
      val arr = r.split(",")
      LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
    })
    //输出到控制台
    ds.print()
    //输出到文件
//    ds
//      .writeAsCsv("src\\\\main\\\\resources\\\\sensor.csv")
//      .setParallelism(1)//默认会分布式并行执行根据多少并行度生成多少文件,这里我让它生成一个文件
    ds.addSink(StreamingFileSink.forRowFormat(
      new Path("src\\\\main\\\\resources\\\\sensor2.csv"),
      new SimpleStringEncoder[LiucfSensorReding]()
    ).build())
    //可见writeAsCSV已经被弃用了
    //启动flink执行
    env.execute("liucf sink api")
  }
}

以上是关于10-flink-1.10.1- flink Sink api 输出算子的主要内容,如果未能解决你的问题,请参考以下文章

在一个 flink 作业中使用 collect() 和 env.execute()

Flink 1.7.1 无法使用 core-site.xml 对 s3a 进行身份验证

Allegro Sigrity SI和PCB SI有啥不同

无线数传模块SI4463SI4438SI4432方案无线通信比对

请问PCB设计中的SI设计和SI仿真有啥关系

超低功耗智能门锁Si522A/Si523/Si512--具有超低功耗自动载波侦测功能(ACD功能)的13.56MHz芯片