jedis的return行為源碼解析
序
本文主要研究一下jedis的return行為
spring-data-redis
RedisTemplate
org/springframework/data/redis/core/RedisTemplate.java
@Nullable public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(action, "Callback object must not be null"); RedisConnectionFactory factory = getRequiredConnectionFactory(); RedisConnection conn = RedisConnectionUtils.getConnection(factory, enableTransactionSupport); try { boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if (pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); T result = action.doInRedis(connToExpose); // close pipeline if (pipeline && !pipelineStatus) { connToUse.closePipeline(); } return postProcessResult(result, connToUse, existingConnection); } finally { RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport); } }
RedisTemplate的execute方法先通過RedisConnectionUtils.getConnection獲取連接,最后通過RedisConnectionUtils.releaseConnection來歸還連接
RedisConnectionUtils
org/springframework/data/redis/core/RedisConnectionUtils.java
public static void releaseConnection(@Nullable RedisConnection conn, RedisConnectionFactory factory, boolean transactionSupport) { releaseConnection(conn, factory); } public static void releaseConnection(@Nullable RedisConnection conn, RedisConnectionFactory factory) { if (conn == null) { return; } RedisConnectionHolder conHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); if (conHolder != null) { if (conHolder.isTransactionActive()) { if (connectionEquals(conHolder, conn)) { if (log.isDebugEnabled()) { log.debug("RedisConnection will be closed when transaction finished."); } // It's the transactional Connection: Don't close it. conHolder.released(); } return; } // release transactional/read-only and non-transactional/non-bound connections. // transactional connections for read-only transactions get no synchronizer registered unbindConnection(factory); return; } doCloseConnection(conn); } private static void doCloseConnection(@Nullable RedisConnection connection) { if (connection == null) { return; } if (log.isDebugEnabled()) { log.debug("Closing Redis Connection."); } try { connection.close(); } catch (DataAccessException ex) { log.debug("Could not close Redis Connection", ex); } catch (Throwable ex) { log.debug("Unexpected exception on closing Redis Connection", ex); } }
releaseConnection方法主要是處理了事務相關的操作,最后執(zhí)行doCloseConnection,它最后執(zhí)行的是connection.close()
connection.close()
org/springframework/data/redis/connection/jedis/JedisConnection.java
@Override public void close() throws DataAccessException { super.close(); JedisSubscription subscription = this.subscription; try { if (subscription != null) { subscription.close(); } } catch (Exception ex) { LOGGER.debug("Cannot terminate subscription", ex); } finally { this.subscription = null; } // return the connection to the pool if (pool != null) { jedis.close(); return; } // else close the connection normally (doing the try/catch dance) try { jedis.quit(); } catch (Exception ex) { LOGGER.debug("Failed to QUIT during close", ex); } try { jedis.disconnect(); } catch (Exception ex) { LOGGER.debug("Failed to disconnect during close", ex); } }
connection的close方法針對使用連接池的會執(zhí)行jedis.close,否則執(zhí)行jedis.quit
jedis.close()
redis/clients/jedis/Jedis.java
@Override public void close() { if (dataSource != null) { JedisPoolAbstract pool = this.dataSource; this.dataSource = null; if (isBroken()) { pool.returnBrokenResource(this); } else { pool.returnResource(this); } } else { super.close(); } }
jedis的close方法會先判斷isBroken(取的redis.clients.jedis.Connection.broken屬性),如果是則執(zhí)行returnBrokenResource,否則執(zhí)行returnResource
pool
redis/clients/jedis/util/Pool.java
public void returnBrokenResource(final T resource) { if (resource != null) { returnBrokenResourceObject(resource); } } public void returnResource(final T resource) { if (resource != null) { returnResourceObject(resource); } } protected void returnBrokenResourceObject(final T resource) { try { internalPool.invalidateObject(resource); } catch (Exception e) { throw new JedisException("Could not return the broken resource to the pool", e); } } protected void returnResourceObject(final T resource) { try { internalPool.returnObject(resource); } catch (RuntimeException e) { throw new JedisException("Could not return the resource to the pool", e); } }
returnBrokenResource執(zhí)行的是internalPool.invalidateObject(resource),而returnResourceObject執(zhí)行的是internalPool.returnObject(resource)
invalidateObject
org/apache/commons/pool2/impl/GenericObjectPool.java
public void invalidateObject(final T obj, final DestroyMode destroyMode) throws Exception { final PooledObject<T> p = getPooledObject(obj); if (p == null) { if (isAbandonedConfig()) { return; } throw new IllegalStateException( "Invalidated object not currently part of this pool"); } synchronized (p) { if (p.getState() != PooledObjectState.INVALID) { destroy(p, destroyMode); } } ensureIdle(1, false); } private void destroy(final PooledObject<T> toDestroy, final DestroyMode destroyMode) throws Exception { toDestroy.invalidate(); idleObjects.remove(toDestroy); allObjects.remove(new IdentityWrapper<>(toDestroy.getObject())); try { factory.destroyObject(toDestroy, destroyMode); } finally { destroyedCount.incrementAndGet(); createCount.decrementAndGet(); } }
invalidateObject方法執(zhí)行的是destroy方法,該方法會回調(diào)factory.destroyObject
destroyObject
redis/clients/jedis/JedisFactory.java
public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception { final BinaryJedis jedis = pooledJedis.getObject(); if (jedis.isConnected()) { try { // need a proper test, probably with mock if (!jedis.isBroken()) { jedis.quit(); } } catch (RuntimeException e) { logger.warn("Error while QUIT", e); } try { jedis.close(); } catch (RuntimeException e) { logger.warn("Error while close", e); } } }
destroyObject方法則執(zhí)行jedis.close()關閉client連接
returnObject
org/apache/commons/pool2/impl/GenericObjectPool.java
public void returnObject(final T obj) { final PooledObject<T> p = getPooledObject(obj); if (p == null) { if (!isAbandonedConfig()) { throw new IllegalStateException( "Returned object not currently part of this pool"); } return; // Object was abandoned and removed } markReturningState(p); final Duration activeTime = p.getActiveDuration(); if (getTestOnReturn() && !factory.validateObject(p)) { try { destroy(p, DestroyMode.NORMAL); } catch (final Exception e) { swallowException(e); } try { ensureIdle(1, false); } catch (final Exception e) { swallowException(e); } updateStatsReturn(activeTime); return; } try { factory.passivateObject(p); } catch (final Exception e1) { swallowException(e1); try { destroy(p, DestroyMode.NORMAL); } catch (final Exception e) { swallowException(e); } try { ensureIdle(1, false); } catch (final Exception e) { swallowException(e); } updateStatsReturn(activeTime); return; } if (!p.deallocate()) { throw new IllegalStateException( "Object has already been returned to this pool or is invalid"); } final int maxIdleSave = getMaxIdle(); if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) { try { destroy(p, DestroyMode.NORMAL); } catch (final Exception e) { swallowException(e); } try { ensureIdle(1, false); } catch (final Exception e) { swallowException(e); } } else { if (getLifo()) { idleObjects.addFirst(p); } else { idleObjects.addLast(p); } if (isClosed()) { // Pool closed while object was being added to idle objects. // Make sure the returned object is destroyed rather than left // in the idle object pool (which would effectively be a leak) clear(); } } updateStatsReturn(activeTime); }
returnObject針對testOnReturn的會執(zhí)行validateObject方法,之后執(zhí)行factory.passivateObject(p),最后根據(jù)maxIdle的參數(shù)來判斷,超出的則執(zhí)行destroy,否則根據(jù)是否Lifo放回到連接池(idleObjects
)中
小結
spring-data-redis的return主要是執(zhí)行connection的close方法,對應到jedis就是jedis.close(),它會先判斷isBroken(取的redis.clients.jedis.Connection.broken屬性
),如果是則執(zhí)行returnBrokenResource,否則執(zhí)行returnResource。
- returnBrokenResource執(zhí)行的是internalPool.invalidateObject(resource),invalidateObject方法執(zhí)行的是destroy方法,該方法會回調(diào)factory.destroyObject方法,即執(zhí)行jedis.close()關閉client連接
- returnObject針對testOnReturn的會執(zhí)行validateObject方法,之后執(zhí)行factory.passivateObject(p),最后根據(jù)maxIdle的參數(shù)來判斷,超出的則執(zhí)行destroy,否則根據(jù)是否Lifo放回到連接池(
idleObjects
)中 - 也就說假設獲取連接之后,執(zhí)行的時候redis掛了,redis.clients.jedis.Connection會標記broken為true,同時拋出JedisConnectionException;而RedisTemplate是在finally中進行releaseConnection,因而歸還的時候會觸發(fā)returnBrokenResource從而關閉壞掉的連接,間接實現(xiàn)
testOnReturn
的效果 - 如果在獲取連接的時候,redis掛了,但是連接池仍然有連接,若沒有testOnBorrow則返回然后使用,但是使用的時候會報錯,即redis.clients.jedis.Connection會標記broken為true,同時拋出JedisConnectionException,歸還的時候直接銷毀;若有testOnBorrow則validate的時候能驗證出來連接有問題,則會執(zhí)行destory然后繼續(xù)循環(huán)獲取連接池的連接,直到連接池連接沒有了;若獲取連接的時候連接池沒有空閑連接了,則走create的邏輯,這個時候create直接拋出redis.clients.jedis.exceptions.JedisConnectionException: Failed to create socket.
以上就是jedis的return行為源碼解析的詳細內(nèi)容,更多關于jedis return行為的資料請關注腳本之家其它相關文章!