使用基于时间的 PollingConsumer 到直接端点

Posted

技术标签:

【中文标题】使用基于时间的 PollingConsumer 到直接端点【英文标题】:Using a timing based PollingConsumer to a direct endpoint 【发布时间】:2016-01-11 14:57:58 【问题描述】:

从功能上讲,我希望在使用 JMS (WMQ) 端点之前检查 URL 是否处于活动状态。 如果无法访问 URL 或服务器错误,那么我不想从队列中接听。所以我想通过轮询消费者继续尝试(无限重试)URL。因此,只要它可用,我就可以从 JMS 获取。

我有一个设置了直接端点的 RouteBuilder,该端点被配置为运行一个处理器来 ping 服务。

所以:

public class PingRoute extends RouteBuilder 
        @Override
        public void configureCamel() 
            from("direct:pingRoute").routeId(PingRoute.class.getSimpleName())
                .process(new PingProcessor(url))
                .to("log://PingRoute?showAll=true");
        
    

在另一条路线上,我正在设置我的计时器:

    @Override
public void configureCamel() 
       from(timerEndpoint).beanRef(PollingConsumerBean.class.getSimpleName(), "checkPingRoute");
   ...
    

通过PollingConsumerBean,我试图通过消费者接收身体:

public void checkPingRoute()
    // loop to check the consumer.  Check we can carry on with the pick up from the JMS queue.
    while(true)
        Boolean pingAvailable = consumer.receiveBody("direct:pingRoute", Boolean.class);
   ...
    

我将路由添加到上下文并使用生产者发送:

context.addRoutes(new PingRoute());
context.start();
producer.sendBody(TimerPollingRoute.TIMER_POLLING_ROUTE_ENDPOINT, "a body");

我得到以下IllegalArgumentException

