Datastax Java 驱动程序自定义重试策略

Posted

技术标签:

【中文标题】Datastax Java 驱动程序自定义重试策略【英文标题】:Datastax Java Driver Custom Retry Policy 【发布时间】:2020-12-26 14:29:45 【问题描述】:

我编写了一个自定义重试策略类,我可以在其中传递没有重试驱动程序将执行 onWriteTimeout/onUnavilable/onReadTimeout。

public class CustomRetryPolicy implements RetryPolicy 


  private static final Logger LOG = LoggerFactory.getLogger(CustomRetryPolicy.class);

  @VisibleForTesting
  public static final String RETRYING_ON_READ_TIMEOUT =
      "[] Retrying on read timeout on same host (consistency: , required responses: , "
          + "received responses: , data retrieved: , retries: )";

  @VisibleForTesting
  public static final String RETRYING_ON_WRITE_TIMEOUT =
      "[] Retrying on write timeout on same host (consistency: , write type: , "
          + "required acknowledgments: , received acknowledgments: , retries: )";

  @VisibleForTesting
  public static final String RETRYING_ON_UNAVAILABLE =
      "[] Retrying on unavailable exception on next host (consistency: , "
          + "required replica: , alive replica: , retries: )";

  @VisibleForTesting
  public static final String RETRYING_ON_ABORTED =
      "[] Retrying on aborted request on next host (retries: )";

  @VisibleForTesting
  public static final String RETRYING_ON_ERROR =
      "[] Retrying on node error on next host (retries: )";

  private static final String LOG_PREFIX = "DATASTORE-CASSANDRA";

  private final int readAttempts;
  private final int writeAttempts;
  private final int unavailableAttempts;

  public CustomRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts) 
    this.readAttempts = readAttempts;
    this.writeAttempts = writeAttempts;
    this.unavailableAttempts = unavailableAttempts;
  

  @Override
  public RetryDecision onReadTimeout(Request request, ConsistencyLevel cl, int blockFor,
      int received, boolean dataPresent, int retryCount) 


    RetryDecision decision = (retryCount < readAttempts && received >= blockFor && !dataPresent)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) 
      LOG.trace(RETRYING_ON_READ_TIMEOUT, LOG_PREFIX, cl, blockFor, received, false, retryCount);
    

    return decision;
  



  @Override
  public RetryDecision onWriteTimeout(Request request, ConsistencyLevel cl, WriteType writeType,
      int blockFor, int received, int retryCount) 
    RetryDecision decision = (retryCount < writeAttempts && writeType == DefaultWriteType.BATCH_LOG)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) 
      LOG.trace(RETRYING_ON_WRITE_TIMEOUT, LOG_PREFIX, cl, writeType, blockFor, received,
          retryCount);
    
    return decision;
  

  @Override
  public RetryDecision onUnavailable(Request request, ConsistencyLevel cl, int required, int alive,
      int retryCount) 
    RetryDecision decision =
        (retryCount < unavailableAttempts) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) 
      LOG.trace(RETRYING_ON_UNAVAILABLE, LOG_PREFIX, cl, required, alive, retryCount);
    

    return decision;
  

  @Override
  public RetryDecision onRequestAborted(Request request, Throwable error, int retryCount) 
    RetryDecision decision =
        (error instanceof ClosedConnectionException || error instanceof HeartbeatException)
            ? RetryDecision.RETRY_NEXT
            : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) 
      LOG.trace(RETRYING_ON_ABORTED, LOG_PREFIX, retryCount, error);
    

    return decision;
  

  @Override
  public RetryDecision onErrorResponse(Request request, CoordinatorException error,
      int retryCount) 
    RetryDecision decision =
        (error instanceof ReadFailureException || error instanceof WriteFailureException)
            ? RetryDecision.RETHROW
            : RetryDecision.RETRY_NEXT;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) 
      LOG.trace(RETRYING_ON_ERROR, LOG_PREFIX, retryCount, error);
    

    return decision;
  

  @Override
  public void close() 

    // Nothing to do

  




我正在使用 datastax java 驱动程序 4.6.0。 但问题是我不能用 CQLSessionBuilder 传递这个类的对象,这可以通过 like

RetryPolicy rc = new CustomRetryPolicy(3, 3, 2);
Cluster cluster = Cluster.builder().addContactPoint("192.168.0.0").withRetryPolicy(rc).build();

在旧版本的驱动程序中。我尝试过使用 DriverConfigLoader,但只有传递自定义类名的选项。

你能推荐一下吗?

【问题讨论】:

【参考方案1】:

如果您查看DefaultRetryPolicy 的实现和CustomRetryPolicy 的示例,您会看到两者都接收2 个参数:context 类型为DriverContext,以及带有配置文件名称的字符串。然后您应该能够使用context 通过getConfig 调用获得DriverConfig,然后在配置上使用getProfile 来提取自定义策略所需的配置值 - 您可以将自己的配置值放入配置文件并在重试策略中使用它,如下所示:

datastax-java-driver 
  advanced.retry-policy 
    class = DefaultRetryPolicy
  
  profiles 
    custom-retries 
      advanced.retry-policy 
        class = CustomRetryPolicy
        custom-policy 
           read-attempts = 3
           write-attempts = 2
           ...
        
      
    
  

【讨论】:

谢谢亚历克斯,是否可以传递值而不是从配置文件中提取它,我有一个接口可以提供重试尝试,我正在尝试根据这些值创建自定义重试策略,并在创建 CqlSession 本身时使用它。 您也可以通过编程方式指定配置属性,然后将 application.conf 中的配置与通过编程方式创建的配置合并 谢谢,如果我正确理解了您的回复,我不想像您在回答中提到的那样从 appliation.conf 重试尝试,而是我想务实地设置它[因为它们将来自界面] DriverConfigLoader 然后在创建会话时加载 configloader - withConfigLoader(loader)。 驱动程序的配置可能来自多个来源,并组合在一起,因此您可以通过编程方式设置您的特定内容,其余来自 application.conf 和 reference.conf。您可能需要升级到最新版本的驱动程序 - 我记得 4.6.0 左右的程序加载程序存在一些问题

以上是关于Datastax Java 驱动程序自定义重试策略的主要内容,如果未能解决你的问题,请参考以下文章

python自定义重试装饰器

feginclient和ribbon的重试策略

来自 AuthorizationHandler (ASP.NET Core) 的自定义重定向

Grails 安全插件自定义重定向

自定义重定向在本地主机上不起作用

在 Laravel Auth 中自定义重定向