A-A+

Flowable6.4 – 分布式事务

2019年12月08日 技术, 默认 暂无评论 阅读 3,420 次

本文是基于SpringBoot、Atomikos、MyBatis、Druid实现的,全文总共分为两大部分:

  1. 介绍一下SpringBoot如何基于Atomikos实现分布式事务。
  2. 介绍一下Flowable如何实现分布式事务。

首先,介绍一下SpringBoot如何实现分布式事务。

Jar包的主要引用如下:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      <version>2.1.1</version>
</dependency>
<dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.38</version>
</dependency>
<dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.1.21</version>
</dependency>

整个工程的目录如下:

MyBatisConfig1和MyBatisConfig2分别声明了两个数据源,连接两个数据库,代码大同小异,主要如下:

//com.xnck.demo.dao.dao1,在另外一个配置中改为dao2
//sqlSessionTemplate1,在另外一个配置中改为sqlSessionTemplate2
//以下遇到1就改为2,配置基本一样
//需要注意AtomikosDataSourceBean,没有这个分布式事务不生效
@Configuration
@DependsOn("transactionManager")
@MapperScan(basePackages = "com.xnck.demo.dao.dao1", sqlSessionTemplateRef = "sqlSessionTemplate1")
public class MyBatisConfig1 {

    // 配置数据源
    @Bean(name = "datasource1")
    @ConditionalOnBean(PropertiesConfigurer.class)
    public DataSource testDataSource() throws SQLException {
        DruidXADataSource druidXADataSource = new DruidXADataSource();
        druidXADataSource.setDriverClassName(PropertiesConfigurer.getProperty("db1.driver"));
        druidXADataSource.setUrl(PropertiesConfigurer.getProperty("db1.url"));
        druidXADataSource.setPassword(PropertiesConfigurer.getProperty("db1.password"));
        druidXADataSource.setUsername(PropertiesConfigurer.getProperty("db1.user"));
        druidXADataSource.setInitialSize(1);
        druidXADataSource.setMinIdle(1);
        druidXADataSource.setMaxActive(20);
        druidXADataSource.setFilters("stat");

        // 数据源改为Atomikos,将事务交给Atomikos统一管理
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(druidXADataSource);
        xaDataSource.setUniqueResourceName("datasource1");

        return xaDataSource;
    }

    @Bean(name = "sqlSessionFactory1")
    public SqlSessionFactory testSqlSessionFactory(@Qualifier("datasource1") DataSource dataSource)
            throws Exception 
{
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper1/*.xml"));
        return bean.getObject();
    }

    @Bean(name = "sqlSessionTemplate1")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("sqlSessionFactory1")
 SqlSessionFactory sqlSessionFactory) throws Exception 
{
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

TransactionConfig必不可少,这是分布式事务生效的关键,代码如下:

@Configuration
public class TransactionConfig {

    @Bean(name = "userTransaction")
    public UserTransaction userTransaction() throws Throwable {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(10000);
        return userTransactionImp;
    }

    @Bean(name = "atomikosTransactionManager")
    public TransactionManager atomikosTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        return userTransactionManager;
    }

    @Bean(name = "transactionManager")
    @DependsOn({"userTransaction""atomikosTransactionManager"})
    public JtaTransactionManager transactionManager(@Qualifier("userTransaction")UserTransaction userTransaction,
                                                    @Qualifier("atomikosTransactionManager")TransactionManager atomikosTransactionManager) throws Throwable 
{
        return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
    }
}

至于,dao、entity、mapper这三处的代码就不贴了,没有什么特殊的,只是为了操作数据库而已。

在TestService中的一段测试代码,如下:

//务必注意添加注解@Transactional,否则事务不生效
@Transactional
public void insert() {
    UserInfo userInfo = new UserInfo();
    userInfo.setId(2);
    userInfo.setUserName("22222");

    LogInfo logInfo = new LogInfo();
    logInfo.setId(2);
    logInfo.setCreateDate(new Date());

    userMapper.insert(userInfo);
    logMapper.insert(logInfo);
}

然后,介绍一下Flowable如何实现分布式事务。

基于上文介绍的,关键在于FlowableConfig,代码如下:

//仍然需要注意AtomikosDataSourceBean,其它的是正常配置
@Configuration
@DependsOn("transactionManager")
public class FlowableConfig {

    @Primary
    @Bean(name = "dataSource")
    @ConditionalOnBean(PropertiesConfigurer.class)
    public DataSource getDataSource() throws Exception {
        DruidXADataSource druidXADataSource = new DruidXADataSource();
        druidXADataSource.setDriverClassName(PropertiesConfigurer.getProperty("db.driver"));
        druidXADataSource.setUrl(PropertiesConfigurer.getProperty("db.url"));
        druidXADataSource.setUsername(PropertiesConfigurer.getProperty("db.user"));
        druidXADataSource.setPassword(PropertiesConfigurer.getProperty("db.password"));
        druidXADataSource.setInitialSize(1);
        druidXADataSource.setMinIdle(1);
        druidXADataSource.setMaxActive(20);
        druidXADataSource.setFilters("stat");

        // 数据源改为Atomikos,将事务交给Atomikos统一管理
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(druidXADataSource);
        xaDataSource.setUniqueResourceName("datasource");

        return xaDataSource;
    }

    @Primary
    @Bean(name = "processEngineConfiguration")
    public ProcessEngineConfiguration getSpringProcessEngineConfiguration(@Qualifier("dataSource") DataSource dataSource,
                                                                          @Qualifier("transactionManager") PlatformTransactionManager transactionManager) 
{
        SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
        configuration.setDataSource(dataSource);
        configuration.setTransactionManager(transactionManager);
        configuration.setDatabaseSchemaUpdate("true");
        return configuration;
    }
}

TestService中的测试代码改为:

//务必注意添加注解@Transactional,否则事务不生效
@Transactional
public void insert() {
    UserInfo userInfo = new UserInfo();
    userInfo.setId(2);
    userInfo.setUserName("22222");

    LogInfo logInfo = new LogInfo();
    logInfo.setId(2);
    logInfo.setCreateDate(new Date());

    userMapper.insert(userInfo);

    identityService.setAuthenticatedUserId("admin");
    Model modelData = repositoryService.getModel("5001");
    ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(modelData.getKey(), "myTestFlow1"new HashMap<>());
    String processInstanceId = processInstance.getId();
    identityService.setAuthenticatedUserId(null);

    logMapper.insert(logInfo);
}

以上,就是整个实现的过程,欢迎指正和讨论。

觉的不错?可以关注我的公众号↑↑↑

给我留言

Copyright © 字痕随行 保留所有权利.   Theme  Ality 京ICP备14039894号

用户登录

分享到: