Spring Boot3中基于MyBatis+定时任务实现两个数据库增量同步数据
引言
在当今复杂的互联网软件开发场景下,多数据库架构愈发常见。开发人员常常面临一个棘手问题:如何在 Spring Boot3 项目中,不借助第三方框架及引用,通过编码方式实现两个数据库的数据实时同步。想象一下,你正在开发一个大型电商系统,订单数据存于 MySQL 数据库,而用户积分数据存于 PostgreSQL 数据库,为了给用户提供精准的服务,如根据订单金额实时更新积分,就迫切需要实现这两个数据库间数据的实时同步。今天,就为大家详细拆解这一过程。
实现数据增量同步的关键步骤
配置数据源
在 Spring Boot 项目中,首先需要在application.properties或application.yml文件中配置源数据库和目标数据库的数据源。以application.properties为例,配置如下:
# 源数据库数据源配置
spring.datasource.source.url=jdbc:mysql://source - db - host:3306/source_db
spring.datasource.source.username=source_user
spring.datasource.source.password=source_password
spring.datasource.source.driver - class - name=com.mysql.cj.jdbc.Driver
# 目标数据库数据源配置
spring.datasource.target.url=jdbc:mysql://target - db - host:3306/target_db
spring.datasource.target.username=target_user
spring.datasource.target.password=target_password
spring.datasource.target.driver - class - name=com.mysql.cj.jdbc.Driver
这样,Spring Boot 就能识别并管理这两个数据源,为后续的数据操作做准备。
(二)创建 Mapper 映射文件
利用 MyBatis 的 Mapper 映射文件来定义查询源数据库数据和插入目标数据库数据的 SQL 语句。假设我们要同步一张名为user的表,源数据库的 Mapper 映射文件SourceUserMapper.xml可以这样编写:
<mapper namespace="com.example.mapper.SourceUserMapper">
<select id="getIncrementalUsers" parameterType="java.util.Date" resultType="com.example.entity.User">
SELECT * FROM user WHERE update_time > #{lastSyncTime}
</select>
</mapper>
上述 SQL 语句通过比较update_time字段来获取自上次同步时间lastSyncTime之后更新的数据。
目标数据库的 Mapper 映射文件TargetUserMapper.xml则用于定义插入数据的操作:
<mapper namespace="com.example.mapper.TargetUserMapper">
<insert id="insertUsers" parameterType="java.util.List">
INSERT INTO user (id, name, age, update_time) VALUES
<foreach collection="list" item="item" separator=",">
(#{item.id}, #{item.name}, #{item.age}, #{item.update_time})
</foreach>
</insert>
</mapper>
这里使用<foreach>标签来实现批量插入,提高数据插入效率。
(三)使用定时任务
Spring Boot 提供了方便的定时任务支持,通过@Scheduled注解即可实现。在一个服务类中定义定时任务方法,如下:
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class DataSyncService {
@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行
public void syncData() {
// 数据同步逻辑将在此处编写
}
}
@Scheduled注解中的cron表达式用于设定任务执行的时间规则,这里表示每天凌晨 1 点执行一次数据同步任务。开发者可以根据实际业务需求灵活调整cron表达式,如@Scheduled(cron = "0 0/30 * * * ?")表示每 30 分钟执行一次。
(四)获取增量数据
在定时任务执行的方法中,首先要获取增量数据。通过调用源数据库的 Mapper 接口方法,传入上次同步时间来获取更新的数据。假设我们有一个SourceUserMapper接口,其定义如下:
import com.example.entity.User;
import org.apache.ibatis.annotations.Mapper;
import java.util.Date;
import java.util.List;
@Mapper
public interface SourceUserMapper {
List<User> getIncrementalUsers(Date lastSyncTime);
}
在DataSyncService类的syncData方法中获取增量数据:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@Service
public class DataSyncService {
@Autowired
private SourceUserMapper sourceUserMapper;
@Scheduled(cron = "0 0 1 * * ?")
public void syncData() {
Date lastSyncTime = getLastSyncTime();// 获取上次同步时间的方法需自行实现
List<User> incrementalUsers = sourceUserMapper.getIncrementalUsers(lastSyncTime);
// 后续处理增量数据
}
private Date getLastSyncTime() {
// 这里可以从数据库、配置文件或其他存储方式中获取上次同步时间
// 示例:从数据库中查询
// return lastSyncTimeRepository.findLastSyncTime();
}
}
(五)数据转换与保存
获取到增量数据后,可能需要根据目标数据库的表结构进行数据转换。假设源数据库和目标数据库的user表结构完全一致,直接将获取到的incrementalUsers列表保存到目标数据库即可。通过调用目标数据库的 Mapper 接口方法实现保存:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@Service
public class DataSyncService {
@Autowired
private SourceUserMapper sourceUserMapper;
@Autowired
private TargetUserMapper targetUserMapper;
@Scheduled(cron = "0 0 1 * * ?")
public void syncData() {
Date lastSyncTime = getLastSyncTime();
List<User> incrementalUsers = sourceUserMapper.getIncrementalUsers(lastSyncTime);
targetUserMapper.insertUsers(incrementalUsers);
updateLastSyncTime();// 更新上次同步时间的方法需自行实现
}
private Date getLastSyncTime() {
// 实现获取上次同步时间的逻辑
}
private void updateLastSyncTime() {
// 实现更新上次同步时间的逻辑,如更新到数据库
}
}
上述代码中,TargetUserMapper接口定义了insertUsers方法用于批量插入数据:
import com.example.entity.User;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface TargetUserMapper {
void insertUsers(List<User> users);
}
同时,别忘了在每次同步完成后更新上次同步时间,以便下次获取增量数据时使用。更新上次同步时间的方法可以将当前时间保存到数据库或其他存储介质中。
优化方案探讨
(一)分布式锁机制
在分布式环境下,可能存在多个实例同时执行数据同步任务的情况,这会导致数据重复同步等问题。为了解决这个问题,可以引入分布式锁机制。例如使用 Redis 实现分布式锁,在定时任务执行前尝试获取锁,只有获取到锁的实例才能执行数据同步任务,执行完成后释放锁。示例代码如下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
public class DataSyncService {
@Autowired
private SourceUserMapper sourceUserMapper;
@Autowired
private TargetUserMapper targetUserMapper;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private static final String LOCK_KEY = "data - sync - lock";
@Scheduled(cron = "0 0 1 * * ?")
public void syncData() {
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "locked", 10, TimeUnit.MINUTES);
if (lock != null && lock) {
try {
Date lastSyncTime = getLastSyncTime();
List<User> incrementalUsers = sourceUserMapper.getIncrementalUsers(lastSyncTime);
targetUserMapper.insertUsers(incrementalUsers);
updateLastSyncTime();
} finally {
stringRedisTemplate.delete(LOCK_KEY);
}
}
}
private Date getLastSyncTime() {
// 实现获取上次同步时间的逻辑
}
private void updateLastSyncTime() {
// 实现更新上次同步时间的逻辑
}
}
上述代码中,
stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "locked", 10, TimeUnit.MINUTES)尝试设置锁,如果设置成功(即当前没有其他实例持有锁),则执行数据同步任务,任务完成后删除锁。如果获取锁失败,则不执行同步任务,避免了重复同步。
(二)分页批量处理
当数据量较大时,一次性获取和处理所有增量数据可能会导致内存溢出等问题。可以采用分页批量处理的方式,每次从源数据库获取一定数量的数据进行同步。在SourceUserMapper.xml中修改查询 SQL,添加分页参数:
<mapper namespace="com.example.mapper.SourceUserMapper">
<select id="getIncrementalUsers" parameterType="map" resultType="com.example.entity.User">
SELECT * FROM user WHERE update_time > #{lastSyncTime}
LIMIT #{offset}, #{pageSize}
</select>
</mapper>
在DataSyncService类中,通过循环多次调用该方法,每次传递不同的offset和pageSize参数来实现分页批量处理:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class DataSyncService {
@Autowired
private SourceUserMapper sourceUserMapper;
@Autowired
private TargetUserMapper targetUserMapper;
private static final int PAGE_SIZE = 1000;
@Scheduled(cron = "0 0 1 * * ?")
public void syncData() {
Date lastSyncTime = getLastSyncTime();
int offset = 0;
while (true) {
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("lastSyncTime", lastSyncTime);
paramMap.put("offset", offset);
paramMap.put("pageSize", PAGE_SIZE);
List<User> incrementalUsers = sourceUserMapper.getIncrementalUsers(paramMap);
if (incrementalUsers.isEmpty()) {
break;
}
targetUserMapper.insertUsers(incrementalUsers);
offset += PAGE_SIZE;
}
updateLastSyncTime();
}
private Date getLastSyncTime() {
// 实现获取上次同步时间的逻辑
}
private void updateLastSyncTime() {
// 实现更新上次同步时间的逻辑
}
}
这样可以有效控制每次处理的数据量,避免内存压力过大,同时提高数据同步的效率和稳定性。
总结
通过以上步骤,我们在 Spring Boot3 框架下,利用 MyBatis 和定时任务成功实现了两个数据库之间的数据增量同步。从数据源配置、Mapper 映射文件编写,到定时任务触发和数据处理,每个环节都紧密配合,确保了数据的及时更新和准确性。同时,通过分布式锁机制和分页批量处理等优化方案,进一步提升了系统在不同场景下的性能和稳定性。希望本文能为广大互联网软件开发人员在处理数据同步相关问题时提供有价值的参考,帮助大家在项目开发中更加高效地解决实际问题。在实际应用中,开发者可以根据具体的业务需求和数据特点,灵活调整和优化这些实现方案,以达到最佳的效果。