Flink 操作示例 —— 输入与输出
Posted lemos
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 操作示例 —— 输入与输出相关的知识,希望对你有一定的参考价值。
输入
实现 SourceFunction[...]
object SourceFunctionExample { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val numbers = env.addSource(new CountSource) numbers.print() env.execute() } } class CountSource extends SourceFunction[Long] { var isRunning: Boolean = true override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { var cnt: Long = -1 while (isRunning && cnt < Long.MaxValue) { cnt += 1 ctx.collect(cnt) } } override def cancel(): Unit = isRunning = false }
输出
实现 RichSinkFunction[...]
object SinkFunctionExample { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.setAutoWatermarkInterval(1000L) val readings = env .addSource(new SensorSource) .assignTimestampsAndWatermarks(new SensorTimeAssigner) readings.addSink(new SimpleSocketSink("localhost", 9191)) .setParallelism(1) env.execute() } } class SimpleSocketSink(val host: String, val port: Int) extends RichSinkFunction[SensorReading] { var socket: Socket = _ var writer: PrintStream = _ override def open(parameters: Configuration): Unit = { socket = new Socket(host, port) writer = new PrintStream(socket.getOutputStream) } override def close(): Unit = { writer.close() socket.close() } override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { writer.println(value.toString) writer.flush() } }
233
以上是关于Flink 操作示例 —— 输入与输出的主要内容,如果未能解决你的问题,请参考以下文章
Flink 系例 之 Connectors 连接 Redis