flink流计算随笔
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink流计算随笔相关的知识,希望对你有一定的参考价值。
MACOS下安装flink:
$ brew install apache-flink
...
$ flink --version
$brew upgrade
MACOS下启动flink:
$cd /usr/local/Cellar/apache-flink/1.6.0
$./libexec/bin/start-cluster.sh
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.scala.examples.socket
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* Implements a streaming windowed version of the "WordCount" program.
*
* This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text sever (at port 12345)
* using the ‘‘netcat‘‘ tool via
* {{{
* nc -l 12345
* }}}
* and run this example with the hostname and the port as arguments..
*/
object SocketWindowWordCount {
/** Main program method */
def main(args: Array[String]) : Unit = {
// the host and the port to connect to
var hostname: String = "localhost"
var port: Int = 0
try {
val params = ParameterTool.fromArgs(args)
hostname = if (params.has("hostname")) params.get("hostname") else "localhost"
port = params.getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run ‘SocketWindowWordCount " +
"--hostname <hostname> --port <port>‘, where hostname (localhost by default) and port " +
"is the address of the text server")
System.err.println("To start a simple text server, run ‘netcat -l <port>‘ " +
"and type the input text into the command line")
return
}
}
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text: DataStream[String] = env.socketTextStream(hostname, port, ‘
‘)
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
/** Data type for words with count */
case class WordWithCount(word: String, count: Long)
}
访问127.0.0.1:8081
?
测试:
//执行完nc输入单词,程序会开始记数。
$nc -l 9002
//开另一个xshell,执行运行程序的命令
$flink run /usr/local/Cellar/apache-flink/1.6.0/libexec/examples/streaming/SocketWindowWordCount.jar --port 9002 //到log目录下可以看到输出了记数的文件
?
linux下安装
1.Download a binary from the?downloads page
2.
$ cd ~/Downloads # Go to download directory
$ tar xzf flink-*.tgz # Unpack the downloaded archive
$ cd flink-1.6.1
3.
$ ./bin/start-cluster.sh # Start Flink
??http://localhost:8081?访问
bogon:1.6.0 myhaspl$ ./libexec/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 11100) on host bogon.
Stopping standalonesession daemon (pid: 10689) on host bogon.
以上是关于flink流计算随笔的主要内容,如果未能解决你的问题,请参考以下文章