许多时候,我们期望在事务提交之后异步执行某些逻辑,调用外部系统,发送MQ,推送ES等等;当事务回滚时,异步操作也不执行,这些异步操作需要等待事务完成后才执行;比如出入库的事务执行完毕后,异步发送MQ给报表系统、ES等等。
我们在项目中大多都是使用声明式事务(@Transactional注解) ,spring会基于动态代理机制对我们的业务方法进行增强,控制connection,从而达到事务的目的。那么我们能否在此找寻一些蛛丝马迹。我们来看下spring事务的相关核心类(装配流程不详细叙述)。
TransactionInterceptor:
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable { @Override @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }}
TransactionAspectSupport(重点关注事务提交之后做了哪些事情,有哪些扩展点)。
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean { protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final TransactionManager tm = determineTransactionManager(txAttr); if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) { ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> { if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) { throw new TransactionUsageException( "Unsupported annotated transaction on suspending function detected: " + method + ". Use TransactionalOperator.transactional extensions instead."); } ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); if (adapter == null) { throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType()); } return new ReactiveTransactionSupport(adapter); }); return txSupport.invokeWithinTransaction( method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm); } PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // 创建事务,此处也会创建connection TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // 执行目标方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 目标方法异常时处理 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 重置TransactionInfo ThreadLocal cleanupTransactionInfo(txInfo); } if (vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } // 业务方法成功执行,提交事务(重点关注此处),最终会调用AbstractPlatformTransactionManager#commit方法 commitTransactionAfterReturning(txInfo); return retVal; } else { final ThrowableHolder throwableHolder = new ThrowableHolder(); // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status); try { Object retVal = invocation.proceedWithInvocation(); if (vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } return retVal; } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } }}}
AbstractPlatformTransactionManager:
public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } // 事务提交处理 processCommit(defStatus);}private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { // 在事务提交后触发(追踪到这里就离真相不远了) triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); }}
TransactionSynchronizationUtils:
public abstract class TransactionSynchronizationUtils { public static void triggerAfterCommit() { // TransactionSynchronizationManager: 事务同步器管理 invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations()); } public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) { if (synchronizations != null) { for (TransactionSynchronization synchronization : synchronizations) { // 调用TransactionSynchronization#afterCommit方法,默认实现为空,留给子类扩展 // 那么我们想在事务提交之后做一些异步操作,实现此方法即可 synchronization.afterCommit(); } } }}
TransactionSynchronization:
public interface TransactionSynchronization extends Flushable { default void afterCommit() {}}
过程中我们发现TransactionSynchronizationManager、TransactionSynchronization、TransactionSynchronizationAdapter 等相关类涉及aop的整个流程,篇幅有限,在此不详细展开,当然我们的一些扩展也是离不开这些基础类的。
事务提交之后异步执行,我们需自定义synchronization.afterCommit,结合线程池一起使用,定义线程池TaskExecutor。
@Beanpublic TaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(******); taskExecutor.setMaxPoolSize(******); taskExecutor.setKeepAliveSeconds(******); taskExecutor.setQueueCapacity(******); taskExecutor.setThreadNamePrefix(******); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); taskExecutor.initialize(); return taskExecutor;}
定义AfterCommitExecutor接口。
public interface AfterCommitExecutor extends Executor { }
定义AfterCommitExecutorImpl实现类,注意需继承TransactionSynchronizationAdapter类。
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.core.NamedThreadLocal;import org.springframework.core.task.TaskExecutor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.transaction.support.TransactionSynchronizationAdapter;import org.springframework.transaction.support.TransactionSynchronizationManager;import java.util.List;import java.util.ArrayList;@Componentpublic class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class); // 保存要运行的任务线程 private static final ThreadLocal<List<Runnable>> RUNNABLE_THREAD_LOCAL = new NamedThreadLocal<>("AfterCommitRunnable"); // 设置线程池 @Autowired private TaskExecutor taskExecutor; /** * 异步执行 * * @param runnable 异步线程 */ @Override public void execute(Runnable runnable) { LOGGER.info("Submitting new runnable {} to run after commit", runnable); // 如果事务已经提交,马上进行异步处理 if (!TransactionSynchronizationManager.isSynchronizationActive()) { LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable); runnable.run(); return; } // 同一个事务的合并到一起处理(注意:没有初始化则初始化,并注册) List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get(); if (null == threadRunnableList) { threadRunnableList = new ArrayList<>(); RUNNABLE_THREAD_LOCAL.set(threadRunnableList); TransactionSynchronizationManager.registerSynchronization(this); } threadRunnableList.add(runnable); } /** * 监听到事务提交之后执行方法 */ @Override public void afterCommit() { List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get(); LOGGER.info("Transaction successfully committed, executing {} threadRunnable", threadRunnableList.size()); for (Runnable runnable : threadRunnableList) { try { taskExecutor.execute(runnable); } catch (RuntimeException e) { LOGGER.error("Failed to execute runnable " + runnable, e); } } } /** * 事务提交/回滚执行 * * @param status (STATUS_COMMITTED-0、STATUS_ROLLED_BACK-1、STATUS_UNKNOWN-2) */ @Override public void afterCompletion(int status) { LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK"); RUNNABLE_THREAD_LOCAL.remove(); }}
使用。
工具类封装好了,使用上那就很简便了:注入AfterCommitExecutor,调用AfterCommitExecutor.execute(runnable)方法即可
spring如此庞大,找准切入点,许多问题都是可以找到解决思路、或者方案;
你对spring了解多少......
本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-10420-0.html事务提交之后异步执行工具类封装
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com