docker启动rabbitmq以及使用方式详解

搜索rabbitmq镜像

docker search rabbitmq:management

%docker启动rabbitmq以及使用方式详解-猿站网-插图

下载镜像

docker pull rabbitmq:management

%docker启动rabbitmq以及使用方式详解-1猿站网-插图

启动容器

docker run -d –hostname localhost –name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

%docker启动rabbitmq以及使用方式详解-2猿站网-插图

%docker启动rabbitmq以及使用方式详解-3猿站网-插图

打印容器

docker logs rabbitmq

%docker启动rabbitmq以及使用方式详解-4猿站网-插图

%docker启动rabbitmq以及使用方式详解-5猿站网-插图

访问RabbitMQ Management

http://localhost:15672

账户密码默认:guest

%docker启动rabbitmq以及使用方式详解-6猿站网-插图

编写生产者类

package com.xun.rabbitmqdemo.example; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { private final static String QUEUE_NAME = “hello”; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(“guest”); factory.setPassword(“guest”); factory.setHost(“localhost”); factory.setPort(5672); factory.setVirtualHost(“/”); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 生成一个queue队列 * 1、队列名称 QUEUE_NAME * 2、队列里面的消息是否持久化(默认消息存储在内存中) * 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费 * 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除 * 5、其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message = “Hello world!”; /** * 发送一个消息 * 1、发送到哪个exchange交换机 * 2、路由的key * 3、其他的参数信息 * 4、消息体 */ channel.basicPublish(“”,QUEUE_NAME,null,message.getBytes()); System.out.println(” [x] Sent “”+message+”””); channel.close(); connection.close(); } }

运行该方法,可以看到控制台的打印

%docker启动rabbitmq以及使用方式详解-7猿站网-插图

name=hello的队列收到Message

%docker启动rabbitmq以及使用方式详解-8猿站网-插图

消费者

package com.xun.rabbitmqdemo.example; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver { private final static String QUEUE_NAME = “hello”; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(“guest”); factory.setPassword(“guest”); factory.setHost(“localhost”); factory.setPort(5672); factory.setVirtualHost(“/”); factory.setConnectionTimeout(600000);//milliseconds factory.setRequestedHeartbeat(60);//seconds factory.setHandshakeTimeout(6000);//milliseconds factory.setRequestedChannelMax(5); factory.setNetworkRecoveryInterval(500); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); System.out.println(“Waiting for messages. “); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { String message = new String(body, “UTF-8″); System.out.println(” [x] Received “” + message + “””); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }

%docker启动rabbitmq以及使用方式详解-9猿站网-插图

%docker启动rabbitmq以及使用方式详解-10猿站网-插图

工作队列

RabbitMqUtils工具类

package com.xun.rabbitmqdemo.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMqUtils { public static Channel getChannel() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(“localhost”); factory.setUsername(“guest”); factory.setPassword(“guest”); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }

启动2个工作线程

package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; public class Work01 { private static final String QUEUE_NAME = “hello”; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String receivedMessage = new String(delivery.getBody()); System.out.println(“接收消息:”+receivedMessage); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+”消费者取消消费接口回调逻辑”); }; System.out.println(“C1 消费者启动等待消费….”); /** * 消费者消费消息 * 1、消费哪个队列 * 2、消费成功后是否自动应答 * 3、消费的接口回调 * 4、消费未成功的接口回调 */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; public class Work02 { private static final String QUEUE_NAME = “hello”; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String receivedMessage = new String(delivery.getBody()); System.out.println(“接收消息:”+receivedMessage); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+”消费者取消消费接口回调逻辑”); }; System.out.println(“C2 消费者启动等待消费….”); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }

启动工作线程

%docker启动rabbitmq以及使用方式详解-11猿站网-插图

启动发送线程

