Datastax Java 驱动程序自定义重试策略
Posted
技术标签:
【中文标题】Datastax Java 驱动程序自定义重试策略【英文标题】:Datastax Java Driver Custom Retry Policy 【发布时间】:2020-12-26 14:29:45 【问题描述】:我编写了一个自定义重试策略类,我可以在其中传递没有重试驱动程序将执行 onWriteTimeout/onUnavilable/onReadTimeout。
public class CustomRetryPolicy implements RetryPolicy
private static final Logger LOG = LoggerFactory.getLogger(CustomRetryPolicy.class);
@VisibleForTesting
public static final String RETRYING_ON_READ_TIMEOUT =
"[] Retrying on read timeout on same host (consistency: , required responses: , "
+ "received responses: , data retrieved: , retries: )";
@VisibleForTesting
public static final String RETRYING_ON_WRITE_TIMEOUT =
"[] Retrying on write timeout on same host (consistency: , write type: , "
+ "required acknowledgments: , received acknowledgments: , retries: )";
@VisibleForTesting
public static final String RETRYING_ON_UNAVAILABLE =
"[] Retrying on unavailable exception on next host (consistency: , "
+ "required replica: , alive replica: , retries: )";
@VisibleForTesting
public static final String RETRYING_ON_ABORTED =
"[] Retrying on aborted request on next host (retries: )";
@VisibleForTesting
public static final String RETRYING_ON_ERROR =
"[] Retrying on node error on next host (retries: )";
private static final String LOG_PREFIX = "DATASTORE-CASSANDRA";
private final int readAttempts;
private final int writeAttempts;
private final int unavailableAttempts;
public CustomRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts)
this.readAttempts = readAttempts;
this.writeAttempts = writeAttempts;
this.unavailableAttempts = unavailableAttempts;
@Override
public RetryDecision onReadTimeout(Request request, ConsistencyLevel cl, int blockFor,
int received, boolean dataPresent, int retryCount)
RetryDecision decision = (retryCount < readAttempts && received >= blockFor && !dataPresent)
? RetryDecision.RETRY_SAME
: RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled())
LOG.trace(RETRYING_ON_READ_TIMEOUT, LOG_PREFIX, cl, blockFor, received, false, retryCount);
return decision;
@Override
public RetryDecision onWriteTimeout(Request request, ConsistencyLevel cl, WriteType writeType,
int blockFor, int received, int retryCount)
RetryDecision decision = (retryCount < writeAttempts && writeType == DefaultWriteType.BATCH_LOG)
? RetryDecision.RETRY_SAME
: RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled())
LOG.trace(RETRYING_ON_WRITE_TIMEOUT, LOG_PREFIX, cl, writeType, blockFor, received,
retryCount);
return decision;
@Override
public RetryDecision onUnavailable(Request request, ConsistencyLevel cl, int required, int alive,
int retryCount)
RetryDecision decision =
(retryCount < unavailableAttempts) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled())
LOG.trace(RETRYING_ON_UNAVAILABLE, LOG_PREFIX, cl, required, alive, retryCount);
return decision;
@Override
public RetryDecision onRequestAborted(Request request, Throwable error, int retryCount)
RetryDecision decision =
(error instanceof ClosedConnectionException || error instanceof HeartbeatException)
? RetryDecision.RETRY_NEXT
: RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled())
LOG.trace(RETRYING_ON_ABORTED, LOG_PREFIX, retryCount, error);
return decision;
@Override
public RetryDecision onErrorResponse(Request request, CoordinatorException error,
int retryCount)
RetryDecision decision =
(error instanceof ReadFailureException || error instanceof WriteFailureException)
? RetryDecision.RETHROW
: RetryDecision.RETRY_NEXT;
if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled())
LOG.trace(RETRYING_ON_ERROR, LOG_PREFIX, retryCount, error);
return decision;
@Override
public void close()
// Nothing to do
我正在使用 datastax java 驱动程序 4.6.0。 但问题是我不能用 CQLSessionBuilder 传递这个类的对象,这可以通过 like
RetryPolicy rc = new CustomRetryPolicy(3, 3, 2);
Cluster cluster = Cluster.builder().addContactPoint("192.168.0.0").withRetryPolicy(rc).build();
在旧版本的驱动程序中。我尝试过使用 DriverConfigLoader,但只有传递自定义类名的选项。
你能推荐一下吗?
【问题讨论】:
【参考方案1】:如果您查看DefaultRetryPolicy 的实现和CustomRetryPolicy 的示例,您会看到两者都接收2 个参数:context
类型为DriverContext
,以及带有配置文件名称的字符串。然后您应该能够使用context
通过getConfig
调用获得DriverConfig
,然后在配置上使用getProfile
来提取自定义策略所需的配置值 - 您可以将自己的配置值放入配置文件并在重试策略中使用它,如下所示:
datastax-java-driver
advanced.retry-policy
class = DefaultRetryPolicy
profiles
custom-retries
advanced.retry-policy
class = CustomRetryPolicy
custom-policy
read-attempts = 3
write-attempts = 2
...
【讨论】:
谢谢亚历克斯,是否可以传递值而不是从配置文件中提取它,我有一个接口可以提供重试尝试,我正在尝试根据这些值创建自定义重试策略,并在创建 CqlSession 本身时使用它。 您也可以通过编程方式指定配置属性,然后将 application.conf 中的配置与通过编程方式创建的配置合并 谢谢,如果我正确理解了您的回复,我不想像您在回答中提到的那样从 appliation.conf 重试尝试,而是我想务实地设置它[因为它们将来自界面] DriverConfigLoader 然后在创建会话时加载 configloader - withConfigLoader(loader)。 驱动程序的配置可能来自多个来源,并组合在一起,因此您可以通过编程方式设置您的特定内容,其余来自 application.conf 和 reference.conf。您可能需要升级到最新版本的驱动程序 - 我记得 4.6.0 左右的程序加载程序存在一些问题以上是关于Datastax Java 驱动程序自定义重试策略的主要内容,如果未能解决你的问题,请参考以下文章