Cannot add a 2nd consumer to the same endpoint. Endpoint Endpoint[direct://pingRoute] only allows one consumer.

有没有办法将直接路由设置为轮询消费者?

【问题讨论】:

您能否描述一下您想要以一种实用的方式实现的目标?从您的代码中很难掌握。我觉得下面的答案只是在帮助你重新思考你的解决方案,以实现更简单、更优化的设计。 是的,很公平。现在已经添加了我想要的功能。通过使用处理器而不是直接路由解决了第二消费者问题。 【参考方案1】:

不幸的是,业务逻辑不是很清楚。据我了解 - 您需要等待服务的响应。恕我直言,您必须使用 Content Enricher EIP http://camel.apache.org/content-enricher.htmlpollEnrich 是您在计时器路线中所需要的。

.pollEnrich("direct:waitForResponce", -1).pollEnrich("seda:waitForResponce", -1)

public class PingRoute extends RouteBuilder 
        @Override
        public void configureCamel() 
             from("direct:pingRoute").routeId(PingRoute.class.getSimpleName())
                .process(new PingProcessor(url))
             .choice().when(body())                       
                  .to("log://PingRoute?showAll=true")
                  .to("direct:waitForResponce") 
                .otherwise()
                  .to("direct:pingRoute")
                .end(); 
        
;

计时器:

    @Override
    public void configureCamel()                            
      from(timerEndpoint)
      .inOnly("direct:pingRoute")
      .pollEnrich("direct:waitForResponce", -1)
       ...
    

【讨论】:

【参考方案2】:

基于他们用例的OP's clarification,他们有几个问题需要解决:

当且仅当对 URL 的 ping 为 肯定时,才使用 JMS 队列中的消息。 如果 URL 无响应,则 JMS 消息不应从队列中消失,并且必须进行 重试,直到 URL 再次响应,在这种情况下,消息最终将被使用。李> OP 没有指定重试次数是 limited 还是 unlimited

基于这个问题场景,我建议重新设计他们的解决方案,利用ActiveMQ retriesbroker-side redeliveryJMS transactions in Camel 来:

    如果 URL ping 失败(通过事务回滚),则将消息返回到队列。 确保消息不会丢失(通过使用 JMS 持久性和代理端重新传递,AMQ 将持久地安排重试周期)。 能够为每条消息指定复杂的重试周期,例如具有指数退避、最大重试次数等。 如果重试周期用尽而没有得到肯定的结果,可以选择将消息发送到Dead Letter Queue,以便可以计划其他一些(可能是手动的)操作。

现在,实现方面

from("activemq:queue:abc?transacted=true")          // (1)
    .to("http4://host.endpoint.com/foo?method=GET") // (2) (3)
    .process(new HandleSuccess());                  // (4)

评论:

    注意transacted 标志。 如果 HTTP 调用失败,HTTP4 端点将引发异常。 由于没有配置异常处理程序,Camel 会将异常传播到消费者端点 (activemq),该端点将回滚事务。 如果调用成功,流程将继续,并且交换主体现在将包含 HTTP 服务器返回的有效负载,您可以按照您希望的任何方式处理它。这里我使用的是处理器。

接下来,重要的是您在 ActiveMQ 中配置重新交付策略,并启用代理端重新交付。您可以在 activemq.xml 配置文件中执行此操作:

<plugins>
  <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
    <redeliveryPolicyMap>
      <redeliveryPolicyMap>
        <redeliveryPolicyEntries>
          <redeliveryPolicy queue="my.queue" 
                            initialRedeliveryDelay="30000" 
                            maximumRedeliveries="17" 
                            maximumRedeliveryDelay="259200000" 
                            redeliveryDelay="30000" 
                            useExponentialBackOff="true"
                            backOffMultiplier="2" />
        </redeliveryPolicyEntries>
      </redeliveryPolicyMap>
    </redeliveryPolicyMap>
  </redeliveryPlugin>
</plugins>

并确保在*** &lt;broker /&gt; 元素中启用了调度程序支持:

<broker xmlns="http://activemq.apache.org/schema/core" 
        brokerName="mybroker" 
        schedulerSupport="true">
    ...
</broker>

希望对你有帮助。

编辑 1: OP 使用 IBM WebSphere MQ 作为代理,我错过了。您可以使用 JMS QueueBrowser 查看消息并在实际使用消息之前尝试其对应的 URL,但不可能有选择地使用单个消息 - 这不是 MOM(面向消息的中间件)的意义所在。

所以我坚持认为您应该探索 JMS 事务,但不要让代理重新传递消息,您可以在 TX 主体本身内启动对 URL 的 ping 循环。对于 Camel,您可以按如下方式实现:

from("jms:queue:myqueue?transacted=true")
    .bean(new UrlPinger());

UrlPinger.java:

public class UrlPinger 

    @EndpointInject
    private ProducerTemplate template;

    private Pattern pattern = Pattern.compile("^(http(?:s)?)\\:");

    @Handler
    public void pingUrl(@Body String url, CamelContext context) throws InterruptedException 
        // Replace http(s): with http(s)4: to use the Camel HTTP4 endpoint.
        Matcher m = pattern.matcher(url);
        if (m.matches()) 
            url = m.replaceFirst(m.group(1) + "4:");
        

        // Try forever until the status code is 200.
        while (getStatusCode(url, context) != 200) 
            Thread.sleep(5000);
        
    

    private int getStatusCode(String url, CamelContext context) 
        Exchange response = template.request(url + "?method=GET&throwExceptionOnFailure=false", new Processor() 
            @Override public void process(Exchange exchange) throws Exception 
                // No body since this is a GET request.
                exchange.getIn().getBody(null);
            
        );

        return response.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
    


注意事项:

    注意throwExceptionOnFailure=false 选项。不会引发异常,因此循环将一直执行,直到条件为真。 在 bean 内部,我一直在循环,直到 HTTP 状态为 200。当然,您的逻辑会有所不同。 在尝试和尝试之间,我睡了 5000 毫秒。 我假设要 ping 的 URL 位于传入 JMS 消息的正文中。我正在用http(s)4: 替换领先的http(s):,以便使用Camel HTTP4 endpoint。 在 TX 内执行 ping 可确保仅在 ping 条件为真(在本例中为 HTTP 状态 == 200)时才使用消息。 您可能想要引入终止条件(您不想一直尝试)。可能会引入一些退避,以免压倒对方。 如果 Camel 或代理在重试周期内出现故障,消息将自动回滚。 考虑到JMS事务是Session绑定的,所以如果你想启动许多并发消费者(concurrentConsumersJMS端点选项),你需要为每个线程设置cacheLevelName=CACHE_NONE以使用不同的JMSSession.

【讨论】:

不错的方法,从不同的角度来总是很好。所以我在 ping URL 时寻找无限重试,尽管说这些事情可能会改变,所以需要记住这一点。但是 Webshere 是有问题的队列,因此需要调查对我开放的重新交付选项。在tmielke.blogspot.co.uk/2011/07/… 找到了一些关于此的阅读内容。 尽管这么说并且经过一番调查,已经制定了重新交付政策。它是处理我们无法传递或处理的消息(显然)并设置重试 3 次。因此,重新传递方法不是我们所需要的,所以轮询过程确实需要这样,一个单独的过程来轮询 URL,甚至在我们考虑从队列中提取之前。 抱歉,我错过了您原始帖子中的 WMQ 部分。 JMS 队列使用者无法查看或选择性地使用单个消息。您可以使用 JMS QueueBrowser,但您还需要一个 Consumer 来实际从队列中删除消息。这就是为什么我认为您需要探索 JMS 事务 (TX)。这个想法是您在 TX 中使用消息并尝试 URL。如果成功,Camel 将提交 TX,消息将被删除。否则,您将在 TX 中启动重试周期,因此 TX 将保持未完成状态,直到它回滚(停止重试)或提交。 @enkor 我已经编辑了我的答案,以便为您提供替代版本;-) 设法获得了一个工作示例。虽然不是基于回滚情况,但它给了我思考的余地,我可以看到它在哪里有用,也许我可能需要使用它。但是根据我最终提出的解决方案发布了答案。所以我只想说谢谢你让我的头脑清醒并进入正确调查的心情。【参考方案3】:

我很难确切地确定您想要做什么,但在我看来,您希望在某个时间间隔内使用来自端点的数据。为此,最好的模式是轮询消费者:http://camel.apache.org/polling-consumer.html

您当前收到的错误是因为您有两个消费者都试图从“direct://pingRoute”中读取如果这是有意的,您可以将直接更改为 seda://pingRoute,使其成为内存队列您的数据将在其中。

【讨论】:

【参考方案4】:

这里的所有答案都为我指明了正确的方向,但我最终想出了一个适合我们的代码库和框架的解决方案。

首先,我发现不需要 bean 来充当轮询消费者,而是可以使用处理器。

@Override
public void configureCamel() 
    from("timer://fnzPoller?period=2000&delay=2000").processRef(UrlPingProcessor.class.getSimpleName())
           .processRef(StopStartProcessor.class.getSimpleName()).to("log://TimerPollingRoute?showAll=true");


然后在UrlPingProcessor中有CXF服务来ping url并且可以检查响应:

@Override
public void process(Exchange exchange) 
    try 
        // CXF service
        FnzPingServiceImpl fnzPingService = new FnzPingServiceImpl(url);
        fnzPingService.getPing();
     catch (WebApplicationException e) 
        int responseCode = e.getResponse().getStatus();
        boolean isValidResponseCode = ResponseCodeUtil.isResponseCodeValid(responseCode);
        if (!isValidResponseCode) 
            // Sets a flag to stop for the StopStartProcessor
            stopRoute(exchange);
        
     

然后在StopStartProcessor 中,它使用ExecutorService 通过新线程停止或启动路由。:

    @Override
public void process(final Exchange exchange) 
    // routeBuilder is set on the constructor.
    final String routeId = routeBuilder.getClass().getSimpleName();
    Boolean stopRoute = ExchangeHeaderUtil.getHeader(exchange, Exchange.ROUTE_STOP, Boolean.class);
    boolean stopRoutePrim = BooleanUtils.isTrue(stopRoute);
    if (stopRoutePrim) 
        StopRouteThread stopRouteThread = new StopRouteThread(exchange, routeId);
        executorService.execute(stopRouteThread);
     else 
        CamelContext context = exchange.getContext();
        Route route = context.getRoute(routeId);
        if (route == null) 
            try 
                context.addRoutes(routeBuilder);
             catch (Exception e) 
                String msg = "Unable to add a route: " + routeBuilder;
                LOGGER.warn(msg, e);
            
        
    

【讨论】:

以上是关于使用基于时间的 PollingConsumer 到直接端点的主要内容,如果未能解决你的问题,请参考以下文章

使用 PyQt 将基于 qml 的图表集成到现有的 ui

如何使用基于策略的授权将所需的声明附加到令牌?

我应该对弹性搜索中的非日志数据使用基于时间的索引吗?

使用基于功能分支的 rebase 提交消息修改,然后合并到 master

如何在 Django 中使用基于类的视图将用户重定向到登录页面? [复制]

使用基于浏览器语言的 htaccess 从和到子域重定向