Spring Boot3中基于MyBatis+定时任务实现两个数据库增量同步数据

yumo6663周前 (08-28)技术文章27

引言

在当今复杂的互联网软件开发场景下,多数据库架构愈发常见。开发人员常常面临一个棘手问题:如何在 Spring Boot3 项目中,不借助第三方框架及引用,通过编码方式实现两个数据库的数据实时同步。想象一下,你正在开发一个大型电商系统,订单数据存于 MySQL 数据库,而用户积分数据存于 PostgreSQL 数据库,为了给用户提供精准的服务,如根据订单金额实时更新积分,就迫切需要实现这两个数据库间数据的实时同步。今天,就为大家详细拆解这一过程。

实现数据增量同步的关键步骤

配置数据源

在 Spring Boot 项目中,首先需要在application.properties或application.yml文件中配置源数据库和目标数据库的数据源。以application.properties为例,配置如下:

# 源数据库数据源配置
spring.datasource.source.url=jdbc:mysql://source - db - host:3306/source_db
spring.datasource.source.username=source_user
spring.datasource.source.password=source_password
spring.datasource.source.driver - class - name=com.mysql.cj.jdbc.Driver

# 目标数据库数据源配置
spring.datasource.target.url=jdbc:mysql://target - db - host:3306/target_db
spring.datasource.target.username=target_user
spring.datasource.target.password=target_password
spring.datasource.target.driver - class - name=com.mysql.cj.jdbc.Driver

这样,Spring Boot 就能识别并管理这两个数据源,为后续的数据操作做准备。

(二)创建 Mapper 映射文件

利用 MyBatis 的 Mapper 映射文件来定义查询源数据库数据和插入目标数据库数据的 SQL 语句。假设我们要同步一张名为user的表,源数据库的 Mapper 映射文件SourceUserMapper.xml可以这样编写:

<mapper namespace="com.example.mapper.SourceUserMapper">
    <select id="getIncrementalUsers" parameterType="java.util.Date" resultType="com.example.entity.User">
        SELECT * FROM user WHERE update_time > #{lastSyncTime}
    </select>
</mapper>

上述 SQL 语句通过比较update_time字段来获取自上次同步时间lastSyncTime之后更新的数据。

目标数据库的 Mapper 映射文件TargetUserMapper.xml则用于定义插入数据的操作:

