在一个查询中从多个主键中提取条目

Posted

技术标签:

【中文标题】在一个查询中从多个主键中提取条目【英文标题】:Extracting entries from multiple primary keys in one query 【发布时间】:2017-08-10 16:48:49 【问题描述】:

我正在使用 spark-cassandra-connector(最新版本,我认为是 2.0.3)将数据从 Cassandra (3.10) 提取到 Spark2。我想知道是否有一种方法可以在一个查询中从多个主键中提取。 PK 是原子的(只有一个字段定义了密钥)。但它们是散列,所以我不能进行范围查询。有没有办法提取

val set = Set(hash1,hash2,hash3)

一个电话sc.cassandraTable("keyspace","table").where("id =?",set) 或类似的东西。

我将 Scala 与 Spark 结合使用。

编辑:

一个建议是使用以下方法

sc.cassandraTable("keyspace","table").where("id in ?",set)

这种方法的问题是,当我尝试使用以这种方式提取的数据时,会出现以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8463.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8463.0 (TID 25187, worker2, executor 28): java.io.IOException: Exception during preparation of SELECT "sha256", "id", "label", "label_version", "data" FROM "keyspace"."data" WHERE sha256 in ?   ALLOW FILTERING: Cannot convert object [Ljava.lang.String;@2e5a7dfc of type class [Ljava.lang.String; to java.util.ArrayList[AnyRef].
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [Ljava.lang.String;@2e5a7dfc of type class [Ljava.lang.String; to java.util.ArrayList[AnyRef].
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:43)
    at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$34.applyOrElse(TypeConverter.scala:671)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:41)
    at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:659)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$35.applyOrElse(TypeConverter.scala:875)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:41)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:858)
    at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:54)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:858)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:316)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:315)
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:315)
    ... 29 more
Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
  ... 60 elided
Caused by: java.io.IOException: Exception during preparation of SELECT "sha256", "id", "label", "label_version", "data" FROM "keyspace"."data" WHERE sha256 in ?   ALLOW FILTERING: Cannot convert object [Ljava.lang.String;@2e5a7dfc of type class [Ljava.lang.String; to java.util.ArrayList[AnyRef].
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayB
uffer.scala:48)
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  at scala.collection.AbstractIterator.to(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
  at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
  at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
  ... 3 more
Caused by: com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [Ljava.lang.String;@2e5a7dfc of type class [Ljava.lang.String; to java.util.ArrayList[AnyRef].
  at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:43)
  at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$34.applyOrElse(TypeConverter.scala:671)
  at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:41)
  at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:659)
  at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$35.applyOrElse(TypeConverter.scala:875)
  at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:41)
  at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:858)
  at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:54)
  at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:858)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:316)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:315)
  at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:315)
  ... 29 more

所以对于分区/主键,这个命令是有问题的,因为它需要ALLOW FILTERING,尽管连接器在大多数 CQL 语句的末尾不提供该部分吗?即便如此,我已经看到了这种用于分区键的命令示例(在 CQL 中而不是在连接器中)。那么这是连接器(尚)不支持的东西还是故意设计的?任何想法如何修复或如何以不同的方式执行我所需的查询?

【问题讨论】:

也许in will work - where("id in ?", set) 感谢您的建议,我测试了它,但它会引发过滤错误,请参阅主帖中的编辑。 【参考方案1】:

sc.cassandraTable("keyspace","table").where("id in ?",set)

是正确的方法,但集合需要是可迭代的、List 或 Seq,而不是数组(这是我所拥有的并引发了错误)。

【讨论】:

以上是关于在一个查询中从多个主键中提取条目的主要内容,如果未能解决你的问题,请参考以下文章

SQL 查询:在主键中查找间隙

T-SQL 过于昂贵的查询,在 where/have 条件和复合主键中选择

休眠仅保存具有复合主键中的父外键的子表条目

afterupdate 在主键中留下空白

追加查询,跳过重复条目(重复项不是主键)

RedisTemplate - 通过键集散列获取多个条目