当前位置: 首页 > news >正文

定制网站建设加盟代理免费发布推广信息网站

定制网站建设加盟代理,免费发布推广信息网站,wordpress 插件冲突,惠州做公司网站1.引言 业务程序经常会通过各式各样的缓存来提升用户的访问速度。 由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。 如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。…

1.引言

业务程序经常会通过各式各样的缓存来提升用户的访问速度。

由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。

如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。

此时,可以通过其他业务部门暴露数据变更通知来感知到数据变化,从而保证数据的更新及时性。

TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。

因此,可以通过 TiCDC 将数据变更通知暴露给业务程序,来让业务程序做及时的对应处理逻辑。

本文以一张用户表的数据变更为例,来展示 Java 服务端接收一条 TiCDC Canal-JSON 的消息变更,解析数据,并转发给对应的业务处理程序的流程。

之前写类似的程序时,网上搜索到的案例还是比较少的,本文仅抛砖引玉,欢迎各位大佬批评指正!

2. 代码思路

(1)通过 kafka 消息获取 CDC 消息

(2)解析 CDC 消息,判断其数据变更类型,执行对应的处理逻辑

3. 代码实现

3.1 代码结构

cdc-demo
└─ src└─ main└─ java└─ com.example.demo├─ constants│    └─ CdcConstants.java├─ dto│    ├─ CdcMessage.java│    └─ User.java├─ job│    ├─ CdcJob.java│    └─ UserCdcJob.java└─ service├─ impl│   └─ UserServiceImpl.java└─ CdcService.java

3.2 CDC 常量类

public class CdcConstants {
​public enum MessageType {/*** 插入操作*/INSERT,
​/*** 更新操作*/UPDATE,
​/*** 删除操作*/DELETE;}
}

3.3 实体类

3.3.1 用户实体类

@Getter
@Setter
public class User {
​/*** 用户id*/private Long id;
​/*** 用户名*/private String name;
​/*** 年龄*/private Integer age;
}

3.3.2 CDC 消息实体类

@Getter
@Setter
public class CdcMessage<T> {
​/*** 数据集合*/private List<T> data;
​/*** 数据库名称*/private String database;
​/*** 是否为DDL语句isDdl*/private boolean isDdl;
​/*** 表结构的类型字段(值为字段类型,如varchar)*/private T mysqlType;
​/*** UPDATE类型下的旧数据(未变更字段无数据)*/private List<T> oldData;
​/*** sql语句*/private String sql;
​/*** 值为int类型*/private T sqlType;
​/*** 数据表名*/private String table;
​/*** 新增(INSERT)、更新(UPDATE)、删除(DELETE)、删除表(ERASE)等*/private String type;
}

3.4 任务类

3.4.1 CDC 任务基类

@Slf4j
public class CdcJob<T> {
​protected CdcService<T> cdcService;
​/*** 处理消息** @param record 消息记录* @param ack    消息处理标识*/public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {String recordString = String.format("topic:%s,partition:%s,offset:%s,value:%s",record.topic(),record.partition(),record.offset(),record.value());log.info("数据更新开始处理,消息:" + recordString);try {boolean processResult = process(record);String processResultString = processResult ? "成功" : "无更新";log.info("数据更新处理结束,处理结果:" + processResultString);} catch (Exception e) {log.error("数据更新报错,", e);} finally {// 手动提交偏移量ack.acknowledge();}}
​/*** 处理数据** @param record kafka消费记录* @return 处理结果*/public boolean process(ConsumerRecord<String, String> record) {String bizName = this.getClass().getSimpleName();// 服务为初始化报错if (null == cdcService) {throw new IllegalStateException("服务未初始化");}
​// 解析消息CdcMessage<T> cdcMessage = JSON.parseObject(record.value(), new TypeReference<CdcMessage<T>>() {});
​// 跳过DDLif (cdcMessage.isDdl()) {log.info(bizName, "DDL变更,无需处理");return false;}// 处理结果初始化boolean result = false;// 服务层处理数据List<T> dataList = cdcMessage.getData();if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {result = cdcService.insert(dataList);} else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {result = cdcService.update(cdcMessage.getOldData(), dataList);} else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {result = cdcService.delete(dataList);} else {log.warn(bizName, "不处理该消息,消息类型:" + cdcMessage.getType());}return result;}
}

3.4.2 用户表 CDC 消费任务类

@Component
public class UserCdcJob extends CdcJob<User> {
​public UserCdcJob(UserServiceImpl userService) {this.cdcService = userService;}
​/*** 消费CDC消息,并进行处理** @param record 消息记录* @param ack    消息处理标识*/@KafkaListener(id = "UserCdcJob", groupId = "${user-cdc.group}",topics = {"${user-cdc.topic}"}, containerFactory = "cdcKafkaListenerFactory")public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {handleMessage(record, ack);}
}

3.5 服务类

3.5.1 CDC 消息处理服务接口

public interface CdcService<T> {
​/*** 插入数据** @param data 数据* @return 插入结果*/boolean insert(List<T> data);
​/*** 更新数据** @param oldData 更新前数据* @param newData 更新后数据* @return 更新结果*/boolean update(List<T> oldData, List<T> newData);
​/*** 删除数据** @param data 数据* @return 删除数据*/boolean delete(List<T> data);
}

3.5.2 用户服务实现类

@Service
public class UserServiceImpl implements CdcService<User> {
​@Overridepublic boolean insert(List<User> data) {// TODOreturn false;}
​@Overridepublic boolean update(List<User> oldData, List<User> newData) {// TODOreturn false;}
​@Overridepublic boolean delete(List<User> data) {// TODOreturn false;}
}

4.参考文档

TiCDC 简介:https://docs.pingcap.com/zh/tidb/stable/ticdc-overview

TiCDC Canal-JSON 协议:https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json

http://www.mnyf.cn/news/46471.html

相关文章:

  • 网站更换空间教程sem竞价托管公司
  • 东莞建站方案手机百度账号申请注册
  • 网站用哪些系统做的好处关键词优化怎么做
  • 网站绩效营销百度上广告怎么搞上去的
  • 网站建设制作要学什么软件线上推广员是做什么的
  • 彩票黑网站是怎么做的网络推广公司如何做
  • 福建厦门网站建设公司宁波优化推广找哪家
  • 外贸销售管理制度南昌seo实用技巧
  • 外贸英文网站制作怎么联系地推公司
  • 写字楼租赁seo关键词选取工具
  • 吕梁网站设计引流推广犯法吗
  • 做网站要钱吗站长工具在线免费
  • 奉新网站制作广告传媒公司
  • 做渠道的网站有哪些方面安装百度到桌面
  • dw做的网站如何使用如何发布一个网站
  • 钓鱼网站在线生成免费精准客源
  • 上海建站费用百度搜索关键词排名查询
  • 做俄语网站建设百度的营销方式有哪些
  • 做网站公司无锡搜索引擎优化文献
  • 跨境电商自建站是什么16888精品货源入口
  • 平面设计素材网站排名91永久免费海外地域网名
  • 企业准备做网站的准备工作百度推广投诉热线
  • 网站建设发票税率是多少今日要闻10条
  • 公司网站备案后在百度上多长时间可以搜索到中国企业网络营销现状
  • 免费做网站缅甸新闻最新消息
  • 福州做网站建设百度手机下载安装
  • 网页制作语言百度搜索关键词排名优化推广
  • 专门做试题的网站网站关键词怎么添加
  • 包头网站建设多少钱成都全网推广哪家专业
  • 培训机构做网站宣传淘宝关键词搜索