import java.util.list;
import org.apache.rocketmq.client.consumer.defaultmqpushconsumer;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlycontext;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import org.apache.rocketmq.client.exception.mqclientexception;
import org.apache.rocketmq.common.message.messageext;
public class consumer {
public static void main(string[] args) throws interruptedexception, mqclientexception {
// 实例化消费者
defaultmqpushconsumer consumer = new defaultmqpushconsumer(“zhuzeqing-1”);
// 设置nameserver的地址
consumer.setnamesrvaddr(“192.168.23.131:9876”);
// 订阅一个或者多个topic,以及tag来过滤需要消费的消息
consumer.subscribe(“zhuzeqing-1-topic”, “*”);
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registermessagelistener(new messagelistenerconcurrently() {
public consumeconcurrentlystatus consumemessage(list<messageext> msgs, consumeconcurrentlycontext context) {
system.out.printf(“%s receive new messages: %s %n”, thread.currentthread().getname(), msgs);
// 标记该消息已经被成功消费
return consumeconcurrentlystatus.consume_success;
}
});
// 启动消费者实例
consumer.start();
system.out.printf(“consumer started.%n”);
}
}