tcc分布式事務(wù)框架體系解析

前言碎語
樓主之前推薦過2pc的分布式事務(wù)框架LCN。今天來詳細(xì)聊聊TCC事務(wù)協(xié)議。
首先我們了解下什么是tcc,如下圖

tcc分布式事務(wù)協(xié)議控制整體業(yè)務(wù)事務(wù)分為三個階段。
try:執(zhí)行業(yè)務(wù)邏輯
confirm:確定業(yè)務(wù)邏輯執(zhí)行無誤后,確定業(yè)務(wù)邏輯執(zhí)行完成
cancel:假如try階段有問題,執(zhí)行cancel階段邏輯,取消try階段的數(shù)據(jù)
這就需要我們在設(shè)計(jì)業(yè)務(wù)時,在try階段多想想業(yè)務(wù)處理的折中狀態(tài),比如,處理中,支付中,進(jìn)行中等,在confirm階段變更為處理完成,或者在cancel階段變更為處理失敗。
以電商下單為例
假設(shè)我們有一個電商下單的業(yè)務(wù),有三個服務(wù)組成,訂單服務(wù)處理下單邏輯,庫存服務(wù)處理減庫存邏輯,支付服務(wù)處理減賬戶余額邏輯。在下單服務(wù)里先后調(diào)用減庫存和減余額的方法。如果使用tcc分布式事務(wù)來協(xié)調(diào)事務(wù),我們服務(wù)就要做如下設(shè)計(jì):
訂單服務(wù):
- try:支付狀態(tài)設(shè)置為支付中
- confirm:設(shè)置為支付完成
- cancel:設(shè)置為支付失敗
庫存服務(wù):
多加一個鎖定庫存的字段記錄,用于記錄業(yè)務(wù)處理中狀態(tài)
- try:總庫存-1,鎖定庫存+1
- confirm:鎖定庫存-1
- cancel:總庫存+1,鎖定庫存-1
支付服務(wù):
多加一個凍結(jié)金額的字段記錄,用于記錄業(yè)務(wù)處理中狀態(tài)
- try:余額-1,凍結(jié)金額+1
- confirm:凍結(jié)金額-1
- cancel:余額+1,凍結(jié)金額-1
tcc分布式事務(wù)在這里起到了一個事務(wù)協(xié)調(diào)者的角色。真實(shí)業(yè)務(wù)只需要調(diào)用try階段的方法。confirm和cancel階段的額方法由tcc框架來幫我們調(diào)用完成最終業(yè)務(wù)邏輯。下面我們假設(shè)如下三個場景的業(yè)務(wù)情況,看tcc如何協(xié)調(diào)業(yè)務(wù)最終一致的。
- 服務(wù)一切正常:所有服務(wù)的try方法執(zhí)行后都沒有問題,庫存足夠,余額足夠。tcc事務(wù)協(xié)調(diào)器會觸發(fā)訂單服務(wù)的confirm方法,將訂單更新為支付完成,觸發(fā)庫存服務(wù)的confirm方法鎖定庫存-1,觸發(fā)支付服務(wù)的confirm方法凍結(jié)金額-1
- 庫存服務(wù)故障,無法調(diào)通:這個時候訂單已經(jīng)生成,狀態(tài)為待支付。當(dāng)調(diào)用庫存超時拋異常后,tcc事務(wù)協(xié)調(diào)器會觸發(fā)訂單服務(wù)的cancel方法將訂單狀態(tài)更新為支付失敗。
- 支付服務(wù)故障,無法調(diào)通:這個時候訂單已經(jīng)生成,狀態(tài)為待支付,總庫存-1,鎖定庫存+1了。當(dāng)調(diào)用支付服務(wù)超時拋異常時,tcc事務(wù)協(xié)調(diào)器會觸發(fā)訂單服務(wù)的cancel方法將訂單狀態(tài)更新為支付失敗,觸發(fā)庫存服務(wù)的cancel方法將庫存+1,鎖定庫存-1。
hmily事務(wù)框架怎么做的?
通過上面對tcc事務(wù)協(xié)議說明大家應(yīng)該都了解了tcc的處理協(xié)調(diào)機(jī)制,下面我們來看看hmily是怎么做到的,我們以接入支持dubbo服務(wù)為例。
概要:首先最基礎(chǔ)兩個應(yīng)用點(diǎn)是aop和dubbo的filter機(jī)制,其次針對一組事務(wù),定義了啟動事務(wù)處理器,參與事務(wù)處理器去協(xié)調(diào)處理不同的事務(wù)單元。外加一個disruptor+ScheduledService處理事務(wù)日志,補(bǔ)償處理失敗的事務(wù)。
hmily框架以@Hmily注解為切入點(diǎn),定義了一個環(huán)繞織入的切面,注解必填兩個參數(shù)confirmMethod和cancelMethod,也就是tcc協(xié)調(diào)的兩個階段方法。在需要tcc事務(wù)的方法上面加上這個注解,也就托管了tcc三個階段的處理流程。下面是aspect切面的抽象類,不同的RPC框架支持會有不同的實(shí)現(xiàn) 。其中真正處理業(yè)務(wù)邏輯需要實(shí)現(xiàn)HmilyTransactionInterceptor接口。
實(shí)現(xiàn)HmilyTransactionInterceptor接口
@Aspect
public abstract class AbstractHmilyTransactionAspect {
private HmilyTransactionInterceptor hmilyTransactionInterceptor;
protected void setHmilyTransactionInterceptor(final HmilyTransactionInterceptor hmilyTransactionInterceptor) {
this.hmilyTransactionInterceptor = hmilyTransactionInterceptor;
}
/**
* this is point cut with {@linkplain Hmily }.
*/
@Pointcut("@annotation(org.dromara.hmily.annotation.Hmily)")
public void hmilyInterceptor() {
}
/**
* this is around in {@linkplain Hmily }.
* @param proceedingJoinPoint proceedingJoinPoint
* @return Object
* @throws Throwable Throwable
*/
@Around("hmilyInterceptor()")
public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return hmilyTransactionInterceptor.interceptor(proceedingJoinPoint);
}
/**
* spring Order.
*
* @return int
*/
public abstract int getOrder();
}dubbo的aspect抽象實(shí)現(xiàn)
@Aspect
@Component
public class DubboHmilyTransactionAspect extends AbstractHmilyTransactionAspect implements Ordered {
@Autowired
public DubboHmilyTransactionAspect(final DubboHmilyTransactionInterceptor dubboHmilyTransactionInterceptor) {
super.setHmilyTransactionInterceptor(dubboHmilyTransactionInterceptor);
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}dubbo的HmilyTransactionInterceptor實(shí)現(xiàn)
@Component
public class DubboHmilyTransactionInterceptor implements HmilyTransactionInterceptor {
private final HmilyTransactionAspectService hmilyTransactionAspectService;
@Autowired
public DubboHmilyTransactionInterceptor(final HmilyTransactionAspectService hmilyTransactionAspectService) {
this.hmilyTransactionAspectService = hmilyTransactionAspectService;
}
@Override
public Object interceptor(final ProceedingJoinPoint pjp) throws Throwable {
final String context = RpcContext.getContext().getAttachment(CommonConstant.HMILY_TRANSACTION_CONTEXT);
HmilyTransactionContext hmilyTransactionContext;
//判斷dubbo上下文中是否攜帶了tcc事務(wù),如果有就取出反序列化為事務(wù)上下文對象
if (StringUtils.isNoneBlank(context)) {
hmilyTransactionContext = GsonUtils.getInstance().fromJson(context, HmilyTransactionContext.class);
RpcContext.getContext().getAttachments().remove(CommonConstant.HMILY_TRANSACTION_CONTEXT);
} else {
//如果dubbo上下文中沒有,就從當(dāng)前上下文中獲取。如果是事務(wù)發(fā)起者,這里其實(shí)也獲取不到事務(wù)
hmilyTransactionContext = HmilyTransactionContextLocal.getInstance().get();
}
return hmilyTransactionAspectService.invoke(hmilyTransactionContext, pjp);
}
}這里主要判斷了dubbo上下文中是否攜帶了tcc事務(wù)。如果沒有就從當(dāng)前線程上下文中獲取,如果是事務(wù)的發(fā)起者,這里其實(shí)獲取不到事務(wù)上下文對象的。在invoke里有個獲取事務(wù)處理器的邏輯,如果事務(wù)上下文入?yún)?為null,那么獲取到的就是啟動事務(wù)處理器。
啟動事務(wù)處理器處理邏輯如下
public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context)
throws Throwable {
System.err.println("StarterHmilyTransactionHandler");
Object returnValue;
try {
HmilyTransaction hmilyTransaction = hmilyTransactionExecutor.begin(point);
try {
//execute try
returnValue = point.proceed();
hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
hmilyTransactionExecutor.updateStatus(hmilyTransaction);
} catch (Throwable throwable) {
//if exception ,execute cancel
final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
executor.execute(() -> hmilyTransactionExecutor
.cancel(currentTransaction));
throw throwable;
}
//execute confirm
final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction));
} finally {
HmilyTransactionContextLocal.getInstance().remove();
hmilyTransactionExecutor.remove();
}
return returnValue;
}真正業(yè)務(wù)處理方法,point.proceed();被try,catch包起來了,如果try里面的方法出現(xiàn)異常,就會走h(yuǎn)milyTransactionExecutor.cancel(currentTransaction)的邏輯,如果成功,就走h(yuǎn)milyTransactionExecutor.confirm(currentTransaction)邏輯。其中cancel和confirm里都有協(xié)調(diào)參與者事務(wù)的處理邏輯,以confirm邏輯為例。
public void confirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException {
LogUtil.debug(LOGGER, () -> "tcc confirm .......!start");
if (Objects.isNull(currentTransaction) || CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) {
return;
}
currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
updateStatus(currentTransaction);
final ListhmilyParticipants = currentTransaction.getHmilyParticipants();
ListfailList = Lists.newArrayListWithCapacity(hmilyParticipants.size());
boolean success = true;
if (CollectionUtils.isNotEmpty(hmilyParticipants)) {
for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
try {
HmilyTransactionContext context = new HmilyTransactionContext();
context.setAction(HmilyActionEnum.CONFIRMING.getCode());
context.setRole(HmilyRoleEnum.START.getCode());
context.setTransId(hmilyParticipant.getTransId());
HmilyTransactionContextLocal.getInstance().set(context);
executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation());
} catch (Exception e) {
LogUtil.error(LOGGER, "execute confirm :{}", () -> e);
success = false;
failList.add(hmilyParticipant);
} finally {
HmilyTransactionContextLocal.getInstance().remove();
}
}
executeHandler(success, currentTransaction, failList);
}
}可以看到executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation()),這里執(zhí)行了事務(wù)參與者的confirm方法。同理cancel里面也有類似代碼,執(zhí)行事務(wù)參與者的cancel方法。那么事務(wù)參與者的信息是怎么獲取到的呢?我們需要回到一開始提到的dubbo的filter機(jī)制。
@Activate(group = {Constants.SERVER_KEY, Constants.CONSUMER})
public class DubboHmilyTransactionFilter implements Filter {
private HmilyTransactionExecutor hmilyTransactionExecutor;
/**
* this is init by dubbo spi
* set hmilyTransactionExecutor.
*
* @param hmilyTransactionExecutor {@linkplain HmilyTransactionExecutor }
*/
public void setHmilyTransactionExecutor(final HmilyTransactionExecutor hmilyTransactionExecutor) {
this.hmilyTransactionExecutor = hmilyTransactionExecutor;
}
@Override
@SuppressWarnings("unchecked")
public Result invoke(final Invoker invoker, final Invocation invocation) throws RpcException {
String methodName = invocation.getMethodName();
Class clazz = invoker.getInterface();
Class[] args = invocation.getParameterTypes();
final Object[] arguments = invocation.getArguments();
converterParamsClass(args, arguments);
Method method = null;
Hmily hmily = null;
try {
method = clazz.getMethod(methodName, args);
hmily = method.getAnnotation(Hmily.class);
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
if (Objects.nonNull(hmily)) {
try {
final HmilyTransactionContext hmilyTransactionContext = HmilyTransactionContextLocal.getInstance().get();
if (Objects.nonNull(hmilyTransactionContext)) {
if (hmilyTransactionContext.getRole() == HmilyRoleEnum.LOCAL.getCode()) {
hmilyTransactionContext.setRole(HmilyRoleEnum.INLINE.getCode());
}
RpcContext.getContext().setAttachment(CommonConstant.HMILY_TRANSACTION_CONTEXT, GsonUtils.getInstance().toJson(hmilyTransactionContext));
}
final Result result = invoker.invoke(invocation);
//if result has not exception
if (!result.hasException()) {
final HmilyParticipant hmilyParticipant = buildParticipant(hmilyTransactionContext, hmily, method, clazz, arguments, args);
if (hmilyTransactionContext.getRole() == HmilyRoleEnum.INLINE.getCode()) {
hmilyTransactionExecutor.registerByNested(hmilyTransactionContext.getTransId(),
hmilyParticipant);
} else {
hmilyTransactionExecutor.enlistParticipant(hmilyParticipant);
}
} else {
throw new HmilyRuntimeException("rpc invoke exception{}", result.getException());
}
return result;
} catch (RpcException e) {
e.printStackTrace();
throw e;
}
} else {
return invoker.invoke(invocation);
}
}
@SuppressWarnings("unchecked")
private HmilyParticipant buildParticipant(final HmilyTransactionContext hmilyTransactionContext,
final Hmily hmily,
final Method method, final Class clazz,
final Object[] arguments, final Class... args) throws HmilyRuntimeException {
if (Objects.isNull(hmilyTransactionContext)
|| (HmilyActionEnum.TRYING.getCode() != hmilyTransactionContext.getAction())) {
return null;
}
//獲取協(xié)調(diào)方法
String confirmMethodName = hmily.confirmMethod();
if (StringUtils.isBlank(confirmMethodName)) {
confirmMethodName = method.getName();
}
String cancelMethodName = hmily.cancelMethod();
if (StringUtils.isBlank(cancelMethodName)) {
cancelMethodName = method.getName();
}
HmilyInvocation confirmInvocation = new HmilyInvocation(clazz, confirmMethodName, args, arguments);
HmilyInvocation cancelInvocation = new HmilyInvocation(clazz, cancelMethodName, args, arguments);
//封裝調(diào)用點(diǎn)
return new HmilyParticipant(hmilyTransactionContext.getTransId(), confirmInvocation, cancelInvocation);
}
private void converterParamsClass(final Class[] args, final Object[] arguments) {
if (arguments == null || arguments.length < 1) {
return;
}
for (int i = 0; i < arguments.length; i++) {
args[i] = arguments[i].getClass();
}
}
}需要注意三個地方
- 一個是filter的group定義@Activate(group = {Constants.SERVER_KEY, Constants.CONSUMER}),這里這樣定義后,就只有服務(wù)的消費(fèi)者會生效,也就是事務(wù)的發(fā)起者,服務(wù)的調(diào)用方會進(jìn)filter的invoke邏輯。
- 只有加@Hmily注解的方法或進(jìn)事務(wù)處理邏輯,其他的方法直接跳過處理
- 最關(guān)鍵的是buildParticipant(hmilyTransactionContext, hmily, method, clazz, arguments, args)方法。dubbo的filter唯一的作用就是收集事務(wù)參與者信息并更新當(dāng)前事務(wù)上線文信息。那么在事務(wù)協(xié)調(diào)時就能夠從當(dāng)前事務(wù)上線文里面獲取到需要協(xié)調(diào)的事務(wù)參與者信息了。
參數(shù)者事務(wù)處理器
public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
HmilyTransaction hmilyTransaction = null;
HmilyTransaction currentTransaction;
switch (HmilyActionEnum.acquireByCode(context.getAction())) {
case TRYING:
try {
hmilyTransaction = hmilyTransactionExecutor.beginParticipant(context, point);
final Object proceed = point.proceed();
hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
//update log status to try
hmilyTransactionExecutor.updateStatus(hmilyTransaction);
return proceed;
} catch (Throwable throwable) {
//if exception ,delete log.
hmilyTransactionExecutor.deleteTransaction(hmilyTransaction);
throw throwable;
} finally {
HmilyTransactionContextLocal.getInstance().remove();
}
case CONFIRMING:
currentTransaction = HmilyTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
hmilyTransactionExecutor.confirm(currentTransaction);
break;
case CANCELING:
currentTransaction = HmilyTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
hmilyTransactionExecutor.cancel(currentTransaction);
break;
default:
break;
}
Method method = ((MethodSignature) (point.getSignature())).getMethod();
logger.error(HmilyActionEnum.acquireByCode(context.getAction()).getDesc());
return DefaultValueUtils.getDefaultValue(method.getReturnType());
}參與者事務(wù)處理器的邏輯比啟動事務(wù)處理器要簡單很多,try階段記錄事務(wù)日志用于事務(wù)補(bǔ)償?shù)臅r候使用。其他的confirm和cancel都是由啟動事務(wù)管理器來觸發(fā)調(diào)用執(zhí)行的。這個地方之前糾結(jié)了樓主幾個小時,怎么一個環(huán)繞織入的切面會被觸發(fā)執(zhí)行兩次,其實(shí)是啟動事務(wù)處理器里的confirm或cancel觸發(fā)的。
disruptor+ScheduledService處理事務(wù)日志,補(bǔ)償處理失敗的事務(wù)
這個不細(xì)聊了,簡述下。disruptor是一個高性能的隊(duì)列。對事務(wù)日志落地的所有操作都是通過disruptor來異步完成的。ScheduledService默認(rèn)128秒執(zhí)行一次,來檢查是否有處理失敗的事務(wù)日志,用于補(bǔ)償事務(wù)協(xié)調(diào)失敗的事務(wù)
文末結(jié)語
相比較2pc的LCN而言,tcc分布式事務(wù)對業(yè)務(wù)侵入性更高。也因2pc的長時間占用事務(wù)資源,tcc的性能肯定比2pc要好。兩者之間本身不存在誰優(yōu)誰劣的問題。所以在做分布式事務(wù)選型時,選一個對的適合自身業(yè)務(wù)的分布式事務(wù)框架就比較重要了。
以上就是tcc分布式事務(wù)框架體系解析的詳細(xì)內(nèi)容,更多關(guān)于tcc分布式事務(wù)框架的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
IDEA連接postgressql數(shù)據(jù)庫操作
這篇文章主要介紹了IDEA連接postgressql數(shù)據(jù)庫操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08
IntelliJ IDEA2019實(shí)現(xiàn)Web項(xiàng)目創(chuàng)建示例
這篇文章主要介紹了IntelliJ IDEA2019實(shí)現(xiàn)Web項(xiàng)目創(chuàng)建示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04
java書店系統(tǒng)畢業(yè)設(shè)計(jì) 用戶模塊(3)
這篇文章主要介紹了java書店系統(tǒng)畢業(yè)設(shè)計(jì),第三步系統(tǒng)總體設(shè)計(jì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-10-10
解決java.util.HashMap$Values?cannot?be?cast?to?java.ut的問題
這篇文章主要介紹了解決java.util.HashMap$Values?cannot?be?cast?to?java.ut的問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03
Java基礎(chǔ)MAC系統(tǒng)下IDEA連接MYSQL數(shù)據(jù)庫JDBC過程
最近一直在學(xué)習(xí)web項(xiàng)目,當(dāng)然也會涉及與數(shù)據(jù)庫的連接這塊,這里就總結(jié)一下在IDEA中如何進(jìn)行MySQL數(shù)據(jù)庫的連接,這里提一下我的電腦是MAC系統(tǒng),使用的編碼軟件是IDEA,數(shù)據(jù)庫是MySQL2021-09-09

