Doris+Flink搭建数据平台

Posted 独狐游清湖

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Doris+Flink搭建数据平台相关的知识,希望对你有一定的参考价值。

Doris+Flink搭建数据平台

Doris部署

Doris 作为一款开源的 MPP 架构 OLAP 数据库,能够运行在绝大多数主流的商用服务器上。
安装:官网Doris安装
务必关注点
1 设置系统最大打开文件句柄数
2 Linux 操作系统版本需求
3 软件需求(Java,GCC)
4 机器角色分配(下图画线部分是重点,预防脑裂!)

设计好前置环境,开始部署!

我的 Doris 安装过程

我的 版本:selectdb_doris-1.2.1.1-x86_64-avx2.tar.gz
安装工具:下文中的 toolkit
安装借鉴:安装doris集群文档
关注点:
1 首先,需要配置集群拓扑文件,提供部署新集群的参数信息。你可以执行如下命令,生成简易版的集群拓扑文件样例,然后按需调整:

dorisctrl cluster template > doris-topo.yaml

内容如下:

[root@node104 ~]# cat doris-topo.yaml 
global:
    user: root
    ssh_port: 22
    deploy_dir: /opt/selectdb

frontends:
  - host: fe104
  - host: fe174
  - host: fe117

backends:
  - host: be118
  - host: be119
  - host: be173

2 执行部署命令(这个命令空格啥的都很重要!下图是写错命令的执行后果)

cluster deploy edudoris(集群名)  -v 1.2.1.1 -f doris-topo.yaml


3 查看指定集群的详细信息

dorisctrl cluster display edudoris

4 查看集群列表

dorisctrl cluster list

5 启动集群

dorisctrl cluster start edudoris

6 停止集群

dorisctrl cluster stop edudoris

7 查看集群详细信息

cat eduoris/meta.yaml

8 登录DorisWeb: fe104:8030/

9 登录客户端:官网借鉴
主要就是改改密码,建建用户账号、数据库表啥的

CREATE USER '用户名' IDENTIFIED BY '密码';
GRANT ALL ON 数据库111 TO 用户名;

toolkit

SelectDB Distribution Toolkit 是 SelectDB 提供的集群管理与数据开发工具,可以简单快捷的管理 Doris 集群,支持集群部署、查看、启停、扩容等常用操作。
我的版本是:selectdb_distribution_toolkit-1.2.1-x86_64.tar.gz
安装:官网toolkit安装

Flink部署

此处笔者仅利用flinkCDC功能测试,所以部署模式是standalone,后续高可用可以更换为FlinkOnYarn
安装借鉴:flink1.14.4安装
注意:
1 slot改成100 (CDC表一个表占用一个slot,同步表多了小心不够用!)
2 flink1:8081
3 提交任务(这2个sql文件是下文python脚本生成!!!)

-- doris建表
ssh -t -t  $FEIP
ssh $FEIP " mysql -h$FEIP -uroot -p密码 -P9030 -e'source /root/sink2doris.sql;'"
-- 提交flinkJob 开始数据同步
cd /flink/bin/
./sql-client.sh -f /flinksync/output/mysql2sink.sql

离线数据同步脚本(Doris官方提供BUT需要修改!!!)

mysql to doris 主要适用于自动化创建doris odbc 表,主要用shell脚本实现。
官网借鉴:mysql_to_doris批量多表操作

官网给的此脚本内容自己下载mysql_to_doris.tar.gz解压即可查看,这里看看我改变的部分吧:
1 bin/run.sh 保持原样、勿动!

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

function usage() 
  echo "Usage: run.sh [option]"
  echo "    -e, --create-external-table: create doris external table"
  echo "    -o, --create-olap-table: create doris olap table"
  echo "    -i, --insert-data: insert data into doris olap table from doris external table"
  echo "    -d, --drop-external-table: drop doris external table"
  echo "    -a, --auto-external-table: create doris external table and auto check mysql schema change"
  echo "    --database: specify the database name to process all tables under the entire database, and separate multiple databases with \\",\\""
  echo "    -t, --type: specify external table type, valid options: ODBC(default), JDBC"
  echo "    -h, --help: show usage"
  exit 1


