找回密码
 立即注册
首页 业界区 业界 MySQL同步ES的6种方案!

MySQL同步ES的6种方案!

二艰糖 2025-6-2 23:09:46
引言

在分布式架构中,MySQL与Elasticsearch(ES)的协同已成为解决高并发查询与复杂检索的标配组合。
然而,如何实现两者间的高效数据同步,是架构设计中绕不开的难题。
这篇文章跟大家一起聊聊MySQL同步ES的6种主流方案,结合代码示例与场景案例,帮助开发者避开常见陷阱,做出最优技术选型。
方案一:同步双写

场景:适用于对数据实时性要求极高,且业务逻辑简单的场景,如金融交易记录同步。
在业务代码中同时写入MySQL与ES。
代码如下:
  1. @Transactional  
  2. public void createOrder(Order order) {  
  3.     // 写入MySQL  
  4.     orderMapper.insert(order);  
  5.     // 同步写入ES  
  6.     IndexRequest request = new IndexRequest("orders")  
  7.         .id(order.getId())  
  8.         .source(JSON.toJSONString(order), XContentType.JSON);  
  9.     client.index(request, RequestOptions.DEFAULT);  
  10. }
复制代码
痛点

  • 硬编码侵入:所有涉及写操作的地方均需添加ES写入逻辑。
  • 性能瓶颈:双写操作导致事务时间延长,TPS下降30%以上。
  • 数据一致性风险:若ES写入失败,需引入补偿机制(如本地事务表+定时重试)。
方案二:异步双写

场景:电商订单状态更新后需同步至ES供客服系统检索。
我们可以使用MQ进行解耦。
架构图如下
1.webp

代码示例如下
  1. // 生产者端  
  2. public void updateProduct(Product product) {  
  3.     productMapper.update(product);  
  4.     kafkaTemplate.send("product-update", product.getId());  
  5. }  
  6. // 消费者端  
  7. @KafkaListener(topics = "product-update")  
  8. public void syncToEs(String productId) {  
  9.     Product product = productMapper.selectById(productId);  
  10.     esClient.index(product);  
  11. }
复制代码
优势

  • 吞吐量提升:通过MQ削峰填谷,可承载万级QPS。
  • 故障隔离:ES宕机不影响主业务链路。
缺陷

  • 消息堆积:突发流量可能导致消费延迟(需监控Lag值)。
  • 顺序性问题:需通过分区键保证同一数据的顺序消费。
方案三:Logstash定时拉取

场景:用户行为日志的T+1分析场景。
该方案低侵入但高延迟。
配置示例如下
  1. input {  
  2.   jdbc {  
  3.     jdbc_driver => "com.mysql.jdbc.Driver"  
  4.     jdbc_url => "jdbc:mysql://localhost:3306/log_db"  
  5.     schedule => "*/5 * * * *"  # 每5分钟执行  
  6.     statement => "SELECT * FROM user_log WHERE update_time > :sql_last_value"  
  7.   }  
  8. }  
  9. output {  
  10.   elasticsearch {  
  11.     hosts => ["es-host:9200"]  
  12.     index => "user_logs"  
  13.   }  
  14. }
复制代码
适用性分析

  • 优点:零代码改造,适合历史数据迁移。
  • 致命伤

    • 分钟级延迟(无法满足实时搜索)
    • 全表扫描压力大(需优化增量字段索引)

方案四:Canal监听Binlog

场景:社交平台动态实时搜索(如微博热搜更新)。
技术栈:Canal + RocketMQ + ES
该方案高实时,并且低侵入。
架构流程如下
2.webp

关键配置
  1. # canal.properties  
  2. canal.instance.master.address=127.0.0.1:3306  
  3. canal.mq.topic=canal.es.sync
复制代码
避坑指南

  • 数据漂移:需处理DDL变更(通过Schema Registry管理映射)。
  • 幂等消费:通过_id唯一键避免重复写入。
方案五:DataX批量同步

场景:将历史订单数据从分库分表MySQL迁移至ES。
该方案是大数据迁移的首选。
配置文件如下
  1. {  
  2.   "job": {  
  3.     "content": [{  
  4.       "reader": {  
  5.         "name": "mysqlreader",  
  6.         "parameter": { "splitPk": "id", "querySql": "SELECT * FROM orders" }  
  7.       },  
  8.       "writer": {  
  9.         "name": "elasticsearchwriter",  
  10.         "parameter": { "endpoint": "http://es-host:9200", "index": "orders" }  
  11.       }  
  12.     }]  
  13.   }  
  14. }
复制代码
性能调优

  • 调整channel数提升并发(建议与分片数对齐)
  • 启用limit分批查询避免OOM
方案六:Flink流处理

场景:商品价格变更时,需关联用户画像计算实时推荐评分。
该方案适合于复杂的ETL场景。
代码片段如下
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  2. env.addSource(new CanalSource())  
  3.    .map(record -> parseToPriceEvent(record))  
  4.    .keyBy(event -> event.getProductId())  
  5.    .connect(userProfileBroadcastStream)  
  6.    .process(new PriceRecommendationProcess())  
  7.    .addSink(new ElasticsearchSink());
复制代码
优势

  • 状态管理:精准处理乱序事件(Watermark机制)
  • 维表关联:通过Broadcast State实现实时画像关联
总结:

对于文章上面给出的这6种技术方案,我们在实际工作中,该如何做选型呢?
下面用一张表格做对比:
方案实时性侵入性复杂度适用阶段同步双写秒级高低小型单体项目MQ异步秒级中中中型分布式系统Logstash分钟级无低离线分析Canal毫秒级无高高并发生产环境DataX小时级无中历史数据迁移Flink毫秒级低极高实时数仓苏三的建议

  • 若团队无运维中间件能力 → 选择Logstash或同步双写
  • 需秒级延迟且允许改造 → MQ异步 + 本地事务表
  • 追求极致实时且资源充足 → Canal + Flink双保险
最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力。
求一键三连:点赞、转发、在看。
关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的50万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer。
本文收录于我的技术网站:http://www.susan.net.cn

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册