新四季網

rabbitmq的三種部署方式(快速嘗鮮RabbitMQ搭建完就得用起來)

2023-09-20 16:08:24 2

在項目真正開始之前我們先來簡單介紹下 rabbitMQ 的工作流程:

生產者往交換機中發送消息;交換機通過規則綁定隊列,通過路由鍵將消息存儲到隊列中;消費者獲取隊列中的消息進行消費;

環境:SpringBoot 2.6.3、JDK 1.8

項目搭建

首先創建 SpringBoot 項目 rabbit-mq

引入依賴

org.springframework.boot spring-boot-starter-aMQp

複製代碼

yml 文件配置

spring: rabbitmq: host: 127.0.0.1 //rabbitMQ服務地址 port: 15672 //這個地方暫時先用我們之前配置的15672 username: cheetah //自己的帳戶名 password: 123456 //自己的密碼

複製代碼

直連交換機

本項目以直連交換機為例,至於其他的交換機類型將在後文中給出詳細介紹。

@Configurationpublic class DirectRabbitConfig { /** * 定義交換機 **/ @Bean public Directexchange directExchange{ /** * 交換機名稱 * 持久性標誌:是否持久化,默認是 true 即聲明一個持久的 exchange,該exchange將在伺服器重啟後繼續運行 * 自動刪除標誌:是否自動刪除,默認為 false, 如果伺服器想在 exchange不再使用時刪除它,則設置為 true **/ return new DirectExchange("directExchange", true, false); } /** * 定義隊列 **/ @Bean public Queue directQueue{ /** * name:隊列名稱 * durable:是否持久化,默認是 true,持久化隊列,會被存儲在磁碟上,當消息代理重啟時仍然存在 * exclusive:是否排他,默認為 false,true則表示聲明了一個排他隊列(該隊列將僅由聲明者連接使用),如果連接關閉,則隊列被刪除。此參考優先級高於durable * autoDelete:是否自動刪除, 默認是 false,true則表示當隊列不再使用時,伺服器刪除該隊列 **/ return new Queue("directQueue",true); } /** * 隊列和交換機綁定 * 設置路由鍵:directRouting **/ @Bean Binding bindingDirect{ return BindingBuilder.bind(directQueue).to(directExchange).with("directRouting"); }}

複製代碼

消息發送

