用于 Cassandra 的 Spark2 会话,sql 查询

Posted

技术标签:

【中文标题】用于 Cassandra 的 Spark2 会话,sql 查询【英文标题】:Spark2 session for Cassandra , sql queries 【发布时间】:2016-12-07 17:40:48 【问题描述】:

在 Spark-2.0 中,创建 Spark 会话的最佳方式是什么。因为在 Spark-2.0 和 Cassandra 中,API 都经过了重新设计,基本上弃用了 SqlContext(以及 CassandraSqlContext)。因此,为了执行 SQL,我创建了一个 Cassandra 会话(com.datastax.driver.core.Session) and use execute( " ")。或者我必须创建一个SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText) 方法。

我不知道两者的 SQL 限制 - 谁能解释一下。

另外,如果我必须创建 SparkSession - 我该怎么做 - 找不到任何合适的示例。随着 API 的重新设计,旧的示例不起作用。 我正在通过这个代码示例-DataFrames-不清楚这里使用的是什么 sql 上下文(这是正确的方法。) (由于某种原因,已弃用的 API 甚至没有编译 - 需要检查我的 Eclipse 设置)

谢谢

【问题讨论】:

【参考方案1】:

您需要 Cassandra Session 来从 Cassandra DB 创建/删除键空间和表。在 Spark 应用程序中,为了创建 Cassandra 会话,您需要将 SparkConf 传递给 CassandraConnector。在 Spark 2.0 中,您可以像下面这样操作。

 SparkSession spark = SparkSession
              .builder()
              .appName("SparkCassandraApp")
              .config("spark.cassandra.connection.host", "localhost")
              .config("spark.cassandra.connection.port", "9042")
              .master("local[2]")
              .getOrCreate();

CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");

如果您有现有的 Dataframe,那么您也可以使用 DataFrameFunctions.createCassandraTable(Df) 在 Cassandra 中创建表。请参阅 api 详细信息here。

您可以使用 spark-cassandra-connector 提供的 api 从 Cassandra DB 读取数据,如下所示。

Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() 
                
                    put("keyspace", "mykeyspace");
                    put("table", "mytable");
                
            ).load();

dataset.show(); 

您可以使用 SparkSession.sql() 方法在由 spark cassandra 连接器返回的 Dataframe 上创建的临时表上运行查询,如下所示。

dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();

【讨论】:

谢谢。这正是我一直在寻找的。使用 Spark 或 Cassandra 运行查询是否有任何限制优点/缺点 - 任何 sql 限制。了解创建键空间/表必须使用 cassandra。 还有 SparkSession.sql() - 我可以只针对临时表而不针对 Cassandra 表运行查询吗? Spark 文档没有说明这一点。 The docs state" 用于 SQL 解析的方言可以配置为 'spark.sql.dialect' 。对于 SQLContext,唯一可用的方言是“sql”,它使用 Spark SQL 提供的简单 SQL 解析器。在 HiveContext ,默认为“hiveql”,尽管“sql”也可用。由于 HiveQL 解析器更加完整,因此建议在大多数用例中使用。” No SparkSession.sql 不限于临时表。您可以使用 Spark JDBC 连接到不同的数据库。使用 SparkCassandraConnector 可以轻松使用 Cassandra DB。 我做了所有这些,创建了 sparks 会话,Cassandra 会话 - 使用 Cassandra 会话创建了键空间和表 - 将数据插入到表中。我可以在 cqlsh shell 中看到所有内容。但是当我运行一个简单的 spark 查询时:Dataset&lt;Row&gt; sqlDF = sparks.sql("SELECT * FROM java_api.products"); errors: Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: java_api.products`` 但如果这样做sparks.read().format *( ).options (HashMap () - 它可以工作。

以上是关于用于 Cassandra 的 Spark2 会话,sql 查询的主要内容,如果未能解决你的问题,请参考以下文章

使用 java 模拟多个用户连接到 Cassandra

如何在 Spark 中从 cassandra datastax 云中读取数据

pyspark读取csv文件multiLine选项不适用于具有换行符spark2.3和spark2.2的记录

使用 c# 将数据记录到 cassandra

如果键空间不存在,Cassandra 连接到集群

用于启用 SSL 的 Spring Boot Cassandra 配置