cur_dir=$(cd "$(dirname "$BASH_SOURCE[0]")" &>/dev/null && pwd)

if [[ $# -eq 0 ]]; then
  usage
fi

opts=$(getopt -o eaoidht: \\
  -l create-external-table \\
  -l create-olap-table \\
  -l insert-datadrop-external-table \\
  -l auto-external-table \\
  -l database: \\
  -l type: \\
  -l help \\
  -n "$0" \\
  -- "$@")

eval set -- "$opts"

CREATE_EXTERNAL_TABLE=0
CREATE_OLAP_TABLE=0
INSERT_DATA=0
DROP_EXTERNAL_TABLE=0
AUTO_EXTERNAL_TABLE=0
DATABASE=''
TYPE='ODBC'

while true; do
  case "$1" in
  -e | --create-external-table)
    CREATE_EXTERNAL_TABLE=1
    shift
    ;;
  -o | --create-olap-table)
    CREATE_OLAP_TABLE=1
    shift
    ;;
  -i | --insert-data)
    INSERT_DATA=1
    shift
    ;;
  -d | --drop-external-table)
    DROP_EXTERNAL_TABLE=1
    shift
    ;;
  -a | --auto-external-table)
    AUTO_EXTERNAL_TABLE=1
    shift
    ;;
  --database)
    DATABASE="$2"
    shift 2
    ;;
  -t | --type)
    TYPE="$2"
    shift 2
    ;;
  -h | --help)
    usage
    shift
    ;;
  --)
    shift
    break
    ;;
  *)
    echo "Internal error"
    exit 1
    ;;
  esac
done

home_dir=$(cd "$cur_dir"/.. && pwd)

source "$home_dir"/conf/env.conf

# when fe_password is not set or is empty, do not put -p option
use_passwd=$([ -z "$doris_password" ] && echo "" || echo "-p$doris_password")

if [ -n "$DATABASE" ]; then
  sh "$home_dir"/lib/get_tables.sh "$DATABASE"
fi

# create doris jdbc catalog
if [[ "JDBC" == "$TYPE" && "$CREATE_EXTERNAL_TABLE" -eq 1 ]]; then
  echo "====================== start create doris jdbc catalog ======================"
  sh "$home_dir"/lib/jdbc/create_jdbc_catalog.sh "$home_dir"/result/mysql/jdbc_catalog.sql 2>>error.log
  echo "source $home_dir/result/mysql/jdbc_catalog.sql;" | mysql -h"$fe_master_host" -P"$fe_master_port" -u"$doris_username" "$use_passwd" 2>>error.log
  res=$?
  if [ "$res" != 0 ]; then
      echo "====================== create doris jdbc catalog failed ======================"
      exit "$res"
    fi
  echo "====================== create doris jdbc catalog finished ======================"
fi

# create doris external table
if [[ "ODBC" == "$TYPE" && "$CREATE_EXTERNAL_TABLE" -eq 1 ]]; then
  echo "====================== start create doris external table ======================"
  sh "$home_dir"/lib/e_mysql_to_doris.sh "$home_dir"/result/mysql/e_mysql_to_doris.sql 2>error.log
  echo "source $home_dir/result/mysql/e_mysql_to_doris.sql;" | mysql -h"$fe_master_host" -P"$fe_master_port" -u"$doris_username" "$use_passwd" 2>>error.log
  res=$?
  if [ "$res" != 0 ]; then
    echo "====================== create doris external table failed ======================"
    exit "$res"
  fi
  echo "====================== create doris external table finished ======================"
fi

