datastax 会话因大量并行查询而挂起

Posted

技术标签:

【中文标题】datastax 会话因大量并行查询而挂起【英文标题】:datastax session hangs with high volume parallel queries 【发布时间】:2019-04-20 01:10:53 【问题描述】:

大图。

并行处理 2000 个查询时,datastax 会话挂起。

并行查询

我正在使用包装 Datastax Cassandra 驱动程序的 Alpakka。我正在使用 Scala Play 框架。

要对大数据进行行计数,必须通过分区来完成。我使用以下代码计算每个分区的行数:

val futureList: ListBuffer[Future[Any]] = new ListBuffer[Future[Any]]
  val acc: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
  targets.isDefined match 
    case true =>
      targets.get.foreach 
        e =>
          val cq: CassandraQueries = new CassandraQueries()
          Logger.info("targets collected so far: "+acc.size)
          Logger.info("Calling count for "+e._1)


          futureList += cq.futureQuery("SELECT count(*) FROM " + keyspaceName + ".\"sparseData\" where label = " + e._2 + ";", sparseRowCountResult(acc, e._1), 120000)
      

      val results = Future.sequence(futureList.toList)

在我的一个键空间中,我有 2000 个分区,因此有 2000 个并行查询。

查询结果

查询由 Alpakka/Datastax 处理并返回Future[Seq[Row]].

Logger.info("furtureQuery: session closed -> "+ session.isClosed)
    val stmt = new SimpleStatement(query).setFetchSize(200).setReadTimeoutMillis(readTimeoutMillis)
    val sb: StringBuilder = new StringBuilder()
    val source = CassandraSource(stmt)
    source.runWith(Sink.seq).onComplete 
      case Success(f) => out(Some(f), None)

      case Failure(e) =>
        Logger.error("simpleQuery failed with " + e.getMessage)
        out(None, Some(e.getMessage))

    

异常和挂起 大约 1000 次查询后,我收到以下错误。在此之后,会话不会返回任何内容。 SuccessFailure 都不会出现。

akka.ConfigurationException: 配置中指定的记录器不能 加载 [akka.event.Logging$DefaultLogger] 由于 [akka.event.Logging$LoggerInitializationException:记录器 log1-Logging$DefaultLogger 没有响应 LoggerInitialized, 改为发送 [TIMEOUT]]

问题

我确信我可以延长日志记录的超时时间。但这是一个症状,而不是真正的问题。

我该怎么做:

配置会话连接以允许 2000 个并行请求?

将 Future.sequence 限制为已知的可能请求数?

还有

如何以编程方式从此类会话挂起中恢复?

【问题讨论】:

【参考方案1】:

您可以通过在创建集群实例时指定池选项来增加每个连接的进行中请求的数量,如下所示:

PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, 10240);

Cluster cluster = Cluster.builder()
    .withContactPoints("127.0.0.1")
    .withPoolingOptions(poolingOptions)
    .build();

但是您仍然需要在代码中处理BusyPoolException,因为在使用异步请求时,仍然很容易使某个特定连接过载。

更多信息在driver's documentation。

【讨论】:

【参考方案2】:

触发 2000 个查询会执行范围查询。利用集群对象元数据,获取令牌范围并计算键的令牌。然后,在一个范围查询中批处理属于同一范围的查询。

【讨论】:

以上是关于datastax 会话因大量并行查询而挂起的主要内容,如果未能解决你的问题,请参考以下文章

RestKit 请求因 HTTP 错误而挂起

Oozie-sqoop 工作流在 cloudera 中因心跳问题而挂起

了解 PostgreSQL 并行查询的会话中工作池中剩余多少并行工作人员?

在并行执行时,第二个会话将覆盖Selenium测试

oracle 怎样使用并行查询

Python套接字recv()由于硬件而挂起?