Doris+Flink搭建数据平台
Posted 独狐游清湖
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Doris+Flink搭建数据平台相关的知识,希望对你有一定的参考价值。
Doris+Flink搭建数据平台
Doris部署
Doris 作为一款开源的 MPP 架构 OLAP 数据库,能够运行在绝大多数主流的商用服务器上。
安装:官网Doris安装
务必关注点:
1 设置系统最大打开文件句柄数
2 Linux 操作系统版本需求
3 软件需求(Java,GCC)
4 机器角色分配(下图画线部分是重点,预防脑裂!)
设计好前置环境,开始部署!
我的 Doris 安装过程
我的 版本:selectdb_doris-1.2.1.1-x86_64-avx2.tar.gz
安装工具:下文中的 toolkit
安装借鉴:安装doris集群文档
关注点:
1 首先,需要配置集群拓扑文件,提供部署新集群的参数信息。你可以执行如下命令,生成简易版的集群拓扑文件样例,然后按需调整:
dorisctrl cluster template > doris-topo.yaml
内容如下:
[root@node104 ~]# cat doris-topo.yaml
global:
user: root
ssh_port: 22
deploy_dir: /opt/selectdb
frontends:
- host: fe104
- host: fe174
- host: fe117
backends:
- host: be118
- host: be119
- host: be173
2 执行部署命令(这个命令空格啥的都很重要!下图是写错命令的执行后果)
cluster deploy edudoris(集群名) -v 1.2.1.1 -f doris-topo.yaml
3 查看指定集群的详细信息
dorisctrl cluster display edudoris
4 查看集群列表
dorisctrl cluster list
5 启动集群
dorisctrl cluster start edudoris
6 停止集群
dorisctrl cluster stop edudoris
7 查看集群详细信息
cat eduoris/meta.yaml
8 登录DorisWeb: fe104:8030/
9 登录客户端:官网借鉴
主要就是改改密码,建建用户账号、数据库表啥的
CREATE USER '用户名' IDENTIFIED BY '密码';
GRANT ALL ON 数据库111 TO 用户名;
toolkit
SelectDB Distribution Toolkit 是 SelectDB 提供的集群管理与数据开发工具,可以简单快捷的管理 Doris 集群,支持集群部署、查看、启停、扩容等常用操作。
我的版本是:selectdb_distribution_toolkit-1.2.1-x86_64.tar.gz
安装:官网toolkit安装
Flink部署
此处笔者仅利用flinkCDC功能测试,所以部署模式是standalone,后续高可用可以更换为FlinkOnYarn
安装借鉴:flink1.14.4安装
注意:
1 slot改成100 (CDC表一个表占用一个slot,同步表多了小心不够用!)
2 flink1:8081
3 提交任务(这2个sql文件是下文python脚本生成!!!)
-- doris建表
ssh -t -t $FEIP
ssh $FEIP " mysql -h$FEIP -uroot -p密码 -P9030 -e'source /root/sink2doris.sql;'"
-- 提交flinkJob 开始数据同步
cd /flink/bin/
./sql-client.sh -f /flinksync/output/mysql2sink.sql
离线数据同步脚本(Doris官方提供BUT需要修改!!!)
mysql to doris 主要适用于自动化创建doris odbc 表,主要用shell脚本实现。
官网借鉴:mysql_to_doris批量多表操作
官网给的此脚本内容自己下载mysql_to_doris.tar.gz解压即可查看,这里看看我改变的部分吧:
1 bin/run.sh 保持原样、勿动!
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
function usage()
echo "Usage: run.sh [option]"
echo " -e, --create-external-table: create doris external table"
echo " -o, --create-olap-table: create doris olap table"
echo " -i, --insert-data: insert data into doris olap table from doris external table"
echo " -d, --drop-external-table: drop doris external table"
echo " -a, --auto-external-table: create doris external table and auto check mysql schema change"
echo " --database: specify the database name to process all tables under the entire database, and separate multiple databases with \\",\\""
echo " -t, --type: specify external table type, valid options: ODBC(default), JDBC"
echo " -h, --help: show usage"
exit 1
cur_dir=$(cd "$(dirname "$BASH_SOURCE[0]")" &>/dev/null && pwd)
if [[ $# -eq 0 ]]; then
usage
fi
opts=$(getopt -o eaoidht: \\
-l create-external-table \\
-l create-olap-table \\
-l insert-datadrop-external-table \\
-l auto-external-table \\
-l database: \\
-l type: \\
-l help \\
-n "$0" \\
-- "$@")
eval set -- "$opts"
CREATE_EXTERNAL_TABLE=0
CREATE_OLAP_TABLE=0
INSERT_DATA=0
DROP_EXTERNAL_TABLE=0
AUTO_EXTERNAL_TABLE=0
DATABASE=''
TYPE='ODBC'
while true; do
case "$1" in
-e | --create-external-table)
CREATE_EXTERNAL_TABLE=1
shift
;;
-o | --create-olap-table)
CREATE_OLAP_TABLE=1
shift
;;
-i | --insert-data)
INSERT_DATA=1
shift
;;
-d | --drop-external-table)
DROP_EXTERNAL_TABLE=1
shift
;;
-a | --auto-external-table)
AUTO_EXTERNAL_TABLE=1
shift
;;
--database)
DATABASE="$2"
shift 2
;;
-t | --type)
TYPE="$2"
shift 2
;;
-h | --help)
usage
shift
;;
--)
shift
break
;;
*)
echo "Internal error"
exit 1
;;
esac
done
home_dir=$(cd "$cur_dir"/.. && pwd)
source "$home_dir"/conf/env.conf
# when fe_password is not set or is empty, do not put -p option
use_passwd=$([ -z "$doris_password" ] && echo "" || echo "-p$doris_password")
if [ -n "$DATABASE" ]; then
sh "$home_dir"/lib/get_tables.sh "$DATABASE"
fi
# create doris jdbc catalog
if [[ "JDBC" == "$TYPE" && "$CREATE_EXTERNAL_TABLE" -eq 1 ]]; then
echo "====================== start create doris jdbc catalog ======================"
sh "$home_dir"/lib/jdbc/create_jdbc_catalog.sh "$home_dir"/result/mysql/jdbc_catalog.sql 2>>error.log
echo "source $home_dir/result/mysql/jdbc_catalog.sql;" | mysql -h"$fe_master_host" -P"$fe_master_port" -u"$doris_username" "$use_passwd" 2>>error.log
res=$?
if [ "$res" != 0 ]; then
echo "====================== create doris jdbc catalog failed ======================"
exit "$res"
fi
echo "====================== create doris jdbc catalog finished ======================"
fi
# create doris external table
if [[ "ODBC" == "$TYPE" && "$CREATE_EXTERNAL_TABLE" -eq 1 ]]; then
echo "====================== start create doris external table ======================"
sh "$home_dir"/lib/e_mysql_to_doris.sh "$home_dir"/result/mysql/e_mysql_to_doris.sql 2>error.log
echo "source $home_dir/result/mysql/e_mysql_to_doris.sql;" | mysql -h"$fe_master_host" -P"$fe_master_port" -u"$doris_username" "$use_passwd" 2>>error.log
res=$?
if [ "$res" != 0 ]; then
echo "====================== create doris external table failed ======================"
exit "$res"
fi
echo "====================== create doris external table finished ======================"
fi
# create doris olap table
if [[ "$CREATE_OLAP_TABLE" -eq 1 ]]; then
echo "====================== start create doris olap table ======================"
sh "$home_dir"/lib/mysql_to_doris.sh "$home_dir"/result/mysql/mysql_to_doris.sql 2>>error.log
echo "source $home_dir/result/mysql/mysql_to_doris.sql;" | mysql -h"$fe_master_host" -P"$fe_master_port" -u"$doris_username" "$use_passwd" 2>>error.log
res=$?
if [ "$res" != 0 ]; then
echo "====================== create doris olap table failed ======================"
exit "$res"
fi
echo "====================== create doris olap table finished ======================"
fi
# insert data into doris olap table
if [[ "$INSERT_DATA" -eq 1 ]]; then
echo "====================== start insert data ======================"
if [[ "JDBC" == "$TYPE" ]]; then
sh "$home_dir"/lib/jdbc/sync_to_doris.sh "$home_dir"/result/mysql/sync_to_doris.sql 2>>error.log
else
sh "$home_dir"/lib/sync_to_doris.sh "$home_dir"/result/mysql/sync_to_doris.sql 2>>error.log
fi
echo "source $home_dir/result/mysql/sync_to_doris.sql;" | mysql -h"$fe_master_host" -P"$fe_master_port" -u"$doris_username" "$use_passwd" 2>>error.log
res=$?
if [ "$res" != 0 ]; then
echo "====================== insert data failed ======================"
exit "$res"
fi
echo "====================== insert data finished ======================"
echo "====================== start sync check ======================"
sh "$home_dir"/lib/sync_check.sh "$home_dir"/result/mysql/sync_check 2>>error.log
res=$?
if [ "$res" != 0 ]; then
echo "====================== sync check failed ======================"
exit "$res"
fi
echo "====================== sync check finished ======================"
fi
# drop doris external table
if [[ "ODBC" == "$TYPE" && "$DROP_EXTERNAL_TABLE" -eq 1 ]]; then
echo "====================== start drop doris external table =========================="
for table in $(cat $home_dir/conf/doris_external_tables | grep -v '#' | awk -F '\\n' 'print $1' | sed 's/\\./`.`/g'); do
echo "DROP TABLE IF EXISTS \\`$table\\`;" | mysql -h"$fe_master_host" -P"$fe_master_port" -u"$doris_username" "$use_passwd" 2>>error.log
res=$?
if [ "$res" != 0 ]; then
echo "====================== drop doris external table failed ======================"
exit "$res"
fi
done
echo "====================== create drop external table finished ======================"
fi
# create doris external table and auto check mysql schema change
if [[ "ODBC" == "$TYPE" && "$AUTO_EXTERNAL_TABLE" -eq 1 ]]; then
echo "====================== start auto doris external table ======================"
nohup sh $home_dir/lib/e_auto.sh &
echo $! >e_auto.pid
echo "====================== create doris external table started ======================"
fi
2 conf文件夹下 env.conf
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# doris env
fe_master_host=填!
fe_password=填!不填sync_check报错!
fe_master_port=9030
doris_username=root
doris_password=填!
doris_odbc_name='MySQL ODBC 5.3 Unicode Driver'
doris_jdbc_catalog='jdbc_catalog'
doris_jdbc_default_db='information_schema'
##==========变化========本地放置jar包、否则会连接超时!!
#doris_jdcb_driver_url='https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar'
doris_jdcb_driver_url='file:///opt/selectdb/mysql-connector-java-8.0.18.jar'
doris_jdbc_driver_class='com.mysql.jdbc.Driver'
# mysql env
mysql_host=
mysql_port=3306
mysql_username=
mysql_password=
3 lib 下mysql_type_convert.sh(类型匹配、关键字冲突问题等,尤其mysql和doris的varchar长度不同!!)
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
path=$1
sed -i 's/AUTO_INCREMENT//g' $path
sed -i 's/CHARACTER SET utf8 COLLATE utf8_bin//g' $path
sed -i 's/CHARACTER SET utf8mb3 COLLATE utf8mb3_bin//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_bin//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8_general_ci//g' $path
sed -i 's/CHARACTER SET utf8 COLLATE utf8_general_ci//g' $path
sed -i 's/DEFAULT CURRENT_TIMESTAMP\\(()\\)\\? ON UPDATE CURRENT_TIMESTAMP\\(()\\)\\?//ig' $path
sed -i 's/DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP\\(()\\)\\?//ig' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_bin//g' $path
sed -i "s/DEFAULT '0000-00-00 00:00:00'/DEFAULT '2000-01-01 00:00:00'/g" $path
sed -i 's/DEFAULT CURRENT_TIMESTAMP\\(()\\)\\?//ig' $path
sed -i 's/DEFAULT b/DEFAULT/g' $path
sed -i "s/DEFAULT \\(\\(\\-\\)\\?[0-9]\\+\\(\\.[0-9]\\+\\)\\?\\)/DEFAULT '\\1'/g" $path
sed -i 's/CHARACTER SET utf8mb4//g' $path
sed -i 's/CHARACTER SET utf8//g' $path
sed -i 's/COLLATE utf8mb4_general_ci//g' $path
sed -i 's/COLLATE utf8_general_ci//g' $path
sed -i 's/COLLATE utf8mb4_unicode_ci//g' $path
sed -i 's/COLLATE utf8_unicode_ci//g' $path
sed -i 's/COLLATE utf8_bin//g' $path
sed -i 's/\\<tinytext\\>/varchar(65533)/g' $path
sed -i 's/text([^)]*)/varchar(65533)/g' $path
sed -i 's/\\<text\\>/varchar(65533)/g' $path
sed -i 's/\\<mediumtext\\>/varchar(65533)/g' $path
sed -i 's/\\<longtext\\>/varchar(65533)/g' $path
sed -i 's/\\<tinyblob\\>/varchar(65533)/g' $path
sed -i 's/blob([^)]*)/varchar(65533)/g' $path
sed -i 's/\\<blob\\>/varchar(65533)/g' $path
sed -i 's/\\<mediumblob\\>/varchar(65533)/g' $path
sed -i 's/\\<longblob\\>/varchar(65533)/g' $path
sed -i 's/\\<tinystring\\>/varchar(65533)/g' $path
sed -i 's/\\<mediumstring\\>/varchar(65533)/g' $path
sed -i 's/\\<longstring\\>/varchar(65533)/g' $path
sed -i 's/\\<timestamp\\>/datetime/g' $path
sed -i 's/\\<unsigned\\>//g' $path
sed -i 's/\\<zerofill\\>//g' $path
sed -i 's/\\<json\\>/varchar(65533)/g' $path
sed -i 's/enum([^)]*)/varchar(65533)/g' $path
sed -i 's/set([^)]*)/varchar(65533)/g' $path
sed -i 's/\\<set\\>/varchar(65533)/g' $path
sed -i 's/bit([^)]*)/varchar(65533)/g' $path
sed -i 's/bit([^)]*)/varchar(65533)/g' $path
sed -i 's/\\<bit\\>/varchar(65533)/g' $path
sed -i 's/varbinary([^)]*)/varchar(65533)/g' $path
sed -i 's/binary([^)]*)/varchar(65533)/g' $path
sed -i 's/string([^)]*)/varchar(65533)/g' $path
sed -i 's/\\<string\\>/varchar(65533)/g' $path
sed -i 's/\\<binary\\>/varchar(65533)/g' $path
sed -i 's/\\<varbinary\\>/varchar(65533)/g' $path
sed -i 's/\\<mediumint/int/g' $path
sed -i 's/float([^)]*)/float/g' $path
sed -i 's/double([^)]*)/double/g' $path
sed -i 's/time([^)]*)/varchar(64)/g' $path
sed -i 's/\\<time\\>/varchar(64)/g' $path
#sed -i 's/year([^)]*)/varchar(64)/g' $path
#sed -i 's/\\<year\\>/varchar(64)/g' $path
sed -i 's/varchar([0-9]*)/varchar(65533)/g' $path
#sed -i 's/varchar([1-9][0-9][0-9])/string/g' $path
4 lib/jdbc下的create_jdbc_catalog.sh
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
cur_dir=$(cd "$(dirname "$BASH_SOURCE[0]")" &>/dev/null && pwd)
home_dir=$(cd "$cur_dir"/../.. && pwd)
source $home_dir/conf/env.conf
# mkdir files to store tables and tables.sql
mkdir -p $home_dir/result/mysql
path=$1:-$home_dir/result/mysql/jdbc_catalog.sql
rm -f $path
##========变化=====加drop===否则总会有缓存导致的报错!url加round,处理datetime为空导致插入失败问题!
echo 'DROP CATALOG IF EXISTS '$doris_jdbc_catalog';
CREATE CATALOG IF NOT EXISTS '$doris_jdbc_catalog'
PROPERTIES (
"type"="jdbc",
"jdbc.user"="'$mysql_username'",
"jdbc.password"="'$mysql_password'",
"jdbc.jdbc_url"="jdbc:mysql://'$mysql_host:$mysql_port/$doris_jdbc_default_db'?useSSL=false&characterEncoding=utf8&zeroDateTimeBehavior=round",
"jdbc.driver_url"="'$doris_jdcb_driver_url'",
"jdbc.driver_class"="'$doris_jdbc_driver_class'"
); ' >> $path
5 mysql_to_doris.sh
6 sync_to_doris.sh
7 最后执行批量同步:
vi /root/mysql_to_doris/conf/doris_tables
vi /root/mysql_to_doris/conf/mysql_tables
vi /root/mysql_to_doris/conf/env.conf
# sh bin/run.sh -o
#建表并同步数据
sh mysql_to_doris/bin/run.sh -o -t JDBC -e -i
# 失败看日志
/root/mysql_to_doris/conf/error.log
FlinkSQL自动化生成脚本
这个脚本很实用,否则mysql-flinkSQL-dorisSQL 各种类型匹配玩死你!
import pymysql.cursors
import pandas as pd
import os
import shutil
def get_mysql_colsInfo(ip="ip",port=9030,user="root",password="密码",db='jxpt_ods',table='ods_wkans_answer_result'):
#print('connect information:',ip, port, user, password, db,table)
conn = pymysql.connect(host=ip, port=port, user=user, password=password, database=db,charset='utf8')
mycursor = conn.cursor()
sql = "SELECT COLUMN_NAME,DATA_TYPE FROM information_schema.COLUMNS WHERE table_name = '" + table + "' AND table_schema = '" + db + "'";
#print(sql)
mycursor.execute(sql)
mysql_cols = mycursor.fetchall()
mysql_cols =[list(i) for i in mysql_cols]
print(mysql_cols)
return mysql_cols
def mysql2source_t_createtable(old_cols,table,ip,db):
mysql2source_t = 'tinyint unsigned': 'smallint',
'mediumint': 'int',
'smallint unsigned': 'int',
'int unsigned': 'bigint',
'bigint unsigned': 'decimal(20,0)',
'double precision': 'double',
'numeric': 'decimal',
'tinint(1)': 'boolean',
'datetime': 'timestamp',
'char': 'string',
'varchar': 'string',
'text': 'string',
'longtext': 'string',
'bit': 'string',
cols=[]
cols_list=[]
for col in old_cols:
if col[1] in mysql2source_t:
col[1]=mysql2source_t[col[1].lower()]
cols.appendFlink进阶篇-CDC 原理实践和优化&采集到Doris中