梁华东
Published on 2025-11-19 / 44 Visits
0
0

行业包数据一致性使用说明

行业包数据一致性使用说明

1. 概述

本方案采用分层解耦和多种同步策略结合的方式,通过CDC(变更数据捕获)技术实现数据从生产库到业务库的流转,并根据业务字段的特性(如稳定性、实时性要求)采用不同的最终一致性策略,在保证系统性能和解耦的同时,满足数据一致性需求。

2. 架构流程

整个数据流从左至右,可分为三个主要阶段:

阶段一:生产库 -> 只读库 (读写分离)

●  源端 :  MBM生产库 

             ○ 这是业务的源头,所有基础数据的增删改查操作都在这里进行。

             ●  目标端 :  MBM只读库

阶段二:只读库 -> 冗余数据库 (数据预处理)

●  源端 :  MBM只读库 

             ●  同步技术 :  Debezium CDC同步 

             ○ Debezium是一个CDC工具,通常用于捕获数据库(如MySQL, PostgreSQL)的变更。这里它从只读库捕获变更。

             ●  目标端 :  MBM冗余数据库(部分表、字段)  

             ○ 作用:

             ■  数据过滤 : 并非同步全部数据,而是只选择行业包业务所需的部分表和字段,减少数据传输量。

             ■  格式准备 : 为下一步同步到行业包业务表做好数据准备。此库可以看作是数据同步的“中转站”或“数据湖”。

阶段三:冗余数据库 -> 行业包业务表 (最终一致性策略)

这是方案最核心的部分,针对不同类型的业务数据,采用了三种不同的同步策略,体现了方案的灵活性。

●  源端 :  MBM冗余数据库 

             ●  目标端 :  行业包业务表 

             ○ 包含QMS、APS、LES等不同业务系统的表。

3. 使用说明:

3.1. 配置说明

3.1.1. 概述

在 imom-mdm 项目的 resources目录下,您可以创建 redundant-xxx.json配置文件(例如:redundant-mdm.json)来定义数据同步规则。该配置文件用于指定源数据和目标数据表之间的同步关系。

项目根目录/
└── resources/
    └── redundant-mdm.json  # 或其他 redundant-xxx.json 文件

3.1.2. 基本结构

{
  "sync": {
    "items": [
      {
        "name": "同步任务名称",
        "syncModel": "同步模式",
        "enabled":"是否启用,默认启用"
        "source": {
          "name": "源数据表名",
          "datasource": "数据源名称(可选),默认为:mbm-mdm-reserve",
          "changeTrigger": "变更触发标志(可选),是否发送MQ消息,默认:false"
        },
        "target": {
          "name": "目标数据表名",
          "datasource": "数据源名称(可选),默认:imom-redundant"
        }
      }
    ]
  },
  "consistency": {
    "items": [
      {
        "name": "一致性同步任务名称",
        "changeTrigger": "实时变更触发标志,黓认:false",
        "reader": {
          "name": "读取表名",
          "businessKey": "主键字段(可选),默认:id",
          "columns": ["字段1", "字段2"]
        },
        "writer": {
          "name": "写入表名",
          "foreignKey": "外键字段",
          "columns": ["字段1", "字段2"]
        }
      }
    ]
  }
}

3.1.3. 配置参数说明

3.1.3.1. 数据同步配置 (sync)

sync 对象

参数 类型 必填 说明
items Array 数据同步任务配置数组

items 数组中的同步任务配置

参数 类型 必填 说明 示例
name String 同步任务的唯一标识符 "t_mdm_part"
source Object 源数据配置
target Object 目标数据配置,省略时使用默认配置与source相同
syncModel String 同步模式增量模式:incremental全量模式:full incremental
enabled Boolean 是否启用 true

source 对象参数

参数 类型 必填 说明 默认值
name String 源数据表名称 -
datasource String 数据源名称 默认数据源
changeTrigger Boolean 是否启用变更触发 false

target 对象参数

参数 类型 必填 说明 默认值
name String 目标数据表名称 与source.name相同
datasource String 目标数据源名称 默认数据源

3.1.3.2. 2. 数据一致性配置 (consistency)

consistency 对象

参数 类型 必填 说明
items Array 数据一致性检查任务配置数组

items 数组中的一致性检查任务配置

参数 类型 必填 说明 示例
name String 一致性检查任务的唯一标识符 mdm-line
changeTrigger Boolean 是否启用变更触发 true
reader Object 数据读取端配置
writer Object 数据写入端配置

