spark连接关系型数据库的几种方法
Posted lyy-blog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark连接关系型数据库的几种方法相关的知识,希望对你有一定的参考价值。
1.使用jdbcRDD的接口:
1 SparkConf conf = new SparkConf(); 2 conf.setAppName("Simple Application").setMaster("local"); 3 JavaSparkContext jsc = new JavaSparkContext(conf); 4 5 6 //1.直接使用jdbcRDD的构造函数 7 class DbConnection extends AbstractFunction0<Connection> implements 8 Serializable { 9 private static final long serialVersionUID = 1L; 10 private String driverClassName; 11 private String connectionUrl; 12 private String userName; 13 private String password; 14 15 public DbConnection(String driverClassName, String connectionUrl, 16 String userName, String password) { 17 this.driverClassName = driverClassName; 18 this.connectionUrl = connectionUrl; 19 this.userName = userName; 20 this.password = password; 21 } 22 23 @Override 24 public Connection apply() { 25 try { 26 Class.forName(driverClassName); 27 } catch (ClassNotFoundException e) { 28 } 29 Properties properties = new Properties(); 30 properties.setProperty("user", userName); 31 properties.setProperty("password", password); 32 Connection connection = null; 33 try { 34 connection = DriverManager.getConnection(connectionUrl, 35 properties); 36 } catch (SQLException e) { 37 } 38 return connection; 39 } 40 } 41 42 class MapResult extends AbstractFunction1<ResultSet, Object[]> 43 implements Serializable { 44 private static final long serialVersionUID = 1L; 45 46 public Object[] apply(ResultSet row) { 47 return JdbcRDD.resultSetToObjectArray(row); 48 } 49 } 50 51 String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8"; 52 String Driver="com.mysql.jdbc.Driver"; 53 String UserName = "root"; 54 String password = "pd"; 55 DbConnection dbConnection = new DbConnection(Driver, 56 Connection_url, UserName, password); 57 sql = "select * from (" + sql + ") as tmp where 0=? and 0=?"; 58 //lowerBound,upperBound均设置0,where条件就为恒真,这个是个处理技巧 59 JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<>(jsc.sc(), dbConnection, 60 sql, 0, 0, 1, new MapResult(), 61 ClassManifestFactory$.MODULE$.fromClass(Object[].class)); 62 JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, 63 ClassManifestFactory$.MODULE$.fromClass(Object[].class)); 64 65 66 //另外一种实现: 67 class DbConnectionFactory implements JdbcRDD.ConnectionFactory { 68 private static final long serialVersionUID = 1L; 69 private String driverClassName; 70 private String connectionUrl; 71 private String userName; 72 private String password; 73 74 public Connection getConnection() throws Exception { 75 Class.forName(driverClassName); 76 String url = connectionUrl; 77 Properties properties = new Properties(); 78 properties.setProperty("user", userName); 79 properties.setProperty("password", password); 80 return DriverManager.getConnection(url, properties); 81 } 82 83 public DbConnectionFactory(String driverClassName, String connectionUrl, 84 String userName, String password) { 85 this.driverClassName = driverClassName; 86 this.connectionUrl = connectionUrl; 87 this.userName = userName; 88 this.password = password; 89 } 90 91 } 92 93 String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8"; 94 sql = "select * from (" + sql + ") as tmp where 0=? and 0=?"; 95 DbConnectionFactory ConnectFactory = new DbConnectionFactory(Driver, 96 Connection_url, UserName, password) 97 javaRDD = JdbcRDD.create(jsc, new DbConnectionFactory(Driver, 98 Connection_url, UserName, password), sql, 0, 0, 1,new Function<ResultSet,Object[]>() 99 { 100 private static final long serialVersionUID = 1L; 101 public Object[] call(ResultSet resultSet) 102 { 103 return JdbcRDD.resultSetToObjectArray(resultSet); 104 } 105 });//直接返回JavaRDD<Object[]>,这个底层调用的是JdbcRDD(SparkContext sc, Function0<Connection> getConnection, String sql, long lowerBound, long upperBound, int numPartitions, Function1<ResultSet, T> mapRow, ClassTag<T> evidence$1) 106 //javaRDD =JdbcRDD.create(jsc, ConnectFactory, sql, 0, 0, 1);//该方法更加简洁,底层调用上面的create(JavaSparkContext paramJavaSparkContext, ConnectionFactory paramConnectionFactory, String paramString, long paramLong1, long paramLong2, int paramInt, Function<ResultSet, T> paramFunction)
以上是关于spark连接关系型数据库的几种方法的主要内容,如果未能解决你的问题,请参考以下文章