大数据Spark Streaming实时处理Canal同步binlog数据
Posted 赵广陆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据Spark Streaming实时处理Canal同步binlog数据相关的知识,希望对你有一定的参考价值。
1. Canal 环境搭建
环境参考:
Spark中的Spark Streaming可以用于实时流项目的开发
,实时流项目的数据源除了可以来源于日志、文件、网络端口等,常常也有这种需求,那就是实时分析处理mysql中的增量数据
。面对这种需求当然我们可以通过JDBC的方式定时查询Mysql,然后再对查询到的数据进行处理也能得到预期的结果,但是Mysql往往还有其他业务也在使用,这些业务往往比较重要,通过JDBC方式频繁查询会对Mysql造成大量无形的压力,甚至可能会影响正常业务的使用,在基本不影响其他Mysql正常使用的情况下完成对增量数据的处理,那就需要 Canal 了。
2 配置Canal
2.1 下载Canal
访问Canal的Release页 canal v1.1.2
wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz
2.2 解压
注意 这里一定要先创建出一个目录,直接解压会覆盖文件
mkdir -p /usr/local/canal
mv canal.deployer-1.1.2.tar.gz /usr/local/canal/
tar -zxvf canal.deployer-1.1.2.tar.gz
2.3 修改instance 配置文件
vim $CANAL_HOME/conf/example/instance.properties,修改如下项,其他默认即可
## mysql serverId , v1.0.26+ will autoGen , 不要和server_id重复
canal.instance.mysql.slaveId=3
# position info。Mysql的url
canal.instance.master.address=node1:3306
# table meta tsdb info
canal.instance.tsdb.enable=false
# 这里配置前面在Mysql分配的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
# 配置需要检测的库名,可以不配置,这里只检测canal_test库
canal.instance.defaultDatabaseName=canal_test
# enable druid Decrypt database password
canal.instance.enableDruid=false
# 配置过滤的正则表达式,监测canal_test库下的所有表
canal.instance.filter.regex=canal_test\\\\..*
# 配置MQ
## 配置上在Kafka创建的那个Topic名字
canal.mq.topic=example
## 配置分区编号为1
canal.mq.partition=1
2.4 修改canal.properties配置文件
配置推送至kafka
vim $CANAL_HOME/conf/canal.properties,修改如下项,其他默认即可
# 这个是如果开启的是tcp模式,会占用这个11111端口,canal客户端通过这个端口获取数据
canal.port = 11111
# 可以配置为:tcp, kafka, RocketMQ,这里配置为kafka
canal.serverMode = kafka
# 这里将这个注释掉,否则启动会有一个警告
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
##################################################
######### MQ #############
##################################################
canal.mq.servers = node1:9092,node2:9092,node3:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
#canal.mq.transaction = false
2.5 启动Canal
$CANAL_HOME/bin/startup.sh
2.6. 验证
查看日志
启动后会在logs下生成两个日志文件:logs/canal/canal.log
、logs/example/example.log
,查看这两个日志,保证没有报错日志。
如果是在虚拟机安装,最好给2个核数以上。
确保登陆的系统的hostname可以ping通。
在Mysql数据库中进行增删改查的操作,然后查看Kafka的topic为 example 的数据
kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic example
2.7. 关闭Canal
不用的时候一定要通过这个命令关闭,如果是用kill或者关机,当再次启动依然会提示要先执行stop.sh脚本后才能再启动。
$CANAL_HOME/bin/stop.sh
3 Spark实现实时数据分析
通过上一步我们已经能够获取到 canal_test 库的变化数据
,并且已经可将将变化的数据实时推送到Kafka中
,Kafka中接收到的数据是一条Json格式的数据,我们需要对 INSERT 和 UPDATE 类型的数据处理,并且只处理状态为1的数据,然后需要计算 mor_rate 的变化,并判断 mor_rate 的风险等级,0-75%为G1等级,75%-80%为R1等级,80%-100%为R2等级。最后将处理的结果保存到DB,可以保存到Redis、Mysql、MongoDB,或者推送到Kafka都可以。这里是将结果数据保存到了Mysql。
3.1 在Mysql中创建如下两张表
-- 在canal_test库下创建表
CREATE TABLE `policy_cred` (
p_num varchar(22) NOT NULL,
policy_status varchar(2) DEFAULT NULL COMMENT '状态:0、1',
mor_rate decimal(20,4) DEFAULT NULL,
load_time datetime DEFAULT NULL,
PRIMARY KEY (`p_num`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 在real_result库下创建表
CREATE TABLE `real_risk` (
p_num varchar(22) NOT NULL,
risk_rank varchar(8) DEFAULT NULL COMMENT '等级:G1、R1、R2',
mor_rate decimal(20,4) ,
ch_mor_rate decimal(20,4),
load_time datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.2 Spark代码开发
3.2.1 在resources下new一个项目的配置文件my.properties
## spark
# spark://cdh3:7077
spark.master=local[2]
spark.app.name=m_policy_credit_app
spark.streaming.durations.sec=10
spark.checkout.dir=src/main/resources/checkpoint
## Kafka
bootstrap.servers=node1:9092,node2:9092,node3:9092
group.id=m_policy_credit_gid
# latest, earliest, none
auto.offset.reset=latest
enable.auto.commit=false
kafka.topic.name=example
## Mysql
mysql.jdbc.driver=com.mysql.jdbc.Driver
mysql.db.url=jdbc:mysql://node1:3306/real_result
mysql.user=root
mysql.password=123456
mysql.connection.pool.size=10
3.2.2 在pom.xml文件中引入如下依
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<spark.version>2.4.0</spark.version>
<canal.client.version>1.1.2</canal.client.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.client.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark -->
<!-- spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming-kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
3.2.3 在scala源码目录下的包下编写配置文件的工具类
package oldlu.spark
import java.util.Properties
/**
* Properties的工具类
* <p>
* Created by oldlu on 2021-06-29 14:05
*/
object PropertiesUtil{
private val properties:Properties=new Properties
/**
*
* 获取配置文件Properties对象
*
* @author oldlu
* @return java.util.Properties
* date 2021/6/29 14:24
*/
def getProperties():Properties={
if(properties.isEmpty){
//读取源码中resource文件夹下的my.properties配置文件
val reader=getClass.getResourceAsStream("/my.properties")
properties.load(reader)
}
properties
}
/**
*
* 获取配置文件中key对应的字符串值
*
* @author oldlu
* @return java.util.Properties
* @date 2021/6/29 14:24
*/
def getPropString(key:String):String={
getProperties().getProperty(key)
}
/**
*
* 获取配置文件中key对应的整数值
*
* @author oldlu
* @return java.util.Properties
* @date 2021/6/29 14:24
*/
def getPropInt(key:String):Int={
getProperties().getProperty(key).toInt
}
/**
*
* 获取配置文件中key对应的布尔值
*
* @author oldlu
* @return java.util.Properties
* @date 2021/6/29 14:24
*/
def getPropBoolean(key:String):Boolean={
getProperties().getProperty(key).toBoolean
}
}
3.2.4 在scala源码目录下的包下编写数据库操作的工具类
package oldlu.spark
import java.sql.{Connection,DriverManager,PreparedStatement,ResultSet,SQLException}
import java.util.concurrent.LinkedBlockingDeque
import scala.collection.mutable.ListBuffer
/**
* Created by oldlu on 2021/11/14 20:34
*/
object JDBCWrapper{
private var jdbcInstance:JDBCWrapper=_
def getInstance():JDBCWrapper={
synchronized{
if(jdbcInstance==null){
jdbcInstance=new JDBCWrapper()
}
}
jdbcInstance
}
}
class JDBCWrapper {
// 连接池的大小
val POOL_SIZE :Int =PropertiesUtil.getPropInt("mysql.connection.pool.size")
val dbConnectionPool = new LinkedBlockingDeque[Connection](POOL_SIZE)
try
Class.forName(PropertiesUtil.getPropString("mysql.jdbc.driver"))
catch
{
case e:
ClassNotFoundException =>e.printStackTrace()
}
for(i<-0
until POOL_SIZE)
{
try {
val conn = DriverManager.getConnection(
PropertiesUtil.getPropString("mysql.db.url"),
PropertiesUtil.getPropString("mysql.user"),
PropertiesUtil.getPropString("mysql.password"));
dbConnectionPool.put(conn)
} catch {
case e:
Exception =>e.printStackTrace()
}
}
def getConnection():Connection =
synchronized {
while (0 == dbConnectionPool.size()) {
try {
Thread.sleep(20)
} catch {
case e:
InterruptedException =>e.printStackTrace()
}
}
dbConnectionPool.poll()
}
/**
* 批量插入
*
* @param sqlText sql语句字符
* @param paramsList 参数列表
* @return Array[Int]
*/
def doBatch(sqlText:String, paramsList:ListBuffer[ParamsList]):Array[Int]=
{
val conn:Connection = getConnection()
var ps:PreparedStatement = null
var result:Array[Int] = null
try {
conn.setAutoCommit(false)
ps = conn.prepareStatement(sqlText)
for (paramters< -paramsList) {
paramters.params_Type match {
case "real_risk" =>{
println("$$$\\treal_risk\\t" + paramsList)
// // p_num, risk_rank, mor_rate, ch_mor_rate, load_time
ps.setObject(1, paramters.p_num)
ps.setObject(2, paramters.risk_rank)
ps.setObject(3, paramters.mor_rate)
ps.setObject(4, paramters.ch_mor_rate)
ps.setObject(5, paramters.load_time)
}
}
ps.addBatch()
}
result = ps.executeBatch
conn.commit()
} catch {
case e:
Exception =>e.printStackTrace()
} finally{
if (ps != null) try {
ps.close()
} catch {
case e:
SQLException =>e.printStackTrace()
}
if (conn != null) try {
dbConnectionPool.put(conn)
} catch {
case e:
InterruptedException =>e.printStackTrace()
}
}
result
}
}
3.2.5 在scala源码目录下的包下编写Spark程序代码
package oldlu.spark
import com.alibaba.fastjson.{JSON,JSONArray,JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level,Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds,StreamingContext}
import scala.collection.mutable.ListBuffer
/**
* Created by oldlu on 2019/3/16 15:11
*/
object M_PolicyCreditApp{
def main(args:Array[String]):Unit={
// 设置日志的输出级别
Logger.getLogger("org").setLevel(Level.ERROR)
val conf=new SparkConf()
.setMaster(PropertiesUtil.getPropString("spark.master"))
.setAppName(PropertiesUtil.getPropString("spark.app.name"))
// !!必须设置,否则Kafka数据会报无法序列化的错误
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
//如果环境中已经配置HADOOP_HOME则可以不用设置hadoop.home.dir
System.setProperty("hadoop.home.dir","/Users/oldluyuan/soft/hadoop-2.9.2")
val ssc=new StreamingContext(conf,Seconds(PropertiesUtil.getPropInt("spark.streaming.durations.sec").toLong))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint(PropertiesUtil.getPropString("spark.checkout.dir"))
val kafkaParams=Map[String,Object](
大数据分析处理框架——离线分析(hive,pig,spark)近似实时分析(Impala)和实时分析(stormspark streaming)
大数据Spark Structured Streaming集成 Kafka