A-A+
Flowable6.4 – 分布式事务
本文是基于SpringBoot、Atomikos、MyBatis、Druid实现的,全文总共分为两大部分:
- 介绍一下SpringBoot如何基于Atomikos实现分布式事务。
- 介绍一下Flowable如何实现分布式事务。
首先,介绍一下SpringBoot如何实现分布式事务。
Jar包的主要引用如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.21</version>
</dependency>
整个工程的目录如下:

MyBatisConfig1和MyBatisConfig2分别声明了两个数据源,连接两个数据库,代码大同小异,主要如下:
//com.xnck.demo.dao.dao1,在另外一个配置中改为dao2
//sqlSessionTemplate1,在另外一个配置中改为sqlSessionTemplate2
//以下遇到1就改为2,配置基本一样
//需要注意AtomikosDataSourceBean,没有这个分布式事务不生效
@Configuration
@DependsOn("transactionManager")
@MapperScan(basePackages = "com.xnck.demo.dao.dao1", sqlSessionTemplateRef = "sqlSessionTemplate1")
public class MyBatisConfig1 {
// 配置数据源
@Bean(name = "datasource1")
@ConditionalOnBean(PropertiesConfigurer.class)
public DataSource testDataSource() throws SQLException {
DruidXADataSource druidXADataSource = new DruidXADataSource();
druidXADataSource.setDriverClassName(PropertiesConfigurer.getProperty("db1.driver"));
druidXADataSource.setUrl(PropertiesConfigurer.getProperty("db1.url"));
druidXADataSource.setPassword(PropertiesConfigurer.getProperty("db1.password"));
druidXADataSource.setUsername(PropertiesConfigurer.getProperty("db1.user"));
druidXADataSource.setInitialSize(1);
druidXADataSource.setMinIdle(1);
druidXADataSource.setMaxActive(20);
druidXADataSource.setFilters("stat");
// 数据源改为Atomikos,将事务交给Atomikos统一管理
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(druidXADataSource);
xaDataSource.setUniqueResourceName("datasource1");
return xaDataSource;
}
@Bean(name = "sqlSessionFactory1")
public SqlSessionFactory testSqlSessionFactory(@Qualifier("datasource1") DataSource dataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper1/*.xml"));
return bean.getObject();
}
@Bean(name = "sqlSessionTemplate1")
public SqlSessionTemplate testSqlSessionTemplate(
@Qualifier("sqlSessionFactory1") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
TransactionConfig必不可少,这是分布式事务生效的关键,代码如下:
@Configuration
public class TransactionConfig {
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager")
public TransactionManager atomikosTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({"userTransaction", "atomikosTransactionManager"})
public JtaTransactionManager transactionManager(@Qualifier("userTransaction")UserTransaction userTransaction,
@Qualifier("atomikosTransactionManager")TransactionManager atomikosTransactionManager) throws Throwable {
return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}
}
至于,dao、entity、mapper这三处的代码就不贴了,没有什么特殊的,只是为了操作数据库而已。
在TestService中的一段测试代码,如下:
//务必注意添加注解@Transactional,否则事务不生效
@Transactional
public void insert() {
UserInfo userInfo = new UserInfo();
userInfo.setId(2);
userInfo.setUserName("22222");
LogInfo logInfo = new LogInfo();
logInfo.setId(2);
logInfo.setCreateDate(new Date());
userMapper.insert(userInfo);
logMapper.insert(logInfo);
}
然后,介绍一下Flowable如何实现分布式事务。
基于上文介绍的,关键在于FlowableConfig,代码如下:
//仍然需要注意AtomikosDataSourceBean,其它的是正常配置
@Configuration
@DependsOn("transactionManager")
public class FlowableConfig {
@Primary
@Bean(name = "dataSource")
@ConditionalOnBean(PropertiesConfigurer.class)
public DataSource getDataSource() throws Exception {
DruidXADataSource druidXADataSource = new DruidXADataSource();
druidXADataSource.setDriverClassName(PropertiesConfigurer.getProperty("db.driver"));
druidXADataSource.setUrl(PropertiesConfigurer.getProperty("db.url"));
druidXADataSource.setUsername(PropertiesConfigurer.getProperty("db.user"));
druidXADataSource.setPassword(PropertiesConfigurer.getProperty("db.password"));
druidXADataSource.setInitialSize(1);
druidXADataSource.setMinIdle(1);
druidXADataSource.setMaxActive(20);
druidXADataSource.setFilters("stat");
// 数据源改为Atomikos,将事务交给Atomikos统一管理
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(druidXADataSource);
xaDataSource.setUniqueResourceName("datasource");
return xaDataSource;
}
@Primary
@Bean(name = "processEngineConfiguration")
public ProcessEngineConfiguration getSpringProcessEngineConfiguration(@Qualifier("dataSource") DataSource dataSource,
@Qualifier("transactionManager") PlatformTransactionManager transactionManager) {
SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
configuration.setDataSource(dataSource);
configuration.setTransactionManager(transactionManager);
configuration.setDatabaseSchemaUpdate("true");
return configuration;
}
}
TestService中的测试代码改为:
//务必注意添加注解@Transactional,否则事务不生效
@Transactional
public void insert() {
UserInfo userInfo = new UserInfo();
userInfo.setId(2);
userInfo.setUserName("22222");
LogInfo logInfo = new LogInfo();
logInfo.setId(2);
logInfo.setCreateDate(new Date());
userMapper.insert(userInfo);
identityService.setAuthenticatedUserId("admin");
Model modelData = repositoryService.getModel("5001");
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(modelData.getKey(), "myTestFlow1", new HashMap<>());
String processInstanceId = processInstance.getId();
identityService.setAuthenticatedUserId(null);
logMapper.insert(logInfo);
}
以上,就是整个实现的过程,欢迎指正和讨论。
