java Spark 读取hbase数据

Posted 曾经是最好

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java Spark 读取hbase数据相关的知识,希望对你有一定的参考价值。

引用的jar包(maven)

     
    <properties>
        <java.version>1.8</java.version>
        <ch.qos.logback.version>1.2.3</ch.qos.logback.version>
        <sharding-sphere.version>4.1.1</sharding-sphere.version>
        <spark.version>2.4.6</spark.version>
        <hbase.version>2.3.0</hbase.version>
        <hadoop.version>2.7.4</hadoop.version>
    </properties>

   <!-- Spring HBase 依赖 -->
        <!--==================hadoop ===================-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--==================HBase ===================-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-annotations -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-annotations</artifactId>
            <version>${hbase.version}</version>
        </dependency>


        <!--spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>ch.qos.logback</groupId>
                    <artifactId>logback-classic</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-to-slf4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>ch.qos.logback</groupId>
                    <artifactId>logback-classic</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

 

 

方式一: 注意类中不能有其他方法自动注解方式不然报  

org.apache.spark.SparkException: Task not serializable
/**
 * 必须序化使用
 * */
@Component
public class SparkOnHbaseTest implements Serializable {

    public void getHbase() {


        SparkSession spark = SparkSession.builder().master("local[*]").appName("HBASEDATA")
                .getOrCreate();

        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

        String tableName = "users";
        String FAMILY = "personal";
        String COLUM_ID = "id";
        String COLUM_NAME = "name";
        String COLUM_PHONE = "phone";

        // Hbase配置
        Configuration hconf = HBaseConfiguration.create();
        hconf.set("hbase.zookeeper.quorum", "192.168.0.124");
        hconf.set("hbase.zookeeper.property.clientPort", "9095");
        hconf.set(TableInputFormat.INPUT_TABLE, tableName);
//
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes(FAMILY));
        scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COLUM_ID));
        scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COLUM_NAME));
        scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COLUM_PHONE));
        try {

            //添加scan
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
            String ScanToString = Base64.encodeBytes(proto.toByteArray());
            hconf.set(TableInputFormat.SCAN, ScanToString);

            //读HBase数据转化成RDD
            JavaPairRDD<ImmutableBytesWritable, Result> hbaseRDD = sc.newAPIHadoopRDD(hconf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
            hbaseRDD.cache();// 对myRDD进行缓存
            long count = hbaseRDD.count();
            System.out.println("数据总条数:" + count);

            JavaRDD<Row> rrd = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Row>() {

                @Override
                public Row call(Tuple2<ImmutableBytesWritable, Result> tuple2) throws Exception {
                    Result result = tuple2._2();
                    String rowKey = Bytes.toString(result.getRow());
                    String id = Bytes.toString(result.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COLUM_ID)));
                    String name = Bytes.toString(result.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COLUM_NAME)));
                    return RowFactory.create(rowKey, id, name);
                }

            });
            String ds = ":";
////写入数据到hdfs系统
////            rrd.saveAsTextFile("hdfs://********:8020/tmp/test");
////
////            hbaseRDD.unpersist();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
    }
}

 

 

方式二:

