java遠(yuǎn)程連接調(diào)用Rabbitmq的實(shí)例代碼
本文介紹了java遠(yuǎn)程連接調(diào)用Rabbitmq,分享給大家,希望此文章對各位有所幫助。
打開IDEA創(chuàng)建一個(gè)maven工程(Java就可以了)。
pom.xml文件如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhenqi</groupId> <artifactId>rabbitmq-study</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitmq-study</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> </dependencies> </project>
為了能遠(yuǎn)程訪問rabbitmq,則需要編輯 /etc/rabbitmq/rabbitmq.conf,添加以下內(nèi)容。
[ {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} ]
添加administrator角色
rabbitmqctl set_user_tags openstack administrator
創(chuàng)建抽象隊(duì)列 EndPoint.java
package com.zhenqi; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * Created by wuming on 2017/7/16. */ public abstract class EndPoint { protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws Exception { this.endPointName = endpointName; //創(chuàng)建一個(gè)連接工廠 connection factory ConnectionFactory factory = new ConnectionFactory(); //設(shè)置rabbitmq-server服務(wù)IP地址 factory.setHost("192.168.146.128"); factory.setUsername("openstack"); factory.setPassword("rabbitmq"); factory.setPort(5672); factory.setVirtualHost("/"); //得到 連接 connection = factory.newConnection(); //創(chuàng)建 channel實(shí)例 channel = connection.createChannel(); channel.queueDeclare(endpointName, false, false, false, null); } /** * 關(guān)閉channel和connection。并非必須,因?yàn)殡[含是自動調(diào)用的。 * @throws IOException */ public void close() throws Exception{ this.channel.close(); this.connection.close(); } }
生產(chǎn)者Producer.java
生產(chǎn)者類的任務(wù)是向隊(duì)列里寫一條消息
package com.zhenqi; import org.apache.commons.lang.SerializationUtils; import java.io.Serializable; /** * Created by wuming on 2017/7/16. */ public class Producer extends EndPoint { public Producer(String endpointName) throws Exception { super(endpointName); } public void sendMessage(Serializable object) throws Exception { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } }
消費(fèi)者QueueConsumer.java
消費(fèi)者可以以線程方式運(yùn)行,對于不同的事件有不同的回調(diào)函數(shù),其中最主要的是處理新消息到來的事件。
package com.zhenqi; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import org.apache.commons.lang.SerializationUtils; import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Created by wuming on 2017/7/16. */ public class QueueConsumer extends EndPoint implements Runnable, Consumer { private Logger LOG=Logger.getLogger(QueueConsumer.class); public QueueConsumer(String endpointName) throws Exception { super(endpointName); } public void handleConsumeOk(String s) { } public void handleCancelOk(String s) { } public void handleCancel(String s) throws IOException { } public void handleShutdownSignal(String s, ShutdownSignalException e) { } public void handleRecoverOk(String s) { LOG.info("Consumer "+s +" registered"); } public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { Map map = (HashMap) SerializationUtils.deserialize(bytes); LOG.info("Message Number "+ map.get("message number") + " received."); } public void run() { try{ channel.basicConsume(endPointName, true,this); }catch(IOException e){ e.printStackTrace(); } } }
測試
運(yùn)行一個(gè)消費(fèi)者線程,然后開始產(chǎn)生大量的消息,這些消息會被消費(fèi)者取走
package com.zhenqi; import java.util.HashMap; /** * Created by wuming on 2017/7/16. */ public class TestRabbitmq { public static void main(String[] args){ try{ QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer producer = new Producer("queue"); for (int i = 0; i < 100000; i++){ HashMap message = new HashMap(); message.put("message number", i); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); } }catch(Exception e){ e.printStackTrace(); } } }
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java多文件以ZIP壓縮包導(dǎo)出的實(shí)現(xiàn)方法
這篇文章主要為大家詳細(xì)介紹了Java多文件以ZIP壓縮包導(dǎo)出的實(shí)現(xiàn)方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-07-07Spark操作之a(chǎn)ggregate、aggregateByKey詳解
這篇文章主要介紹了Spark操作之a(chǎn)ggregate、aggregateByKey詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06學(xué)習(xí)Java中的日期和時(shí)間處理及Java日歷小程序的編寫
這篇文章主要介紹了學(xué)習(xí)Java中的日期和時(shí)間處理及Java日歷小程序的編寫,這個(gè)日歷小程序僅用簡單的算法實(shí)現(xiàn)沒有用到date類等,但是帶有圖形界面,需要的朋友可以參考下2016-02-02MyBatis?Plus?導(dǎo)入IdType失敗的解決
這篇文章主要介紹了MyBatis?Plus?導(dǎo)入IdType失敗的解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12