LightDB(pg) 通过DBeaver执行SQL发现SQL不能并行

Posted 紫无之紫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了LightDB(pg) 通过DBeaver执行SQL发现SQL不能并行相关的知识,希望对你有一定的参考价值。

LightDB(pg) 通过DBeaver执行SQL发现SQL不能并行

最近遇到一个问题,在dbeaver上执行explain analyze 发现SQL使用到了并行,且执行时间为30多秒;而去掉explain analyze 实际执行时,发现用了50多秒,然后通过观察后端进程,发现没有并行进程在执行此SQL。通过网上查找资料及自己跟踪代码发现是由于如下的原因导致的。简单来说就是如果设置了fetchsize 或者maxrows 会导致不能并行。

官方文档中也有此说明:

The client sends an Execute message with a non-zero fetch count. See the discussion of the extended query protocol. Since libpq currently provides no way to send such a message, this can only occur when using a client that does not rely on libpq. If this is a frequent occurrence, it may be a good idea to set max_parallel_workers_per_gather to zero in sessions where it is likely, so as to avoid generating query plans that may be suboptimal when run serially.

jdbc 中执行逻辑如下

private void sendOneQuery(SimpleQuery query, SimpleParameterList params, int maxRows,
      int fetchSize, int flags) throws IOException 
    boolean asSimple = (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) != 0;
    if (asSimple) 
      assert (flags & QueryExecutor.QUERY_DESCRIBE_ONLY) == 0
          : "Simple mode does not support describe requests. sql = " + query.getNativeSql()
          + ", flags = " + flags;
      sendSimpleQuery(query, params);
      return;
    

    assert !query.getNativeQuery().multiStatement
        : "Queries that might contain ; must be executed with QueryExecutor.QUERY_EXECUTE_AS_SIMPLE mode. "
        + "Given query is " + query.getNativeSql();

    // nb: if we decide to use a portal (usePortal == true) we must also use a named statement
    // (oneShot == false) as otherwise the portal will be closed under us unexpectedly when
    // the unnamed statement is next reused.

    boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
    boolean noMeta = (flags & QueryExecutor.QUERY_NO_METADATA) != 0;
    boolean describeOnly = (flags & QueryExecutor.QUERY_DESCRIBE_ONLY) != 0;
    boolean usePortal = (flags & QueryExecutor.QUERY_FORWARD_CURSOR) != 0 && !noResults && !noMeta
        && fetchSize > 0 && !describeOnly;
    boolean oneShot = (flags & QueryExecutor.QUERY_ONESHOT) != 0 && !usePortal;
    boolean noBinaryTransfer = (flags & QUERY_NO_BINARY_TRANSFER) != 0;
    boolean forceDescribePortal = (flags & QUERY_FORCE_DESCRIBE_PORTAL) != 0;

    // Work out how many rows to fetch in this pass.

    int rows;
    if (noResults) 
      rows = 1; // We're discarding any results anyway, so limit data transfer to a minimum
     else if (!usePortal) 
      rows = maxRows; // Not using a portal -- fetchSize is irrelevant
     else if (maxRows != 0 && fetchSize > maxRows) 
      // fetchSize > maxRows, use maxRows (nb: fetchSize cannot be 0 if usePortal == true)
      rows = maxRows;
     else 
      rows = fetchSize; // maxRows > fetchSize
    

    sendParse(query, params, oneShot);

    // Must do this after sendParse to pick up any changes to the
    // query's state.
    //
    boolean queryHasUnknown = query.hasUnresolvedTypes();
    boolean paramsHasUnknown = params.hasUnresolvedTypes();

    boolean describeStatement = describeOnly
        || (!oneShot && paramsHasUnknown && queryHasUnknown && !query.isStatementDescribed());

    if (!describeStatement && paramsHasUnknown && !queryHasUnknown) 
      int queryOIDs[] = query.getStatementTypes();
      int paramOIDs[] = params.getTypeOIDs();
      for (int i = 0; i < paramOIDs.length; i++) 
        // Only supply type information when there isn't any
        // already, don't arbitrarily overwrite user supplied
        // type information.
        if (paramOIDs[i] == Oid.UNSPECIFIED) 
          params.setResolvedType(i + 1, queryOIDs[i]);
        
      
    

    if (describeStatement) 
      sendDescribeStatement(query, params, describeOnly);
      if (describeOnly) 
        return;
      
    

    // Construct a new portal if needed.
    Portal portal = null;
    if (usePortal) 
      String portalName = "C_" + (nextUniqueID++);
      portal = new Portal(query, portalName);
    

    sendBind(query, params, portal, noBinaryTransfer);

    // A statement describe will also output a RowDescription,
    // so don't reissue it here if we've already done so.
    //
    if (!noMeta && !describeStatement) 
      /*
       * don't send describe if we already have cached the row description from previous executions
       *
       * XXX Clearing the fields / unpreparing the query (in sendParse) is incorrect, see bug #267.
       * We might clear the cached fields in a later execution of this query if the bind parameter
       * types change, but we're assuming here that they'll still be valid when we come to process
       * the results of this query, so we don't send a new describe here. We re-describe after the
       * fields are cleared, but the result of that gets processed after processing the results from
       * earlier executions that we didn't describe because we didn't think we had to.
       *
       * To work around this, force a Describe at each execution in batches where this can be a
       * problem. It won't cause more round trips so the performance impact is low, and it'll ensure
       * that the field information available when we decoded the results. This is undeniably a
       * hack, but there aren't many good alternatives.
       */
      if (!query.isPortalDescribed() || forceDescribePortal) 
        sendDescribePortal(query, portal);
      
    

    sendExecute(query, portal, rows);
  

对并发有影响的是上面的rows, 也即是下面发送的limit值


sendExecute:
    pgStream.sendChar('E'); // Execute
    pgStream.sendInteger4(4 + 1 + encodedSize + 4); // message size
    if (encodedPortalName != null) 
      pgStream.send(encodedPortalName); // portal name
    
    pgStream.sendChar(0); // portal name terminator
    pgStream.sendInteger4(limit); // row limit

LightDB 中对上述请求的处理逻辑如下

由于是’E’,然后会执行 exec_execute_message

    portal_name = pq_getmsgstring(&input_message);
    max_rows = pq_getmsgint(&input_message, 4);
    pq_getmsgend(&input_message);

    exec_execute_message(portal_name, max_rows);

exec_execute_message /* execute */

exec_execute_message:
	/*
	 * If we re-issue an Execute protocol request against an existing portal,
	 * then we are only fetching more rows rather than completely re-executing
	 * the query from the start. atStart is never reset for a v3 portal, so we
	 * are safe to use this check.
	 */
	execute_is_fetch = !portal->atStart;
    PortalRun(xxx,!execute_is_fetch && max_rows == FETCH_ALL,xxx) 
    //bool run_once = !execute_is_fetch && max_rows == FETCH_ALL
    

最终在执行器部分,会使用到run_once即如下的execute_once,如果execute_once为false,及会多次执行则会导致不能并行。

ExecutePlan:
	if (!execute_once)
		use_parallel_mode = false;

参考

聊聊pg jdbc statement的maxRows参数
When Can Parallel Query Be Used?
How the SQL client can influence performance

以上是关于LightDB(pg) 通过DBeaver执行SQL发现SQL不能并行的主要内容,如果未能解决你的问题,请参考以下文章

lightdb数据库链接之postgresql_fdw

PG优化篇--执行计划

认识LightDB

postgresql/lightdb中WHERE CURRENT OF的使用

lightdb 创建自定义类型

lightdb22.3 - 集群启停脚本新特性