# create doris olap table
if [[ "$CREATE_OLAP_TABLE" -eq 1 ]]; then
  echo "====================== start create doris olap table ======================"
  sh "$home_dir"/lib/mysql_to_doris.sh "$home_dir"/result/mysql/mysql_to_doris.sql 2>>error.log
  echo "source $home_dir/result/mysql/mysql_to_doris.sql;" | mysql -h"$fe_master_host" -P"$fe_master_port" -u"$doris_username" "$use_passwd" 2>>error.log
  res=$?
  if [ "$res" != 0 ]; then
    echo "====================== create doris olap table failed ======================"
    exit "$res"
  fi
  echo "====================== create doris olap table finished ======================"
fi

# insert data into doris olap table
if [[ "$INSERT_DATA" -eq 1 ]]; then
  echo "====================== start insert data ======================"
  if [[ "JDBC" == "$TYPE" ]]; then
    sh "$home_dir"/lib/jdbc/sync_to_doris.sh "$home_dir"/result/mysql/sync_to_doris.sql 2>>error.log
  else
    sh "$home_dir"/lib/sync_to_doris.sh "$home_dir"/result/mysql/sync_to_doris.sql 2>>error.log
  fi
  echo "source $home_dir/result/mysql/sync_to_doris.sql;" | mysql -h"$fe_master_host" -P"$fe_master_port" -u"$doris_username" "$use_passwd" 2>>error.log
  res=$?
  if [ "$res" != 0 ]; then
    echo "====================== insert data failed ======================"
    exit "$res"
  fi
  echo "====================== insert data finished ======================"
  echo "====================== start sync check ======================"
  sh "$home_dir"/lib/sync_check.sh "$home_dir"/result/mysql/sync_check 2>>error.log
  res=$?
  if [ "$res" != 0 ]; then
    echo "====================== sync check failed ======================"
    exit "$res"
  fi
  echo "====================== sync check finished ======================"
fi

# drop doris external table
if [[ "ODBC" == "$TYPE" && "$DROP_EXTERNAL_TABLE" -eq 1 ]]; then
  echo "====================== start drop doris external table =========================="
  for table in $(cat $home_dir/conf/doris_external_tables | grep -v '#' | awk -F '\\n' 'print $1' | sed 's/\\./`.`/g'); do
    echo "DROP TABLE IF EXISTS \\`$table\\`;" | mysql -h"$fe_master_host" -P"$fe_master_port" -u"$doris_username" "$use_passwd" 2>>error.log
    res=$?
    if [ "$res" != 0 ]; then
      echo "====================== drop doris external table failed ======================"
      exit "$res"
    fi
  done
  echo "====================== create drop external table finished ======================"
fi

# create doris external table and auto check mysql schema change
if [[ "ODBC" == "$TYPE" && "$AUTO_EXTERNAL_TABLE" -eq 1 ]]; then
  echo "====================== start auto doris external table ======================"
  nohup sh $home_dir/lib/e_auto.sh &
  echo $! >e_auto.pid
  echo "====================== create doris external table started ======================"
fi

2 conf文件夹下 env.conf

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# doris env
fe_master_host=填!
fe_password=填!不填sync_check报错!
fe_master_port=9030
doris_username=root
doris_password=填!
doris_odbc_name='MySQL ODBC 5.3 Unicode Driver'
doris_jdbc_catalog='jdbc_catalog'
doris_jdbc_default_db='information_schema'
##==========变化========本地放置jar包、否则会连接超时!!
#doris_jdcb_driver_url='https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar'
doris_jdcb_driver_url='file:///opt/selectdb/mysql-connector-java-8.0.18.jar'
doris_jdbc_driver_class='com.mysql.jdbc.Driver'

# mysql env
mysql_host=
mysql_port=3306
mysql_username=
mysql_password=

