Flink quickstart

Posted 看0和1的故事

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink quickstart相关的知识,希望对你有一定的参考价值。

最重要的来了!!!

Flink runs on Linux, Mac OS X, and Windows  


在本地使用:

Flink quickstart

官网demo代码:

使用scala写:

object SocketWindowWordCount {

    def main(args: Array[String]) : Unit = {

        // the port to connect to
        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
                return
            }
        }

        // get the execution environment
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        // get input data by connecting to the socket
        val text = env.socketTextStream("localhost", port, '
')

        // parse the data, group it, window it, and aggregate the counts
        val windowCounts = text
            .flatMap { w => w.split("\s") }
            .map { w => WordWithCount(w, 1) }
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .sum("count")

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1)

        env.execute("Socket Window WordCount")
    }

    // Data type for words with count
    case class WordWithCount(word: String, count: Long)}

使用java写:
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the port to connect to
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "
");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                @Override
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                @Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }}

下一篇就要研究flink的开发环境怎么搭建。

如果是个人玩,很简单,新建一个maven项目,直接引入包即可:

<dependencies>

<!-- core dependencies -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.10</artifactId>

<version>${project.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-scala_2.10</artifactId>

<version>${project.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-examples-batch_2.10</artifactId>

<version>${project.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-twitter_2.10</artifactId>

<version>${project.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.8_2.10</artifactId>

<version>${project.version}</version>

</dependency>

<!-- test dependencies -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-test-utils_2.10</artifactId>

<version>${project.version}</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.10</artifactId>

<version>${project.version}</version>

<scope>test</scope>

<type>test-jar</type>

</dependency>

</dependencies>


${project.version} 直接换成安装部署的flink版本号吧,最新的1.3.2

有外网就是好可以直接拉包


(我还需要去外网下载包回来,再传到内网私服上,还在要现在的工程中集成flink模块,幸好flink比较独立,目前还有没有跟spring,jpa这些恶心的东西有关联,埋头调错中。个人觉得 nexus作为私服也不是很稳定啊。。。 主要还是公司网络太烂。)

测试步骤:

Run the Example

Now, we are going to run this Flink application. It will read text from a socket and once every 5 seconds print the number of occurrences of each distinct word during the previous 5 seconds, i.e. a tumbling window of processing time, as long as words are floating in.

  • First of all, we use netcat to start local server via

    $ nc -l 9000
  • Submit the Flink program:

    $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
    
    Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
    Using address 127.0.0.1:6123 to connect to JobManager.
    JobManager web interface address http://127.0.0.1:8081
    Starting execution of program
    Submitting job with JobID: 574a10c8debda3dccd0c78a3bde55e1b. Waiting for job completion.
    Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#297388688]11/04/2016 14:04:50     Job execution switched to status RUNNING.
    11/04/2016 14:04:50     Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
    11/04/2016 14:04:50     Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING
    11/04/2016 14:04:50     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to SCHEDULED
    11/04/2016 14:04:51     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to DEPLOYING
    11/04/2016 14:04:51     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to RUNNING
    11/04/2016 14:04:51     Source: Socket Stream -> Flat Map(1/1) switched to RUNNING

    The program connects to the socket and waits for input. You can check the web interface to verify that the job is running as expected:


  • Words are counted in time windows of 5 seconds (processing time, tumbling windows) and are printed to stdout. Monitor the JobManager’s output file and write some text in nc (input is sent to Flink line by line after hitting  ):

    $ nc -l 9000
    lorem ipsum
    ipsum ipsum ipsum
    bye

    The .out file will print the counts at the end of each time window as long as words are floating in, e.g.:

    $ tail -f log/flink-*-jobmanager-*.out
    lorem : 1
    bye : 1
    ipsum : 4



这是local的提交方式


那么提交给yarn,mesos呢?

以上是关于Flink quickstart的主要内容,如果未能解决你的问题,请参考以下文章

Flink入门路线

Flink入门路线

Flink入门路线

mvn创建flink项目

flink流计算随笔

IDEA无法运行或调试Flink的解决方式