/**
* 可以不序化使用
* */
@Component
public class HbaseGetData {

@Resource
private HbaseTemplate hbaseTemplate;

@Resource
private JavaSparkContext sc;

/**
* 可以成功获取数据
**/
public void getData_1() {

String tableName = "users";
String FAMILY = "personal";
String COLUM_ID = "id";
String COLUM_NAME = "name";
String COLUM_PHONE = "phone";

// Hbase配置
Configuration hconf = HBaseConfiguration.create(hbaseTemplate.getConfiguration());
hconf.set(TableInputFormat.INPUT_TABLE, tableName);

hconf.set(TableInputFormat.SCAN_COLUMNS, "personal:name personal:phone personal:id");
try {

//读HBase数据转化成RDD
JavaPairRDD<ImmutableBytesWritable, Result> hbaseRDD = sc.newAPIHadoopRDD(hconf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
hbaseRDD.cache();// 对myRDD进行缓存
long count = hbaseRDD.count();
System.out.println("数据总条数:" + count);

List<Result> list=hbaseRDD.map(t->t._2()).collect();
System.out.println("list size---"+list.size());
for(Result result:list){
List<Cell> cells=result.listCells();
System.out.println(Bytes.toString(CellUtil.cloneRow(cells.get(0))));
for(Cell cell:cells){
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}


/**
* 对hbase 的 DDL、DML操作或者使用HBaseUtils
*
* @Author wulincheng
* @Date 2020-7-13 15:14:08
* @Version 1.0
*/
public class HbaseTemplate{

private Logger log = LoggerFactory.getLogger(this.getClass());
/**
* hbase连接对象
*/
private Connection connection;

private org.apache.hadoop.conf.Configuration configuration;

public HbaseTemplate() {
}

// public HbaseTemplate(Connection connection) {
// setConnection(connection);
// }

@PostConstruct
private void init() throws IOException {
setConnection(ConnectionFactory.createConnection(configuration));
}

public HbaseTemplate(org.apache.hadoop.conf.Configuration configuration) throws IOException {
setConfiguration(configuration);
}

public Connection getConnection() {
return connection;
}

private Admin getAdmin() throws IOException {
return connection.getAdmin();
}

public void setConnection(Connection connection) {
this.connection = connection;
}

public Configuration getConfiguration() {
return configuration;
}

public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}

}


/**
* 读取hbase配置文件
* @author wulincheng
* @date 2020-7-14 12:11:18
* */
@ConfigurationProperties(prefix = "hbase")
public class HBaseProperties {

private Map<String, String> config;

public Map<String, String> getConfig() {
return config;
}

public void setConfig(Map<String, String> config) {
this.config = config;
}


}

/**
* HBase配置类
* @author wulincheng
* @date 2020-7-14 12:11:18
* https://hbase.apache.org/book.html#faq 官网的
* http://c.biancheng.net/view/6523.html hbase用法参考
* */
@Configuration
@EnableConfigurationProperties(HBaseProperties.class)
public class HBaseConfig {

private final HBaseProperties properties;

public HBaseConfig(HBaseProperties properties) {
this.properties = properties;
}

@Bean
public HbaseTemplate hbaseTemplate() {
// Connection connection = null;
// try {
// connection = ConnectionFactory.createConnection(configuration());
// } catch (IOException e) {
// e.printStackTrace();
// }
// return new HbaseTemplate(configuration());
// new HbaseTemplate(connection);

HbaseTemplate hbaseTemplate = new HbaseTemplate();
hbaseTemplate.setConfiguration(configuration());
// hbaseTemplate.setAutoFlush(true);
return hbaseTemplate;
}

@Bean
public Admin admin() {
Admin admin = null;
try {
Connection connection = ConnectionFactory.createConnection(configuration());
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
return admin;
}


public org.apache.hadoop.conf.Configuration configuration() {

org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();

Map<String, String> config = properties.getConfig();
Set<String> keySet = config.keySet();
for (String key : keySet) {
configuration.set(key, config.get(key));
}

return configuration;
}

}


@Configuration
@ConfigurationProperties(prefix="spark")
public class SparkContextBean implements Serializable {

//spark的安装地址
private String sparkHome = "";
//应用的名称
private String appName = "";
//master的地址
private String master = "";

@Bean
@ConditionalOnMissingBean(SparkConf.class)
public SparkConf sparkConf() throws Exception {
SparkConf conf = new SparkConf()
.setSparkHome(sparkHome)
.setAppName(appName)
.setMaster(master);
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
return conf;
}

@Bean
@ConditionalOnMissingBean(JavaSparkContext.class)
public JavaSparkContext javaSparkContext() throws Exception {
return new JavaSparkContext(sparkConf());
}

public String getSparkHome() {
return sparkHome;
}

public void setSparkHome(String sparkHome) {
this.sparkHome = sparkHome;
}

public String getAppName() {
return appName;
}

public void setAppName(String appName) {
this.appName = appName;
}

public String getMaster() {
return master;
}

public void setMaster(String master) {
this.master = master;
}
}


以上是关于java Spark 读取hbase数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark读取Hbase中的数据

Spark读取Hbase中的数据

如何使用 spark 从 hbase 读取

无法在纱线簇模式下读取带有火花的Hbase数据

spark将数据写入hbase以及从hbase读取数据

Spark 读取 HBase 数据