4flink自定义sourcesink
Posted xiexiandong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4flink自定义sourcesink相关的知识,希望对你有一定的参考价值。
一、Source
代码地址:https://gitee.com/nltxwz_xxd/abc_bigdata
1.1、flink内置数据源
1、基于文件
env.readTextFile("file://path")
env.readFile(inputFormat, "file://path");
2、基于socket数据源
env.socketTextStream("localhost", 6666, ‘ ‘)
3. 基于Collection
import org.apache.flink.api.scala._ env.fromCollection(List(1,2,3)) env.fromElements(1,2,3) env.generateSequence(0, 1000)
1.2、自定义数据源
1、实现SourceFunction
SourceFunction 是非并行的,所以不能指定并行度,即不能用setParallelism(num) 算子;
SocketTextStreamFunction就是实现的SourceFunction ,源码中也有详细的用例;
import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector // 需要加上这一行隐式转换 否则在调用flatmap方法的时候会报错 import org.apache.flink.api.scala._ // SourceFunction 是非并行的,所以不能指定并行度 即 不能 用 setParallelism(num) 算子 class MySourceFunction extends SourceFunction[String]{ var num: Long = 0 var isCancel: Boolean = true //在cancel的时候被执行,传递变量用于控制run方法中的执行 override def cancel(): Unit = { println("cancel") isCancel = false } // 调用run 方法向下游产生数据 override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (isCancel){ ctx.collect(s"xxd ${num}") Thread.sleep(1000) num += 1 } } } object SourceFunctionWordCount{ def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 开启spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否则打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath) // 配置有多少个solor conf.setString("taskmanager.numberOfTaskSlots","3") // 获取本地运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定义数据源 val sourceDataStream: DataStream[String] = env.addSource(new MySourceFunction) // 增加 setParallelism就会报错 // val sourceDataStream: DataStream[String] = env.addSource(new MySourceFunction).setParallelism(2) // 定义 operators,作用是解析数据,分组,窗口化,并且聚合就SUM val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] { override def flatMap(value: String, out: Collector[(String, Int)]): Unit = { val strings: Array[String] = value.split(" ") for (f <- strings) { out.collect((f, 1)) } } }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2) // 定义sink打印输出 wordCountData.print().setParallelism(2) // 打印任务执行计划 println(env.getExecutionPlan) // 运行 env.execute("Socket Window WordCount") } }
2、实现ParallelSourceFunction
ParallelSourceFunction是并行化的source所以能指定并行度
import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector import org.apache.flink.api.scala._ //ParallelSourceFunction是并行化的source所以能指定并行度 class MyParallelSource extends ParallelSourceFunction[String] { var num = 0 var isCancel = true override def cancel(): Unit = { println("cancel") isCancel = false } override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (isCancel) { ctx.collect(s"xxd ${num}") Thread.sleep(1000) num += 1 } } } object ParallelSourceWordCount { def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 开启spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否则打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath) // 配置有多少个solor conf.setString("taskmanager.numberOfTaskSlots", "8") // 获取本地运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定义数据源 val sourceDataStream: DataStream[String] = env.addSource(new MyParallelSource).setParallelism(4) // 定义 operators,作用是解析数据,分组,窗口化,并且聚合就SUM val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] { override def flatMap(value: String, out: Collector[(String, Int)]): Unit = { val strings: Array[String] = value.split(" ") for (f <- strings) { out.collect((f, 1)) } } }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2) // 定义sink打印输出 wordCountData.print().setParallelism(2) // 打印任务执行计划 println(env.getExecutionPlan) // 运行 env.execute("Socket Window WordCount") } }
3、继承RichParallelSourceFunction
RichParallelSourceFunction不仅实现了ParallelSourceFunction,还继承了AbstractRichFunction
所以RichParallelSourceFunction不仅能够并行化,还比ParallelSourceFunction增加了open和close方法、getRuntimeContext
import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector import org.apache.flink.api.scala._ //RichParallelSourceFunction不但能并行化 //还比ParallelSourceFunction增加了open和close方法、getRuntimeContext class MyRichParallelSource extends RichParallelSourceFunction[String]{ var num = 0 var isCancel = true //初始化 在source开启的时候执行一次,比如可以在这里开启mysql的连接 override def open(parameters: Configuration): Unit = { println("open") num = 100 } //在source关闭的时候执行一次 //比如mysql连接用完了,给还回连接池 override def close(): Unit = { while (isMysql){ Thread.sleep(1000) println("close sleep") } println("close") num = 0 } //在输出的时候被执行,传递变量用于控制run方法中的执行 //这个是被手动触发,在执行完cancel之后,会再执行close override def cancel(): Unit = { println("cancel") isCancel = false } //调用run方法向下游产生数据 //手动cancel之后,不会等待run方法中处理结束而是强制执行close方法 //这样就可能导致run方法中正在使用的连接被close了 //所以此时需要加一个处理完成标识,用于判断是否可以进行close var isMysql = false override def run(ctx: SourceFunction.SourceContext[String]): Unit = { import scala.util.control.Breaks._ breakable{ while (isCancel){ println(getRuntimeContext.getIndexOfThisSubtask) // 获取执行的taskid ctx.collect(s"xxd ${num}") Thread.sleep(2000) num += 1 if (num > 1200){ break() } } } isMysql = true } } object RichParallelSourceWordCount{ def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 开启spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否则打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath) // 配置有多少个solor conf.setString("taskmanager.numberOfTaskSlots","8") // 获取本地运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定义数据源 val sourceDataStream: DataStream[String] = env.addSource(new MyRichParallelSource).setParallelism(4) // 定义 operators,作用是解析数据,分组,窗口化,并且聚合就SUM val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] { override def flatMap(value: String, out: Collector[(String, Int)]): Unit = { val strings: Array[String] = value.split(" ") for (f <- strings) { out.collect((f, 1)) } } }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2) // 定义sink打印输出 wordCountData.slotSharingGroup("xxd").print().setParallelism(2) // 打印任务执行计划 println(env.getExecutionPlan) // 运行 env.execute("Socket Window WordCount") } }
二、sink
2.1、内置数据输出源
1、基于文件
#使用TextOutputFormat stream.writeAsText("/path/to/file") #使用CsvOutputFormat stream.writeAsCsv("/path/to/file")
2、基于socket
stream.writeToSocket(host, port, SerializationSchema)
3、基于标准/错误输出
#注: 线上应用杜绝使用,采用抽样打印或者日志的方式
stream.print()
stream.printToErr()
2.2、自定义输出源
1、实现SinkFunction
import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ class MySinkFunction extends SinkFunction[(String, Int)] { override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = { println(s"value:${value}," + s"processTime:${context.currentProcessingTime()}," + s"waterMark:${context.currentWatermark()}") } } object SinkFunctionWordCount { def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 开启spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否则打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath) // 配置有多少个solor conf.setString("taskmanager.numberOfTaskSlots", "8") // 获取本地运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定义数据源 val input = env.fromElements("xxd xxd xxd") val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_, 1)) // 使用自定义的sink output.addSink(new MySinkFunction) env.execute() } }
2、继承RichSinkFunction
package com.xxd.flink.sink import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ class MyRichSinkFunction extends RichSinkFunction[(String, Int)]{ //在Sink开启的时候执行一次,比如可以在这里开启mysql的连接 override def open(parameters: Configuration): Unit = { println("open") } //在Sink关闭的时候执行一次 //比如mysql连接用完了,给还回连接池 override def close(): Unit = { println("close") } //调用invoke方法,执行数据的输出 override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = { //在rich方法中可以使用getRuntimeContext方法得到比如广播变量和累加? //getRuntimeContext.getBroadcastVariable("") println(s"value:${value}," + s"processTime:${context.currentProcessingTime()}," + s"waterMark:${context.currentWatermark()}") } } object RichSinkFunctionWordCount { def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 开启spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否则打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath) // 配置有多少个solor conf.setString("taskmanager.numberOfTaskSlots","8") // 获取本地运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定义数据源 val input = env.fromElements("xxd xxd xxd") val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_, 1)) // 使用自定义的sink output.addSink(new MyRichSinkFunction) env.execute() } }
3、使用自定义OutputFormat,然后使用stream.writeUsingOutputFormat("自定义outputFormat")
import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} class MyOutPutFormat extends OutputFormat[(String, Int)]{ //配置outputformat override def configure(parameters: Configuration): Unit = { println("configure") } //在Sink开启的时候执行一次,比如可以在这里开启mysql的连接 override def open(taskNumber: Int, numTasks: Int): Unit = { //taskNumber第几个tak,numTasks总任务数 println(s"taskNumber:${taskNumber},numTasks:${numTasks}") } //调用writeRecord方法,执行数据的输出 override def writeRecord(record: (String,Int)): Unit = { println(record) } //在Sink关闭的时候执行一次 //比如mysql连接用完了,给还回连接池 override def close(): Unit = { println("close") } } object OutputFormatWordCount { def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 开启spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否则打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath) // 配置有多少个solor conf.setString("taskmanager.numberOfTaskSlots","8") // 获取本地运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定义数据源 val input = env.fromElements("xxd xxd xxd") val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_,1)) //使用自定义的outputFormat output.writeUsingOutputFormat(new MyOutPutFormat) env.execute() } }
以上是关于4flink自定义sourcesink的主要内容,如果未能解决你的问题,请参考以下文章