rdd.saveAsCassandraTable() 创建表但不将 rdd 项写入 Cassandra

Posted

技术标签:

【中文标题】rdd.saveAsCassandraTable() 创建表但不将 rdd 项写入 Cassandra【英文标题】:rdd.saveAsCassandraTable()creates table but does not write rdd items to Cassandra 【发布时间】:2016-05-19 15:26:22 【问题描述】:

我正在尝试使用 Scala (2.11)、Spark Streaming (1.5.0) 和 Cassandra(3.5) 创建一个日志处理应用程序。目前,在接收到第一组 rdd 项目并运行如下所示的 foreachRDD(...) 时,

    集合中的第一个元素打印没有任何问题

    saveAsCassandraTable() 方法在 Cassandra 中正确创建所需的表模式,但不会在表中插入任何 RDD 条目。

    logitems.foreachRDD(items => 
      if (items.count() == 0)
        println("No log item received")
      else
        val first = items.first()
        println(first.timestamp)  // WORKS: Shows the timestamp in the first rdd element
    
        items.saveAsCassandraTable("analytics", "test_logs", SomeColumns("timestamp", "c_ip", "c_referrer", "c_user_agent"))
        //table schema is created but the rdd items are not written
      
    )
    
    
    
    16/05/19 16:15:06 INFO Cluster: New Cassandra host /192.168.1.95:9042 added
    16/05/19 16:15:06 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
    16/05/19 16:15:07 INFO SparkContext: Starting job: foreachRDD at StreamingApp.scala:27
    16/05/19 16:15:07 INFO DAGScheduler: Got job 8 (foreachRDD at StreamingApp.scala:27) with 8 output partitions
    16/05/19 16:15:07 INFO DAGScheduler: Final stage: ResultStage 6(foreachRDD at StreamingApp.scala:27)
    16/05/19 16:15:07 INFO DAGScheduler: Parents of final stage: List()
    16/05/19 16:15:07 INFO DAGScheduler: Missing parents: List()
    16/05/19 16:15:07 INFO DAGScheduler: Submitting ResultStage 6 (MapPartitionsRDD[9] at map at StreamingApp.scala:24), which has no missing parents
    16/05/19 16:15:07 INFO MemoryStore: ensureFreeSpace(13272) called with curMem=122031, maxMem=1538166620
    16/05/19 16:15:07 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 13.0 KB, free 1466.8 MB)
    16/05/19 16:15:07 INFO MemoryStore: ensureFreeSpace(5909) called with curMem=135303, maxMem=1538166620
    16/05/19 16:15:07 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 5.8 KB, free 1466.8 MB)
    16/05/19 16:15:07 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:63323 (size: 5.8 KB, free: 1466.8 MB)
    16/05/19 16:15:07 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:861
    16/05/19 16:15:07 INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 6 (MapPartitionsRDD[9] at map at StreamingApp.scala:24)
    16/05/19 16:15:07 INFO TaskSchedulerImpl: Adding task set 6.0 with 8 tasks
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 34, localhost, PROCESS_LOCAL, 1943 bytes)
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 35, localhost, PROCESS_LOCAL, 1943 bytes)
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 36, localhost, PROCESS_LOCAL, 1943 bytes)
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 3.0 in stage 6.0 (TID 37, localhost, PROCESS_LOCAL, 1943 bytes)
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 4.0 in stage 6.0 (TID 38, localhost, PROCESS_LOCAL, 1943 bytes)
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 5.0 in stage 6.0 (TID 39, localhost, PROCESS_LOCAL, 1943 bytes)
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 6.0 in stage 6.0 (TID 40, localhost, PROCESS_LOCAL, 1943 bytes)
    16/05/19 16:15:07 INFO Executor: Running task 0.0 in stage 6.0 (TID 34)
    16/05/19 16:15:07 INFO Executor: Running task 1.0 in stage 6.0 (TID 35)
    16/05/19 16:15:07 INFO Executor: Running task 4.0 in stage 6.0 (TID 38)
    16/05/19 16:15:07 INFO Executor: Running task 5.0 in stage 6.0 (TID 39)
    16/05/19 16:15:07 INFO Executor: Running task 3.0 in stage 6.0 (TID 37)
    16/05/19 16:15:07 INFO Executor: Running task 2.0 in stage 6.0 (TID 36)
    16/05/19 16:15:07 INFO Executor: Running task 6.0 in stage 6.0 (TID 40)
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_6 locally
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_3 locally
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_4 locally
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_2 locally
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_1 locally
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_5 locally
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_0 locally
    16/05/19 16:15:10 INFO JobScheduler: Added jobs for time 1463667310000 ms
    16/05/19 16:15:15 INFO JobScheduler: Added jobs for time 1463667315000 ms
    16/05/19 16:15:20 INFO JobScheduler: Added jobs for time 1463667320000 ms
    16/05/19 16:15:25 INFO JobScheduler: Added jobs for time 1463667325000 ms
    16/05/19 16:15:30 INFO JobScheduler: Added jobs for time 1463667330000 ms
    16/05/19 16:15:35 INFO JobScheduler: Added jobs for time 1463667335000 ms
    16/05/19 16:15:40 INFO JobScheduler: Added jobs for time 1463667340000 ms
    16/05/19 16:15:45 INFO JobScheduler: Added jobs for time 1463667345000 ms
    
    .... continues until program is manually terminated
    

我很高兴有任何关于在哪里解决这个问题的建议。

我附上了 spark ui 的截图。

【问题讨论】:

您是否发现任何其他错误? 无...只有重复的消息说 (16/05/19 16:15:45 INFO JobScheduler: added jobs for time 1463667345000 ms) 直到我不得不终止工作 在完成任务大约等待 20 分钟后,我在 com.datastax.driver.core.exceptions.UnavailableException.copy(UnavailableException.java:128) 收到以下错误 16/05/19 23:36:04 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@2c166223 com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive) com.datastax.driver.core.exceptions.UnavailableException.copy(UnavailableException.java:128) 【参考方案1】:

我怀疑随后对saveAsCassandraTable 的调用失败了,因为该表已经存在。您可能应该在流循环之外制作表格。

我会检查切换到saveToCassandra 是否可以解决问题。如果没有,获取执行程序日志或流式 UI 的屏幕截图可能会有所帮助。

【讨论】:

嗨 RussS,我实际上是从 saveToCassandra() 开始的,但得到了相同的行为。然后我切换到 saveAsCassandraTable() 只是为了确保 Spark 实际上到达了 Cassandra。但是你在 foreach 调用中使用 saveAsCassandraTable 绝对是正确的 - 我将恢复到前者。 可以粘贴您在流式 UI 中看到的内容吗? 我添加了spark ui的截图。通常处理我的小数据集只需要几毫秒而不需要 saveToCassandra() 步骤。但是,通过这一步,它会持续几分钟,直到我终止任务。 我使用“local[*]”将它作为本地作业运行,所以我假设它使用了所有可用的内核 您添加的错误表明存在可用性问题。您确定所有必需的节点都已启动吗?

以上是关于rdd.saveAsCassandraTable() 创建表但不将 rdd 项写入 Cassandra的主要内容,如果未能解决你的问题,请参考以下文章