欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

在RabbitMQ中實(shí)現(xiàn)Work queues工作隊(duì)列模式

 更新時(shí)間:2021年04月16日 15:00:37   作者:Java_Caiyo  
這篇文章主要介紹了如何在RabbitMQ中實(shí)現(xiàn)Work queues模式,代碼詳細(xì),解釋清晰,可以幫助大家更好理解java,對這方面感興趣的朋友可以參考下

一、模式說明

Work Queues 與入門程序的簡單模式相比,多了一個或一些消費(fèi)端,多個消費(fèi)端共同消費(fèi)同一個隊(duì)列中的消息。

應(yīng)用場景 :對于任務(wù)過重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。

二、代碼

Work Queues 與入門程序的 簡單模式 的代碼是幾乎一樣的:可以完全復(fù)制,并復(fù)制多一個消費(fèi)者進(jìn)行多個消費(fèi)者同時(shí)消費(fèi)消息的測試。

①生產(chǎn)者

package com.itheima.rabbitmq.work; 
import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
public class Producer { 
	static final String QUEUE_NAME = "work_queue"; 
	public static void main(String[] args) throws Exception { 
		//創(chuàng)建連接 
		Connection connection = ConnectionUtil.getConnection(); 
		// 創(chuàng)建頻道 
		Channel channel = connection.createChannel(); 
		// 聲明(創(chuàng)建)隊(duì)列 
		/**
		 * 參數(shù)1:隊(duì)列名稱 
		 * 參數(shù)2:是否定義持久化隊(duì)列 
		 * 參數(shù)3:是否獨(dú)占本次連接 
		 * 參數(shù)4:是否在不使用的時(shí)候自動刪除隊(duì)列 
		 * 參數(shù)5:隊(duì)列其它參數(shù) 
		*/ 
		channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
		for (int i = 1; i <= 30; i++) { 
			// 發(fā)送信息 
			String message = "你好;小兔子!work模式--" + i; 
			/**
			 * 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage 
			 * 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱 
			 * 參數(shù)3:消息其它屬性 
			 * 參數(shù)4:消息內(nèi)容 
			*/ 
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
			System.out.println("已發(fā)送消息:" + message); 
		}
		// 關(guān)閉資源 
		channel.close(); connection.close(); 
	} 
}

②消費(fèi)者1

package com.itheima.rabbitmq.work; 
import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.*;
import java.io.IOException; 
public class Consumer1 { 
	public static void main(String[] args) throws Exception { 
		Connection connection = ConnectionUtil.getConnection(); 
		// 創(chuàng)建頻道 
		Channel channel = connection.createChannel(); 
		// 聲明(創(chuàng)建)隊(duì)列 
		/**
		 * 參數(shù)1:隊(duì)列名稱 
		 * 參數(shù)2:是否定義持久化隊(duì)列 
		 * 參數(shù)3:是否獨(dú)占本次連接 
		 * 參數(shù)4:是否在不使用的時(shí)候自動刪除隊(duì)列 
		 * 參數(shù)5:隊(duì)列其它參數(shù) 
		*/ 
		channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); 
		//一次只能接收并處理一個消息 
		channel.basicQos(1); 
		//創(chuàng)建消費(fèi)者;并設(shè)置消息處理 
		DefaultConsumer consumer = new DefaultConsumer(channel){ 
			@Override 
			/**
			 * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定 
			 * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送) 
			 * properties 屬性信息 
			 * body 消息 
			*/ 
			public void handleDelivery(String consumerTag, Envelope envelope, 
					AMQP.BasicProperties properties, byte[] body) throws IOException { 
				try {
					//路由key 
					System.out.println("路由key為:" + envelope.getRoutingKey()); 
					//交換機(jī) 
					System.out.println("交換機(jī)為:" + envelope.getExchange()); 
					//消息id 
					System.out.println("消息id為:" + envelope.getDeliveryTag()); 
					//收到的消息 
					System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8")); 
					Thread.sleep(1000); 
					//確認(rèn)消息 
					channel.basicAck(envelope.getDeliveryTag(), false); 
				} 
				catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
			} 
		};
		//監(jiān)聽消息 
		/**
		 * 參數(shù)1:隊(duì)列名稱
		 * 參數(shù)2:是否自動確認(rèn),設(shè)置為true為表示消息接收到自動向mq回復(fù)接收到了,mq接收到回復(fù)會刪除消息,設(shè)置為false則需要手動確認(rèn) 
		 * 參數(shù)3:消息接收到后回調(diào) 
		*/ 
		channel.basicConsume(Producer.QUEUE_NAME, false, consumer); 
	} 
}

③消費(fèi)者2

