如何使用 spark 从 hbase 读取
Posted
技术标签:
【中文标题】如何使用 spark 从 hbase 读取【英文标题】:How to read from hbase using spark 【发布时间】:2014-07-30 15:22:27 【问题描述】:下面的代码将从hbase读取,然后将其转换为json结构并转换为schemaRDD,但问题是我是using List
来存储json字符串然后传递给javaRDD,大约100 GB的数据主机将在内存中加载数据。从 hbase 加载数据然后执行操作,然后转换为 JavaRDD 的正确方法是什么。
package hbase_reader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import com.google.common.collect.Lists;
public class hbase_reader
public static void main(String[] args) throws IOException, ParseException
List<String> jars = Lists.newArrayList("");
SparkConf spconf = new SparkConf();
spconf.setMaster("local[2]");
spconf.setAppName("HBase");
//spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
spconf.setJars(jars.toArray(new String[jars.size()]));
JavaSparkContext sc = new JavaSparkContext(spconf);
//spconf.set("spark.executor.memory", "1g");
JavaSQLContext jsql = new JavaSQLContext(sc);
HBaseConfiguration conf = new HBaseConfiguration();
String tableName = "HBase.CounData1_Raw_Min1";
HTable table = new HTable(conf,tableName);
try
ResultScanner scanner = table.getScanner(new Scan());
List<String> jsonList = new ArrayList<String>();
String json = null;
for(Result rowResult:scanner)
json = "";
String rowKey = Bytes.toString(rowResult.getRow());
for(byte[] s1:rowResult.getMap().keySet())
String s1_str = Bytes.toString(s1);
String jsonSame = "";
for(byte[] s2:rowResult.getMap().get(s1).keySet())
String s2_str = Bytes.toString(s2);
for(long s3:rowResult.getMap().get(s1).get(s2).keySet())
String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
jsonSame += "\""+s2_str+"\":"+s3_str+",";
jsonSame = jsonSame.substring(0,jsonSame.length()-1);
json += "\""+s1_str+"\""+":"+jsonSame+""+",";
json = json.substring(0,json.length()-1);
json = "\"RowKey\":\""+rowKey+"\","+json+"";
jsonList.add(json);
JavaRDD<String> jsonRDD = sc.parallelize(jsonList);
JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);
System.out.println(schemaRDD.take(2));
finally
table.close();
【问题讨论】:
【参考方案1】:使用 Spark (Scala) 读取 HBase 数据的基本示例,您也可以使用 Java 编写:
import org.apache.hadoop.hbase.client.HBaseAdmin, Result
import org.apache.hadoop.hbase. HBaseConfiguration, HTableDescriptor
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark._
object HBaseRead
def main(args: Array[String])
val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val tableName = "table1"
System.setProperty("user.name", "hdfs")
System.setProperty("HADOOP_USER_NAME", "hdfs")
conf.set("hbase.master", "localhost:60000")
conf.setInt("timeout", 120000)
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName))
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("Number of Records found : " + hBaseRDD.count())
sc.stop()
更新 -2016
从 Spark 1.0.x+ 开始,现在您也可以使用 Spark-HBase 连接器了:
要包含的 Maven 依赖项:
<dependency>
<groupId>it.nerdammer.bigdata</groupId>
<artifactId>spark-hbase-connector_2.10</artifactId>
<version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>
并在下面找到相同的示例代码:
import org.apache.spark._
import it.nerdammer.spark.hbase._
object HBaseRead extends App
val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
val sc = new SparkContext(sparkConf)
// For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:
val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
.select("DocID", "Title").inColumnFamily("SMPL")
println("Number of Records found : " + docRdd .count())
更新 - 2017 年
从 Spark 1.6.x+ 开始,现在您也可以使用 SHC 连接器(Hortonworks 或 HDP 用户):
要包含的 Maven 依赖项:
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc</artifactId>
<version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
</dependency>
使用此连接器的主要优点是它在 Schema 定义中具有灵活性,并且不需要像 nerdammer/spark-hbase-connector 中那样的硬编码参数。另请记住,它支持 Spark 2.x,因此此连接器非常灵活,并在问题和 PR 中提供端到端支持。
找到以下存储库路径以获取最新的自述文件和示例:
Hortonworks Spark HBase Connector
您还可以将此 RDD 转换为 DataFrames 并在其上运行 SQL,或者您可以将这些 Dataset 或 DataFrames 映射到用户定义的 Java Pojo 或 Case 类。效果很好。
如果您还有其他需要,请在下方评论。
【讨论】:
这是一个很好的信息,但是如何访问 spark-shell REPL 中的库......对于 Spark 1.5.2 和 HBase 1.1.2.2.3.4.0-3485,提供了哪些包--packages 标志?例如:我使用spark-shell --packages $SPARK_PKGS with export SPARK_PKGS=$(cat << END | xargs echo | sed 's/ /,/g' org.apache.hadoop:hadoop-aws:2.7.1 com.amazonaws:aws-java-sdk-s3:1.10.30 com.databricks:spark-csv_2.10:1.3.0 com.databricks:spark-avro_2.10:2.0.1 ??? END )
我认为使用 spark-shell 您还可以提及 --jars 必须包含您的 Hbase 库。
我们如何在 Python 中使用 PySpark 做到这一点?
我尝试使用连接器,但由于需要过滤器而卡住了。现在考虑我是否应该扩展连接器并构建它们,或者使用其他东西。建议?
@Raghav 我也建议查看以下内容:github.com/hortonworks-spark/shc【参考方案2】:
我更喜欢从 hbase 读取数据并在 spark 中进行 json 操作。 Spark 提供JavaSparkContext.newAPIHadoopRDD 函数从hadoop 存储读取数据,包括HBase。您必须提供 HBase 配置、表名和扫描配置参数和表输入格式以及它的 key-value
您可以使用table input format 类及其作业参数来提供表名和扫描配置
示例:
conf.set(TableInputFormat.INPUT_TABLE, "tablename");
JavaPairRDD<ImmutableBytesWritable, Result> data =
jsc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
然后你可以在 spark 中进行 json 操作。由于spark可以在内存满的时候重新计算,所以它只会加载重新计算部分(cmiiw)需要的数据,所以你不必担心数据大小
【讨论】:
您能否提供一个完整但简单的示例?使用 Java 从 Spark 中的 hbase 获取数据的方法并不多 一个正确添加扫描的例子会很好! @markgiaconia This 涵盖了完整的代码。【参考方案3】:由于这个问题并不新鲜,所以目前还有其他一些选择:
hbase-spark,一个直接在 HBase 仓库中可用的模块 Spark-on-HBase 来自 Hortonworks我对第一个项目了解不多,但看起来它不支持 Spark 2.x。但是,它在 RDD 级别对 Spark 1.6.x 有丰富的支持。
另一方面,Spark-on-HBase 拥有 Spark 2.0 和即将推出的 Spark 2.1 的分支。这个项目非常有前途,因为它专注于 Dataset/DataFrame API。在底层,它实现了标准的 Spark 数据源 API,并利用 Spark Catalyst 引擎进行查询优化。开发人员声称here 能够进行分区修剪、列修剪、谓词下推和实现数据局部性。
下面是一个简单的例子,它使用来自这个repo 和Spark 2.0.2 的com.hortonworks:shc:1.0.0-2.0-s_2.11
工件:
case class Record(col0: Int, col1: Int, col2: Boolean)
val spark = SparkSession
.builder()
.appName("Spark HBase Example")
.master("local[4]")
.getOrCreate()
def catalog =
s"""
|"table":"namespace":"default", "name":"table1",
|"rowkey":"key",
|"columns":
|"col0":"cf":"rowkey", "col":"key", "type":"int",
|"col1":"cf":"cf1", "col":"col1", "type":"int",
|"col2":"cf":"cf2", "col":"col2", "type":"boolean"
|
|""".stripMargin
val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0))
// write
spark
.createDataFrame(artificialData)
.write
.option(HBaseTableCatalog.tableCatalog, catalog)
.option(HBaseTableCatalog.newTable, "5")
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
// read
val df = spark
.read
.option(HBaseTableCatalog.tableCatalog, catalog)
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
df.count()
【讨论】:
我在 Spark-on-HBase 方面关注了小白兔,看来最初的开发人员早就离开了 Hortonworks,并且显然扼杀了将这些功能集成到 HBase-trunk 中。 特别是:issues.apache.org/jira/browse/HBASE-14789 有一个开放的子任务,issues.apache.org/jira/browse/HBASE-15335,基本上已经完成,但是从 2016 年年中开始就没有更新过。目前,提供的补丁不再适用,所以即使尽管剩余的工作量可能不大,但仍有一些维护工作要做。另见reviews.apache.org/r/47547【参考方案4】:只是添加关于如何添加扫描的评论:
TableInputFormat 具有以下属性:
SCAN_ROW_START SCAN_ROW_STOP
conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey");
conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
【讨论】:
其他扫描参数可以在hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/…的Fields
部分找到。以上是关于如何使用 spark 从 hbase 读取的主要内容,如果未能解决你的问题,请参考以下文章
如何使用Spark Streaming读取HBase的数据并写入到HDFS
spark 从 hbase 读取数据,worker 是不是需要从远程驱动程序中获取分区数据?