学习笔记Flink—— Flink开发环境配置及运行实例(单词计数)

Posted 别呀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了学习笔记Flink—— Flink开发环境配置及运行实例(单词计数)相关的知识,希望对你有一定的参考价值。

一、Intellij IDEA 环境配置

1、创建Maven工程

1.1、开发环境

Maven && JDK


1.2、Pom配置

Compiler Configuration

在pom.xml添加:

<properties>
 	<maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
</properties>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

Flink Dependency
在pom.xml添加:

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.10.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.10.1</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>
</dependencies>

然后IDEA就会自动帮我们添加依赖


1.3、Log配置

log4j.properties
在resources下建立log4j.properties文件并添加(linux上flink安装目录conf下的log4j-console.properties内容):

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

1.4、添加Scala支持

在main文件下新建一个scala目录,并设置为source root

添加scala支持


二、案例:Flink单词计算本地实战

代码:

package demo
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
  def main(args: Array[String]): Unit = {
    val host = "node110"
    val port = 9999
    val windowSeconds = 5

    //get env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //create data source
    val source = env.socketTextStream(host, port)
    val counts = source
      .flatMap { line => line.toLowerCase.split("\\\\W+").filter(word => word.nonEmpty) }
      .map { word => (word, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(windowSeconds))
      .sum(1)
    //add sink
    counts.print()
    //execute
    env.execute("Window Stream Word Count with paremters")
  }
}

测试:
linux执行命令:nc -lk 9999


三、Flink集群运行实战

3.1、代码修改

if(args.length != 3){
  println("Usage: WindowWordCount <socketHost> <socketPort> <windowSeconds>")
  System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val windowSeconds = args(2).toInt

3.2、程序打包 & 上传

3.3、运行

先开启一个会话(注意先开启,否则执行会出现一堆东西)

再开一个会话


输入数据:

网页查看:


四、Dataset API实现(单词计数)

数据文件 input.txt:

I am a student
I love the world

代码:

package demo

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment

object WordCount {
  def main(args: Array[String]): Unit = {
    //get env
    val env = ExecutionEnvironment.getExecutionEnvironment
    //create data source
    val source = env.readTextFile("D:\\\\java test\\\\flink_test\\\\src\\\\main\\\\resources\\\\input.txt")
    val counts = source
      .flatMap { line => line.toLowerCase.split("\\\\W+").filter(word => word.nonEmpty) }
      .map { word => (word, 1) }
      .groupBy(0)
      .sum(1)
    //add sink
    counts.writeAsText("D:\\\\java test\\\\flink_test\\\\target\\\\output00")
    //execute
    env.execute("Batch Word Count")
  }
}

结果

以上是关于学习笔记Flink—— Flink开发环境配置及运行实例(单词计数)的主要内容,如果未能解决你的问题,请参考以下文章

Flink学习笔记:搭建Flink on Yarn环境并运行Flink应用

Flink学习笔记:Flink的最简安装

Flink学习笔记:Flink的最简安装

学习笔记Flink—— Flink安装启动与监控

学习笔记Flink—— Flink安装启动与监控

学习笔记Flink—— Flink DataStream API编程