Flink 实践教程:入门:写入 Elasticsearch

Posted 小小的一朵云

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 实践教程:入门:写入 Elasticsearch相关的知识,希望对你有一定的参考价值。

作者:腾讯云流计算 Oceanus 团队

 

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何使用 datagen 连接器生成随机数据,经过流计算 Oceanus,最终将计算数据存入 Elasticsearch 。

 操作视频

 

前置准备

创建 流计算 Oceanus 集群

进入流计算 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考流计算 Oceanus 官方文档创建独享集群

创建 Elasticsearch 集群

进入Elasticsearch 控制台,点击左上方【新建】,创建 Elasticsearch 实例,具体操作请访问创建 Elasticsearch 集群

!创建流计算 Oceanus 集群和 Elasticsearch 集群时所选 VPC 必须是同一 VPC。

 

流计算 Oceanus 作业

1. 创建 Source

-- Datagen Connector 可以随机生成一些数据用于测试
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html

CREATE TABLE random_source (
f_sequence INT,
f_random INT,
f_random_str VARCHAR
) WITH (
 \'connector\' = \'datagen\',
 \'rows-per-second\'=\'1\',  -- 每秒产生的数据条数
     
 \'fields.f_sequence.kind\'=\'sequence\',   -- 有界序列(结束后自动停止输出)
 \'fields.f_sequence.start\'=\'1\',         -- 序列的起始值
 \'fields.f_sequence.end\'=\'10000\',       -- 序列的终止值
     
 \'fields.f_random.kind\'=\'random\',       -- 无界的随机数
 \'fields.f_random.min\'=\'1\',             -- 随机数的最小值
 \'fields.f_random.max\'=\'1000\',          -- 随机数的最大值
     
 \'fields.f_random_str.length\'=\'10\'      -- 随机字符串的长度
);

2. 创建 Sink

-- Elasticsearch 只能作为数据目的表(Sink)写入
-- 注意! 如果您启用了 Elasticsearch 的用户名密码鉴权功能, 目前只能使用 Flink 1.10 的旧语法。若无需鉴权, 则可以使用 Flink 1.11 的新语法。
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

CREATE TABLE Student (
  `user_id`   INT,
  `user_name` VARCHAR
) WITH (
   \'connector.type\' = \'elasticsearch\', -- 输出到 Elasticsearch

   \'connector.version\' = \'6\',            -- 指定 Elasticsearch 的版本, 例如 \'6\', \'7\'. 注意务必要和所选的内置 Connector 版本一致
   \'connector.hosts\' = \'http://10.0.0.175:9200\',  -- Elasticsearch 的连接地址
   \'connector.index\' = \'Student\',        -- Elasticsearch 的 Index 名
   \'connector.document-type\' = \'stu\',    -- Elasticsearch 的 Document 类型
   \'connector.username\' = \'elastic\',     -- 可选参数: 请替换为实际 Elasticsearch 用户名
   \'connector.password\' = \'xxxxxxxxxx\',  -- 可选参数: 请替换为实际 Elasticsearch 密码

   \'update-mode\' = \'append\',             -- 可选无主键的 \'append\' 模式,或有主键的 \'upsert\' 模式    
   \'connector.key-delimiter\' = \'$\',      -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
   \'connector.key-null-literal\' = \'n/a\',  -- 主键为 null 时的替代字符串,默认是 \'null\'
   \'connector.failure-handler\' = \'retry-rejected\',   -- 可选的错误处理。可选择 \'fail\' (抛出异常)、\'ignore\'(忽略任何错误)、\'retry-rejected\'(重试)

   \'connector.flush-on-checkpoint\' = \'true\',   -- 可选参数, 快照时不允许批量写入(flush), 默认为 true
   \'connector.bulk-flush.max-actions\' = \'42\',  -- 可选参数, 每批次最多的条数
   \'connector.bulk-flush.max-size\' = \'42 mb\',  -- 可选参数, 每批次的累计最大大小 (只支持 mb)
   \'connector.bulk-flush.interval\' = \'60000\',  -- 可选参数, 批量写入的间隔 (ms)
   \'connector.connection-max-retry-timeout\' = \'300\',     -- 每次请求的最大超时时间 (ms)

   \'format.type\' = \'json\'        -- 输出数据格式, 目前只支持 \'json\'
);

3. 编写业务 SQL

INSERT INTO Student
SELECT
f_sequence   AS user_id,
f_random_str AS user_name
FROM random_source;

4. 选择 Connector

点击【作业参数】,在【内置 Connector】选择 flink-connector-elasticsearch6,点击【保存】>【发布草稿】运行作业。

?新版 Flink 1.13 集群不需要用户选择内置 Connector。其他版本集群请根据实际购买的 Elasticsearch 版本选择对应的 Connector。

5. 数据查询

进入Elasticsearch 控制台,点击之前购买的 Elasticsearch 实例,点击右上角【Kibana】,进入 Kibana 查询数据。具体查询方法请参考通过 Kibana 访问集群

 

总结

本示例用 Datagen 连接器随机生成数据,经过 流计算 Oceanus 实现最基础的数据转换功能,最后 Sink 到Elasticsearch 中,用户无需提前在 Elasticsearch 中创建索引。

 

 

关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站Get~

Flink从入门到精通100篇(二十三)-基于Apache Flink的爱奇艺实时计算平台建设实践

前言

随着大数据的快速发展,行业大数据服务越来越重要。同时,对大数据实时计算的要求也越来越高。今天会和大家分享下爱奇艺基于Apache Flink的实时计算平台建设实践。

今天的介绍会围绕下面三点展开:

  • Flink的现状与改进

  • 平台化的探索和实践:实时计算平台

  • Flink业务案例

01Flink的现状与改进

1. Flink现状

首先和大家分享下爱奇艺大数据服务的发展史。

 

  • 2012年搭建了第一个Hadoop集群,当时只有大概20几个节点,使用的计算框架是MapReduce和Hive等

  • 到2013,2014年,开始使用Hadoop 2.0,上线了Storm和Spark,由于Storm的使用性和稳定性不够好,被放弃使用,转而使用Spark

  • 2015年发布了第一个实时计算平台Europa,上线了Kafka

  • 2017年使用了Flink,同时我们基于Spark和Flink打造了流式计算引擎St

以上是关于Flink 实践教程:入门:写入 Elasticsearch的主要内容,如果未能解决你的问题,请参考以下文章

Flink从入门到精通100篇(二十三)-基于Apache Flink的爱奇艺实时计算平台建设实践

Flink从入门到精通100篇(二十三)-Apache Flink在滴滴的应用与实践

2021年最新最全Flink系列教程__Flink综合案例

EMR-StarRocks 与 Flink 在汇量实时写入场景的最佳实践

Flink从入门到精通系列文章

2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(建议收藏!!)