package com.itheima.rabbitmq.work; 
import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.*; 
import java.io.IOException; 
public class Consumer2 { 
	public static void main(String[] args) throws Exception { 
		Connection connection = ConnectionUtil.getConnection(); 
		// 創(chuàng)建頻道 
		Channel channel = connection.createChannel(); 
		// 聲明(創(chuàng)建)隊(duì)列 
		/**
		 * 參數(shù)1:隊(duì)列名稱 
		 * 參數(shù)2:是否定義持久化隊(duì)列 
		 * 參數(shù)3:是否獨(dú)占本次連接 
		 * 參數(shù)4:是否在不使用的時(shí)候自動刪除隊(duì)列 
		 * 參數(shù)5:隊(duì)列其它參數(shù) 
		*/ 
		channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); 
		//一次只能接收并處理一個消息 
		channel.basicQos(1); 
		//創(chuàng)建消費(fèi)者;并設(shè)置消息處理 
		DefaultConsumer consumer = new DefaultConsumer(channel){ 
			@Override 
			/**
			 * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定 
			 * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送) 
			 * properties 屬性信息 
			 * body 消息 
			*/ 
			public void handleDelivery(String consumerTag, Envelope envelope, 
					AMQP.BasicProperties properties, byte[] body) throws IOException { 
				try {
					//路由key 
					System.out.println("路由key為:" + envelope.getRoutingKey()); 
					//交換機(jī) 
					System.out.println("交換機(jī)為:" + envelope.getExchange()); 
					//消息id 
					System.out.println("消息id為:" + envelope.getDeliveryTag());
					//收到的消息 
					System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8")); 
					Thread.sleep(1000); 
					//確認(rèn)消息 
					channel.basicAck(envelope.getDeliveryTag(), false); 
				} catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
			} 
		};
		//監(jiān)聽消息 
		/**
		 * 參數(shù)1:隊(duì)列名稱 
		 * 參數(shù)2:是否自動確認(rèn),設(shè)置為true為表示消息接收到自動向mq回復(fù)接收到了,mq接收到回復(fù)會刪除消息,設(shè)置為false則需要手動確認(rèn) 
		 * 參數(shù)3:消息接收到后回調(diào) 
		*/ 
		channel.basicConsume(Producer.QUEUE_NAME, false, consumer); 
	} 
}

三、測試

啟動兩個消費(fèi)者,然后再啟動生產(chǎn)者發(fā)送消息;到IDEA的兩個消費(fèi)者對應(yīng)的控制臺查看是否競爭性的接收到消息。

總結(jié)

在一個隊(duì)列中如果有多個消費(fèi)者,那么消費(fèi)者之間對于同一個消息的關(guān)系是競爭的關(guān)系。

到此這篇關(guān)于如何在RabbitMQ中實(shí)現(xiàn)Work queues模式的文章就介紹到這了,希望對你有所幫助,更多相關(guān)RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持腳本之家!

相關(guān)文章

  • 基于swing實(shí)現(xiàn)窗體拖拽和拉伸

    基于swing實(shí)現(xiàn)窗體拖拽和拉伸

    這篇文章主要為大家詳細(xì)介紹了基于swing實(shí)現(xiàn)窗體拖拽和拉伸,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-12-12
  • Springmvc Controller接口代碼示例

    Springmvc Controller接口代碼示例

    這篇文章主要介紹了Springmvc Controller接口代碼示例,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-11-11
  • 使用ObjectMapper解析json不用一直new了

    使用ObjectMapper解析json不用一直new了

    這篇文章主要為大家介紹了使用ObjectMapper解析json不用一直new了的方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-06-06
  • 輕松掌握J(rèn)ava觀察者模式

    輕松掌握J(rèn)ava觀察者模式

    這篇文章主要幫助大家輕松掌握J(rèn)ava觀察者模式,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2016-09-09
  • Spring Security使用Lambda DSL配置流程詳解

    Spring Security使用Lambda DSL配置流程詳解

    Spring Security 5.2 對 Lambda DSL 語法的增強(qiáng),允許使用lambda配置HttpSecurity、ServerHttpSecurity,重要提醒,之前的配置方法仍然有效。lambda的添加旨在提供更大的靈活性,但是用法是可選的。讓我們看一下HttpSecurity的lambda配置與以前的配置樣式相比
    2023-02-02
  • Elasticsearch8.1中的Script使用實(shí)例深入解讀

    Elasticsearch8.1中的Script使用實(shí)例深入解讀

    這篇文章主要為大家介紹了Elasticsearch8.1中的Script使用實(shí)例深入解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-10-10
  • 基于java構(gòu)造方法Vector修改元素源碼分析

    基于java構(gòu)造方法Vector修改元素源碼分析

    本篇文章是關(guān)于ava構(gòu)造方法Vector源碼分析系列文章,本文主要介紹了Vector修改元素的源碼分析,有需要的朋友可以借鑒參考下,希望可以有所幫助
    2021-09-09
  • Spring Boot統(tǒng)一異常處理詳解

    Spring Boot統(tǒng)一異常處理詳解

    我們在做Web應(yīng)用的時(shí)候,請求處理過程中發(fā)生錯誤是非常常見的情況。這個時(shí)候就需要統(tǒng)一異常處理了,這篇文章主要給大家介紹了Spring Boot如何進(jìn)行統(tǒng)一異常處理的相關(guān)資料,需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-02-02
  • Java并發(fā)編程之ConcurrentLinkedQueue解讀

    Java并發(fā)編程之ConcurrentLinkedQueue解讀

    這篇文章主要介紹了Java并發(fā)編程之ConcurrentLinkedQueue解讀,非阻塞的實(shí)現(xiàn)方式則可以使用循環(huán)CAS的方式來實(shí)現(xiàn),而ConcurrentLinkedQueue就是juc包中自帶的經(jīng)典非堵塞方式實(shí)現(xiàn)的工具類,需要的朋友可以參考下
    2023-12-12
  • SpringBoot項(xiàng)目中訪問HTML頁面的三種方法

    SpringBoot項(xiàng)目中訪問HTML頁面的三種方法

    這篇文章主要介紹了SpringBoot項(xiàng)目中訪問HTML頁面的三種方法,文中通過代碼示例和圖文結(jié)合的方式講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下
    2024-07-07

最新評論