Java利用Redis實(shí)現(xiàn)消息隊(duì)列的示例代碼
本文介紹了Java利用Redis實(shí)現(xiàn)消息隊(duì)列的示例代碼,分享給大家,具體如下:
應(yīng)用場(chǎng)景
為什么要用redis?
二進(jìn)制存儲(chǔ)、java序列化傳輸、IO連接數(shù)高、連接頻繁
一、序列化
這里編寫(xiě)了一個(gè)java序列化的工具,主要是將對(duì)象轉(zhuǎn)化為byte數(shù)組,和根據(jù)byte數(shù)組反序列化成java對(duì)象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每個(gè)需要序列化的對(duì)象都要實(shí)現(xiàn)Serializable接口;
其代碼如下:
package Utils; import java.io.*; /** * Created by Kinglf on 2016/10/17. */ public class ObjectUtil { /** * 對(duì)象轉(zhuǎn)byte[] * @param obj * @return * @throws IOException */ public static byte[] object2Bytes(Object obj) throws IOException{ ByteArrayOutputStream bo=new ByteArrayOutputStream(); ObjectOutputStream oo=new ObjectOutputStream(bo); oo.writeObject(obj); byte[] bytes=bo.toByteArray(); bo.close(); oo.close(); return bytes; } /** * byte[]轉(zhuǎn)對(duì)象 * @param bytes * @return * @throws Exception */ public static Object bytes2Object(byte[] bytes) throws Exception{ ByteArrayInputStream in=new ByteArrayInputStream(bytes); ObjectInputStream sIn=new ObjectInputStream(in); return sIn.readObject(); } }
二、消息類(lèi)(實(shí)現(xiàn)Serializable接口)
package Model; import java.io.Serializable; /** * Created by Kinglf on 2016/10/17. */ public class Message implements Serializable { private static final long serialVersionUID = -389326121047047723L; private int id; private String content; public Message(int id, String content) { this.id = id; this.content = content; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
三、Redis的操作
利用redis做隊(duì)列,我們采用的是redis中l(wèi)ist的push和pop操作;
結(jié)合隊(duì)列的特點(diǎn):
只允許在一端插入新元素只能在隊(duì)列的尾部FIFO:先進(jìn)先出原則
Redis中l(wèi)push頭入(rpop尾出)或rpush尾入(lpop頭出)可以滿(mǎn)足要求,而Redis中l(wèi)ist藥push或 pop的對(duì)象僅需要轉(zhuǎn)換成byte[]即可
java采用Jedis進(jìn)行Redis的存儲(chǔ)和Redis的連接池設(shè)置
上代碼:
package Utils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.util.List; import java.util.Map; import java.util.Set; /** * Created by Kinglf on 2016/10/17. */ public class JedisUtil { private static String JEDIS_IP; private static int JEDIS_PORT; private static String JEDIS_PASSWORD; private static JedisPool jedisPool; static { //Configuration自行寫(xiě)的配置文件解析類(lèi),繼承自Properties Configuration conf=Configuration.getInstance(); JEDIS_IP=conf.getString("jedis.ip","127.0.0.1"); JEDIS_PORT=conf.getInt("jedis.port",6379); JEDIS_PASSWORD=conf.getString("jedis.password",null); JedisPoolConfig config=new JedisPoolConfig(); config.setMaxActive(5000); config.setMaxIdle(256); config.setMaxWait(5000L); config.setTestOnBorrow(true); config.setTestOnReturn(true); config.setTestWhileIdle(true); config.setMinEvictableIdleTimeMillis(60000L); config.setTimeBetweenEvictionRunsMillis(3000L); config.setNumTestsPerEvictionRun(-1); jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000); } /** * 獲取數(shù)據(jù) * @param key * @return */ public static String get(String key){ String value=null; Jedis jedis=null; try{ jedis=jedisPool.getResource(); value=jedis.get(key); }catch (Exception e){ jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally { close(jedis); } return value; } private static void close(Jedis jedis) { try{ jedisPool.returnResource(jedis); }catch (Exception e){ if(jedis.isConnected()){ jedis.quit(); jedis.disconnect(); } } } public static byte[] get(byte[] key){ byte[] value = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); value = jedis.get(key); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } return value; } public static void set(byte[] key, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.set(key, value); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } public static void set(byte[] key, byte[] value, int time) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.set(key, value); jedis.expire(key, time); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } public static void hset(byte[] key, byte[] field, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hset(key, field, value); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } public static void hset(String key, String field, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hset(key, field, value); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } /** * 獲取數(shù)據(jù) * * @param key * @return */ public static String hget(String key, String field) { String value = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); value = jedis.hget(key, field); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } return value; } /** * 獲取數(shù)據(jù) * * @param key * @return */ public static byte[] hget(byte[] key, byte[] field) { byte[] value = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); value = jedis.hget(key, field); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } return value; } public static void hdel(byte[] key, byte[] field) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hdel(key, field); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } /** * 存儲(chǔ)REDIS隊(duì)列 順序存儲(chǔ) * @param key reids鍵名 * @param value 鍵值 */ public static void lpush(byte[] key, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.lpush(key, value); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } /** * 存儲(chǔ)REDIS隊(duì)列 反向存儲(chǔ) * @param key reids鍵名 * @param value 鍵值 */ public static void rpush(byte[] key, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.rpush(key, value); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } /** * 將列表 source 中的最后一個(gè)元素(尾元素)彈出,并返回給客戶(hù)端 * @param key reids鍵名 * @param destination 鍵值 */ public static void rpoplpush(byte[] key, byte[] destination) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.rpoplpush(key, destination); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } /** * 獲取隊(duì)列數(shù)據(jù) * @param key 鍵名 * @return */ public static List lpopList(byte[] key) { List list = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); list = jedis.lrange(key, 0, -1); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } return list; } /** * 獲取隊(duì)列數(shù)據(jù) * @param key 鍵名 * @return */ public static byte[] rpop(byte[] key) { byte[] bytes = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); bytes = jedis.rpop(key); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } return bytes; } public static void hmset(Object key, Map hash) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hmset(key.toString(), hash); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } public static void hmset(Object key, Map hash, int time) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hmset(key.toString(), hash); jedis.expire(key.toString(), time); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } public static List hmget(Object key, String... fields) { List result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.hmget(key.toString(), fields); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } return result; } public static Set hkeys(String key) { Set result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.hkeys(key); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } return result; } public static List lrange(byte[] key, int from, int to) { List result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.lrange(key, from, to); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } return result; } public static Map hgetAll(byte[] key) { Map result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.hgetAll(key); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } return result; } public static void del(byte[] key) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.del(key); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } } public static long llen(byte[] key) { long len = 0; Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.llen(key); } catch (Exception e) { //釋放redis對(duì)象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返還到連接池 close(jedis); } return len; } }
四、Configuration主要用于讀取Redis的配置信息
package Utils; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * Created by Kinglf on 2016/10/17. */ public class Configuration extends Properties { private static final long serialVersionUID = -2296275030489943706L; private static Configuration instance = null; public static synchronized Configuration getInstance() { if (instance == null) { instance = new Configuration(); } return instance; } public String getProperty(String key, String defaultValue) { String val = getProperty(key); return (val == null || val.isEmpty()) ? defaultValue : val; } public String getString(String name, String defaultValue) { return this.getProperty(name, defaultValue); } public int getInt(String name, int defaultValue) { String val = this.getProperty(name); return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val); } public long getLong(String name, long defaultValue) { String val = this.getProperty(name); return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val); } public float getFloat(String name, float defaultValue) { String val = this.getProperty(name); return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val); } public double getDouble(String name, double defaultValue) { String val = this.getProperty(name); return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val); } public byte getByte(String name, byte defaultValue) { String val = this.getProperty(name); return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val); } public Configuration() { InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml"); try { this.loadFromXML(in); in.close(); } catch (IOException ioe) { } } }
五、測(cè)試
import Model.Message; import Utils.JedisUtil; import Utils.ObjectUtil; import redis.clients.jedis.Jedis; import java.io.IOException; /** * Created by Kinglf on 2016/10/17. */ public class TestRedisQueue { public static byte[] redisKey = "key".getBytes(); static { try { init(); } catch (IOException e) { e.printStackTrace(); } } private static void init() throws IOException { for (int i = 0; i < 1000000; i++) { Message message = new Message(i, "這是第" + i + "個(gè)內(nèi)容"); JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message)); } } public static void main(String[] args) { try { pop(); } catch (Exception e) { e.printStackTrace(); } } private static void pop() throws Exception { byte[] bytes = JedisUtil.rpop(redisKey); Message msg = (Message) ObjectUtil.bytes2Object(bytes); if (msg != null) { System.out.println(msg.getId() + "----" + msg.getContent()); } } }
每執(zhí)行一次pop()方法,結(jié)果如下: <br>1----這是第1個(gè)內(nèi)容 <br>2----這是第2個(gè)內(nèi)容 <br>3----這是第3個(gè)內(nèi)容 <br>4----這是第4個(gè)內(nèi)容
總結(jié)
至此,整個(gè)Redis消息隊(duì)列的生產(chǎn)者和消費(fèi)者代碼已經(jīng)完成
1.Message 需要傳送的實(shí)體類(lèi)(需實(shí)現(xiàn)Serializable接口)
2.Configuration Redis的配置讀取類(lèi),繼承自Properties
3.ObjectUtil 將對(duì)象和byte數(shù)組雙向轉(zhuǎn)換的工具類(lèi)
4.Jedis 通過(guò)消息隊(duì)列的先進(jìn)先出(FIFO)的特點(diǎn)結(jié)合Redis的list中的push和pop操作進(jìn)行封裝的工具類(lèi)
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- 一口氣說(shuō)出Java 6種延時(shí)隊(duì)列的實(shí)現(xiàn)方法(面試官也得服)
- 詳解Java阻塞隊(duì)列(BlockingQueue)的實(shí)現(xiàn)原理
- java中棧和隊(duì)列的實(shí)現(xiàn)和API的用法(詳解)
- Java 隊(duì)列實(shí)現(xiàn)原理及簡(jiǎn)單實(shí)現(xiàn)代碼
- 解析Java中PriorityQueue優(yōu)先級(jí)隊(duì)列結(jié)構(gòu)的源碼及用法
- 剖析Java中阻塞隊(duì)列的實(shí)現(xiàn)原理及應(yīng)用場(chǎng)景
- java實(shí)現(xiàn)消息隊(duì)列的兩種方式(小結(jié))
- 詳解Java消息隊(duì)列-Spring整合ActiveMq
- java優(yōu)先隊(duì)列PriorityQueue中Comparator的用法詳解
- Java中和隊(duì)列相關(guān)的基本操作
相關(guān)文章
Springboot項(xiàng)目中kaptcha驗(yàn)證碼的使用方式
這篇文章主要介紹了Springboot項(xiàng)目中kaptcha驗(yàn)證碼的使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05使用SpringBoot設(shè)置虛擬路徑映射絕對(duì)路徑
這篇文章主要介紹了使用SpringBoot設(shè)置虛擬路徑映射絕對(duì)路徑的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08SSH框架網(wǎng)上商城項(xiàng)目第27戰(zhàn)之申請(qǐng)域名空間和項(xiàng)目部署及發(fā)布
這篇文章主要為大家詳細(xì)介紹了SSH框架網(wǎng)上商城項(xiàng)目第26戰(zhàn)之申請(qǐng)域名空間和項(xiàng)目部署及發(fā)布,感興趣的小伙伴們可以參考一下2016-06-06如何將Java與C#時(shí)間進(jìn)行互相轉(zhuǎn)換
這篇文章主要介紹了Java與C#時(shí)間互轉(zhuǎn)的方法以及JAVA日期、C#日期計(jì)算說(shuō)明,需要的朋友可以參考下2022-11-11MyBatisPlus的autoResultMap生成策略實(shí)現(xiàn)
本文主要介紹了MyBatisPlus的autoResultMap生成策略實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-02-02Java程序圖形用戶(hù)界面設(shè)計(jì)之容器JFrame
圖形界面(簡(jiǎn)稱(chēng)GUI)是指采用圖形方式顯示的計(jì)算機(jī)操作用戶(hù)界面。與早期計(jì)算機(jī)使用的命令行界面相比,圖形界面對(duì)于用戶(hù)來(lái)說(shuō)在視覺(jué)上更易于接受,本篇精講Java語(yǔ)言中關(guān)于圖形用戶(hù)界面的基本容器JFrame2022-02-02