发布时间:2023-09-23 15:30
1.ShardingSphere 中的分布式事务
在 ShardingSphere 中,除本地事务之外,还提供针对分布式事务的两种实现方案,分别是 XA 事务和柔性事务 具体可见官网:分布式事务 :: ShardingSphere
XA 事务:XA 事务提供基于两阶段提交协议的实现机制。所谓两阶段提交,顾名思义分成两个阶段,一个是准备阶段,一个是执行阶段。在准备阶段中,协调者发起一个提议,分别询问各参与者是否接受。在执行阶段,协调者根据参与者的反馈,提交或终止事务。如果参与者全部同意则提交,只要有一个参与者不同意就终止,业界在实现 XA 事务时也存在一些主流工具库,包括 Atomikos、Narayana 和 Bitronix。ShardingSphere 对这三种工具库都进行了集成,并默认使用 Atomikos 来完成两阶段提交
BASE 事务:XA 事务是典型的强一致性事务,也就是完全遵循事务的 ACID 设计原则。与 XA 事务这种“刚性”不同,柔性事务则遵循 BASE 设计理论,追求的是最终一致性。这里的 BASE 来自基本可用(Basically Available)、软状态(Soft State)和最终一致性(Eventual Consistency)这三个概念 实现柔性事务如阿里巴巴提供一些优秀的框架 Seata,ShardingSphere 内部也集成了对 Seata 的支持,可以根据需要集成其他分布式事务类开源框架,并基于微内核架构嵌入到 ShardingSphere 运行时环境中
2.项目搭建
2.1 pom文件中的依赖:
4.0.0 org.springframework.boot spring-boot-starter-parent 2.7.1 com.shardingsphere demo 0.0.1-SNAPSHOT shardingsphere Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-web mysql mysql-connector-java com.alibaba druid 1.2.6 com.baomidou mybatis-plus-boot-starter 3.4.0 org.apache.shardingsphere sharding-jdbc-spring-boot-starter 4.1.1 org.apache.shardingsphere sharding-transaction-xa-core 4.1.1 org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test cn.hutool hutool-all 5.5.8 org.springframework.boot spring-boot-maven-plugin
2.2 配置spring-boot的事务管理器
package com.shardingsphere.config; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.sql.DataSource; @Configuration @EnableTransactionManagement public class TransactionConfig { /** * 关联 datasource 到 spring 的 PlatformTransactionManager(没有直接使用 jdbc 原生事务) */ @Bean public PlatformTransactionManager txManager(@Qualifier("shardingDataSource") final DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } /** * 通过 jdbcTemplate 简化原生 sharding-jdbc SQL 的使用 */ @Bean public JdbcTemplate jdbcTemplate(@Qualifier("shardingDataSource")final DataSource dataSource) { return new JdbcTemplate(dataSource); } }
2.3 创建MessageMapper层
package com.shardingsphere.Mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.shardingsphere.entity.Message; import org.apache.ibatis.annotations.Mapper; import org.springframework.stereotype.Repository; /** * @Author 何志鹏 * @Date 2022/7/18 10:59 * @Version 1.0 */ @Mapper @Repository public interface MessageMapper extends BaseMapper{ }
2.4 创建 MessageService层
package com.shardingsphere.service; import com.baomidou.mybatisplus.extension.service.IService; import com.shardingsphere.entity.Message; /** * @Author 何志鹏 * @Date 2022/7/18 18:10 * @Version 1.0 */ public interface MessageService extends IService{ int add(); }
2.5 创建MessageServiceImpl层
package com.shardingsphere.service.impl; import cn.hutool.core.lang.Snowflake; import cn.hutool.core.util.IdUtil; import com.baomidou.mybatisplus.core.mapper.Mapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.shardingsphere.Mapper.MessageMapper; import com.shardingsphere.Mapper.UserMapper; import com.shardingsphere.entity.Message; import com.shardingsphere.entity.User; import com.shardingsphere.service.MessageService; import org.apache.shardingsphere.transaction.annotation.ShardingTransactionType; import org.apache.shardingsphere.transaction.core.TransactionType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Date; /** * @Author 何志鹏 * @Date 2022/7/18 18:12 * @Version 1.0 */ @Service public class MessageServiceImpl extends ServiceImplimplements MessageService { @Autowired private UserMapper userMapper; @Autowired private MessageMapper messageMapper; @Transactional(rollbackFor = Exception.class) @ShardingTransactionType(TransactionType.XA) @Override public int add() { Snowflake snowflake = IdUtil.createSnowflake(1, 1); Message message = new Message(); int randomNum = (int) (Math.random() * 9000 + 1000); message.setMsgId(snowflake.nextId()); message.setContactId(snowflake.nextId()); message.setUserId(new Long(randomNum)); message.setUserTag(Boolean.TRUE); message.setRecallTime(new Date()); message.setContent("测试测试"); message.setPushMsg("55555555"); message.setCreationTime(new Date()); messageMapper.insert(message); User user = new User(); user.setName("何志鹏555"); int randomNum2 = (int) (Math.random() * 9000 + 1000); user.setAge(new Long(randomNum2)); user.setSex("2"); user.setEducation("1"); int insert = userMapper.insert(user); if(insert>0){ //抛出异常 事务回滚 int a = 10 / 0; return 1; } return 0; } }
2.6 创建MessageController层
package com.shardingsphere.controller; import com.shardingsphere.service.MessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Author 何志鹏 * @Date 2022/7/19 15:23 * @Version 1.0 */ @RestController @RequestMapping("/message") public class MessageController { @Autowired private MessageService messageService; @GetMapping("/add") public int add(){ return messageService.add(); } }
2.7 用postman调用message/add测试
以上如此便会回滚事务 注意分布式事务管理器的特有配置 XA事务管理器参数配置(可选)
ShardingSphere默认的XA事务管理器为Atomikos,在项目的logs目录中会生成xa_tx.log
, 这是XA崩溃恢复时所需的日志,请勿删除 也可以通过在项目的classpath中添加jta.properties
来定制化Atomikos配置项。具体的配置规则请参考Atomikos的官方文档 如下:
最后附上代码: https://gitee.com/hezhipeng_ek/shardingsphere.git