大数据flink 读取文件数据写入ElasticSearch

Posted 逆风飞翔的小叔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据flink 读取文件数据写入ElasticSearch相关的知识,希望对你有一定的参考价值。

前言

es是大数据存储的必备中间件之一,通过flink可以读取来自日志文件,kafka等外部数据源的数据,然后写入到es中,本篇将通过实例演示下完整的操作过程;

一、前置准备

1、提前搭建并开启es服务(本文使用docker搭建的es7.6的服务);

2、提前搭建并开启kibana服务(便于操作es的索引数据);

3、提前创建一个测试用的索引

PUT test_index

注意点:

使用docker搭建的es,可能会出现创建完毕索引后,插入数据报错的问题,即提示无操作权限的问题,如果出现这个问题,请执行下面的这段,否则在运行flink代码的时候也会报错;

PUT _settings

  "index": 
    "blocks": 
      "read_only_allow_delete": "false"
    
  

二、编写程序

1、导入基础的pom依赖

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>

        <!--新引入的包-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.11.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>

    </dependencies>

2、准备一个外部文件用于程序读取

csv 文件内容如下

3、核心程序代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;

public class SinkEs 

    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //从环境的集合中获取数据
        String path = "E:\\\\code-self\\\\flink_study\\\\src\\\\main\\\\resources\\\\userinfo.txt";
        DataStreamSource<String> inputStream = env.readTextFile(path);
        SingleOutputStreamOperator<UserInfo> dataStream = inputStream.map(new MapFunction<String, UserInfo>() 
            @Override
            public UserInfo map(String value) throws Exception 
                String[] fields = value.split(",");
                return new UserInfo(fields[0], fields[1]);
            
        );
        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("IP",9200));

        ElasticsearchSink.Builder<UserInfo> result =new ElasticsearchSink.Builder<UserInfo>(httpHosts, new ElasticsearchSinkFunction<UserInfo>() 

            @Override
            public void process(UserInfo element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) 
                //具体数据写入的操作
                HashMap<String, String> dataSource = new HashMap<>();
                dataSource.put("id",element.getId());
                dataSource.put("name",element.getName());
                //创建请求作为向es写入的请求命令
                IndexRequest indexRequest = Requests.indexRequest().index("test_index").source(dataSource);
                //发送请求
                requestIndexer.add(indexRequest);
            
        );
        result.setBulkFlushMaxActions(1);
        dataStream.addSink(result.build());
        env.execute();
        System.out.println("数据写入es成功");

    

上面代码中涉及到的一个UserInfo对象

public class UserInfo 

    private String id;
    private String name;

    public UserInfo() 
    

    public UserInfo(String id, String name) 
        this.id = id;
        this.name = name;
    

    public String getId() 
        return id;
    

    public void setId(String id) 
        this.id = id;
    

    public String getName() 
        return name;
    

    public void setName(String name) 
        this.name = name;
    

运行上面的程序,观察控制台输出

4、使用kibana查询结果

执行下面的查询语句

GET test_index/_search

  "query": 
    "match_all": 
  

看到下面的结果,说明数据成功写入到es 

 程序运行过程中的问题总结

本次编写代码向es导入数据时,遇到了2点问题,在此做一下记录,避免后面的踩坑

1、报错截图如下

大概的意思是通过flink程序写入到es的时候,时间类型对不上,解决办法是,在程序中添加如下的代码:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

2、报错截图如下

大概的意思是:ElasticSearch进入“只读”模式,只允许删除,网上给出了一些解决方案说是内存不足导致的,但是我设置了好像不行,最后的解决办法就是文章开头说的那样,做一下设置即可,即设置为false;

PUT _settings

  "index": 
    "blocks": 
      "read_only_allow_delete": "false"
    
  

以上是关于大数据flink 读取文件数据写入ElasticSearch的主要内容,如果未能解决你的问题,请参考以下文章

Flink Streaming-Sink

大数据ClickHouse(十九):Flink 写入 ClickHouse API

Flink 如何读取和写入 Clickhouse?

大数据Flink进阶:Flink入门案例

大数据flink保证Exactly_Once的理解

Spark SQL大数据处理并写入Elasticsearch