3 lib 下mysql_type_convert.sh(类型匹配、关键字冲突问题等,尤其mysql和doris的varchar长度不同!!)

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
path=$1
sed -i 's/AUTO_INCREMENT//g' $path
sed -i 's/CHARACTER SET utf8 COLLATE utf8_bin//g' $path
sed -i 's/CHARACTER SET utf8mb3 COLLATE utf8mb3_bin//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_bin//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci//g'  $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8_general_ci//g' $path
sed -i 's/CHARACTER SET utf8 COLLATE utf8_general_ci//g' $path
sed -i 's/DEFAULT CURRENT_TIMESTAMP\\(()\\)\\? ON UPDATE CURRENT_TIMESTAMP\\(()\\)\\?//ig' $path
sed -i 's/DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP\\(()\\)\\?//ig' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_bin//g' $path
sed -i "s/DEFAULT '0000-00-00 00:00:00'/DEFAULT '2000-01-01 00:00:00'/g" $path
sed -i 's/DEFAULT CURRENT_TIMESTAMP\\(()\\)\\?//ig' $path
sed -i 's/DEFAULT b/DEFAULT/g' $path
sed -i "s/DEFAULT \\(\\(\\-\\)\\?[0-9]\\+\\(\\.[0-9]\\+\\)\\?\\)/DEFAULT '\\1'/g" $path
sed -i 's/CHARACTER SET utf8mb4//g' $path
sed -i 's/CHARACTER SET utf8//g' $path
sed -i 's/COLLATE utf8mb4_general_ci//g' $path
sed -i 's/COLLATE utf8_general_ci//g'  $path
sed -i 's/COLLATE utf8mb4_unicode_ci//g' $path
sed -i 's/COLLATE utf8_unicode_ci//g'  $path
sed -i 's/COLLATE utf8_bin//g'  $path
sed -i 's/\\<tinytext\\>/varchar(65533)/g' $path
sed -i 's/text([^)]*)/varchar(65533)/g' $path
sed -i 's/\\<text\\>/varchar(65533)/g' $path
sed -i 's/\\<mediumtext\\>/varchar(65533)/g' $path
sed -i 's/\\<longtext\\>/varchar(65533)/g' $path
sed -i 's/\\<tinyblob\\>/varchar(65533)/g' $path
sed -i 's/blob([^)]*)/varchar(65533)/g' $path
sed -i 's/\\<blob\\>/varchar(65533)/g' $path
sed -i 's/\\<mediumblob\\>/varchar(65533)/g' $path
sed -i 's/\\<longblob\\>/varchar(65533)/g' $path
sed -i 's/\\<tinystring\\>/varchar(65533)/g' $path
sed -i 's/\\<mediumstring\\>/varchar(65533)/g' $path
sed -i 's/\\<longstring\\>/varchar(65533)/g' $path
sed -i 's/\\<timestamp\\>/datetime/g' $path
sed -i 's/\\<unsigned\\>//g' $path
sed -i 's/\\<zerofill\\>//g' $path
sed -i 's/\\<json\\>/varchar(65533)/g' $path
sed -i 's/enum([^)]*)/varchar(65533)/g' $path
sed -i 's/set([^)]*)/varchar(65533)/g' $path
sed -i 's/\\<set\\>/varchar(65533)/g' $path
sed -i 's/bit([^)]*)/varchar(65533)/g' $path
sed -i 's/bit([^)]*)/varchar(65533)/g' $path
sed -i 's/\\<bit\\>/varchar(65533)/g' $path
sed -i 's/varbinary([^)]*)/varchar(65533)/g' $path
sed -i 's/binary([^)]*)/varchar(65533)/g' $path
sed -i 's/string([^)]*)/varchar(65533)/g' $path
sed -i 's/\\<string\\>/varchar(65533)/g' $path
sed -i 's/\\<binary\\>/varchar(65533)/g' $path
sed -i 's/\\<varbinary\\>/varchar(65533)/g' $path
sed -i 's/\\<mediumint/int/g' $path
sed -i 's/float([^)]*)/float/g' $path
sed -i 's/double([^)]*)/double/g' $path
sed -i 's/time([^)]*)/varchar(64)/g' $path
sed -i 's/\\<time\\>/varchar(64)/g' $path
#sed -i 's/year([^)]*)/varchar(64)/g' $path
#sed -i 's/\\<year\\>/varchar(64)/g' $path
sed -i 's/varchar([0-9]*)/varchar(65533)/g' $path
#sed -i 's/varchar([1-9][0-9][0-9])/string/g' $path

