Java利用Redis實現(xiàn)消息隊列的示例代碼
本文介紹了Java利用Redis實現(xiàn)消息隊列的示例代碼,分享給大家,具體如下:
應(yīng)用場景
為什么要用redis?
二進制存儲、java序列化傳輸、IO連接數(shù)高、連接頻繁
一、序列化
這里編寫了一個java序列化的工具,主要是將對象轉(zhuǎn)化為byte數(shù)組,和根據(jù)byte數(shù)組反序列化成java對象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每個需要序列化的對象都要實現(xiàn)Serializable接口;
其代碼如下:
package Utils;
import java.io.*;
/**
* Created by Kinglf on 2016/10/17.
*/
public class ObjectUtil {
/**
* 對象轉(zhuǎn)byte[]
* @param obj
* @return
* @throws IOException
*/
public static byte[] object2Bytes(Object obj) throws IOException{
ByteArrayOutputStream bo=new ByteArrayOutputStream();
ObjectOutputStream oo=new ObjectOutputStream(bo);
oo.writeObject(obj);
byte[] bytes=bo.toByteArray();
bo.close();
oo.close();
return bytes;
}
/**
* byte[]轉(zhuǎn)對象
* @param bytes
* @return
* @throws Exception
*/
public static Object bytes2Object(byte[] bytes) throws Exception{
ByteArrayInputStream in=new ByteArrayInputStream(bytes);
ObjectInputStream sIn=new ObjectInputStream(in);
return sIn.readObject();
}
}
二、消息類(實現(xiàn)Serializable接口)
package Model;
import java.io.Serializable;
/**
* Created by Kinglf on 2016/10/17.
*/
public class Message implements Serializable {
private static final long serialVersionUID = -389326121047047723L;
private int id;
private String content;
public Message(int id, String content) {
this.id = id;
this.content = content;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
三、Redis的操作
利用redis做隊列,我們采用的是redis中l(wèi)ist的push和pop操作;
結(jié)合隊列的特點:
只允許在一端插入新元素只能在隊列的尾部FIFO:先進先出原則 Redis中l(wèi)push頭入(rpop尾出)或rpush尾入(lpop頭出)可以滿足要求,而Redis中l(wèi)ist藥push或 pop的對象僅需要轉(zhuǎn)換成byte[]即可
java采用Jedis進行Redis的存儲和Redis的連接池設(shè)置
上代碼:
package Utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Created by Kinglf on 2016/10/17.
*/
public class JedisUtil {
private static String JEDIS_IP;
private static int JEDIS_PORT;
private static String JEDIS_PASSWORD;
private static JedisPool jedisPool;
static {
//Configuration自行寫的配置文件解析類,繼承自Properties
Configuration conf=Configuration.getInstance();
JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
JEDIS_PORT=conf.getInt("jedis.port",6379);
JEDIS_PASSWORD=conf.getString("jedis.password",null);
JedisPoolConfig config=new JedisPoolConfig();
config.setMaxActive(5000);
config.setMaxIdle(256);
config.setMaxWait(5000L);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setMinEvictableIdleTimeMillis(60000L);
config.setTimeBetweenEvictionRunsMillis(3000L);
config.setNumTestsPerEvictionRun(-1);
jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
}
/**
* 獲取數(shù)據(jù)
* @param key
* @return
*/
public static String get(String key){
String value=null;
Jedis jedis=null;
try{
jedis=jedisPool.getResource();
value=jedis.get(key);
}catch (Exception e){
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally {
close(jedis);
}
return value;
}
private static void close(Jedis jedis) {
try{
jedisPool.returnResource(jedis);
}catch (Exception e){
if(jedis.isConnected()){
jedis.quit();
jedis.disconnect();
}
}
}
public static byte[] get(byte[] key){
byte[] value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.get(key);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
return value;
}
public static void set(byte[] key, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(key, value);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
public static void set(byte[] key, byte[] value, int time) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(key, value);
jedis.expire(key, time);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
public static void hset(byte[] key, byte[] field, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hset(key, field, value);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
public static void hset(String key, String field, String value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hset(key, field, value);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
/**
* 獲取數(shù)據(jù)
*
* @param key
* @return
*/
public static String hget(String key, String field) {
String value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
return value;
}
/**
* 獲取數(shù)據(jù)
*
* @param key
* @return
*/
public static byte[] hget(byte[] key, byte[] field) {
byte[] value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
return value;
}
public static void hdel(byte[] key, byte[] field) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hdel(key, field);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
/**
* 存儲REDIS隊列 順序存儲
* @param key reids鍵名
* @param value 鍵值
*/
public static void lpush(byte[] key, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.lpush(key, value);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
/**
* 存儲REDIS隊列 反向存儲
* @param key reids鍵名
* @param value 鍵值
*/
public static void rpush(byte[] key, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.rpush(key, value);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
/**
* 將列表 source 中的最后一個元素(尾元素)彈出,并返回給客戶端
* @param key reids鍵名
* @param destination 鍵值
*/
public static void rpoplpush(byte[] key, byte[] destination) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.rpoplpush(key, destination);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
/**
* 獲取隊列數(shù)據(jù)
* @param key 鍵名
* @return
*/
public static List lpopList(byte[] key) {
List list = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
list = jedis.lrange(key, 0, -1);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
return list;
}
/**
* 獲取隊列數(shù)據(jù)
* @param key 鍵名
* @return
*/
public static byte[] rpop(byte[] key) {
byte[] bytes = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
bytes = jedis.rpop(key);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
return bytes;
}
public static void hmset(Object key, Map hash) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hmset(key.toString(), hash);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
public static void hmset(Object key, Map hash, int time) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hmset(key.toString(), hash);
jedis.expire(key.toString(), time);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
public static List hmget(Object key, String... fields) {
List result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hmget(key.toString(), fields);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
return result;
}
public static Set hkeys(String key) {
Set result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hkeys(key);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
return result;
}
public static List lrange(byte[] key, int from, int to) {
List result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.lrange(key, from, to);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
return result;
}
public static Map hgetAll(byte[] key) {
Map result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hgetAll(key);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
return result;
}
public static void del(byte[] key) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.del(key);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
}
public static long llen(byte[] key) {
long len = 0;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.llen(key);
} catch (Exception e) {
//釋放redis對象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返還到連接池
close(jedis);
}
return len;
}
}
四、Configuration主要用于讀取Redis的配置信息
package Utils;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* Created by Kinglf on 2016/10/17.
*/
public class Configuration extends Properties {
private static final long serialVersionUID = -2296275030489943706L;
private static Configuration instance = null;
public static synchronized Configuration getInstance() {
if (instance == null) {
instance = new Configuration();
}
return instance;
}
public String getProperty(String key, String defaultValue) {
String val = getProperty(key);
return (val == null || val.isEmpty()) ? defaultValue : val;
}
public String getString(String name, String defaultValue) {
return this.getProperty(name, defaultValue);
}
public int getInt(String name, int defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}
public long getLong(String name, long defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}
public float getFloat(String name, float defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
}
public double getDouble(String name, double defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
}
public byte getByte(String name, byte defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
}
public Configuration() {
InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
try {
this.loadFromXML(in);
in.close();
} catch (IOException ioe) {
}
}
}
五、測試
import Model.Message;
import Utils.JedisUtil;
import Utils.ObjectUtil;
import redis.clients.jedis.Jedis;
import java.io.IOException;
/**
* Created by Kinglf on 2016/10/17.
*/
public class TestRedisQueue {
public static byte[] redisKey = "key".getBytes();
static {
try {
init();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void init() throws IOException {
for (int i = 0; i < 1000000; i++) {
Message message = new Message(i, "這是第" + i + "個內(nèi)容");
JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
}
}
public static void main(String[] args) {
try {
pop();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void pop() throws Exception {
byte[] bytes = JedisUtil.rpop(redisKey);
Message msg = (Message) ObjectUtil.bytes2Object(bytes);
if (msg != null) {
System.out.println(msg.getId() + "----" + msg.getContent());
}
}
}
每執(zhí)行一次pop()方法,結(jié)果如下: <br>1----這是第1個內(nèi)容 <br>2----這是第2個內(nèi)容 <br>3----這是第3個內(nèi)容 <br>4----這是第4個內(nèi)容
總結(jié)
至此,整個Redis消息隊列的生產(chǎn)者和消費者代碼已經(jīng)完成
1.Message 需要傳送的實體類(需實現(xiàn)Serializable接口)
2.Configuration Redis的配置讀取類,繼承自Properties
3.ObjectUtil 將對象和byte數(shù)組雙向轉(zhuǎn)換的工具類
4.Jedis 通過消息隊列的先進先出(FIFO)的特點結(jié)合Redis的list中的push和pop操作進行封裝的工具類
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- 一口氣說出Java 6種延時隊列的實現(xiàn)方法(面試官也得服)
- 詳解Java阻塞隊列(BlockingQueue)的實現(xiàn)原理
- java中棧和隊列的實現(xiàn)和API的用法(詳解)
- Java 隊列實現(xiàn)原理及簡單實現(xiàn)代碼
- 解析Java中PriorityQueue優(yōu)先級隊列結(jié)構(gòu)的源碼及用法
- 剖析Java中阻塞隊列的實現(xiàn)原理及應(yīng)用場景
- java實現(xiàn)消息隊列的兩種方式(小結(jié))
- 詳解Java消息隊列-Spring整合ActiveMq
- java優(yōu)先隊列PriorityQueue中Comparator的用法詳解
- Java中和隊列相關(guān)的基本操作
相關(guān)文章
SSH框架網(wǎng)上商城項目第27戰(zhàn)之申請域名空間和項目部署及發(fā)布
這篇文章主要為大家詳細介紹了SSH框架網(wǎng)上商城項目第26戰(zhàn)之申請域名空間和項目部署及發(fā)布,感興趣的小伙伴們可以參考一下2016-06-06
MyBatisPlus的autoResultMap生成策略實現(xiàn)
本文主要介紹了MyBatisPlus的autoResultMap生成策略實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-02-02

