行业包数据一致性使用说明
1. 概述
本方案采用分层解耦和多种同步策略结合的方式,通过CDC(变更数据捕获)技术实现数据从生产库到业务库的流转,并根据业务字段的特性(如稳定性、实时性要求)采用不同的最终一致性策略,在保证系统性能和解耦的同时,满足数据一致性需求。
2. 架构流程
整个数据流从左至右,可分为三个主要阶段:
阶段一:生产库 -> 只读库 (读写分离)
● 源端 : MBM生产库
○ 这是业务的源头,所有基础数据的增删改查操作都在这里进行。
● 目标端 : MBM只读库
阶段二:只读库 -> 冗余数据库 (数据预处理)
● 源端 : MBM只读库
● 同步技术 : Debezium CDC同步
○ Debezium是一个CDC工具,通常用于捕获数据库(如MySQL, PostgreSQL)的变更。这里它从只读库捕获变更。
● 目标端 : MBM冗余数据库(部分表、字段)
○ 作用:
■ 数据过滤 : 并非同步全部数据,而是只选择行业包业务所需的部分表和字段,减少数据传输量。
■ 格式准备 : 为下一步同步到行业包业务表做好数据准备。此库可以看作是数据同步的“中转站”或“数据湖”。
阶段三:冗余数据库 -> 行业包业务表 (最终一致性策略)
这是方案最核心的部分,针对不同类型的业务数据,采用了三种不同的同步策略,体现了方案的灵活性。
● 源端 : MBM冗余数据库
● 目标端 : 行业包业务表
○ 包含QMS、APS、LES等不同业务系统的表。
3. 使用说明:
3.1. 配置说明
3.1.1. 概述
在 imom-mdm 项目的 resources目录下,您可以创建 redundant-xxx.json配置文件(例如:redundant-mdm.json)来定义数据同步规则。该配置文件用于指定源数据和目标数据表之间的同步关系。
项目根目录/
└── resources/
└── redundant-mdm.json # 或其他 redundant-xxx.json 文件
3.1.2. 基本结构
{
"sync": {
"items": [
{
"name": "同步任务名称",
"syncModel": "同步模式",
"enabled":"是否启用,默认启用"
"source": {
"name": "源数据表名",
"datasource": "数据源名称(可选),默认为:mbm-mdm-reserve",
"changeTrigger": "变更触发标志(可选),是否发送MQ消息,默认:false"
},
"target": {
"name": "目标数据表名",
"datasource": "数据源名称(可选),默认:imom-redundant"
}
}
]
},
"consistency": {
"items": [
{
"name": "一致性同步任务名称",
"changeTrigger": "实时变更触发标志,黓认:false",
"reader": {
"name": "读取表名",
"businessKey": "主键字段(可选),默认:id",
"columns": ["字段1", "字段2"]
},
"writer": {
"name": "写入表名",
"foreignKey": "外键字段",
"columns": ["字段1", "字段2"]
}
}
]
}
}
3.1.3. 配置参数说明
3.1.3.1. 数据同步配置 (sync)
sync 对象
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
| items | Array | 是 | 数据同步任务配置数组 |
items 数组中的同步任务配置
| 参数 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
| name | String | 是 | 同步任务的唯一标识符 | "t_mdm_part" |
| source | Object | 是 | 源数据配置 | |
| target | Object | 否 | 目标数据配置,省略时使用默认配置与source相同 | |
| syncModel | String | 否 | 同步模式增量模式:incremental全量模式:full | incremental |
| enabled | Boolean | 否 | 是否启用 | true |
source 对象参数
| 参数 | 类型 | 必填 | 说明 | 默认值 |
|---|---|---|---|---|
| name | String | 是 | 源数据表名称 | - |
| datasource | String | 否 | 数据源名称 | 默认数据源 |
| changeTrigger | Boolean | 否 | 是否启用变更触发 | false |
target 对象参数
| 参数 | 类型 | 必填 | 说明 | 默认值 |
|---|---|---|---|---|
| name | String | 否 | 目标数据表名称 | 与source.name相同 |
| datasource | String | 否 | 目标数据源名称 | 默认数据源 |
3.1.3.2. 2. 数据一致性配置 (consistency)
consistency 对象
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
| items | Array | 是 | 数据一致性检查任务配置数组 |
items 数组中的一致性检查任务配置
| 参数 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
| name | String | 是 | 一致性检查任务的唯一标识符 | mdm-line |
| changeTrigger | Boolean | 是 | 是否启用变更触发 | true |
| reader | Object | 是 | 数据读取端配置 | |
| writer | Object | 是 | 数据写入端配置 |
reader 对象参数
| 参数 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
| name | String | 是 | 读取表名称 | "t_mdm_line_dme" |
| businessKey | String | 是 | 业务主键字段名 | "lineNo" |
| columns | Array | 是 | 需要检查的字段列表 | ["lineName", "lineNo"] |
| datasource | String | 否 | 数据源名称 | 默认数据源 |
writer 对象参数
| 参数 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
| name | String | 是 | 写入表名称 | "mpm_line_test" |
| foreignKey | String | 是 | 外键字段名 | "line_code" |
| columns | Array | 是 | 需要检查的字段列表 | ["line_name", "line_code"] |
| datasource | String | 否 | 数据源名称 | 默认数据源 |
3.2. 数据同步到冗余库
3.2.1. 概述
本文档说明如何通过 REST API 实现从华为只读库到 iMOM 冗余库的数据同步功能。该功能提供数据同步启动、配置初始化和验证等核心操作。
3.2.2. 接口
- 启动数据同步
接口说明:手动触发从华为只读库到 iMOM 冗余库的数据同步过程
curl -X POST "http://localhost:8080/v1/redundant-sync/start" \
-H "Content-Type: application/json" \
-d '{
"targetTableNames": ["t_mdm_line", "t_mdm_part"]
}'
参数说明:
| 参数名 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
| targetTableNames | List | ❌ | 目标表名列表,为空时同步所有配置的表 | ["t_mdm_part", "t_mdm_line"] |
- 初始化配置
接口说明:初始化同步配置,加载 redundant-xxx.json配置文件
GET /v1/redundant-sync/initialize
- 验证配置
接口说明:验证当前同步配置的正确性和完整性
GET /v1/redundant-sync/verify-config
3.2.3. 启动服务时自动同步一次
redundant:
sync:
enable: true
3.3. 方式一:外键关联
a. 应用场景 : 同步QMS-XX表中的客户ID/code。
b. 实现方式 : 在行业包业务表中不存储客户的具体信息(如名称、地址),只存储一个指向MBM冗余库中客户表的外键 (客户ID或code)。
c. 优点 : 数据绝对一致,没有冗余存储。当客户信息更新时,QMS系统通过关联查询总能拿到最新数据。
d. 缺点 : 查询性能有损耗(需要跨库关联),且强依赖于冗余数据库的可用性。
3.3.1. 数据存储设计 (示例)
在核心的业务表 (例如qms_xx_business_table)中, 不直接存储客户的详细具体信息。
业务表仅存储一个外键字段,该外键指向MBM(主数据管理)冗余库中的客户表(customer_table)的主键。这个外键可以是customer_id(客户唯一ID)或customer_code(客户编码)。
CREATE TABLE `work_order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '工单ID(主键)',
`order_no` varchar(50) NOT NULL COMMENT '工单编号',
`customer_id` bigint(20) NOT NULL COMMENT '客户ID(外键,关联MBM冗余库客户表)',
`customer_code` varchar(30) COMMENT '客户编码(冗余存储,便于查询)',
`product_name` varchar(100) NOT NULL COMMENT '产品名称',
`quantity` int(11) NOT NULL COMMENT '订单数量',
`priority` int(2) NOT NULL DEFAULT 1 COMMENT '优先级(1-低,2-中,3-高)',
`status` int(2) NOT NULL DEFAULT 1 COMMENT '工单状态(1-新建,2-进行中,3-完成,4-关闭)',
`plan_start_time` datetime COMMENT '计划开始时间',
`plan_end_time` datetime COMMENT '计划完成时间',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`),
KEY `idx_customer_id` (`customer_id`),
KEY `idx_customer_code` (`customer_code`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工单业务表 - 只存储客户ID,不存储客户详细信息';
3.3.2. MDM-SDK 服务开发
MDM-SDK基于数据冗余同步架构,提供统一的主数据查询服务。通过CDC实时同步和定时任务冗余更新机制,确保业务系统能够高效、一致地访问主数据。
代码仓库:http://192.168.175.208/imom/mdm/imom-mdm
步骤1:定义响应数据模型
/**
* 客户查询响应数据类
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class CustomerQueryResp {
/**
* 客户ID
*/
private Long id;
/**
* 客户编码
*/
private String customerCode;
/**
* 客户名称
*/
private String customerName;
/**
* 客户状态
*/
private Integer status;
/**
* 创建时间
*/
private LocalDateTime createTime;
}
步骤2:创建查询服务类
/**
* 客户查询服务
* @author 您的姓名
* @date 2025/08/27
*/
@Service
public class CustomerQueryService extends BaseQueryService<CustomerQueryResp> {
@Override
public String getTableName() {
return "t_mdm_customer_dme"; // 对应MBM冗余库中的客户表
}
/**
* 根据客户编码查询客户信息
*/
public CustomerQueryResp getByCustomerCode(String customerCode) {
QueryListReq req = new QueryListReq();
req.setSelectColumns(Arrays.asList("id", "customer_code", "customer_name", "status"));
QueryConditionReq condition = new QueryConditionReq();
condition.setField("customer_code");
condition.setOperator(Operator.EQ);
condition.setValue(customerCode);
req.setConditions(Arrays.asList(condition));
List<CustomerQueryResp> result = getList(req);
return CollectionUtil.isNotEmpty(result) ? result.get(0) : null;
}
/**
* 根据状态查询有效客户列表
*/
public List<CustomerQueryResp> getActiveCustomers() {
QueryListReq req = new QueryListReq();
QueryConditionReq condition = new QueryConditionReq();
condition.setField("status");
condition.setOperator(Operator.EQ);
condition.setValue(1); // 假设1表示有效状态
req.setConditions(Arrays.asList(condition));
req.setCacheable(true); // 启用缓存提高性能
return getList(req);
}
}
步骤三:引用MDM-SDK
<!-- QMS系统的pom.xml中引用主数据SDK -->
<dependency>
<groupId>com.sie.imom</groupId>
<artifactId>mdm-sdk</artifactId>
<version>3.1.0-SNAPSHOT</version>
</dependency>
查询API调用
// 注入客户查询服务(对应图片中MBM冗余库的客户表)
@Autowired
private CustomerQueryService customerQueryService;
// 示例1:查询所有客户(从MBM冗余库查询)
QueryListReq req = new QueryListReq();
List<CustomerQueryResp> allCustomers = customerQueryService.getList(req);
// 示例2:带条件查询客户信息
QueryListReq conditionReq = new QueryListReq();
conditionReq.setSelectColumns(Arrays.asList("id", "customer_code", "customer_name", "customer_gender", "phone", "status"));
// 状态条件:只查询有效客户
QueryConditionReq statusCondition = new QueryConditionReq();
statusCondition.setField("status");
statusCondition.setOperator(Operator.EQ);
statusCondition.setValue(1); // 1表示有效客户
// 客户类型条件:查询指定类型的客户
QueryConditionReq typeCondition = new QueryConditionReq();
typeCondition.setField("customer_type");
typeCondition.setOperator(Operator.IN);
typeCondition.setValue(Arrays.asList(1, 2, 3)); // 1-普通, 2-VIP, 3-战略客户
conditionReq.setConditions(Arrays.asList(statusCondition, typeCondition));
// 执行查询(通过SDK查询MBM冗余库中的客户表)
List<CustomerQueryResp> activeCustomers = customerQueryService.getList(conditionReq);
//根据客户编码查询单个客户详情
QueryListReq codeReq = new QueryListReq();
codeReq.setSelectColumns(Arrays.asList("id", "customer_code", "customer_name", "phone", "email", "address"));
QueryConditionReq codeCondition = new QueryConditionReq();
codeCondition.setField("customer_code");
codeCondition.setOperator(Operator.EQ);
codeCondition.setValue("CUST2025001");
codeReq.setConditions(Arrays.asList(codeCondition));
List<CustomerQueryResp> customerByCode = customerQueryService.getList(codeReq);
CustomerQueryResp customer = customerByCode.isEmpty() ? null : customerByCode.get(0);
// 根据客户名称模糊查询
QueryListReq nameReq = new QueryListReq();
nameReq.setSelectColumns(Arrays.asList("id", "customer_code", "customer_name", "contact_person", "phone"));
QueryConditionReq nameCondition = new QueryConditionReq();
nameCondition.setField("customer_name");
nameCondition.setOperator(Operator.LIKE);
nameCondition.setValue("%科技%"); // 查询包含"科技"的客户
nameReq.setConditions(Arrays.asList(nameCondition));
nameReq.setCacheable(true); // 启用缓存提高性能
List<CustomerQueryResp> customersByName = customerQueryService.getList(nameReq);
// 根据ID列表批量查询客户(适用于业务数据关联查询)
List<Long> customerIds = Arrays.asList(1001L, 1002L, 1003L, 1004L);
List<CustomerQueryResp> customerList = customerQueryService.getListByIds(customerIds);
// 转换为Map便于业务处理
Map<Long, CustomerQueryResp> customerMap = customerList.stream()
.collect(Collectors.toMap(CustomerQueryResp::getId, Function.identity()));
步骤四:调用代码
/**
* QMS质量管理系统中的客户信息查询服务
* 直接使用BaseQueryService提供的getById方法
*/
@Service
@RequiredArgsConstructor
public class QmsCustomerService {
// 直接注入客户查询服务
@Autowired
private CustomerQueryService customerQueryService;
/**
* 查询质量投诉单详情(包含客户信息)
* 直接使用getById方法
*/
public QmsComplaintDetailVO getComplaintDetailWithCustomer(Long complaintId) {
// 1. 从QMS业务表查询投诉基础信息
QmsComplaint complaint = qmsComplaintMapper.selectById(complaintId);
if (complaint == null) {
throw new BusinessException("投诉单不存在: " + complaintId);
}
// 2. 直接使用getById方法查询客户主数据(简化代码)
CustomerQueryResp customer = customerQueryService.getById(complaint.getCustomerId());
// 3. 组装数据返回给前端
return assembleComplaintDetail(complaint, customer);
}
/**
* 批量查询投诉列表(包含客户信息)
*/
public List<QmsComplaintDetailVO> getComplaintListWithCustomer(QueryListReq req) {
// 1. 查询业务数据
List<QmsComplaint> complaints = qmsComplaintMapper.selectList(req);
// 2. 提取客户ID,使用getListByIds批量查询
List<Long> customerIds = complaints.stream()
.map(QmsComplaint::getCustomerId)
.distinct()
.collect(Collectors.toList());
// 使用现有的getListByIds方法批量查询
List<CustomerQueryResp> customers = customerQueryService.getListByIds(customerIds);
Map<Long, CustomerQueryResp> customerMap = customers.stream()
.collect(Collectors.toMap(CustomerQueryResp::getId, Function.identity()));
// 3. 组装数据
return complaints.stream()
.map(complaint -> {
CustomerQueryResp customer = customerMap.get(complaint.getCustomerId());
return assembleComplaintDetail(complaint, customer);
})
.collect(Collectors.toList());
}
private QmsComplaintDetailVO assembleComplaintDetail(QmsComplaint complaint, CustomerQueryResp customer) {
return QmsComplaintDetailVO.builder()
.id(complaint.getId())
.complaintNo(complaint.getComplaintNo())
.complaintContent(complaint.getComplaintContent())
.complaintTime(complaint.getComplaintTime())
.status(complaint.getStatus())
// 客户信息从MBM冗余库获取
.customerId(complaint.getCustomerId())
.customerName(customer != null ? customer.getCustomerName() : "未知客户")
.customerPhone(customer != null ? customer.getPhone() : "")
.customerEmail(customer != null ? customer.getEmail() : "")
.customerAddress(customer != null ? customer.getAddress() : "")
.build();
}
}
3.4. 方式二:定时任务更新冗余字段
e. 应用场景 : 同步APS-XX表中的物料名称。
f. 实现方式 : 将物料名称冗余存储在行业包业务表中。通过一个定时任务(如每天凌晨) 批量检查并更新发生变化的物料名称。
g. 优点 : 查询性能极高(无需跨库关联),对源库压力小。
h. 缺点 : 数据一致性是弱一致性,在两次定时任务之间,业务表的数据可能不是最新的,存在延迟。
3.4.1. 配置文件
3.4.1.1. 配置文件位置与结构
文件路径: mdm-biz/src/main/resources/redundant-mdm.json
{
"consistency": {
"items": [
{
"name": "mdm-line", // 任务名称:线体数据同步
"changeTrigger": true, // 是否监听变更(为true时,可实时或定时基于变更触发)
"reader": { // 数据读取端(源:MDM冗余库)
"name": "t_mdm_line_dme", // 源表名(MDM冗余库中的线体表)
"businessKey": "lineNo", // 与业务关联键-不填配置为主键关联(选填),默认为id
"columns": [ // 需要同步的字段
"lineName",
"lineNo"
]
},
"writer": { // 数据写入端(目标:业务库)
"name": "mpm_line_test", // 目标表名(业务系统中的表)
"foreignKey": "line_code", // 目标表中外键字段
"columns": [ // 目标表中对应的冗余字段
"line_name",
"line_code"
]
}
}
]
}
}
3.4.1.2. 处理数据一致性服务接口
3.4.1.3. 验证配置服务接口
GET /v1/redundant-consistency/verify-config
3.5. 方式三:实时更新冗余字段
i. 应用场景 : 同步LES-XX表中的工艺名称。
j. 实现方式 : 同样采用冗余存储。但通过监听Debezium发出的数据变更消息 (通过Redis),实时地、在事务完成后立刻更新行业包业务表中的对应字段。
k. 优点 : 在保证高性能查询的同时,提供了准实时的数据一致性,延迟极低。
l. 缺点 : 架构更复杂,需要引入消息队列,并保证消息处理的可靠性(不丢失、不重复)。
3.5.1. 配置文件结构
在 redundant-xxx.json文件中,通过 consistency配置节实现实时冗余字段更新:
{
"consistency": {
"items": [
{
"name": "配置项唯一标识",
"changeTrigger": true,//这里是关键
"reader": {
"name": "源表名",
"businessKey": "业务主键字段",
"columns": ["字段1", "字段2"],
"datasource": "数据源名称(可选)"
},
"writer": {
"name": "目标表名",
"foreignKey": "外键字段",
"columns": ["字段1", "字段2"],
"datasource": "数据源名称(可选)"
}
}
]
}
}
3.5.2. 配置参数详解
consistency 配置项
| 参数 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
| name | String | ✅ | 配置项唯一标识 | "mdm-line" |
| changeTrigger | Boolean | ✅ | 是否启用实时变更触发 | true |
| reader | Object | ✅ | 数据读取端配置 | |
| writer | Object | ✅ | 数据写入端配置 |
reader 读取端配置
| 参数 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
| name | String | ✅ | CDC监听的源表名 | "t_mdm_line_dme" |
| businessKey | String | ✅ | 业务主键字段名 | "lineNo" |
| columns | Array | ✅ | 需要同步的字段列表 | ["lineName", "lineNo"] |
| datasource | String | ❌ | 数据源名称(默认使用主数据源) | "mbm-mdm-reserve" |
writer 写入端配置
| 参数 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
| name | String | ✅ | 更新的目标表名 | "mpm_line_test" |
| foreignKey | String | ✅ | 关联外键字段名 | "line_code" |
| columns | Array | ✅ | 目标表字段列表 | ["line_name", "line_code"] |
| datasource | String | ❌ | 数据源名称(默认使用主数据源) | "imom-redundant" |
3.5.2.1. 字段一一对应规则
核心规则:reader.columns和 writer.columns必须严格按数组索引顺序一一对应。
reader.columns[0] ←→ writer.columns[0]
reader.columns[1] ←→ writer.columns[1]
reader.columns[2] ←→ writer.columns[2]
...
示例
{
"consistency": {
"items": [
{
"name": "mdm-line",
"changeTrigger": true,
"reader": {
"name": "t_mdm_line_dme",
"businessKey": "lineNo",
"columns": ["lineName", "lineNo", "workshopName", "lineType"]
},
"writer": {
"name": "mpm_line_test",
"foreignKey": "line_code",
"columns": ["line_name", "line_code", "workshop_name", "line_type"]
}
}
]
}
}
字段映射关系:
● lineName→ line\_name
● lineNo→ line\_code
● workshopName→ workshop\_name
● lineType→ line\_type
3.6. 同步数据实时消息
3.6.1. 流程:
流程步骤说明
1. 数据库变更:业务数据库(如MySQL)中发生了数据增、删、改操作。
2. Debezium CDC:Debezium 实时捕获数据库的变更事件,并将其转换为标准的消息格式。
3. Redis 消息队列:变更事件被发布到 Redis 消息队列中,保证消息可靠性的作用。
4. 消息处理服务:订阅 Redis 消息队列的服务接收到变更事件。
5. 更新缓存:消息处理服务根据变更事件的内容(例如,某条商品信息已更新),对应用缓存(如 Redis 缓存)进行相应的更新或失效操作。
6. 扩展业务使用:保证缓存数据与数据库一致后,业务系统可以享受到缓存带来的高性能读写好处,从而扩展了业务能力。
这个流程的核心价值在于,它通过异步的方式将数据库的变更“扩散”到系统的其他部分(如缓存),非常适合您提到的更新缓存这一典型场景,能够有效提升应用的性能和可扩展性。
3.6.2. 消息处理器实现
package com.sie.imom.mdm.redundantcdc.mq;
import com.sie.imom.mdm.redundantcdc.constant.RedundantCdcConstant;
import com.sie.imom.mdm.redundantcdc.mq.dto.ChangeEventDTO;
import com.sie.imom.mdm.redundantcdc.service.RedundantCdcService;
import com.sie.mbm.mom.framework.mq.MessageHandlerInterface;
import com.sie.mbm.mom.framework.mq.annotation.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Redis CDC 变更消息处理器
* @author 梁华东
* @date 2025/10/24
* @description 实时处理Debezium发出的数据变更消息,更新冗余字段
*/
@Slf4j
@Component
@MessageListener(value = RedundantCdcConstant.CDC_CHANGE_QUEUE + "-" + "t_mdm_part_dme")
public class PartDmeChangeHandler implements MessageHandlerInterface<ChangeEventDTO> {
@Autowired
private RedundantCdcService redundantCdcService;
/**
* 处理变更消息
* @param payload 变更事件数据
*/
@Override
public void handle(ChangeEventDTO payload) {
try {
log.info("接收到部件数据变更事件: {}", payload);
// 根据变更类型执行不同的处理逻辑
switch (payload.getOperationType()) {
case INSERT:
case UPDATE:
redundantCdcService.syncPartRedundantFields(payload);
break;
case DELETE:
redundantCdcService.handlePartDelete(payload);
break;
default:
log.warn("未知的操作类型: {}", payload.getOperationType());
}
log.info("部件冗余字段同步完成: {}", payload.getPrimaryKey());
} catch (Exception e) {
log.error("处理部件变更消息失败: {}", payload, e);
throw new RuntimeException("消息处理失败", e);
}
}
/**
* 重试失败处理
* @param payload 变更事件数据
*/
@Override
public void retryFail(ChangeEventDTO payload) {
log.error("部件变更消息重试失败,需要人工干预: {}", payload);
// 可以发送告警、记录错误日志等
redundantCdcService.recordSyncFailure(payload);
}
}
变更事件DTO
/**
* 快照数据同步上下文
* 用于表示数据变更事件的上下文信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Jacksonized
public class ChangeEventDTO implements java.io.Serializable {
private static final long serialVersionUID = 1L;
// 类字段定义...
}
字段说明
| 字段名 | 类型 | 必填 | 说明 | 示例值 |
|---|---|---|---|---|
| databaseName | String | ✅ | 发生变更的数据库名称 | "mdm_production" |
| table | String | ✅ | 发生变更的表名 | "t_mdm_line_dme" |
| operation | Operation | ✅ | 数据操作类型 | Operation.UPDATE |
| before | Map | ❌ | 变更前的数据状态(UPDATE/DELETE时有效) | {"id": 1, "name": "旧名称"} |
| after | Map | ❌ | 变更后的数据状态(CREATE/UPDATE时有效) | {"id": 1, "name": "新名称"} |
| timestamp | Instant | ✅ | 变更发生的时间戳 | 2024-01-15T10:30:00Z |
总结与方案特点
| 特点 | 说明 |
|---|---|
| ✅ 分层解耦 | 通过只读库、冗余数据库进行隔离,保护了核心生产库的稳定。 |
| ✅ 技术选型 | 结合Canal和Debezium等成熟CDC工具,实现可靠的数据捕获与同步。 |
| ✅ 策略灵活 | 针对不同业务字段的实时性要求,混合使用外键、定时任务、实时推送三种策略,平衡了性能与一致性。 |
| ✅ 最终一致性 | 方案的核心是保证最终一致性,而非强一致性,这在分布式系统中是更实用和高效的选择。 |
| ⚠️ 复杂度 | 架构相对复杂,需要维护多个组件(CDC工具、消息队列、定时任务等),对运维有较高要求。 |
总而言之,这是一个非常经典和实用的数据一致性解决方案,它有效地解决了核心基础数据向多个下游业务系统分发的挑战。