Logo
Overview

使用Canal同步MySQL和Elasticsearch

November 26, 2021
2 min read

Why use Elasticsearch?

对于中文站点来说,全文搜索一定会是一个痛点。流行的关系型数据库对于中文检索的支持基本来自于MySQL自带的ngram和第三方插件Sphinx(英文部分coreseek)。ngram作为分词器是没什么问题的,但是MySQL对于多字段查询的支持实在是不好(这并不是说MySQL不好),你可以使用like语句,不过like并不是万能的;当然你也可以通过联合索引的方式优化,不过这会让你的表结构越来越复杂(而且效果也不见得很好)。Sphinx是一个非常成熟的解决方案,底层的语言是C++,这使得它在性能上有着得天独厚的优势。但是由于Sphinx本身的实现问题,对增量更新的支持上并不优雅。并且由于辅助表的存在,会使得MySQL的压力进一步增大。Elasticsearch的优点在于其天生的集群能力,不过由于其底层是Java,所以在CPU和内存开销上都不如Sphinx。但是更强大的自定义功能和复杂查询语句支持最终让我选择了Elasticsearch作为搜索系统的基础。这一点我稍后还会聊到。

Principle

我个人认为搜索是一个相对独立的功能,但是数据来源却和其他各个系统密不可分,所以如何将其他业务系统中的MySQL数据实时导入到Elasticsearch就成为了一个亟待解决的问题。经过和杯具一起搜索后,发现go-mysql-elasticsearchCanal这两个项目都是通过对MySQL的binlog进行解析实现同步的。前者使用go语言,部署非常方便,同时能够支持对表和列进行选择。虽然README上面写着对MySQL8和Elasticsearch6的支持还在TODO中,但是经过测试,它已经能正常工作。后者来自阿里巴巴的开源项目,底层使用JAVA,除了能对表和列进行选择同步,还对SQL做了进一步的支持。canal将同步的过程抽象成“生产-消费”模型,这样一来就可以引入队列和不同的消费者(适配器)。团队讨论后我们最终选择使用canal进行同步,因为他比go-mysql-elasticsearch的支持更好,可预见的瓶颈更小。当然另一个原因是go-mysql-elasticsearch的维护状态并不是很好,有一段时间甚至被标为deprecated。不过如果你的需求非常简单,我仍然推荐这个go语言写的同步工具。

Get familiar with canal

在使用docker部署canal前我个人建议先在容器外跑一遍,因为canal的Dockerfile写的实在是不怎么样。正如上文提到的,canal使用了“生产-消费”模型,在本文的场景中,生产者是MySQL,而消费者是Elasticsearch。而canal和canal-adapter则在生产者和消费者之间作为消息传递的通道。

MySQL->canal->canal-adapter->Elasticsearch

canal通过对消息队列的支持实现生产消费的进一步解耦,支持Kafka和RocketMQ。同时也支持Promethus的指标监控,方便实现异常报警。

MySQL->canal->Kafka/RocketMQ->canal-adapter->Elasticsearch

canal也原生支持keepalived高可用,在局域网环境下通过VRRP做到无缝切换主备节点。

How to do

Environment:
- Canal: v1.1.5
- Canal-adapter: v1.1.5
- Elasticsearch: v7.14.2
- MySQL: 8.0.x

之所以要表明环境是因为canal在1.1.5的release中仍然存在一些小问题,不过我们可以通过重新打包的方式解决,如果canal发布了新的版本,文本的问题解决部分还需要结合实际情况食用。

MySQL

MySQL部分需要对canal使用的账号授予远程访问和响应数据库的权限,同时开启binlog,这部分由于Docker已经默认配置好了,不再赘述,如果你是物理机直接部署,可以搜索相关教程。

Canal-server

canal-server负责解析binlog并将消息推送给消费者。canal-server的配置相对简单,只需要指定你想创建的canal实例的名称,并且简单配置数据库的访问信息。首先从release下载canal.deployer,并将其解压到/usr/local/canal目录下(canal内部的启动脚本是这样定义的),解压完成后可以可以看到conf下面有canal.propertiesexample文件夹

Terminal window
conf/
├── canal_local.properties
├── canal.properties
├── example
├── h2.mv.db
├── instance.properties
└── meta.dat
├── logback.xml
├── metrics
└── Canal_instances_tmpl.json
└── spring
├── base-instance.xml
├── default-instance.xml
├── file-instance.xml
├── group-instance.xml
├── memory-instance.xml
└── tsdb
├── h2-tsdb.xml
├── mysql-tsdb.xml
├── sql
└── create_table.sql
└── sql-map
├── sqlmap-config.xml
├── sqlmap_history.xml
└── sqlmap_snapshot.xml

前者记录了canal监听的端口(TCP模式)以及消息队列相关的配置,这里只需要更改

canal.destination={设置的canal名称}

并且在conf目录下新建同名文件夹。将example下的instance.properties复制到其中,接下来需要在其中设置MySQL的链接信息,并使用正则表达式过滤需要的数据库和表:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=password
#白名单
canal.instance.filter.regex={你的数据库}.{某个表}
#黑名单使用正则匹配
canal.instance.filter.black.regex=mysql\\.slave_.*