@RestControllerpublic class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate; @Getmapping("/sendMessage") public String sendMessage{ //將消息攜帶路由鍵值 rabbitTemplate.convertAndSend("directExchange", "directRouting", "發送消息!"); return "ok"; }}

複製代碼

我們先啟動程序,在瀏覽器訪問下

http://127.0.0.1:9001/sendMessage

報錯如下:

我們之前已經給該用戶分配過權限了,如果之前未分配,直接在客戶端中配置:

之所以訪問不到,是因為我們使用的埠號不正確

所以我們需要將埠改為 5672 (如果是阿里雲伺服器實例,需要將該埠 開放權限

我們再來訪問下

http://127.0.0.1:9001/sendMessage

請求返回"OK",控制臺輸出

客戶端相關頁面截圖如下:

消息消費

@Component@RabbitListener(queues = "directQueue")//監聽隊列名稱public class MQReciever { @RabbitHandler public void process(String message){ System.out.println("接收到的消息是:" message); }}

複製代碼

啟動項目,發現消息已經被消費。

為了防止消息丟失, RabbitMQ 增加了消息確認機制:生產者消息確認機制和消費者消息確認機制。

確認機制一、生產者消息確認機制在 yml 中增加配置信息

spring: rabbitmq: #確認消息已發送到交換機(Exchange) publisher-confirm-type: correlated #確認消息已發送到隊列(Queue) publisher-returns: true

複製代碼

spring.rabbitmq.publisher-confirm 新版本已被棄用,現在使用 spring.rabbitmq.publisher-confirm-type = correlated 實現相同效果

增加回調

@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate; rabbitTemplate.setConnectionFactory(connectionFactory); //設置開啟 Mandatory,才能觸發回調函數,無論消息推送結果怎麼樣都強制調用回調函數 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: " "相關數據:" correlationData); System.out.println("ConfirmCallback: " "確認情況:" ack); System.out.println("ConfirmCallback: " "原因:" cause); } }); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback{ @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("ReturnCallback: " "消息:" returned.getMessage); System.out.println("ReturnCallback: " "回應碼:" returned.getReplyCode); System.out.println("ReturnCallback: " "回應信息:" returned.getReplyText); System.out.println("ReturnCallback: " "交換機:" returned.getExchange); System.out.println("ReturnCallback: " "路由鍵:" returned.getRoutingKey); } }); return rabbitTemplate;}

複製代碼

confirm 機制是只保證消息到達 exchange ,並不保證消息可以路由到正確的地方 queue當前的 exchange 不存在或者指定的路由 key 路由不到才會觸發 return 機制

大家可以自行演示以下情況的執行結果:

不存在交換機和隊列存在交換機,不存在隊列消息推送成功二、消費者消息的確認機制

默認情況下如果一個消息被消費者正確接收則會從隊列中移除。如果一個隊列沒被任何消費者訂閱,那麼這個隊列中的消息會被緩存,當有消費者訂閱時則會立即發送,進而從隊列中移除。

消費者消息的確認機制可以分為以下 3 種:

自動確認

AcknowledgeMode.NONE 默認為自動確認,不管消費者是否成功處理了消息,消息都會從隊列中被移除。

根據情況確認

AcknowledgeMode.AUTO 根據方法的執行情況來決定是否確認還是拒絕(是否重新入隊列)

如果消息成功被消費(成功的意思是在消費的過程中沒有拋出異常),則自動確認當拋出 AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕,且消息不會重回隊列當拋出 ImmediateAcknowledgeAmqpException 異常,則消費者會被確認其他的異常,則消息會被拒絕,並且該消息會重回隊列,如果此時只有一個消費者監聽該隊列,則有發生死循環的風險,多消費端也會造成資源的極大浪費,這個在開發過程中一定要避免的。可以通過 setDefaultRequeueRejected (默認是 true )去設置

可能造成消息丟失,一般是需要我們在 try-catch 捕捉異常後, 列印日誌 用於追蹤數據,這樣找出對應數據再做後續處理。

手動確認

AcknowledgeMode.MANUAL 對於手動確認,也是我們工作中最常用到的,它的用法如下:

/* * 肯定確認 * deliveryTag:消息隊列數據的唯一id * multiple:是否批量 * true :一次性確認所有小於等於deliveryTag的消息 * false:對當前消息進行確認; */channel.basicAck(long deliveryTag, boolean multiple);

複製代碼

/* * 否定確認 * multiple:是否批量 * true:一次性拒絕所有小於deliveryTag的消息 * false:對當前消息進行確認; * requeue:被拒絕的是否重新入列, * true:就是將數據重新丟回隊列裡,那麼下次還會消費這消息; * false:就是拒絕處理該消息,伺服器把該消息丟掉即可。 */channel.basicNack(long deliveryTag, boolean multiple, boolean requeue);

複製代碼

/* * 用於否定確認,但與basicNack相比有一個限制,一次只能拒絕單條消息 */channel.basicReject(long deliveryTag, boolean requeue);

複製代碼

手動確認

在 yml 配置中開啟手動確認模式

spring: rabbitmq: listener: simple: acknowledge-mode: manual

複製代碼

或者在代碼中開啟

@Configurationpublic class MessageListenerConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private MQReciever mqReciever;//消息接收處理類 @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer{ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //並發使用者的數量 container.setConcurrentConsumers(1); //消費者人數上限 container.setMaxConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默認是自動確認,這裡改為手動確認消息 //設置一個隊列,此處支持設置多個 container.setQueueNames("directQueue"); container.setMessageListener(mqReciever); return container; }}

複製代碼

消息消費類

@Component@RabbitListener(queues = "directQueue")//監聽隊列名稱public class MQReciever implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties.getDeliveryTag; try { String msg = message.toString; String[] msgArray = msg.split("'");//可以點進Message裡面看源碼,單引號直接的數據就是我們的map消息數據 System.out.println("消費的消息內容:" msgArray[1]); System.out.println("消費的主題消息來自:" message.getMessageProperties.getConsumerQueue); //業務處理 ...... channel.basicAck(deliveryTag, true); } catch (Exception e) { //拒絕重新入隊列 channel.basicReject(deliveryTag, false); e.printStackTrace; } }}

原文 https://xie.infoq.cn/article/96df79f01c32a63129dfe7a29

,
同类文章
葬禮的夢想

葬禮的夢想

夢見葬禮,我得到了這個夢想,五個要素的五個要素,水火只好,主要名字在外面,職業生涯良好,一切都應該對待他人治療誠意,由於小,吉利的冬天夢想,秋天的夢是不吉利的
找到手機是什麼意思?

找到手機是什麼意思?

找到手機是什麼意思?五次選舉的五個要素是兩名士兵的跡象。與他溝通很好。這是非常財富,它擅長運作,職業是仙人的標誌。單身男人有這個夢想,主要生活可以有人幫忙
我不怎麼想?

我不怎麼想?

我做了什麼意味著看到米飯烹飪?我得到了這個夢想,五線的主要土壤,但是Tu Ke水是錢的跡象,職業生涯更加真誠。他真誠地誠實。這是豐富的,這是夏瑞的巨星
夢想你的意思是什麼?

夢想你的意思是什麼?

你是什​​麼意思夢想的夢想?夢想,主要木材的五個要素,水的跡象,主營業務,主營業務,案子應該抓住魅力,不能疏忽,春天夢想的吉利夢想夏天的夢想不幸。詢問學者夢想
拯救夢想

拯救夢想

拯救夢想什麼意思?你夢想著拯救人嗎?拯救人們的夢想有一個現實,也有夢想的主觀想像力,請參閱週宮官方網站拯救人民夢想的詳細解釋。夢想著敵人被拯救出來
2022愛方向和生日是在[質量個性]中

2022愛方向和生日是在[質量個性]中

[救生員]有人說,在出生88天之前,胎兒已經知道哪天的出生,如何有優質的個性,將走在什麼樣的愛情之旅,將與生活生活有什么生活。今天
夢想切割剪裁

夢想切割剪裁

夢想切割剪裁什麼意思?你夢想切你的手是好的嗎?夢想切割手工切割手有一個真正的影響和反應,也有夢想的主觀想像力。請參閱官方網站夢想的細節,以削減手
夢想著親人死了

夢想著親人死了

夢想著親人死了什麼意思?你夢想夢想你的親人死嗎?夢想有一個現實的影響和反應,還有夢想的主觀想像力,請參閱夢想世界夢想死亡的親屬的詳細解釋
夢想搶劫

夢想搶劫

夢想搶劫什麼意思?你夢想搶劫嗎?夢想著搶劫有一個現實的影響和反應,也有夢想的主觀想像力,請參閱週恭吉夢官方網站的詳細解釋。夢想搶劫
夢想缺乏缺乏紊亂

夢想缺乏缺乏紊亂

夢想缺乏缺乏紊亂什麼意思?你夢想缺乏異常藥物嗎?夢想缺乏現實世界的影響和現實,還有夢想的主觀想像,請看官方網站的夢想組織缺乏異常藥物。我覺得有些東西缺失了