V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
jiobanma
V2EX  ›  程序员

mysql 数据同步 elasticsearch 方案

  •  
  •   jiobanma ·
    banmajio · 2023-03-08 09:24:18 +08:00 · 6758 次点击
    这是一个创建于 631 天前的主题,其中的信息可能已经有所发展或是发生改变。

    目前项目想把 mysql 中的数据同步到 elasticsearch 中,接触了几种方案,想咨询一下大家工作中都使用什么方案。

    1. mysql 中的全量数据使用 logstash 导入 es ,然后增量的数据通过 canal 监听 binlog 转发到消息队列,然后数据服务监听详细队列,维护 es 中的数据。 [优点] :代码侵入小,轻量级使用。 [缺点] :a. logstash 导入大量数据效率低,每次任务最多只能同步 100000 条,并且最少要 1 分钟执行一次。b. logstash 先全量,canal 再接着增量,两个时间点可能会出现数据一致性问题。c. 貌似 canal 的坑不少,并且 github 好像也不怎么维护了。

    2. flink cdc 同步 mysql 到 elasticsearch 。看了一下 demo 的演示,感觉挺强大的,完全不用写代码,基本上通过 flink sql 就可以搞定。但是尝试按照官方的教程在本地模拟一下,不知道是不是目前还不支持 es8 ,导致我的数据刚开始可以同步到 es ,但是之后的 CUD 操作都无法同步到 es ,flink 的 webui 也有错误信息。在官方的 github 上咨询了此事,目前还没有回复 https://github.com/ververica/flink-cdc-connectors/discussions/1968 。 [缺点] :网上的资料较少。

    大家工作中都使用什么方案呢

    68 条回复    2024-04-28 18:43:43 +08:00
    voidmnwzp
        1
    voidmnwzp  
       2023-03-08 09:29:41 +08:00
    不要求实时性就根据 updatetime 每天跑个定时任务刷一次,要求强一致性就用 flink cdc
    GunsRose
        2
    GunsRose  
       2023-03-08 09:31:34 +08:00
    肯定是 flink-cdc 好用,不过这玩意搞起来也有很多的坑
    jiobanma
        3
    jiobanma  
    OP
       2023-03-08 09:34:58 +08:00
    @GunsRose 能举个例子吗大佬
    jiobanma
        4
    jiobanma  
    OP
       2023-03-08 09:35:19 +08:00
    @voidmnwzp 还是要求强一致性的
    pubby
        5
    pubby  
       2023-03-08 09:36:19 +08:00 via iPhone
    现在用的方案是
    记录 binlog 位置后先全量导入
    然后从那个位置开始 canal 同步数据
    NoString
        6
    NoString  
       2023-03-08 09:39:40 +08:00
    阿里云全家桶之 DTS...
    577322753
        7
    577322753  
       2023-03-08 09:39:54 +08:00
    如果是云服务上的 db 的话,es 索引与表是一对一的话可以直接用 DTS 数据同步来做,表与 es 索引不是一对一的话,可以用 DTS 数据订阅,将 binlog 投递到 kafka 中,自己订阅 kafka 消息解析 binlog ,写到 es 中。
    我们目前是用 DTS 数据订阅作为生产端 + 基于 canal Client 改造的客户端实现的 es 同步
    577322753
        8
    577322753  
       2023-03-08 09:41:36 +08:00
    有些 DTS 工具也是支持多表写入到一个索引里的,京东云上的数据同步就是这样的,跟他们沟通过,底层也是用的 flink 那一套
    j1132888093
        9
    j1132888093  
       2023-03-08 09:42:51 +08:00
    我们公司操作数据库的时候同步操作 es........
    likeme
        10
    likeme  
       2023-03-08 09:44:25 +08:00
    @j1132888093 我也是这么做的,难道不行?(自我怀疑...)
    binge921
        11
    binge921  
       2023-03-08 09:47:18 +08:00
    别用 canal 了 我上次用 不小心把生产的 cpu 占用干满了 哭死
    jiobanma
        12
    jiobanma  
    OP
       2023-03-08 09:49:53 +08:00
    @j1132888093
    @likeme 这样是可以的,但是涉及到一个历史数据的导入,还是比较麻烦的,当然可以写一个洗数据的接口,但是效率肯定很低。然后要在业务代码里在同步操作 es 还得梳理旧的代码和接口,太费人了。
    gitxuzan
        13
    gitxuzan  
       2023-03-08 09:51:27 +08:00
    cancal 提供了第三方中间件语言,对接到自己语言里面,监听解析 binlog ,然后实时批量同步
    hhjswf
        14
    hhjswf  
       2023-03-08 09:54:37 +08:00
    @likeme 入侵性太强了
    godleon
        15
    godleon  
       2023-03-08 09:59:54 +08:00
    bboss
    matrix1010
        16
    matrix1010  
       2023-03-08 10:14:28 +08:00
    可以参考 elastic 官方的这篇文章,简单来说就是通过时间戳增量批量刷新: https://www.elastic.co/cn/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash 。 但其实可以完全不用 Logstash, 自己写个 cron 脚本按相同的逻辑更新
    vagusss
        17
    vagusss  
       2023-03-08 10:14:33 +08:00
    maxwell 好像坑比 canal 少一点
    Morriaty
        18
    Morriaty  
       2023-03-08 10:17:54 +08:00
    你是 online service 还只是内部搜索需求?如果是 online service ,同步工具倒是其次,这玩意最恶心的是,你要设计好一个机制,就是每次由于字段变更、分词词库更新等问题需要重新全量索引( full index )时,你的实时索引( realtime index )不能停,因为线上还在服务,用户要一直能看到最新的。可以看看 https://engineering.carsguide.com.au/elasticsearch-zero-downtime-reindexing-e3a53000f0ac
    SoulSleep
        19
    SoulSleep  
       2023-03-08 10:20:32 +08:00
    我们用的 canal ,你说的这俩方案我们曾经都试过....
    logstash...慢,不灵活
    flink cdc 还不错,不需要写代码...但是...有上手难度(对我们团队来说

    canal ,我们现在线上运行了几十个实例,有 1-1 ,N-1 ,多种场景,性能,没太多资源,大概做到了 5w/s 的速度,完全满足我们的需要
    我们的链路:
    1-1:mysql--canal--adapter--es

    n-1: mysql--canal--处理程序--es
    terranboy
        20
    terranboy  
       2023-03-08 10:20:38 +08:00
    现在的 ORM 的 EVENT 都很完善的 在业务里面同步了
    SoulSleep
        21
    SoulSleep  
       2023-03-08 10:22:45 +08:00   ❤️ 1
    @Morriaty #18 对于你这个补充一下
    我们在 es 里只存搜索条件+数据库主键,会有一个单独的服务用搜索条件得到各个表的数据库主键,然后在程序里聚合,查询效果对比:
    1.关系型数据库,多表 join ,几亿数据,大概要 5~10 秒
    2.现有方案,es+单表聚合,几亿数据,大概 0.x~2 秒
    weofuh
        22
    weofuh  
       2023-03-08 10:30:57 +08:00
    可不可以历史数据用 datax ,增量数据用 canal 呢?
    Aresxue
        23
    Aresxue  
       2023-03-08 10:33:40 +08:00
    flink 没玩过,但是 canal 绝对是个坑,一个是 cpu 老是莫名其妙跑满,还有对主从模式很多地方都没有支持非常坑。
    barbery
        24
    barbery  
       2023-03-08 10:41:11 +08:00
    我们也是用 canal 同步,暂时没发现有什么坑,可能是我们的数据量不大😂
    jiobanma
        25
    jiobanma  
    OP
       2023-03-08 10:45:42 +08:00
    @SoulSleep 感谢大佬
    Seulgi
        26
    Seulgi  
       2023-03-08 10:51:25 +08:00
    dts 贼贵啊。
    hotcool100
        27
    hotcool100  
       2023-03-08 10:55:42 +08:00
    flink cdc 太吃内存和资源了,用 canal 好多了
    changdy
        28
    changdy  
       2023-03-08 10:57:40 +08:00
    2333 看来只有我一个人用的是 debezium 吗?
    CRUD
        29
    CRUD  
       2023-03-08 11:03:03 +08:00
    我们是用 canal ,mysql -> canal -> rabbitmq -> 处理服务 -> es
    jiobanma
        30
    jiobanma  
    OP
       2023-03-08 11:04:10 +08:00
    @SoulSleep 看您的描述,整个链路都是用 canal 实现的,那历史数据方便怎么同步到 es 呢?我了解的是 canal 对全量数据的同步支持不是很好
    brader
        31
    brader  
       2023-03-08 11:22:48 +08:00
    可以了解下这个项目 https://github.com/brokercap/Bifrost
    用 go 写的,也很节省服务器资源
    potatowish
        32
    potatowish  
       2023-03-08 11:27:01 +08:00   ❤️ 1
    debezium 支持全量和增量同步,flink cdc 也是基于它
    FawkesV
        33
    FawkesV  
       2023-03-08 11:28:05 +08:00
    学习了
    Desdemor
        34
    Desdemor  
       2023-03-08 11:31:14 +08:00
    canal -> kafka -> 处理服务 -> es
    awalkingman
        35
    awalkingman  
       2023-03-08 12:25:39 +08:00
    @j1132888093
    @likeme
    那要是写 mysql 成功了,写 es 失败了,,,数据的一致性咋个维护
    v2e0xAdmin2
        36
    v2e0xAdmin2  
       2023-03-08 13:44:26 +08:00
    @jiobanma https://github.com/colosobo/drc

    我们公司用的这个项目,可以全量同步,也可以增量同步。写到 kafka 或者 mq 里,然后 flink 消费写到 es 里。
    v2e0xAdmin2
        37
    v2e0xAdmin2  
       2023-03-08 13:47:15 +08:00
    @SoulSleep canal 运维太麻烦了。
    QunLeLZ
        38
    QunLeLZ  
       2023-03-08 14:03:01 +08:00
    我们现在使用的是从 pg 同步数据到 es ,中间还要做一定逻辑处理,还是比较方便的,直接写 SQL 就行了。不过我们用的是阿里的 Flink 服务,自建 Flink 的话会稍微麻烦点,但我理解应该问题也不会太大。
    wmwgijol28
        39
    wmwgijol28  
       2023-03-08 14:03:41 +08:00
    CloudCanal 社区版
    wxw752
        40
    wxw752  
       2023-03-08 14:04:19 +08:00
    flink-cdc
    j1132888093
        41
    j1132888093  
       2023-03-08 14:05:50 +08:00
    @newskillsget 先写 mysql ,再写 es ,失败就回滚 mysql 。如果是写完 es 再抛出异常的话,就要手动回滚 es 了,但是我们公司这种场景目前很少,还能 cover 住
    potatowish
        42
    potatowish  
       2023-03-08 14:14:14 +08:00
    @changdy 第二个
    fengpan567
        43
    fengpan567  
       2023-03-08 14:17:38 +08:00
    canal 监听 binlog ,kafka 消费写入 es
    noparking188
        44
    noparking188  
       2023-03-08 14:34:37 +08:00
    CDC 到队列,自己写程序或者 lambda 写 ES ,可行吗,是不是更简单一点?
    我之前的公司是双写+重试,错了人肉纠正
    BQsummer
        45
    BQsummer  
       2023-03-08 14:48:36 +08:00
    前段时间调研过 doris 的数据同步:
    canal 文档一坨屎, 很多参数得从源代码里找, 数据量大的话会有坑
    flink cdc 开发最省事, 但是集群运维费事
    debezium 中文文档为 0, flink cdc 最早的同步方式是通过它, 后来自己实现了一套
    CloudCanal 是 starrocks 文档里推荐的, 国产, 还不开源, 不敢用
    其实上云是最省力的方式
    SoulSleep
        46
    SoulSleep  
       2023-03-08 14:51:09 +08:00
    @jiobanma #30 历史数据 canal 支持一次性迁移,类似 etl
    @v2e0xAdmin2 #37 麻烦在哪儿呢?我们用 canal-admin 管理配置,指标暂时没输出到 grafana...监控主要关注业务异常,主要是稳定,基本上没出过问题
    dabai0806
        47
    dabai0806  
       2023-03-08 15:00:47 +08:00
    logstash 其实是可以配置秒级同步的(这也是我碰巧试出来的 文档和网上文章都没看到有人提过)

    线上几万条数据, 我配置的是 10 秒同步一次 就是同步的时候 cpu 会上飙百分之 30 左右

    配置参数
    schedule => "*/10 * * * * *"
    dabai0806
        48
    dabai0806  
       2023-03-08 15:02:51 +08:00
    logstash 维护方便些同步数据的自由度也高 写条查询 sql 就完事了, canal 就是个半成品, 难用的要死
    lueluev
        49
    lueluev  
       2023-03-08 15:09:08 +08:00
    好熟悉的问题 flume 还有人用吗
    fridaycatye
        50
    fridaycatye  
       2023-03-08 15:36:24 +08:00
    datax 可用,基于查询做同步,灵活
    fishtocat
        51
    fishtocat  
       2023-03-08 17:56:52 +08:00
    学习了,我们现在是基于 canal 自己实现的 golang 客户端消费更新信息
    liprais
        52
    liprais  
       2023-03-08 17:57:50 +08:00
    flink-cdc 开 ckpt 完事
    ufo5260987423
        53
    ufo5260987423  
       2023-03-08 18:00:16 +08:00
    我不知道您为啥说 logstash 效率低,不论是全量更新还是增量更新,效率都还不错。我处理过的专利数据大概几百个 G ,都能在可接受时间内完成任务。

    如果需要实时更新,那么在事物里面同步操作 es 就好了。
    可能我了解的还不够多,请您指正。
    ffkjjj
        54
    ffkjjj  
       2023-03-08 18:02:55 +08:00
    debezium
    awalkingman
        55
    awalkingman  
       2023-03-08 18:08:47 +08:00
    @j1132888093 人工补偿兜底,很真实了。巧妙的人工介入能极大地降低系统复杂度,谓之四两拨千斤。
    v2e0xAdmin2
        56
    v2e0xAdmin2  
       2023-03-08 18:11:39 +08:00
    @SoulSleep 加机器麻烦
    zhangxudong
        57
    zhangxudong  
       2023-03-08 18:34:21 +08:00
    可以试试阿里云 DTS 原班人马搞的 cloudcanal
    HunterPan
        58
    HunterPan  
       2023-03-08 19:13:18 +08:00
    datax
    vgbhfive
        59
    vgbhfive  
       2023-03-08 19:33:23 +08:00
    datax 可以考虑下
    GopherDaily
        60
    GopherDaily  
       2023-03-08 19:53:58 +08:00
    kafka connect, 如果你们已经有 kafka 的话,
    debezium + es sink 吧
    tairan2006
        61
    tairan2006  
       2023-03-08 19:58:01 +08:00
    我记得 datax 是专门搞这个的,不过我也没用过
    leeton
        62
    leeton  
       2023-03-08 22:04:24 +08:00
    我们就是用 canal 监听 binlog😅 发 kafka 消息一条一条刷增量数据
    ldh756034624
        63
    ldh756034624  
       2023-03-09 10:04:22 +08:00
    cirton
        64
    cirton  
       2023-03-09 10:05:24 +08:00
    ogg 可以实时同步到 es 。之前用 ogg 同步 mysql 和 oracle ,效率非常够用,效果也很好,但是出了问题运维是个麻烦事儿。
    刚看到你的 es 版本是 8 的,那就用不了了。
    Morriaty
        65
    Morriaty  
       2023-03-09 11:06:50 +08:00
    @SoulSleep 我们说的应该不是一个问题。举个例子,

    es 里有个 title 字段,类似「高启强大规模犯罪集团终落网」,分词有问题,分成了「高启 / 强大 / 规模 / 犯罪 / 集团 / 终 / 落网」,于是我们更新了人名库,因为可能有很多历史 title 都包含这个词,需要把整个索引 reindex 一遍。

    这个过程基于历史数据量不同,耗时不同,可能需要几个小时的时间。但你线上正在提供 online search service 不能停,不能等这几个小时的 reindex 。

    所以就需要两个并行的 full indexing 和 realtime indexing ,以及两个链路的切换逻辑
    SoulSleep
        66
    SoulSleep  
       2023-03-09 14:51:07 +08:00
    @Morriaty #64 不错,我们的场景还没有这么复杂,只是加速分库的 mysql 查询效率,这么精细的需求对我们现在的场景来说,就直接刷 db 的 binlog 重新同步去做了,按照最大的效率,估计要 2 个小时左右....确实无论怎么做代价都很大....
    kakawa
        67
    kakawa  
       214 天前
    @Morriaty 所以就需要两个并行的 full indexing 和 realtime indexing ,以及两个链路的切换逻辑
    不是搜索部门的 但是现在实实在在的遇到了这个问题 请问这个有什么最佳实践吗 求解
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3063 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 14:46 · PVG 22:46 · LAX 06:46 · JFK 09:46
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.