package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.Channel; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import java.util.Scanner; public class Task01 { private static final String QUEUE_NAME = “hello”; public static void main(String[] args) throws Exception{ try(Channel channel= RabbitMqUtils.getChannel();){ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //从控制台接收消息 Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String message = scanner.next(); channel.basicPublish(“”,QUEUE_NAME,null,message.getBytes()); System.out.println(“发送消息完成:”+message); } } } }

启动发送线程,此时发送线程等待键盘输入

%docker启动rabbitmq以及使用方式详解-12猿站网-插图

发送4个消息

%docker启动rabbitmq以及使用方式详解-13猿站网-插图

%docker启动rabbitmq以及使用方式详解-14猿站网-插图

%docker启动rabbitmq以及使用方式详解-15猿站网-插图

可以看到2个工作线程按照顺序分别接收message。

消息应答机制

rabbitmq将message发送给消费者后,就会将该消息标记为删除。

但消费者在处理message过程中宕机,会导致消息的丢失。

因此需要设置手动应答。

生产者

import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import java.util.Scanner; public class Task02 { private static final String TASK_QUEUE_NAME = “ack_queue”; public static void main(String[] args) throws Exception{ try(Channel channel = RabbitMqUtils.getChannel()){ channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null); Scanner scanner = new Scanner(System.in); System.out.println(“请输入信息”); while(scanner.hasNext()){ String message = scanner.nextLine(); channel.basicPublish(“”,TASK_QUEUE_NAME,null,message.getBytes()); System.out.println(“生产者task02发出消息”+ message); } } } }

消费者

package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import com.xun.rabbitmqdemo.utils.SleepUtils; public class Work03 { private static final String ACK_QUEUE_NAME = “ack_queue”; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); System.out.println(“Work03 等待接收消息处理时间较短”); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody()); SleepUtils.sleep(1); System.out.println(“接收到消息:”+message); /** * 1、消息的标记tag * 2、是否批量应答 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+”消费者取消消费接口回调逻辑”); }; //采用手动应答 boolean autoAck = false; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import com.xun.rabbitmqdemo.utils.SleepUtils; public class Work04 { private static final String ACK_QUEUE_NAME = “ack_queue”; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); System.out.println(“Work04 等待接收消息处理时间较长”); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody()); SleepUtils.sleep(30); System.out.println(“接收到消息:”+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+”消费者取消消费接口回调逻辑”); }; //采用手动应答 boolean autoAck = false; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }

工具类SleepUtils

package com.xun.rabbitmqdemo.utils; public class SleepUtils { public static void sleep(int second){ try{ Thread.sleep(1000*second); }catch (InterruptedException _ignored){ Thread.currentThread().interrupt(); } } }

模拟

%docker启动rabbitmq以及使用方式详解-16猿站网-插图

%docker启动rabbitmq以及使用方式详解-17猿站网-插图

%docker启动rabbitmq以及使用方式详解-18猿站网-插图

work04等待30s后发出ack

%docker启动rabbitmq以及使用方式详解-19猿站网-插图

在work04处理message时手动停止线程,可以看到message:dd被rabbitmq交给了work03

%docker启动rabbitmq以及使用方式详解-20猿站网-插图

%docker启动rabbitmq以及使用方式详解-21猿站网-插图

%docker启动rabbitmq以及使用方式详解-22猿站网-插图

不公平分发

上面的轮询分发,生产者依次向消费者按顺序发送消息,但当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。

不公平分发:
int prefetchCount = 1; channel.basicQos(prefetchCount);

通过此配置,当消费者未处理完当前消息,rabbitmq会优先将该message分发给空闲消费者。

%docker启动rabbitmq以及使用方式详解-23猿站网-插图

总结 

到此这篇关于docker启动rabbitmq以及使用的文章就介绍到这了,更多相关docker启动rabbitmq及使用内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文地址:https://blog.csdn.net/qq_44402069/article/details/124303944

声明: 猿站网有关资源均来自网络搜集与网友提供,任何涉及商业盈利目的的均不得使用,否则产生的一切后果将由您自己承担! 本平台资源仅供个人学习交流、测试使用 所有内容请在下载后24小时内删除,制止非法恶意传播,不对任何下载或转载者造成的危害负任何法律责任!也请大家支持、购置正版! 。本站一律禁止以任何方式发布或转载任何违法的相关信息访客发现请向站长举报,会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。本网站的资源部分来源于网络,如有侵权烦请发送邮件至:2697268773@qq.com进行处理。
建站知识

Docker容器Consul部署概述

2023-3-3 18:02:18

建站知识

小知识:科普:什么是 ROADM?

2023-3-3 18:07:10

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索