使用foreachPartition将结果写入外部存储
Posted ZL小屁孩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用foreachPartition将结果写入外部存储相关的知识,希望对你有一定的参考价值。
好久没有写了!!!记录一下 :
最近有个小伙伴问我,使用spark处理的数据存入mysql中老是导致mysql链接超时或中断,我看了一下他的代码,想揍人,
其代码如下:
dstream.foreachRDD rdd =>
rdd.foreachPartition partitionRecords =>
val connection = createNewConnection
//将结果存入外部存储系统中
partitionRecords.foreach(record => connection.send(record))
这种方式不可行!!!
最不济可以这样写:
dstream.foreachRDD rdd =>
rdd.foreachPartition partitionRecords =>
val connection = createNewConnection
//将结果存入外部存储系统中
partitionRecords.foreach(record => connection.send(record))
connection.close() //记得要关闭
这样可以保证每个partition只需要链接一次外部存储系统,最起码不会造成锁死等问题,提高了性能,但是并不能使不同的partition直接可以重复利用链接,为了重复利用此链接可以使用连接池来解决,使其不同partition之间可以共享链接:
dstream.foreachRDD rdd =>
rdd.foreachPartition partitionRecords =>
val connection = ConnectionPool.getConnection //使用连接池使不同partition之间共享链接
//将结果存入外部存储系统中
partitionRecords.foreach(record => connection.send(record))
//重用连接池
ConnectionPool.returnConnection(connection)
提高了效率,又不会阻塞链接
另有连接池:
public class ConnectionPool
private static LinkedList<Connection> connectionQueue;
static
try
Class.forName("com.mysql.jdbc.Driver");
catch (ClassNotFoundException e)
e.printStackTrace();
public synchronized static Connection getConnection()
try
if (connectionQueue == null)
connectionQueue = new LinkedList<Connection>();
for (int i = 0; i < 5; i++)
Connection conn = DriverManager.getConnection(
"jdbc:mysql://ip地址:3306/所处路径",
"username",
"password");
connectionQueue.push(conn);
catch (Exception e)
e.printStackTrace();
return connectionQueue.poll();
public static void returnConnection(Connection conn)connectionQueue.push(conn);
以上是关于使用foreachPartition将结果写入外部存储的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming之妙用foreachRDD和foreachPartition
Spark Streaming之妙用foreachRDD和foreachPartition
Spark - 使用 foreachpartition 收集分区