jmockit 中文网 mq消息生产者

Posted funkboy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jmockit 中文网 mq消息生产者相关的知识,希望对你有一定的参考价值。

 

RocketMQ是我们常用的消息中间件,在运行单元测试时,我们可能不需要真正发送消息(除非是为了测试发送消息),也不想因为连结不上RocketMQ的Broker,NameServer而影响单元测试运行。
   那我们该如何Mock RocketMQ消息生产者呢?
  请看例子:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//RocketMQ消息生产者 Mock 
 
public class RocetMQProducerMockingTest {
    // 把RocketMQ的生产者mock
    @BeforeClass
    public static void mockRocketMQ() {
        new RocketMQProducerMockUp();
    }
 
    @Test
    public void testSendRocketMQMessage() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("test_producer");
        producer.setNamesrvAddr("192.168.0.2:9876;192.168.0.3:9876");
        producer.start();
        for (int i = 0; i < 20; i++) {
            Message msg = new Message("testtopic""TagA", ("Hello " + i).getBytes());
            // 因为mq生产者已经mock,所以消息并不会真正的发送,即使nameServer连不上,也不影响单元测试的运行
            SendResult result = producer.send(msg);
            Assert.isTrue(result.getSendStatus() == SendStatus.SEND_OK);
            Assert.isTrue(result.getMsgId() != null);
        }
        producer.shutdown();
    }
}

 

 

最关键的类是RocketMQProducerMockUp,这个类改变了生产者默认实现。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//MQ消息发送者 的MockUp(伪类) 
public class RocketMQProducerMockUp extends MockUp<DefaultMQProducer> {
 
    @Mock
    void init() throws MQClientException {
        // 构造函数也什么都不做
    }
 
    @Mock
    void start() throws MQClientException {
        // 启动,什么都不做 
    }
 
    @Mock
    void shutdown() {
        // 关闭,也什么都不做 
    }
 
    @Mock
    List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException {
        // 欺骗调用方,返回不存在的消息队列,因为消息并不会真正发送嘛
        List<MessageQueue> queues = new ArrayList<MessageQueue>();
        MessageQueue q = new MessageQueue();
        q.setBrokerName("testbrokername");
        q.setQueueId(1);
        q.setTopic("testtopic");
        queues.add(q);
        return queues;
    }
 
    // 下面是对各个send方法的mock,都返回消息成功结果
    @Mock
    SendResult send(final Message msg)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return newSuccessSendResult();
    }
 
    @Mock
    SendResult send(final Message msg, final long timeout)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return newSuccessSendResult();
    }
 
    @Mock
    void send(final Message msg, final SendCallback sendCallback)
            throws MQClientException, RemotingException, InterruptedException {
        sendCallback.onSuccess(this.newSuccessSendResult());
    }
 
    @Mock
    void send(final Message msg, final SendCallback sendCallback, final long timeout)
            throws MQClientException, RemotingException, InterruptedException {
        sendCallback.onSuccess(this.newSuccessSendResult());
    }
 
    @Mock
    void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException {
 
    }
 
    @Mock
    SendResult send(final Message msg, final MessageQueue mq)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return newSuccessSendResult();
    }
 
    @Mock
    SendResult send(final Message msg, final MessageQueue mq, final long timeout)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return newSuccessSendResult();
    }
 
    @Mock
    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
            throws MQClientException, RemotingException, InterruptedException {
        sendCallback.onSuccess(this.newSuccessSendResult());
    }
 
    @Mock
    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
            throws MQClientException, RemotingException, InterruptedException {
        sendCallback.onSuccess(this.newSuccessSendResult());
    }
 
    @Mock
    void sendOneway(final Message msg, final MessageQueue mq)
            throws MQClientException, RemotingException, InterruptedException {
 
    }
 
    @Mock
    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return newSuccessSendResult();
    }
 
    @Mock
    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return newSuccessSendResult();
    }
 
    @Mock
    void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback)
            throws MQClientException, RemotingException, InterruptedException {
        sendCallback.onSuccess(this.newSuccessSendResult());
    }
 
    @Mock
    void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback,
            final long timeout) throws MQClientException, RemotingException, InterruptedException {
        sendCallback.onSuccess(this.newSuccessSendResult());
    }
 
    @Mock
    void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
            throws MQClientException, RemotingException, InterruptedException {
 
    }
 
    @Mock
    TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter,
            final Object arg) throws MQClientException {
        return newTransactionSendResult();
    }
 
    private TransactionSendResult newTransactionSendResult() {
        TransactionSendResult success = new TransactionSendResult();
        success.setSendStatus(SendStatus.SEND_OK);
        success.setMsgId(UUID.randomUUID().toString());
        MessageQueue q = new MessageQueue();
        q.setBrokerName("testbrokername");
        q.setQueueId(1);
        q.setTopic("testtopic");
        success.setMessageQueue(q);
        success.setLocalTransactionState(LocalTransactionState.COMMIT_MESSAGE);
        return success;
    }
 
    private SendResult newSuccessSendResult() {
        SendResult success = new SendResult();
        success.setSendStatus(SendStatus.SEND_OK);
        success.setMsgId(UUID.randomUUID().toString());
        MessageQueue q = new MessageQueue();
        q.setBrokerName("testbrokername");
        q.setQueueId(1);
        q.setTopic("testtopic");
        success.setMessageQueue(q);
        return success;
    }
}

 

以上是关于jmockit 中文网 mq消息生产者的主要内容,如果未能解决你的问题,请参考以下文章

PHP+RabbitMQ消息队列的配置和使用方法MQ

PHP+RabbitMQ消息队列的配置和使用方法MQ

MQ任意延时消息基于客户端实现

消息队列MQ

MQ任意延时消息基于服务端实现

RabbitMQ---消息队列---上半部分