reader 对象参数

参数 类型 必填 说明 示例
name String 读取表名称 "t_mdm_line_dme"
businessKey String 业务主键字段名 "lineNo"
columns Array 需要检查的字段列表 ["lineName", "lineNo"]
datasource String 数据源名称 默认数据源

writer 对象参数

参数 类型 必填 说明 示例
name String 写入表名称 "mpm_line_test"
foreignKey String 外键字段名 "line_code"
columns Array 需要检查的字段列表 ["line_name", "line_code"]
datasource String 数据源名称 默认数据源

3.2. 数据同步到冗余库

3.2.1. 概述

本文档说明如何通过 REST API 实现从华为只读库到 iMOM 冗余库的数据同步功能。该功能提供数据同步启动、配置初始化和验证等核心操作。

3.2.2. 接口

  1. 启动数据同步

接口说明:手动触发从华为只读库到 iMOM 冗余库的数据同步过程

curl -X POST "http://localhost:8080/v1/redundant-sync/start" \
  -H "Content-Type: application/json" \
  -d '{
    "targetTableNames": ["t_mdm_line", "t_mdm_part"]
  }'

参数说明:

参数名 类型 必填 说明 示例
targetTableNames List 目标表名列表,为空时同步所有配置的表 ["t_mdm_part", "t_mdm_line"]
  1. 初始化配置

接口说明:初始化同步配置,加载 redundant-xxx.json配置文件

GET /v1/redundant-sync/initialize
  1. 验证配置

接口说明:验证当前同步配置的正确性和完整性

GET /v1/redundant-sync/verify-config

3.2.3. 启动服务时自动同步一次

redundant:
  sync:
    enable: true

3.3. 方式一:外键关联

a.  应用场景 : 同步QMS-XX表中的客户ID/code。

         b.  实现方式 : 在行业包业务表中不存储客户的具体信息(如名称、地址),只存储一个指向MBM冗余库中客户表的外键 (客户ID或code)。

         c.  优点 : 数据绝对一致,没有冗余存储。当客户信息更新时,QMS系统通过关联查询总能拿到最新数据。

         d.  缺点 : 查询性能有损耗(需要跨库关联),且强依赖于冗余数据库的可用性。

3.3.1. 数据存储设计 (示例)

在核心的业务表 (例如qms_xx_business_table)中, 不直接存储客户的详细具体信息。

业务表仅存储一个外键字段,该外键指向MBM(主数据管理)冗余库中的客户表(customer_table)的主键。这个外键可以是customer_id(客户唯一ID)或customer_code(客户编码)。