配置完成后即可使用bin目录下的startup脚本启动canal-server,默认情况下是监听在11111端口。

Canal-adapter

已知1.1.5版本的adapter存在连接池问题,需要更改pom.xml重新打包,这一部分可以参考这个issue,更为详细的指导可以参考掘金的这篇文章替换完plugin之后,就可以开始配置adapter

Terminal window
conf/
├── application.yml
├── bootstrap.yml
├── es6/
├── biz_order.yml
├── customer.yml
├── mytest_user.yml
└── suggest.yml
├── es7/
├── biz_order.yml
├── customer.yml
├── mytest_user.yml
└── suggest.yml
├── hbase/
└── mytest_person2.yml
├── kudu/
└── kudutest_user.yml
├── logback.xml
├── META-INF/
└── spring.factories
└── rdb/
└── mytest_user.yml

可以看到conf目录下有不同的消费者配置,不过我们首先来配置application.properties,这其中设置了数据源和消费者:

srcDataSources:
{自定义数据名}: # 这是一个数据源的别名
url: jdbc:mysql://127.0.0.1:3306/{你的数据库}?useUnicode=true
username: root
password: password
canalAdapters: # Canal 适配器配置列表
- instance: {设置的canal名称} # 对应的 Canal 实例名
# canalInstanceName: # 这个键在原文本中似乎是注释或者多余的,通常直接在 instance 字段中指定
ormqtopicname: groups # ormqtopicname 的值可能是 topic 名称或 group 名称
groups:
- groupId: g1 # 设置组名
outerAdapters: # 外部适配器配置列表
- name: logger # 开启日志,注释之后不会再输出日志到终端
- name: es6 # 必须是 es6/es7/hbase/kudu/rbd 其中之一
key: mykey # Option 设置一个键
hosts: http://127.0.0.1:9200 # ES 主机地址
# 127.0.0.1:9200 for rest mode properties
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: docker-cluster

接下来需要根据你设置的adapter类型在对应文件夹下新建一个yaml文件

# 假设这是某个适配器配置(如 es6 适配器)的映射规则部分
# 注意:这个片段需要嵌入到上一个 YAML 文件的相应位置。
- dataSourceKey: {自定义数据名} # 引用上文 srcDataSources 中的数据源名称
destination: {设置的canal名称} # 引用上文 canalAdapters 中的 instance 名称
groupId: g1 # 组id要和上文一致
outerAdapterKey: mykey # 引用上文 outerAdapters 中 es6 的 key
esMapping: # ES 映射配置
_index: suggest # 目标索引名称
# _type: suggest # es6 需要配置 type,但是 es7 不需要
pk: id # 通过 pk 设置 es 的文档 ID 字段
sql: "select id, adddate, keywords, userid from suggest" # 初始全量同步或 ETL 用的 SQL
commitBatch: 3000 # 批次提交大小
etlCondition: "where id={} and id<{}" # 后续选择性同步的时候会用到,用于增量或条件同步

我们要根据application.properties中的设置修改数据源和目的地名称。如果你不是使用id作为自己的主键,可以这么写

esMapping: # ES 映射配置的开始
_index: suggest # 目标索引名称
_id: _id # 设置 ES 文档 ID 的来源,这里指定从 SQL 结果中的名为 _id 的字段获取
sql: "select userid as _id from suggest" # 定义 SQL 查询,并将 userid 字段别名为 _id

canal支持非常复杂的sql语句,不过也仍有一些限制,这些限制可以在wiki中看到最后的etlCondition是用来做全量同步的使用分批处理的时候要用的。配置完成后,同样使用bin目录下的startup脚本启动adapter,如果没有什么意外,你可以通过动态的修改数据库查看终端输出确认canal工作正常。###Fullsynccanal支持增量更新,不过在第一次使用的时候需要手动触发全量同步。同步之前请确认你要同步的索引已经在elasticsearch中创建了,不然会报错(canal并不会帮你创建索引)。canal-adapter在本地8081暴露了一个管理端口,我们可以通过RESTAPI触发全量同步

Terminal window
curl -X POST http://localhost:8081/etl/{es6|es7|hbase|rdb}/{mykey}/{配置文件名}

其中mykey的部分是你配置了outerAdapterKey才需要使用的。你也可以通过定义的etlCondition对同步的范围做规定

Terminal window
curl -X POST http://localhost:8081/etl/{es6|es7|hbase|rdb}/{mykey}/{配置文件名}-d"params=0;1000"

这就对应了etlCondition中的两个参数,实现只同步id在0到1000的数据。

Summary

将mysql的数据同步到Elasticsearch本身并不是稀奇的事情,但是这种同步能力和生产消费模型却能让多个数据库连动。除了canal本身提供的一些第三方客户端之外,我们也可以使用消息队列的客户端,自定义消费行为,实现更灵活的处理能力当然代价也是显著的,毕竟搜索我已经鸽了四个月了(逃

Reference:

  1. 横向对比ElasticSearch与Sphinx
  2. MySQL全文检索性能测试及问题总结
  3. 简单聊聊MySQL全文索引