<mapper namespace="com.example.mapper.TargetUserMapper">
    <insert id="insertUsers" parameterType="java.util.List">
        INSERT INTO user (id, name, age, update_time) VALUES
        <foreach collection="list" item="item" separator=",">
            (#{item.id}, #{item.name}, #{item.age}, #{item.update_time})
        </foreach>
    </insert>
</mapper>

这里使用<foreach>标签来实现批量插入,提高数据插入效率。

(三)使用定时任务

Spring Boot 提供了方便的定时任务支持,通过@Scheduled注解即可实现。在一个服务类中定义定时任务方法,如下:

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class DataSyncService {

    @Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行
    public void syncData() {
        // 数据同步逻辑将在此处编写
    }
}

@Scheduled注解中的cron表达式用于设定任务执行的时间规则,这里表示每天凌晨 1 点执行一次数据同步任务。开发者可以根据实际业务需求灵活调整cron表达式,如@Scheduled(cron = "0 0/30 * * * ?")表示每 30 分钟执行一次。

(四)获取增量数据

在定时任务执行的方法中,首先要获取增量数据。通过调用源数据库的 Mapper 接口方法,传入上次同步时间来获取更新的数据。假设我们有一个SourceUserMapper接口,其定义如下:

import com.example.entity.User;
import org.apache.ibatis.annotations.Mapper;
import java.util.Date;
import java.util.List;

@Mapper
public interface SourceUserMapper {
    List<User> getIncrementalUsers(Date lastSyncTime);
}

在DataSyncService类的syncData方法中获取增量数据:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;

@Service
public class DataSyncService {

    @Autowired
    private SourceUserMapper sourceUserMapper;

    @Scheduled(cron = "0 0 1 * * ?")
    public void syncData() {
        Date lastSyncTime = getLastSyncTime();// 获取上次同步时间的方法需自行实现
        List<User> incrementalUsers = sourceUserMapper.getIncrementalUsers(lastSyncTime);
        // 后续处理增量数据
    }

    private Date getLastSyncTime() {
        // 这里可以从数据库、配置文件或其他存储方式中获取上次同步时间
        // 示例:从数据库中查询
        // return lastSyncTimeRepository.findLastSyncTime();
    }
}

(五)数据转换与保存

获取到增量数据后,可能需要根据目标数据库的表结构进行数据转换。假设源数据库和目标数据库的user表结构完全一致,直接将获取到的incrementalUsers列表保存到目标数据库即可。通过调用目标数据库的 Mapper 接口方法实现保存:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;

@Service
public class DataSyncService {

    @Autowired
    private SourceUserMapper sourceUserMapper;
    @Autowired
    private TargetUserMapper targetUserMapper;

    @Scheduled(cron = "0 0 1 * * ?")
    public void syncData() {
        Date lastSyncTime = getLastSyncTime();
        List<User> incrementalUsers = sourceUserMapper.getIncrementalUsers(lastSyncTime);
        targetUserMapper.insertUsers(incrementalUsers);
        updateLastSyncTime();// 更新上次同步时间的方法需自行实现
    }

    private Date getLastSyncTime() {
        // 实现获取上次同步时间的逻辑
    }

    private void updateLastSyncTime() {
        // 实现更新上次同步时间的逻辑,如更新到数据库
    }
}

上述代码中,TargetUserMapper接口定义了insertUsers方法用于批量插入数据:

import com.example.entity.User;
import org.apache.ibatis.annotations.Mapper;

import java.util.List;

@Mapper
public interface TargetUserMapper {
    void insertUsers(List<User> users);
}

同时,别忘了在每次同步完成后更新上次同步时间,以便下次获取增量数据时使用。更新上次同步时间的方法可以将当前时间保存到数据库或其他存储介质中。

优化方案探讨

(一)分布式锁机制

在分布式环境下,可能存在多个实例同时执行数据同步任务的情况,这会导致数据重复同步等问题。为了解决这个问题,可以引入分布式锁机制。例如使用 Redis 实现分布式锁,在定时任务执行前尝试获取锁,只有获取到锁的实例才能执行数据同步任务,执行完成后释放锁。示例代码如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Service
public class DataSyncService {

    @Autowired
    private SourceUserMapper sourceUserMapper;
    @Autowired
    private TargetUserMapper targetUserMapper;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private static final String LOCK_KEY = "data - sync - lock";

    @Scheduled(cron = "0 0 1 * * ?")
    public void syncData() {
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "locked", 10, TimeUnit.MINUTES);
        if (lock != null && lock) {
            try {
                Date lastSyncTime = getLastSyncTime();
                List<User> incrementalUsers = sourceUserMapper.getIncrementalUsers(lastSyncTime);
                targetUserMapper.insertUsers(incrementalUsers);
                updateLastSyncTime();
            } finally {
                stringRedisTemplate.delete(LOCK_KEY);
            }
        }
    }

    private Date getLastSyncTime() {
        // 实现获取上次同步时间的逻辑
    }

    private void updateLastSyncTime() {
        // 实现更新上次同步时间的逻辑
    }
}

上述代码中,
stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "locked", 10, TimeUnit.MINUTES)尝试设置锁,如果设置成功(即当前没有其他实例持有锁),则执行数据同步任务,任务完成后删除锁。如果获取锁失败,则不执行同步任务,避免了重复同步。

(二)分页批量处理

当数据量较大时,一次性获取和处理所有增量数据可能会导致内存溢出等问题。可以采用分页批量处理的方式,每次从源数据库获取一定数量的数据进行同步。在SourceUserMapper.xml中修改查询 SQL,添加分页参数:

