Rocket MQ 1 - 用

Posted it-worker365

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rocket MQ 1 - 用相关的知识,希望对你有一定的参考价值。

参考 http://www.iocoder.cn/categories/RocketMQ/ ; https://www.jianshu.com/nb/16219849

首先上启动方法,分别启动namesrv/broker/procedure/consumer

    public static void main(String[] args) throws Exception {
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        NamesrvConfig namesrvConfig = new NamesrvConfig();
        nettyServerConfig.setListenPort(9876);
        NamesrvController nameSrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
        nameSrvController.initialize();
        nameSrvController.start();
        Thread.sleep(1000000);
    }
    public static void main(String[] args) throws Exception {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(10911);
        final BrokerConfig brokerConfig = new BrokerConfig();
        brokerConfig.setBrokerName("broker-a");
        brokerConfig.setAutoCreateTopicEnable(true);
        brokerConfig.setNamesrvAddr("127.0.0.1:9876");
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
        messageStoreConfig.setDeleteWhen("04");
        messageStoreConfig.setFileReservedTime(48);
        messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
        messageStoreConfig.setDuplicationEnable(false);
        
        BrokerController brokerController = new BrokerController(
                brokerConfig,
                nettyServerConfig,
            new NettyClientConfig(),
            messageStoreConfig);
        assertThat(brokerController.initialize());
        brokerController.start();
        Thread.sleep(1000000);
        brokerController.shutdown();
    }
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagB" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

 

以上是关于Rocket MQ 1 - 用的主要内容,如果未能解决你的问题,请参考以下文章

Netty协议-Rocket MQ之NettyRemotingClient/Server

Netty协议-Rocket MQ之NettyRemotingClient/Server

Netty协议-Rocket MQ之NettyRemotingClient/Server

Windown10下Rocket MQ 安装

rocket MQ

Rocket MQ报错No route info of this topic的问题探究