使用foreachPartition将结果写入外部存储

Posted ZL小屁孩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用foreachPartition将结果写入外部存储相关的知识,希望对你有一定的参考价值。

好久没有写了!!!记录一下 :

最近有个小伙伴问我,使用spark处理的数据存入mysql中老是导致mysql链接超时或中断,我看了一下他的代码,想揍人,

其代码如下:

dstream.foreachRDD rdd =>
  rdd.foreachPartition partitionRecords =>
  val connection = createNewConnection
  //将结果存入外部存储系统中
    partitionRecords.foreach(record => connection.send(record))
  

这种方式不可行!!!

最不济可以这样写:

dstream.foreachRDD rdd =>
  rdd.foreachPartition partitionRecords =>
  val connection = createNewConnection
  //将结果存入外部存储系统中
    partitionRecords.foreach(record => connection.send(record))
	connection.close()  //记得要关闭
  

这样可以保证每个partition只需要链接一次外部存储系统,最起码不会造成锁死等问题,提高了性能,但是并不能使不同的partition直接可以重复利用链接,为了重复利用此链接可以使用连接池来解决,使其不同partition之间可以共享链接:

dstream.foreachRDD rdd =>
  rdd.foreachPartition partitionRecords =>
  val connection = ConnectionPool.getConnection  //使用连接池使不同partition之间共享链接
  //将结果存入外部存储系统中
    partitionRecords.foreach(record => connection.send(record))
	//重用连接池
	ConnectionPool.returnConnection(connection)
  

提高了效率,又不会阻塞链接

另有连接池:

public class ConnectionPool 
    private static LinkedList<Connection> connectionQueue;

    static 
        try 
            Class.forName("com.mysql.jdbc.Driver");
         catch (ClassNotFoundException e) 
            e.printStackTrace();
        
    

    public synchronized static Connection getConnection() 
        try 
            if (connectionQueue == null) 
                connectionQueue = new LinkedList<Connection>();
                for (int i = 0; i < 5; i++) 
                    Connection conn = DriverManager.getConnection(
                            "jdbc:mysql://ip地址:3306/所处路径",
                            "username",
                            "password");
                    connectionQueue.push(conn);
                
            
         catch (Exception e) 
            e.printStackTrace();
        
        return connectionQueue.poll();

    
	
    public  static void returnConnection(Connection conn)connectionQueue.push(conn);

 

以上是关于使用foreachPartition将结果写入外部存储的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming之妙用foreachRDD和foreachPartition

Spark Streaming之妙用foreachRDD和foreachPartition

如何将火花流 DF 写入 Kafka 主题

Spark - 使用 foreachpartition 收集分区

如何将 Spark SQL 批处理作业结果写入 Apache Druid?

通过管道(C)写入外部程序