4 lib/jdbc下的create_jdbc_catalog.sh

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

cur_dir=$(cd "$(dirname "$BASH_SOURCE[0]")" &>/dev/null && pwd)
home_dir=$(cd "$cur_dir"/../.. && pwd)

source $home_dir/conf/env.conf

# mkdir files to store tables and tables.sql
mkdir -p $home_dir/result/mysql

path=$1:-$home_dir/result/mysql/jdbc_catalog.sql

rm -f $path
##========变化=====加drop===否则总会有缓存导致的报错!url加round,处理datetime为空导致插入失败问题!
echo 'DROP CATALOG IF EXISTS '$doris_jdbc_catalog';
CREATE CATALOG IF NOT EXISTS '$doris_jdbc_catalog'
PROPERTIES (
  "type"="jdbc",
  "jdbc.user"="'$mysql_username'",
  "jdbc.password"="'$mysql_password'",
  "jdbc.jdbc_url"="jdbc:mysql://'$mysql_host:$mysql_port/$doris_jdbc_default_db'?useSSL=false&characterEncoding=utf8&zeroDateTimeBehavior=round",
  "jdbc.driver_url"="'$doris_jdcb_driver_url'",
  "jdbc.driver_class"="'$doris_jdbc_driver_class'"
); ' >> $path

5 mysql_to_doris.sh

6 sync_to_doris.sh

7 最后执行批量同步:

vi /root/mysql_to_doris/conf/doris_tables
vi /root/mysql_to_doris/conf/mysql_tables
vi /root/mysql_to_doris/conf/env.conf
# sh bin/run.sh -o 
#建表并同步数据
 sh mysql_to_doris/bin/run.sh -o -t JDBC -e -i
 # 失败看日志
/root/mysql_to_doris/conf/error.log

FlinkSQL自动化生成脚本

这个脚本很实用,否则mysql-flinkSQL-dorisSQL 各种类型匹配玩死你!

import pymysql.cursors
import pandas as pd
import os
import shutil

def get_mysql_colsInfo(ip="ip",port=9030,user="root",password="密码",db='jxpt_ods',table='ods_wkans_answer_result'):

    #print('connect information:',ip, port, user, password, db,table)
    conn = pymysql.connect(host=ip, port=port, user=user, password=password, database=db,charset='utf8')

    mycursor = conn.cursor()

    sql = "SELECT COLUMN_NAME,DATA_TYPE FROM information_schema.COLUMNS WHERE table_name = '" + table + "' AND table_schema = '" + db + "'";

    #print(sql)
    mycursor.execute(sql)
    mysql_cols = mycursor.fetchall()
    mysql_cols =[list(i) for i in mysql_cols]
    print(mysql_cols)
    return mysql_cols


def mysql2source_t_createtable(old_cols,table,ip,db):

    mysql2source_t = 'tinyint unsigned': 'smallint',
                      'mediumint': 'int',
                      'smallint unsigned': 'int',
                      'int unsigned': 'bigint',
                      'bigint unsigned': 'decimal(20,0)',
                      'double precision': 'double',
                      'numeric': 'decimal',
                      'tinint(1)': 'boolean',
                      'datetime': 'timestamp',
                      'char': 'string',
                      'varchar': 'string',
                      'text': 'string',
                      'longtext': 'string',
                      'bit': 'string',
                      
    
    cols=[]
    cols_list=[]
    for col in old_cols:
        if col[1] in mysql2source_t:
            col[1]=mysql2source_t[col[1].lower()]
    
        cols.appendFlink进阶篇-CDC 原理实践和优化&采集到Doris中

flink doris batch案例

flink doris batch案例

flink doris batch案例

Doris通过Flink CDC接入MySQL实战

Flink sink doris案例