Scala數(shù)據(jù)庫連接池的簡單實現(xiàn)
在使用JDBC的時候,數(shù)據(jù)庫據(jù)連接是非常寶貴的資源。為了復用這些資源,可以將連接保存在一個隊列中。當需要的時候可以從隊列中取出未使用的連接。如果沒有可用連接,則可以在一定時間內等待,直到隊列中有可用的連接,否則將拋出異常。
以下是DataSoucre的代碼,DataSoucre負責對連接的管理以及分發(fā),同時設置隊列的大小,等待時間,連接的賬號、密碼等。
核心方法為getConenction()方法。且實現(xiàn)AutoCloseable接口,以便后面可以使用using方法自動關閉資源。隊列中的連接為封裝了conenction的DbConnection類。
package pool import scala.util.control.Breaks._ import scala.collection.mutable.ArrayBuffer import java.{util => ju} import scala.collection.mutable.Buffer import scala.util.control.Breaks class DataSource( val driverName: String, val url: String, val user: String, val password: String, val minSize: Integer = 1, val maxSize: Integer = 10, val keepAliveTimeout: Long = 1000 ) extends AutoCloseable { if (minSize < 0 || minSize > maxSize || keepAliveTimeout < 0) { throw new IllegalArgumentException("These arguments are Illegal") } Class.forName(driverName) private val pool: Buffer[DbConnection] = ArrayBuffer[DbConnection]() private val lock: ju.concurrent.locks.Lock = new ju.concurrent.locks.ReentrantLock(true) for (i <- 0 until minSize) { pool += new DbConnection(url, user, password) } def getConenction(): DbConnection = { val starEntry = System.currentTimeMillis() Breaks.breakable { while (true) { lock.lock() try { for (con <- pool) { if (!con.used) { con.used = true return con; } } if (pool.size < maxSize) { var con = new DbConnection(url, user, password) { used = true } pool.append(con) return con } } finally { lock.unlock() } if (System.currentTimeMillis() - starEntry > keepAliveTimeout) { break() } } } throw new IllegalArgumentException("Connection Pool is empty") } def close(): Unit = { lock.lock() try { if (pool != null) { pool.foreach(t => t.innerConnection.close()) pool.clear() } } finally { lock.unlock() } } }
以下是Dbconnection類,該類提供了三個方法且實現(xiàn)了AutoCloseable接口
BeginTransaction:開啟事務,并返回封裝了的DbTransaction類
close:將連接釋放
CreateCommand:創(chuàng)建DbCommand類,該類是負責操作連接的類,比如提交sql,讀取數(shù)據(jù)等
package pool import java.sql.Connection import java.sql.DriverAction import java.sql.DriverManager class DbConnection( val url: String, val user: String, val password: String ) extends AutoCloseable { private[pool] var used: Boolean = false private[pool] val innerConnection: Connection = DriverManager.getConnection(url, user, password) def close(): Unit = { if (used) { used = false } } def BeginTransaction(isolationLevel: Int = IsolationLevel.TRANSACTION_READ_COMMITTED): DbTransaction = { if (innerConnection.getAutoCommit()) { innerConnection.setAutoCommit(false) } innerConnection.setTransactionIsolation(isolationLevel) new DbTransaction(this) } def CreateCommand(): DbCommand = { new DbCommand(this) } }
以下是DbCommand類的代碼,該類負責操作數(shù)據(jù)庫。如ExecuteResultSet,ExecuteScalar等。
ExecuteScalar:查詢數(shù)據(jù)庫并返回第一行第一個值的方法。
ExecuteResultSet:該方法有兩個重載方法。
參數(shù)為callBack: ResultSet => Unit的方法,提供了一個回調函數(shù),解析數(shù)據(jù)的操作可以在回調中實現(xiàn)。
無參的版本則通過反射直接將ResultSet通過字段位置映射,轉換成你需要的類型。
package pool import java.sql.CallableStatement import java.sql.ResultSet import java.sql.SQLType import java.sql.Statement import java.sql.Types import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Buffer import scala.language.implicitConversions import scala.reflect.ClassTag import scala.reflect.runtime.{universe => ru} import Dispose.using import java.{util => ju} class DbCommand(val connection: DbConnection, var commandText: String = null, val queryTimeout: Integer = 30) extends AutoCloseable { if (queryTimeout < 0) { throw new IllegalArgumentException(s"timeout (${queryTimeout}) value must be greater than 0.") } val Parameters: Buffer[DbParameter] = ArrayBuffer[DbParameter]() private val mirror = ru.runtimeMirror(getClass().getClassLoader()) private var statement: CallableStatement = null /** @author:qingchuan * * @return */ def ExecuteScalar(): Any = { var obj: Any = None ExecuteResultSet(t => { if (t.next()) { if (t.getMetaData().getColumnCount() > 0) obj = t.getObject(1) } }) obj } /** @author * qingchuan * @version 1.0 * * @param callBack */ def ExecuteResultSet(callBack: ResultSet => Unit): Unit = { if (callBack == null) throw new IllegalArgumentException("The value of parameter callback is null.") statement = connection.innerConnection.prepareCall(commandText) statement.setQueryTimeout(queryTimeout) addParatemetrs() using(statement.executeQuery()) { t => callBack(t) if (!t.isClosed()) getOutParameterValue() } } def ExecuteResultSet[T: ru.TypeTag](): ArrayBuffer[T] = { val classSymbol = mirror.symbolOf[T].asClass val classMirror = mirror.reflectClass(classSymbol) val consMethodMirror = classMirror.reflectConstructor(classSymbol.primaryConstructor.asMethod) val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm) val result = new ArrayBuffer[T]() ExecuteResultSet(t => { while (t.next()) { var i = 1 val values: Buffer[Any] = ArrayBuffer() for (f <- fields) { values += t.getObject(i) i += 1 } result += consMethodMirror.apply(values: _*).asInstanceOf[T] } }) result } def ExecuteBatch[T: ru.TypeTag: ClassTag](values: List[T]): Int = { statement = connection.innerConnection.prepareCall(commandText) var trans: DbTransaction = null val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm) for (t <- values) { var i = 1 val filedMirror = mirror.reflect(t) for (f <- fields) { val instance = filedMirror.reflectField(f) statement.setObject(i, instance.get) i += 1 } statement.addBatch() } try { trans = connection.BeginTransaction() val obj = statement.executeBatch() trans.Commit() statement.clearBatch() obj.sum } catch { case e: Exception => { if (trans != null) trans.RollBack() throw e } } } def ExecuteNoneQuery(): Integer = { statement = connection.innerConnection.prepareCall(commandText) statement.setQueryTimeout(queryTimeout) addParatemetrs() val obj = statement.executeUpdate() getOutParameterValue() obj } def CreateParameter(): DbParameter = { new DbParameter(); } private def getOutParameterValue(): Unit = { for (i <- 1 to Parameters.size) { val parameter: DbParameter = Parameters(i - 1); if (parameter.parameterDirection == ParameterDirection.Output || parameter.parameterDirection == ParameterDirection.InputOutput) { parameter.value = statement.getObject(i); } } } private def addParatemetrs(): Unit = { statement.clearParameters() for (i <- 1 to Parameters.size) { val p = Parameters(i - 1); if (p.parameterDirection == ParameterDirection.Input || p.parameterDirection == ParameterDirection.InputOutput) { statement.setObject(i, p.value) } if (p.parameterDirection == ParameterDirection.Output || p.parameterDirection == ParameterDirection.InputOutput) { statement.registerOutParameter(p.parameterName, p.sqlType, p.scale) } } } def close() { if (statement != null) { statement.close() } } } case class DbParameter( var parameterName: String = null, var value: Any = null, var parameterDirection: Integer = ParameterDirection.Input, var scale: Integer = 0, var sqlType: Integer = null ) {} object ParameterDirection { val Input = 1 val InputOutput = 2 val Output = 3 }
以下代碼是DbTransaction,該類提供了事務的操作如提交、回滾。
package pool class DbTransaction(private val connection: DbConnection) { def Commit(): Unit = { connection.innerConnection.commit() if (!connection.innerConnection.getAutoCommit()) { connection.innerConnection.setAutoCommit(true); } } def RollBack(): Unit = { connection.innerConnection.rollback() if (!connection.innerConnection.getAutoCommit()) { connection.innerConnection.setAutoCommit(true) } } def getConnection(): DbConnection = { connection } def getTransactionIsolation(): Int = { connection.innerConnection.getTransactionIsolation() } } object IsolationLevel { val TRANSACTION_NONE = 0 val TRANSACTION_READ_UNCOMMITTED = 1; val TRANSACTION_READ_COMMITTED = 2; val TRANSACTION_REPEATABLE_READ = 4; val TRANSACTION_SERIALIZABLE = 8; }
最后是using的方法。通過柯里化以及Try-catch-finally的方式 自動關閉實現(xiàn)了AutoCloseable接口的資源。
package pool object Dispose { def using[T <: AutoCloseable](cls: T)(op: T => Unit): Unit = { try { op(cls) } catch { case e: Exception => throw e } finally { cls.close() } } }
以下是客戶端調用,代碼模擬了15個線程并發(fā)訪問數(shù)據(jù)庫,連接池最多3個資源,從而說明連接池是可以復用這些連接的。
import pool.DataSource import pool.DbCommand import pool.DbParameter import pool.DbTransaction import pool.Dispose.using import pool.IsolationLevel import pool.ParameterDirection import java.sql.Date import java.sql.ResultSet import java.sql.Types import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime import javax.xml.crypto.Data import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Buffer import scala.language.experimental.macros import scala.reflect.ClassTag import scala.reflect.runtime.{universe => ru} import com.nimbusds.oauth2.sdk.util.date.SimpleDate import java.text.SimpleDateFormat object App { def main(args: Array[String]): Unit = { val pool = new DataSource( "com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver://localhost:1433;databaseName=HighwaveDW;trustServerCertificate=true", "賬號", "密碼", minSize = 1, maxSize = 3, keepAliveTimeout = 3000 ) val formatter: SimpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); for (i <- 1 to 15) { val thread: Thread = new Thread(() => { val date = new Date(System.currentTimeMillis()) using(pool.getConenction()) { con => { using(new DbCommand(con)) { cmd => { cmd.commandText = "{call p_get_out(?,?)}" val p1 = new DbParameter("@id", i) val p2 = new DbParameter(parameterName = "@msg", parameterDirection = ParameterDirection.Output, sqlType = Types.VARCHAR, scale = 20) cmd.Parameters.append(p1) cmd.Parameters.append(p2) val result = cmd.ExecuteScalar() println(s"result=${result},output=${p2.value},parameter=${i}") } } } } }) thread.start() } } }
開發(fā)環(huán)境VsCode,SQL Server數(shù)據(jù)庫。以下是引用的第三方庫。
version := "1.0" libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "11.2.0.jre8" libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.13.8"
以下是執(zhí)行結果。
到此這篇關于Scala數(shù)據(jù)庫連接池的簡單實現(xiàn)的文章就介紹到這了,更多相關Scala數(shù)據(jù)庫連接池內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java數(shù)據(jù)結構實現(xiàn)折半查找的算法過程解析
這篇文章主要介紹了Java數(shù)據(jù)結構實現(xiàn)折半查找的算法過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-03-03Maven3種打包方式中maven-assembly-plugin的使用詳解
這篇文章主要介紹了Maven3種打包方式中maven-assembly-plugin的使用,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07