Flink quickstart
Posted 看0和1的故事
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink quickstart相关的知识,希望对你有一定的参考价值。
最重要的来了!!!
Flink runs on Linux, Mac OS X, and Windows
在本地使用:
官网demo代码:
使用scala写:
object SocketWindowWordCount { def main(args: Array[String]) : Unit = { // the port to connect to val port: Int = try { ParameterTool.fromArgs(args).getInt("port") } catch { case e: Exception => { System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'") return } } // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // get input data by connecting to the socket val text = env.socketTextStream("localhost", port, ' ') // parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap { w => w.split("\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .sum("count") // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1) env.execute("Socket Window WordCount") } // Data type for words with count case class WordWithCount(word: String, count: Long)}
使用java写:
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "
");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}}
下一篇就要研究flink的开发环境怎么搭建。
如果是个人玩,很简单,新建一个maven项目,直接引入包即可:
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-batch_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
${project.version} 直接换成安装部署的flink版本号吧,最新的1.3.2
有外网就是好可以直接拉包
(我还需要去外网下载包回来,再传到内网私服上,还在要现在的工程中集成flink模块,幸好flink比较独立,目前还有没有跟spring,jpa这些恶心的东西有关联,埋头调错中。个人觉得 nexus作为私服也不是很稳定啊。。。 主要还是公司网络太烂。)
测试步骤:
Run the Example
Now, we are going to run this Flink application. It will read text from a socket and once every 5 seconds print the number of occurrences of each distinct word during the previous 5 seconds, i.e. a tumbling window of processing time, as long as words are floating in.
First of all, we use netcat to start local server via
$ nc -l 9000
Submit the Flink program:
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000 Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 Using address 127.0.0.1:6123 to connect to JobManager. JobManager web interface address http://127.0.0.1:8081 Starting execution of program Submitting job with JobID: 574a10c8debda3dccd0c78a3bde55e1b. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#297388688]11/04/2016 14:04:50 Job execution switched to status RUNNING. 11/04/2016 14:04:50 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED 11/04/2016 14:04:50 Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING 11/04/2016 14:04:50 Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to SCHEDULED 11/04/2016 14:04:51 Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to DEPLOYING 11/04/2016 14:04:51 Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to RUNNING 11/04/2016 14:04:51 Source: Socket Stream -> Flat Map(1/1) switched to RUNNING
The program connects to the socket and waits for input. You can check the web interface to verify that the job is running as expected:
Words are counted in time windows of 5 seconds (processing time, tumbling windows) and are printed to
stdout
. Monitor the JobManager’s output file and write some text innc
(input is sent to Flink line by line after hitting): $ nc -l 9000 lorem ipsum ipsum ipsum ipsum bye
The
.out
file will print the counts at the end of each time window as long as words are floating in, e.g.:$ tail -f log/flink-*-jobmanager-*.out lorem : 1 bye : 1 ipsum : 4
这是local的提交方式
那么提交给yarn,mesos呢?
以上是关于Flink quickstart的主要内容,如果未能解决你的问题,请参考以下文章