Flink 操作示例 —— 输入与输出

Posted lemos

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 操作示例 —— 输入与输出相关的知识,希望对你有一定的参考价值。

输入

实现 SourceFunction[...]

object SourceFunctionExample {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val numbers = env.addSource(new CountSource)
    numbers.print()
    env.execute()
  }
}

class CountSource extends SourceFunction[Long] {

  var isRunning: Boolean = true

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {

    var cnt: Long = -1
    while (isRunning && cnt < Long.MaxValue) {
      cnt += 1
      ctx.collect(cnt)
    }
  }

  override def cancel(): Unit = isRunning = false
}

 

输出

实现 RichSinkFunction[...]

object SinkFunctionExample {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.setAutoWatermarkInterval(1000L)

    val readings = env
      .addSource(new SensorSource)
      .assignTimestampsAndWatermarks(new SensorTimeAssigner)

    readings.addSink(new SimpleSocketSink("localhost", 9191))
      .setParallelism(1)

    env.execute()
  }
}

class SimpleSocketSink(val host: String, val port: Int) extends RichSinkFunction[SensorReading] {

  var socket: Socket = _
  var writer: PrintStream = _

  override def open(parameters: Configuration): Unit = {
    socket = new Socket(host, port)
    writer = new PrintStream(socket.getOutputStream)
  }

  override def close(): Unit = {
    writer.close()
    socket.close()
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    writer.println(value.toString)
    writer.flush()
  }
}

 

233

以上是关于Flink 操作示例 —— 输入与输出的主要内容,如果未能解决你的问题,请参考以下文章

Flink 系例 之 Connectors 连接 Redis

Flink 状态编程

Flink Table API & SQL 基本操作

Flink Table API & SQL 基本操作

片段(Java) | 机试题+算法思路+考点+代码解析 2023

如何使用Apache Flink阅读Cassandra?