发布时间:2023-08-12 15:00
就是数据库的一批数据,量不少,需要执行同步插入到别的地方。
简单点肯定是一次性查出来,然后循环一个个插入,完事。
考虑点:
① 数据量大,一次性查出来操作,很爆炸。
② 循环里面一次一次地去插入,如果非业务场景必要,基本是不会在循环里面使用sql操作的。
所以该篇作为抛砖引玉(还有很多需要考虑的点),给出一种解决上面场景的代码编写方案, 手动分页,查询后批量插入。
开始实战:
代码大体就这样:
代码:
//获取数据总计数
UserQueryCondition userQueryCondition=new UserQueryCondition();
Integer totalCount = userService.getAllUserCount(userQueryCondition);
//每批同步的数据条数
Integer batchSizeLimit = 500;
//分批切割处理
List pageLimitGroupList = getPageLimitGroupList(totalCount, batchSizeLimit);
int count=1;
//物理批次查询
for (PageLimitDTO pageBatchLimit:pageLimitGroupList){
List pageBatchList = userService.getPageList(
userQueryCondition, pageBatchLimit.getCurrIndex(), pageBatchLimit.getPageSize()
);
if (!CollectionUtils.isEmpty(pageBatchList)){
//批量插入
Boolean syncAddResult = userSyncService.batchSyncAdd(pageBatchList);
//做其余业务
if (syncAddResult){
log.info("第{}次,user数据批量插入成功",count);
}
}
log.info("第{}批次,user数据同步批量插入业务结束执行",count);
count=count+1;
}
切割函数getPageLimitGroupList:
public List getPageLimitGroupList(Integer totalCount, Integer batchSizeLimit ) {
log.info("这一次处理的总数据条数为 ={} 条, 每一批次处理条数为 ={} 条,现在开始做分批切割处理。",totalCount,batchSizeLimit);
int pageNum = totalCount / batchSizeLimit;
int surplus = totalCount % batchSizeLimit;
if (surplus > 0) {
pageNum = pageNum + 1;
}
List pageLimitGroupList =new LinkedList<>();
for(int i = 0; i < pageNum; i++){
Integer currIndex = i * batchSizeLimit;
PageLimitDTO pageLimitDTO=new PageLimitDTO();
pageLimitDTO.setPageSize(batchSizeLimit);
pageLimitDTO.setCurrIndex(currIndex);
pageLimitDTO.setDealDataCount(currIndex+batchSizeLimit);
pageLimitGroupList.add(pageLimitDTO);
log.info("分批切割,第={}次,每次={}条,最终会处理到={}条。",pageLimitGroupList.size(),batchSizeLimit,currIndex+batchSizeLimit);
}
log.info("这一次处理的总数据条数为 ={} 条, 每一批次处理条数为 ={} 条,总共切割分成了 ={} 次,一切准备就绪,可以开始批量插入。",totalCount,batchSizeLimit,pageLimitGroupList.size());
return pageLimitGroupList;
}
物理分页查询的mybatis sql写法示例(核心手动切割分页查询红色部分):
代码:
批量插入示例:
insert into user(
id,
name,
age
)
values
(
#{item.id,jdbcType=BIGINT},
#{item.name,jdbcType=VARCHAR},
#{item.age,jdbcType=INTEGER}
)
然后就是我们美如画的,手动批次切割查询插入:
当前方案作为抛砖引玉,还有比较多可优化的点,但是我不做扩展了,简单列举一下:
1. 每次切割分页查询,其实可以优化。 例如取上一次的id作为下一次的起始条件。
2. 同步异步的封装,可以更动态化。
3. 是否完全需要分批? 动态设置数据超过多少才开始分批切割,不超过,不需要走切割这些流程代码。
等等
好吧,该篇就到这吧(如果对你有帮助,给我点赞收藏一下。)。
使用Scala/Java对Iceberg数据湖的Hive Catalog/Hadoop Catalog/HDFS Path进行表操作
[论文阅读笔记]Towards Deep Learning Models Resistant to Adversarial Attacks
沁恒CH32V103C8T6(二): Linux RISC-V编译和烧录环境配置
基于Ubuntu 18.04.3操作系统的TensorFlow 2.1.0、PyTorch 1.4.0、OpenCV 4.2.0、Darknet深度学习环境搭建
【动手学基于Python+Tensorflow+CNN深度学习的轴承故障诊断(西储大学数据集)(含完整代码)】