<mapper namespace="com.example.mapper.SourceUserMapper">
    <select id="getIncrementalUsers" parameterType="map" resultType="com.example.entity.User">
        SELECT * FROM user WHERE update_time > #{lastSyncTime}
        LIMIT #{offset}, #{pageSize}
    </select>
</mapper>

在DataSyncService类中,通过循环多次调用该方法,每次传递不同的offset和pageSize参数来实现分页批量处理:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Service
public class DataSyncService {

    @Autowired
    private SourceUserMapper sourceUserMapper;
    @Autowired
    private TargetUserMapper targetUserMapper;

    private static final int PAGE_SIZE = 1000;

    @Scheduled(cron = "0 0 1 * * ?")
    public void syncData() {
        Date lastSyncTime = getLastSyncTime();
        int offset = 0;
        while (true) {
            Map<String, Object> paramMap = new HashMap<>();
            paramMap.put("lastSyncTime", lastSyncTime);
            paramMap.put("offset", offset);
            paramMap.put("pageSize", PAGE_SIZE);
            List<User> incrementalUsers = sourceUserMapper.getIncrementalUsers(paramMap);
            if (incrementalUsers.isEmpty()) {
                break;
            }
            targetUserMapper.insertUsers(incrementalUsers);
            offset += PAGE_SIZE;
        }
        updateLastSyncTime();
    }

    private Date getLastSyncTime() {
        // 实现获取上次同步时间的逻辑
    }

    private void updateLastSyncTime() {
        // 实现更新上次同步时间的逻辑
    }
}

这样可以有效控制每次处理的数据量,避免内存压力过大,同时提高数据同步的效率和稳定性。

总结

通过以上步骤,我们在 Spring Boot3 框架下,利用 MyBatis 和定时任务成功实现了两个数据库之间的数据增量同步。从数据源配置、Mapper 映射文件编写,到定时任务触发和数据处理,每个环节都紧密配合,确保了数据的及时更新和准确性。同时,通过分布式锁机制和分页批量处理等优化方案,进一步提升了系统在不同场景下的性能和稳定性。希望本文能为广大互联网软件开发人员在处理数据同步相关问题时提供有价值的参考,帮助大家在项目开发中更加高效地解决实际问题。在实际应用中,开发者可以根据具体的业务需求和数据特点,灵活调整和优化这些实现方案,以达到最佳的效果。

相关文章

java定时器Timer 你还记得吗?_c#timer定时器的基本用法

java已经帮我们写了定时器的任务,我们只需要按照API的文档来实现就行。首先我们看下java帮我们实现的定时器类:java.lang.Timer我们先来看下Timer的构造方法:我们可以看到Time...

Java 底层大揭秘系列:如何实现定时任务

定时器已经是现代软件中不可缺少的一部分,例如每隔5秒去查询一下状态,是否有新邮件,实现一个闹钟等, Java 中已经有现成的 api 供使用,但是如果你想设计更高效,更精准的定时器任务,就需要了解底层...

java总结:8.正则表达式,匹配一天的指定时间段跑定时器

定时时间(每天早上3点到晚上23点,每5分钟执行):0 1/5 3-22 ? * * * 举例操作:定时器每20分钟执行一次,每天从3点执行,到5点结束 0 1/20 3-4 ? * * 执行的结...

面试突击34:如何使用线程池执行定时任务?

在 Java 语言中,有两个线程池可以执行定时任务:ScheduledThreadPool 和 SingleThreadScheduledExecutor,其中 SingleThreadSchedul...

JAVA架构师之路-教你如何去实现一个分布式定时任务

什么是分布式定时任务:首先,我们要了解计划任务这个概念,计划任务是指由计划的定时运行或者周期性运行的程序。我们最常见的就是Linux的‘crontab’和Windows的‘计划任务’。那么什么是分布式...

Java---定时任务的实现方式_java定时任务的实现方式

一 什么是定时任务见名知意,定时任务就是每隔一段时间执行一次这个任务,比如我们日常生活中的下课铃,或者是闹钟等等,就是在设置好的固定时间段去不断执行这个任务。二 如何实现定时任务功能这次我介绍两种执行...