saveToCassandra 与 Cassandra Lucene 插件一起使用吗?

Posted

技术标签:

【中文标题】saveToCassandra 与 Cassandra Lucene 插件一起使用吗?【英文标题】:saveToCassandra works with Cassandra Lucene plugin? 【发布时间】:2016-03-28 17:32:22 【问题描述】:

我正在为 Cassandra 页面 (https://github.com/Stratio/cassandra-lucene-index) 实现示例,当我尝试使用 saveToCassandra 保存数据时,我得到了 NoSuchElementException 异常。 如果我使用 CassandraConnector.withSessionDo 我可以将元素添加到 Cassandra 中并且不会引发异常。

表格:

CREATE KEYSPACE demo
WITH REPLICATION = 'class' : 'SimpleStrategy', 'replication_factor': 1;
USE demo;
CREATE TABLE tweets (
    id INT PRIMARY KEY,
    user TEXT,
    body TEXT,
    time TIMESTAMP,
    latitude FLOAT,
    longitude FLOAT
);

CREATE CUSTOM INDEX tweets_index ON tweets ()
USING 'com.stratio.cassandra.lucene.Index'
WITH OPTIONS = 
    'refresh_seconds' : '1',
    'schema' : '
        fields : 
            id    : type : "integer",
            user  : type : "string",
            body  : type : "text", analyzer : "english",
            time  : type : "date", pattern : "yyyy/MM/dd", sorted : true,
            place : type : "geo_point", latitude:"latitude", longitude:"longitude"
        
    '
;

代码:

import org.apache.spark.SparkConf, SparkContext, Logging
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector._

object App extends Logging
    def main(args: Array[String]) 

        // Get the cassandra IP and create the spark context
        val cassandraIP = System.getenv("CASSANDRA_IP");
        val sparkConf = new SparkConf(true)
                        .set("spark.cassandra.connection.host", cassandraIP)
                        .set("spark.cleaner.ttl", "3600")
                        .setAppName("Simple Spark Cassandra Example")


        val sc = new SparkContext(sparkConf)

        // Works
        CassandraConnector(sparkConf).withSessionDo  session =>
           session.execute("INSERT INTO demo.tweets(id, user, body, time, latitude, longitude) VALUES (19, 'Name', 'Body', '2016-03-19 09:00:00-0300', 39, 39)")
        

        // Does not work
        val demo = sc.parallelize(Seq((9, "Name", "Body", "2016-03-29 19:00:00-0300", 29, 29)))
        // Raises the exception
        demo.saveToCassandra("demo", "tweets", SomeColumns("id", "user", "body", "time", "latitude", "longitude"))

    

例外:

16/03/28 14:15:41 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.util.NoSuchElementException: Column  not found in demo.tweets
    at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60)
    at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60)
    at scala.collection.Map$WithDefault.default(Map.scala:52)
    at scala.collection.MapLike$class.apply(MapLike.scala:141)
    at scala.collection.AbstractMap.apply(Map.scala:58)
    at com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:153)
    at com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:152)
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
    at com.datastax.spark.connector.cql.TableDef.<init>(Schema.scala:152)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:283)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:271)
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
    at scala.collection.immutable.Set$Set4.foreach(Set.scala:137)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:271)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:295)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:294)
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
    at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
    at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:294)
    at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:307)
    at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:304)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:304)
    at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275)
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
    at com.webradar.spci.spark.cassandra.App$.main(App.scala:27)
    at com.webradar.spci.spark.cassandra.App.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)    16/03/28 14:15:41 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.util.NoSuchElementException: Column  not found in demo.tweets
    at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60)
    at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60)
    at scala.collection.Map$WithDefault.default(Map.scala:52)
    at scala.collection.MapLike$class.apply(MapLike.scala:141)
    at scala.collection.AbstractMap.apply(Map.scala:58)
    at com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:153)
    at com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:152)
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
    at com.datastax.spark.connector.cql.TableDef.<init>(Schema.scala:152)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:283)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:271)
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
    at scala.collection.immutable.Set$Set4.foreach(Set.scala:137)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:271)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:295)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:294)
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
    at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
    at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:294)
    at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:307)
    at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:304)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:304)
    at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275)
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
    at com.webradar.spci.spark.cassandra.App$.main(App.scala:27)
    at com.webradar.spci.spark.cassandra.App.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

编辑: 版本

火花 1.6.0 卡桑德拉 3.0.3 Lucene 插件 3.0.3.1 为了创建 Jar,我使用 maven-assembly-plugin 来获得一个胖 JAR。

如果我删除自定义索引,我可以使用 saveToCassandra

【问题讨论】:

几个问题...您使用的是哪个版本的 spark、cassandra 连接器?你如何提交你的罐子?是罐子还是肥罐子? 我添加了版本 【参考方案1】:

问题似乎是由 Cassandra Spark 驱动程序中的问题引起的,而不是在插件中。

由于CASSANDRA-10217 Cassandra 3.x 每行索引不再需要在假列上创建。因此,从 Cassandra 3.x 开始,“CREATE CUSTOM INDEX %s ON %s(%s)”基于列的语法被替换为新的“CREATE CUSTOM INDEX %s ON %s ()" 基于行的语法。但是,DataStax Spark 驱动程序似乎还不支持这个新功能。

当调用“com.datastax.spark.connector.RDDFunctions.saveToCassandra”时,它会尝试加载与表列相关的表架构和索引架构。由于这个新的索引语法不再具有假列,因此由于列名为空,它会导致 NoSuchElementException。

但是,如果您使用先前的假列语法执行相同的示例,则 saveToCassandra 效果很好:

CREATE KEYSPACE demo
WITH REPLICATION = 'class' : 'SimpleStrategy', 'replication_factor': 1;
USE demo;
CREATE TABLE tweets (
    id INT PRIMARY KEY,
    user TEXT,
    body TEXT,
    time TIMESTAMP,
    latitude FLOAT,
    longitude FLOAT,
    lucene TEXT
);



CREATE CUSTOM INDEX tweets_index ON tweets (lucene)
USING 'com.stratio.cassandra.lucene.Index'
WITH OPTIONS = 
    'refresh_seconds' : '1',
    'schema' : '
        fields : 
            id    : type : "integer",
            user  : type : "string",
            body  : type : "text", analyzer : "english",
            time  : type : "date", pattern : "yyyy/MM/dd", sorted : true,
            place : type : "geo_point", latitude:"latitude", longitude:"longitude"
        
    '
;

【讨论】:

以上是关于saveToCassandra 与 Cassandra Lucene 插件一起使用吗?的主要内容,如果未能解决你的问题,请参考以下文章

cassandra集群缩容与剔除问题节点

cassandra安装配置

Cassandra 不使用本机方法

将从joinWithCassandraTable获取的CassandraRow转换为DataFrame

什么是“分布式交易”?

Oracle 到 Apache Cassandra 数据迁移