flink流计算随笔

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink流计算随笔相关的知识,希望对你有一定的参考价值。

MACOS下安装flink:

$ brew install apache-flink
...
$ flink --version
$brew upgrade

MACOS下启动flink:

$cd /usr/local/Cellar/apache-flink/1.6.0

$./libexec/bin/start-cluster.sh
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.scala.examples.socket

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * Implements a streaming windowed version of the "WordCount" program.
 * 
 * This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text sever (at port 12345) 
 * using the ‘‘netcat‘‘ tool via
 * {{{
 * nc -l 12345
 * }}}
 * and run this example with the hostname and the port as arguments..
 */
object SocketWindowWordCount {

  /** Main program method */
  def main(args: Array[String]) : Unit = {

    // the host and the port to connect to
    var hostname: String = "localhost"
    var port: Int = 0

    try {
      val params = ParameterTool.fromArgs(args)
      hostname = if (params.has("hostname")) params.get("hostname") else "localhost"
      port = params.getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port specified. Please run ‘SocketWindowWordCount " +
          "--hostname <hostname> --port <port>‘, where hostname (localhost by default) and port " +
          "is the address of the text server")
        System.err.println("To start a simple text server, run ‘netcat -l <port>‘ " +
          "and type the input text into the command line")
        return
      }
    }

    // get the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data by connecting to the socket
    val text: DataStream[String] = env.socketTextStream(hostname, port, ‘
‘)

    // parse the data, group it, window it, and aggregate the counts 
    val windowCounts = text
          .flatMap { w => w.split("\s") }
          .map { w => WordWithCount(w, 1) }
          .keyBy("word")
          .timeWindow(Time.seconds(5))
          .sum("count")

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1)

    env.execute("Socket Window WordCount")
  }

  /** Data type for words with count */
  case class WordWithCount(word: String, count: Long)
}
访问127.0.0.1:8081

?

测试:

//执行完nc输入单词,程序会开始记数。

$nc -l 9002

//开另一个xshell,执行运行程序的命令 

$flink run /usr/local/Cellar/apache-flink/1.6.0/libexec/examples/streaming/SocketWindowWordCount.jar  --port 9002 //到log目录下可以看到输出了记数的文件

?

linux下安装


1.Download a binary from the?downloads page

2.

$ cd ~/Downloads        # Go to download directory
$ tar xzf flink-*.tgz   # Unpack the downloaded archive
$ cd flink-1.6.1
3.

$ ./bin/start-cluster.sh  # Start Flink
??http://localhost:8081?访问

bogon:1.6.0 myhaspl$ ./libexec/bin/stop-cluster.sh

Stopping taskexecutor daemon (pid: 11100) on host bogon.

Stopping standalonesession daemon (pid: 10689) on host bogon.

以上是关于flink流计算随笔的主要内容,如果未能解决你的问题,请参考以下文章

flink流计算随笔

Flink流计算随笔

flink流计算随笔

flink流计算随笔

Flink流处理随笔(下)

Flink流处理随笔(上)