CREATE TABLE `work_order` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '工单ID(主键)',
  `order_no` varchar(50) NOT NULL COMMENT '工单编号',
  `customer_id` bigint(20) NOT NULL COMMENT '客户ID(外键,关联MBM冗余库客户表)',
  `customer_code` varchar(30) COMMENT '客户编码(冗余存储,便于查询)',
  `product_name` varchar(100) NOT NULL COMMENT '产品名称',
  `quantity` int(11) NOT NULL COMMENT '订单数量',
  `priority` int(2) NOT NULL DEFAULT 1 COMMENT '优先级(1-低,2-中,3-高)',
  `status` int(2) NOT NULL DEFAULT 1 COMMENT '工单状态(1-新建,2-进行中,3-完成,4-关闭)',
  `plan_start_time` datetime COMMENT '计划开始时间',
  `plan_end_time` datetime COMMENT '计划完成时间',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_order_no` (`order_no`),
  KEY `idx_customer_id` (`customer_id`),
  KEY `idx_customer_code` (`customer_code`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工单业务表 - 只存储客户ID,不存储客户详细信息';

3.3.2. MDM-SDK 服务开发

MDM-SDK基于数据冗余同步架构,提供统一的主数据查询服务。通过CDC实时同步和定时任务冗余更新机制,确保业务系统能够高效、一致地访问主数据。

代码仓库:http://192.168.175.208/imom/mdm/imom-mdm

步骤1:定义响应数据模型

/**
 * 客户查询响应数据类
 */
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class CustomerQueryResp {
    /**
     * 客户ID
     */
    private Long id;
  
    /**
     * 客户编码
     */
    private String customerCode;
  
    /**
     * 客户名称
     */
    private String customerName;
  
    /**
     * 客户状态
     */
    private Integer status;
  
    /**
     * 创建时间
     */
    private LocalDateTime createTime;
}

步骤2:创建查询服务类

/**
 * 客户查询服务
 * @author 您的姓名
 * @date 2025/08/27
 */
@Service
public class CustomerQueryService extends BaseQueryService<CustomerQueryResp> {

    @Override
    public String getTableName() {
        return "t_mdm_customer_dme"; // 对应MBM冗余库中的客户表
    }
  
    /**
     * 根据客户编码查询客户信息
     */
    public CustomerQueryResp getByCustomerCode(String customerCode) {
        QueryListReq req = new QueryListReq();
  
        req.setSelectColumns(Arrays.asList("id", "customer_code", "customer_name", "status"));
  
        QueryConditionReq condition = new QueryConditionReq();
        condition.setField("customer_code");
        condition.setOperator(Operator.EQ);
        condition.setValue(customerCode);
  
        req.setConditions(Arrays.asList(condition));
  
        List<CustomerQueryResp> result = getList(req);
        return CollectionUtil.isNotEmpty(result) ? result.get(0) : null;
    }
  
    /**
     * 根据状态查询有效客户列表
     */
    public List<CustomerQueryResp> getActiveCustomers() {
        QueryListReq req = new QueryListReq();
  
        QueryConditionReq condition = new QueryConditionReq();
        condition.setField("status");
        condition.setOperator(Operator.EQ);
        condition.setValue(1); // 假设1表示有效状态
  
        req.setConditions(Arrays.asList(condition));
        req.setCacheable(true); // 启用缓存提高性能
  
        return getList(req);
    }
}

步骤三:引用MDM-SDK

<!-- QMS系统的pom.xml中引用主数据SDK -->
<dependency>
    <groupId>com.sie.imom</groupId>
    <artifactId>mdm-sdk</artifactId>
    <version>3.1.0-SNAPSHOT</version>
</dependency>

查询API调用

// 注入客户查询服务(对应图片中MBM冗余库的客户表)
@Autowired
private CustomerQueryService customerQueryService;

// 示例1:查询所有客户(从MBM冗余库查询)
QueryListReq req = new QueryListReq();
List<CustomerQueryResp> allCustomers = customerQueryService.getList(req);

// 示例2:带条件查询客户信息
QueryListReq conditionReq = new QueryListReq();
conditionReq.setSelectColumns(Arrays.asList("id", "customer_code", "customer_name", "customer_gender", "phone", "status"));

// 状态条件:只查询有效客户
QueryConditionReq statusCondition = new QueryConditionReq();
statusCondition.setField("status");
statusCondition.setOperator(Operator.EQ);
statusCondition.setValue(1); // 1表示有效客户

// 客户类型条件:查询指定类型的客户
QueryConditionReq typeCondition = new QueryConditionReq();
typeCondition.setField("customer_type");
typeCondition.setOperator(Operator.IN);
typeCondition.setValue(Arrays.asList(1, 2, 3)); // 1-普通, 2-VIP, 3-战略客户

conditionReq.setConditions(Arrays.asList(statusCondition, typeCondition));

// 执行查询(通过SDK查询MBM冗余库中的客户表)
List<CustomerQueryResp> activeCustomers = customerQueryService.getList(conditionReq);



//根据客户编码查询单个客户详情
QueryListReq codeReq = new QueryListReq();
codeReq.setSelectColumns(Arrays.asList("id", "customer_code", "customer_name", "phone", "email", "address"));

QueryConditionReq codeCondition = new QueryConditionReq();
codeCondition.setField("customer_code");
codeCondition.setOperator(Operator.EQ);
codeCondition.setValue("CUST2025001");

codeReq.setConditions(Arrays.asList(codeCondition));

List<CustomerQueryResp> customerByCode = customerQueryService.getList(codeReq);
CustomerQueryResp customer = customerByCode.isEmpty() ? null : customerByCode.get(0);



// 根据客户名称模糊查询
QueryListReq nameReq = new QueryListReq();
nameReq.setSelectColumns(Arrays.asList("id", "customer_code", "customer_name", "contact_person", "phone"));

QueryConditionReq nameCondition = new QueryConditionReq();
nameCondition.setField("customer_name");
nameCondition.setOperator(Operator.LIKE);
nameCondition.setValue("%科技%"); // 查询包含"科技"的客户

nameReq.setConditions(Arrays.asList(nameCondition));
nameReq.setCacheable(true); // 启用缓存提高性能

List<CustomerQueryResp> customersByName = customerQueryService.getList(nameReq);


// 根据ID列表批量查询客户(适用于业务数据关联查询)
List<Long> customerIds = Arrays.asList(1001L, 1002L, 1003L, 1004L);
List<CustomerQueryResp> customerList = customerQueryService.getListByIds(customerIds);

// 转换为Map便于业务处理
Map<Long, CustomerQueryResp> customerMap = customerList.stream()
    .collect(Collectors.toMap(CustomerQueryResp::getId, Function.identity()));

步骤四:调用代码

/** 
 * QMS质量管理系统中的客户信息查询服务
 * 直接使用BaseQueryService提供的getById方法
 */
@Service
@RequiredArgsConstructor
public class QmsCustomerService {
  
    // 直接注入客户查询服务
    @Autowired
    private CustomerQueryService customerQueryService;
  
    /**
     * 查询质量投诉单详情(包含客户信息)
     * 直接使用getById方法
     */
    public QmsComplaintDetailVO getComplaintDetailWithCustomer(Long complaintId) {
        // 1. 从QMS业务表查询投诉基础信息
        QmsComplaint complaint = qmsComplaintMapper.selectById(complaintId);
        if (complaint == null) {
            throw new BusinessException("投诉单不存在: " + complaintId);
        }
  
        // 2. 直接使用getById方法查询客户主数据(简化代码)
        CustomerQueryResp customer = customerQueryService.getById(complaint.getCustomerId());
  
        // 3. 组装数据返回给前端
        return assembleComplaintDetail(complaint, customer);
    }
  
    /**
     * 批量查询投诉列表(包含客户信息)
     */
    public List<QmsComplaintDetailVO> getComplaintListWithCustomer(QueryListReq req) {
        // 1. 查询业务数据
        List<QmsComplaint> complaints = qmsComplaintMapper.selectList(req);
  
        // 2. 提取客户ID,使用getListByIds批量查询
        List<Long> customerIds = complaints.stream()
                .map(QmsComplaint::getCustomerId)
                .distinct()
                .collect(Collectors.toList());
  
        // 使用现有的getListByIds方法批量查询
        List<CustomerQueryResp> customers = customerQueryService.getListByIds(customerIds);
        Map<Long, CustomerQueryResp> customerMap = customers.stream()
                .collect(Collectors.toMap(CustomerQueryResp::getId, Function.identity()));
  
        // 3. 组装数据
        return complaints.stream()
                .map(complaint -> {
                    CustomerQueryResp customer = customerMap.get(complaint.getCustomerId());
                    return assembleComplaintDetail(complaint, customer);
                })
                .collect(Collectors.toList());
    }
  
    private QmsComplaintDetailVO assembleComplaintDetail(QmsComplaint complaint, CustomerQueryResp customer) {
        return QmsComplaintDetailVO.builder()
                .id(complaint.getId())
                .complaintNo(complaint.getComplaintNo())
                .complaintContent(complaint.getComplaintContent())
                .complaintTime(complaint.getComplaintTime())
                .status(complaint.getStatus())
                // 客户信息从MBM冗余库获取
                .customerId(complaint.getCustomerId())
                .customerName(customer != null ? customer.getCustomerName() : "未知客户")
                .customerPhone(customer != null ? customer.getPhone() : "")
                .customerEmail(customer != null ? customer.getEmail() : "")
                .customerAddress(customer != null ? customer.getAddress() : "")
                .build();
    }
}

3.4. 方式二:定时任务更新冗余字段

e.  应用场景 : 同步APS-XX表中的物料名称。

         f.  实现方式 : 将物料名称冗余存储在行业包业务表中。通过一个定时任务(如每天凌晨)   批量检查并更新发生变化的物料名称。

         g.  优点 : 查询性能极高(无需跨库关联),对源库压力小。

         h.  缺点 : 数据一致性是弱一致性,在两次定时任务之间,业务表的数据可能不是最新的,存在延迟。

3.4.1. 配置文件

3.4.1.1. 配置文件位置与结构

文件路径: mdm-biz/src/main/resources/redundant-mdm.json

{
  "consistency": {
    "items": [
      {
        "name": "mdm-line",           // 任务名称:线体数据同步
        "changeTrigger": true,        // 是否监听变更(为true时,可实时或定时基于变更触发)
        "reader": {                   // 数据读取端(源:MDM冗余库)
          "name": "t_mdm_line_dme",   // 源表名(MDM冗余库中的线体表)
          "businessKey": "lineNo",   // 与业务关联键-不填配置为主键关联(选填),默认为id
          "columns": [                // 需要同步的字段
            "lineName", 
            "lineNo"
          ]
        },
        "writer": {                   // 数据写入端(目标:业务库)
          "name": "mpm_line_test",    // 目标表名(业务系统中的表)
          "foreignKey": "line_code",  // 目标表中外键字段
          "columns": [                // 目标表中对应的冗余字段
            "line_name", 
            "line_code"
          ]
        }
      }
    ]
  }
}

3.4.1.2. 处理数据一致性服务接口

3.4.1.3. 验证配置服务接口

GET /v1/redundant-consistency/verify-config

3.5. 方式三:实时更新冗余字段

i.  应用场景 : 同步LES-XX表中的工艺名称。

         j.  实现方式 : 同样采用冗余存储。但通过监听Debezium发出的数据变更消息 (通过Redis),实时地、在事务完成后立刻更新行业包业务表中的对应字段。

         k.  优点 : 在保证高性能查询的同时,提供了准实时的数据一致性,延迟极低。

         l.  缺点 : 架构更复杂,需要引入消息队列,并保证消息处理的可靠性(不丢失、不重复)。

3.5.1. 配置文件结构

在 redundant-xxx.json文件中,通过 consistency配置节实现实时冗余字段更新:

{
  "consistency": {
    "items": [
      {
        "name": "配置项唯一标识",
        "changeTrigger": true,//这里是关键
        "reader": {
          "name": "源表名",
          "businessKey": "业务主键字段",
          "columns": ["字段1", "字段2"],
          "datasource": "数据源名称(可选)"
        },
        "writer": {
          "name": "目标表名", 
          "foreignKey": "外键字段",
          "columns": ["字段1", "字段2"],
          "datasource": "数据源名称(可选)"
        }
      }
    ]
  }
}

3.5.2. 配置参数详解

consistency 配置项

参数 类型 必填 说明 示例
name String 配置项唯一标识 "mdm-line"
changeTrigger Boolean 是否启用实时变更触发 true
reader Object 数据读取端配置
writer Object 数据写入端配置

reader 读取端配置

参数 类型 必填 说明 示例
name String CDC监听的源表名 "t_mdm_line_dme"
businessKey String 业务主键字段名 "lineNo"
columns Array 需要同步的字段列表 ["lineName", "lineNo"]
datasource String 数据源名称(默认使用主数据源) "mbm-mdm-reserve"

writer 写入端配置

参数 类型 必填 说明 示例
name String 更新的目标表名 "mpm_line_test"
foreignKey String 关联外键字段名 "line_code"
columns Array 目标表字段列表 ["line_name", "line_code"]
datasource String 数据源名称(默认使用主数据源) "imom-redundant"

3.5.2.1. 字段一一对应规则

核心规则:reader.columns和 writer.columns必须严格按数组索引顺序一一对应。

reader.columns[0]  ←→  writer.columns[0]
reader.columns[1]  ←→  writer.columns[1]  
reader.columns[2]  ←→  writer.columns[2]
...

示例

{
  "consistency": {
    "items": [
      {
        "name": "mdm-line",
        "changeTrigger": true,
        "reader": {
          "name": "t_mdm_line_dme",
          "businessKey": "lineNo",
          "columns": ["lineName", "lineNo", "workshopName", "lineType"]
        },
        "writer": {
          "name": "mpm_line_test", 
          "foreignKey": "line_code",
          "columns": ["line_name", "line_code", "workshop_name", "line_type"]
        }
      }
    ]
  }
}

字段映射关系:

● lineName→ line\_name

             ● lineNo→ line\_code

             ● workshopName→ workshop\_name

             ● lineType→ line\_type

3.6. 同步数据实时消息

3.6.1. 流程:

流程步骤说明

1. 数据库变更:业务数据库(如MySQL)中发生了数据增、删、改操作。

         2. Debezium CDC:Debezium 实时捕获数据库的变更事件,并将其转换为标准的消息格式。

         3. Redis 消息队列:变更事件被发布到 Redis 消息队列中,保证消息可靠性的作用。

         4. 消息处理服务:订阅 Redis 消息队列的服务接收到变更事件。

         5. 更新缓存:消息处理服务根据变更事件的内容(例如,某条商品信息已更新),对应用缓存(如 Redis 缓存)进行相应的更新或失效操作。

         6. 扩展业务使用:保证缓存数据与数据库一致后,业务系统可以享受到缓存带来的高性能读写好处,从而扩展了业务能力。

这个流程的核心价值在于,它通过异步的方式将数据库的变更“扩散”到系统的其他部分(如缓存),非常适合您提到的更新缓存这一典型场景,能够有效提升应用的性能和可扩展性。

3.6.2. 消息处理器实现

package com.sie.imom.mdm.redundantcdc.mq;

import com.sie.imom.mdm.redundantcdc.constant.RedundantCdcConstant;
import com.sie.imom.mdm.redundantcdc.mq.dto.ChangeEventDTO;
import com.sie.imom.mdm.redundantcdc.service.RedundantCdcService;
import com.sie.mbm.mom.framework.mq.MessageHandlerInterface;
import com.sie.mbm.mom.framework.mq.annotation.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Redis CDC 变更消息处理器
 * @author 梁华东
 * @date 2025/10/24
 * @description 实时处理Debezium发出的数据变更消息,更新冗余字段
 */
@Slf4j
@Component
@MessageListener(value = RedundantCdcConstant.CDC_CHANGE_QUEUE + "-" + "t_mdm_part_dme")
public class PartDmeChangeHandler implements MessageHandlerInterface<ChangeEventDTO> {

    @Autowired
    private RedundantCdcService redundantCdcService;

    /**
     * 处理变更消息
     * @param payload 变更事件数据
     */
    @Override
    public void handle(ChangeEventDTO payload) {
        try {
            log.info("接收到部件数据变更事件: {}", payload);
  
            // 根据变更类型执行不同的处理逻辑
            switch (payload.getOperationType()) {
                case INSERT:
                case UPDATE:
                    redundantCdcService.syncPartRedundantFields(payload);
                    break;
                case DELETE:
                    redundantCdcService.handlePartDelete(payload);
                    break;
                default:
                    log.warn("未知的操作类型: {}", payload.getOperationType());
            }
  
            log.info("部件冗余字段同步完成: {}", payload.getPrimaryKey());
        } catch (Exception e) {
            log.error("处理部件变更消息失败: {}", payload, e);
            throw new RuntimeException("消息处理失败", e);
        }
    }

    /**
     * 重试失败处理
     * @param payload 变更事件数据
     */
    @Override
    public void retryFail(ChangeEventDTO payload) {
        log.error("部件变更消息重试失败,需要人工干预: {}", payload);
        // 可以发送告警、记录错误日志等
        redundantCdcService.recordSyncFailure(payload);
    }
}

变更事件DTO

/**
 * 快照数据同步上下文
 * 用于表示数据变更事件的上下文信息
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Jacksonized
public class ChangeEventDTO implements java.io.Serializable {
    private static final long serialVersionUID = 1L;
  
    // 类字段定义...
}

字段说明

字段名 类型 必填 说明 示例值
databaseName String 发生变更的数据库名称 "mdm_production"
table String 发生变更的表名 "t_mdm_line_dme"
operation Operation 数据操作类型 Operation.UPDATE
before Map 变更前的数据状态(UPDATE/DELETE时有效) {"id": 1, "name": "旧名称"}
after Map 变更后的数据状态(CREATE/UPDATE时有效) {"id": 1, "name": "新名称"}
timestamp Instant 变更发生的时间戳 2024-01-15T10:30:00Z

总结与方案特点

特点 说明
✅ 分层解耦 通过只读库、冗余数据库进行隔离,保护了核心生产库的稳定。
✅ 技术选型 结合Canal和Debezium等成熟CDC工具,实现可靠的数据捕获与同步。
✅ 策略灵活 针对不同业务字段的实时性要求,混合使用外键、定时任务、实时推送三种策略,平衡了性能与一致性。
✅ 最终一致性 方案的核心是保证最终一致性,而非强一致性,这在分布式系统中是更实用和高效的选择。
⚠️ 复杂度 架构相对复杂,需要维护多个组件(CDC工具、消息队列、定时任务等),对运维有较高要求。

总而言之,这是一个非常经典和实用的数据一致性解决方案,它有效地解决了核心基础数据向多个下游业务系统分发的挑战。

附:培训视频


Comment