如何在 DAG 中使用 Apache Apex Malhar RabbitMQ 运算符

Posted

技术标签:

【中文标题】如何在 DAG 中使用 Apache Apex Malhar RabbitMQ 运算符【英文标题】:How to use Apache Apex Malhar RabbitMQ operator in DAG 【发布时间】:2017-02-13 15:20:02 【问题描述】:

我有一个 Apache Apex 应用程序 DAG,它从队列中读取 RabbitMQ 消息。我应该使用哪个 Apache Apex Malhar 运算符?有几个运算符,但不清楚使用哪一个以及如何使用它。

【问题讨论】:

问题是? 【参考方案1】:

你看过https://github.com/apache/apex-malhar/tree/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq 吗? https://github.com/apache/apex-malhar/tree/master/contrib/src/test/java/com/datatorrent/contrib/rabbitmq 中也有测试显示如何使用操作符

【讨论】:

请解释你的答案。或者您可以发表评论。【参考方案2】:

https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

这是主要的操作符代码,其中元组类型是泛型参数,emitTuple() 是子类需要实现的抽象方法。

AbstractSinglePortRabbitMQInputOperator 是一个简单的子类,它提供单个输出端口并使用另一个抽象方法 getTuple() 来实现 emitTuple(),该方法需要在其子类中实现。

Sanjay 指出的测试显示了如何使用这些类。

【讨论】:

【参考方案3】:

我在查找如何将消息从 RabbitMQ 读取到 Apache Apex 时也遇到了问题。在 Sanjay 的回答 (https://***.com/a/42210636/2350644) 提供的链接的帮助下,我终于设法让它运行起来。以下是它如何协同工作:

1。设置 RabbitMQ 服务器

这里描述了很多安装 RabbitMQ 的方法:https://www.rabbitmq.com/download.html 对我来说最简单的方法是使用docker(参见:https://store.docker.com/images/rabbitmq)

docker pull rabbitmq

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management  

要检查 RabbitMQ 是否正常工作,请打开浏览器并导航到:http://localhost:15672/。您应该看到 RabbitMQ 的管理页面。

2。编写生产者程序

要向队列发送消息,您可以编写一个简单的 JAVA 程序,如下所示:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.ArrayList;

public class Send 
    private final static String EXCHANGE = "myExchange";

    public static void main(String[] args) throws Exception 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);

        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE, "");

        List<String> messages = Arrays.asList("Hello", "World", "!");
        for (String msg : messages) 
            channel.basicPublish(EXCHANGE, "", null, msg.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + msg + "'");
        

        channel.close();
        connection.close();
    

如果您执行 JAVA 程序,您应该会在 RabbitMQ 的管理 UI 中看到一些输出。

3。实施示例 Apex 应用程序

3.1 引导示例 apex 应用程序

关注apex官方文档http://docs.datatorrent.com/beginner/

3.2 向 pom.xml 添加额外的依赖项

要使用malhar 提供的类,请添加以下依赖项:

<dependency>
  <groupId>org.apache.apex</groupId>
  <artifactId>malhar-contrib</artifactId>
  <version>3.7.0</version>
</dependency>
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>4.2.0</version>
</dependency>

3.3 创建消费者

我们首先需要创建一个InputOperator,它使用来自apex-malhar 的可用代码来使用来自RabbitMQ 的消息。

import com.datatorrent.contrib.rabbitmq.AbstractSinglePortRabbitMQInputOperator;

public class MyRabbitMQInputOperator extends AbstractSinglePortRabbitMQInputOperator<String> 
    @Override
    public String getTuple(byte[] message) 
        return new String(message);
    

您只需覆盖getTuple() 方法。在这种情况下,我们只是返回从 RabbitMQ 接收到的消息。

3.4 设置 Apex DAG

为了测试应用程序,我们只需添加一个使用来自 RabbitMQ 的数据的InputOperator(我们之前实现的MyRabbitMQInputOperator)和一个打印接收到的消息的ConsoleOutputOperator

import com.rabbitmq.client.BuiltinExchangeType;
import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.lib.io.ConsoleOutputOperator;

@ApplicationAnnotation(name="MyFirstApplication")
public class Application implements StreamingApplication


  private final static String EXCHANGE = "myExchange";


  @Override
  public void populateDAG(DAG dag, Configuration conf)
  

    MyRabbitMQInputOperator consumer = dag.addOperator("Consumer", new MyRabbitMQInputOperator());
    consumer.setHost("localhost");
    consumer.setExchange(EXCHANGE);
    consumer.setExchangeType(BuiltinExchangeType.FANOUT.getType());

    ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());

    dag.addStream("myStream", consumer.outputPort, cons.input).setLocality(Locality.CONTAINER_LOCAL);
  

3.5 测试应用程序

为了简单地测试创建的应用程序,我们可以编写一个 UnitTest,因此无需设置 Hadoop/YARN 集群。 在引导应用程序中已经有一个 UnitTest,即我们可以使用的 ApplicationTest.java

import java.io.IOException;
import javax.validation.ConstraintViolationException;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import com.datatorrent.api.LocalMode;

/**
 * Test the DAG declaration in local mode.
 */
public class ApplicationTest 

  @Test
  public void testApplication() throws IOException, Exception 
    try 
      LocalMode lma = LocalMode.newInstance();
      Configuration conf = new Configuration(true);
      //conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
      lma.prepareDAG(new Application(), conf);
      LocalMode.Controller lc = lma.getController();
      lc.run(10000); // runs for 10 seconds and quits
     catch (ConstraintViolationException e) 
      Assert.fail("constraint violations: " + e.getConstraintViolations());
    
  


由于我们不需要此应用程序的任何属性,因此该文件中唯一更改的是取消注释该行:

conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));

如果您执行ApplicationTest.java 并使用2. 中描述的Producer 程序向RabbitMQ 发送消息,则测试应输出所有消息。 您可能需要增加测试时间才能看到所有消息(目前设置为 10 秒)。

【讨论】:

以上是关于如何在 DAG 中使用 Apache Apex Malhar RabbitMQ 运算符的主要内容,如果未能解决你的问题,请参考以下文章

可以在 Apache Apex 中的 DAG 中间使用输入运算符吗

如何从 Apache Apex 应用程序内部获取 ApplicationID?

IBM Websphere MQ 到 Apache Apex Operator Stream?

将输入运算符动态添加到正在运行的 Apache Apex 应用程序

如何使用 Apache Apex 对 Kafka 0.9 运算符进行单元测试?

如何计算 Apache Apex 中运营商之间的网络延迟