-
2019-11-17 20:49:07
canal同步mysql数据到ES
- 前提是:mysql开启了bingo日志
- canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限.
CREATE USER canal IDENTIFIED BY 'Mmc..5211314'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
- 安装并启动Canal-server
2.1 连接阿里云ECS实例,下载并解压Canal-deployer。本文使用Canal-deployer 1.1.4版本。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
2.2 解压canal.deployer-1.1.4.tar.gz。
tar -zxvf canal.deployer-1.1.4.tar.gz
2.3 修改conf/example/instance.properties文件。
vi conf/example/instance.properties
2.4 启动Canal-server,并查看日志。./bin/startup.sh cat logs/canal/canal.log
3. 安装并启动Canal-adapter
3.1 连接阿里云ECS实例,下载并解压Canal-adapter。本文使用Canal-adapter1.1.4版本。wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
3.2 解压canal.adapter-1.1.4.tar.gz
tar -zxvf canal.adapter-1.1.4.tar.gz
3.3 修改conf/application.yml文件。
vi conf/application.yml
3.4 同样的方式,修改conf/es/*.yml文件,定义MySQL数据到ES数据的映射字段(此处文件以表名命名最好区分)
3.5 启动Canal-adapter服务,并查看日志。./bin/startup.sh cat logs/adapter/adapter.log
- 最后在数据库插入、修改、删除数据进行验证
更多相关内容 -
使用canal同步MySQL数据到Elasticsearch(ES)
2021-06-18 15:22:31canal是阿里巴巴开源的mysql数据传输组件,基于mysql binlog,提供了准确、实时的数据传输服务。有关binlog介绍,参见binlog介绍。 以下来自官方GitHub介绍。GitHub地址 canal [kə’næl],译意为水道/管道/沟渠,...1、功能及使用场景
1.1、功能介绍
canal是阿里巴巴开源的mysql数据传输组件,基于mysql binlog,提供了准确、实时的数据传输服务。有关binlog介绍,参见binlog介绍。
以下来自官方GitHub介绍。GitHub地址
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
1.2、使用场景
根据官方文档,目标系统支持MySQL、kafka、elasticsearch、hbase、rocketMQ、pulsar等。
本文主要介绍MySQL同步到Elasticsearch的使用。
2、需求引入
整个过程用一个示例来介绍canal的安装使用。
存在如下两张表(一对多关系),用户信息表和用户权限表,需要将用户信息以及权限同步到es
user_info(用户信息表):
id name role_id 1 张三 1 2 李四 2 3 王大锤 2 role(权限表):
role_id role_name 1 管理员 2 测试员 查询sql:
SELECT a.id AS _id, a.name, a.role_id, b.role_name FROM user_info a LEFT JOIN role b ON b.role_id = a.role_id
查询结果:
_id name role_id role_name 3 王大锤 2 测试员 1 张三 1 管理员 2 李四 1 管理员 对应的es索引结构:
{ "user_index": { "mappings": { "_doc": { "properties": { "name": { "type": "text" }, "role_id": { "type": "long" }, "role_name": { "type": "text" } } } } } }
3、canal文件下载及准备
3.1 下载文件
官方GitHub下载地址:https://github.com/alibaba/canal/releases
下载canal服务端(canal.deployer-1.1.4.tar.gz)和客户端(canal.adapter-1.1.4.tar.gz),如下图。
下载完成如下
3.2 准备文件
命令运行完成后,进入adapter和deployer可以看到如下结构(忽略我本机的.DS_Store)
adapter:
deployer:4、deployer安装及效果测试
4.1、deployer 配置修改
4.1.1 准备
(针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步)
-
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 -
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant,下面新建了canal账号
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
4.1.2 修改连接数据库信息
vi conf/example/instance.properties
修改如下标红信息
4.2 启动deployer
在deployer目录运行启动脚本
sh bin/startup.sh
- 查看 server 日志
tail -f logs/canal/canal.log
- 查看 instance 的日志
tail -f logs/example/example.log
看到如上日志,标志启动成功
4.3 测试deployer效果
(4.3 步骤可跳过,主要为验证deployer端效果)
4.3.1 在本地电脑新建普通maven工程
pom文件,依赖如下pom<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
4.3.2 新建ClientSimple类
粘贴测试代码(下面链接页面上的ClientSimple代码)
https://github.com/alibaba/canal/wiki/ClientExample将圈红ip改为部署adapter的ip,然后直接启动此main方法
启动完成看到如下日志:
日志会循环打印count
此时触发数据库变更user_info表变更前
user_info表变更后,新加了一条名叫逻辑的记录
可在日志处观察到变更信息,标志着deployer监听mysql binlog变更成功5、adapter安装及效果测试
canal adapter 的 Elastic Search 版本支持6.x.x以上
官方文档地址
https://github.com/alibaba/canal/wiki/Sync-ES5.1 修改配置
5.1.1 修改启动器配置: application.yml
进adapter/conf目录
vi application.yml
修改如下配置,注意缩进格式,yml文件严格缩进格式,格式错误会引起启动失败问题。
mode使用rest模式,测试使用transport会出问题,目前没找到原因,下图mode还没更改5.1.2 修改 conf/es/mytest_user.yml文件
adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件
不需要的文件可删除,只配置需要的yml文件
修改如下配置,esMapping信息,包括 index,type,sql,其中sql尽量保证不要换行,在文本编辑器中编辑成一行,在粘贴进去,否则可能会出问题(还是由于yml格式问题)
此处的sql的字段别名即是es字段名,不写别名,默认原名即是es字段名,有关详细说明,可参见官方文档https://github.com/alibaba/canal/wiki/Sync-ES
etlCondition 可以注释掉,我们默认任何条件都同步
5.2 启动adapter
启动命令 sh bin/startup.sh
查看日志:
tail -f logs/adapter/adapter.log
观察如下日志,即表示启动成功
5.3 效果测试
canal是一个MySQL增量订阅组件,所以不支持数据的初始化
我们需要在数据库触发变更,才能将数据同步到es
变更前:
es数据->在kibana查询对应数据,可以看到右侧数据为空
mysql数据→使用查询sql,查询到数据如下
下面,我们进行数据变更:比如将张三名字变成张六,MySQL数据如下:
此时,es数据在MySQL数据变更同时,es数据相应变更,如下,可以观察到,数据变更已经成功从MySQL同步到elasticsearch需要注意的是,由于我们只变更了user_info表,所以此处只同步了user_info表的name和id字段,role_name字段,并没有同步,只有role_name字段变更时,才会被同步,所以实际使用时,要先做好数据初始化工作。
同时,可在logs/adapter/adapter.log观察到数据变更日志
至此,全部操作完成6、全量数据导入
由于canal只支持增量导入,所以官方adapter提供了全量导入手动触发的功能,
canal全表同步(etl功能,手动触发)
参见源码:/** * ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST * * @param type 类型 hbase, es * @param task 任务名对应配置文件名 mytest_person2.yml * @param params etl where条件参数, 为空全部导入 */ @PostMapping("/etl/{type}/{task}") public EtlResult etl(@PathVariable String type, @PathVariable String task, @RequestParam(name = "params", required = false) String params) { return etl(type, null, task, params); }
只需要发送http命令即可
例:如下,ip和端口是部署canal adapter的IP和端口,然后类型选es,后面在跟上对应的yml配置文件即可
curl http://127.0.0.1:8081/etl/es/mytest_person2.yml -X POST
导入成功提示
-
实战!基于canal同步mysql数据到elasticsearch
2022-03-08 00:46:16首发公众号:MarkerHub 原创作者:吕一明 视频讲解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/ hello,大家好呀,好久没写过原创了,今天带大家做个实验吧,基于canal同步mysql的数据到es中! 原理啥的,都给我...首发公众号:MarkerHub
原创作者:吕一明
视频讲解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/
hello,大家好呀,好久没写过原创了,今天带大家做个实验吧,基于canal同步mysql的数据到es中!
原理啥的,都给我百度去吧,这里直接搞实验!
本文使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中。
实验中间件版本说明:
centos 8
mysql 5.7.36
es 7.16.2
cannal.server: 1.1.5
canal.adapter: 1.1.5
postman
0、安装docker
基本命令:
#centos 7 安装 docker yum install docker #centos 8 安装docker yum erase podman buildah yum install -y yum-utils yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo yum install docker-ce docker-ce-cli containerd.io #检验安装是否成功 [root@localhost opt]# docker --version Docker version 20.10.12, build e91ed57 #启动 systemctl start docker #换镜像源 sudo vim /etc/docker/daemon.json 内容如下: { "registry-mirrors": ["https://m9r2r2uj.mirror.aliyuncs.com"] } 保存退出,重启docker #重启 sudo service docker restart #列出镜像 docker images #查看运行进程 docker ps
1、安装mysql
docker pull mysql:5.7.36 docker run --name mysql5736 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36 docker exec -it mysql5736 /bin/bash apt-get update apt-get install vim cd /etc/mysql/mysql.conf.d vim mysqld.cnf // 修改mysql配置
配置:
[mysqld] #binlog setting log-bin=mysql-bin // 开启logbin binlog-format=ROW // binlog日志格式 server-id=1 // mysql主从备份serverId,canal中不能与此相同
保存退出,重启mysql:service mysql restart
可能会退出docker镜像,注意重启启动docker的mysql。
mysql -uroot -p show master status // binlog日志文件 reset master; // 重启日志
查看是否配置成功:
查看日志文件:
cd /var/lib/mysql // 进入日志文件目录 mysqlbinlog -vv mysql-bin.000001 // row格式查看日志
使用数据库工具连接上docker中的mysql,然后创建dailyhub数据库,然后再查看日志(mysqlbinlog -vv mysql-bin.000001)可以看到截图如下:
到这里,mysql已经安装成功了。
2、安装es
docker pull elasticsearch:7.16.2 docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --name='es7162' -d elasticsearch:7.16.2
注意如果拉取不出对应的版本,可以上https://registry.hub.docker.com/_/elasticsearch?tab=tags&page=1&ordering=last_updated,查看对应的版本再拉取。我之前是拉取7.15.2的实验的,后来过来几天发现这版本已经拉取不了了,就改成了7.16.2。或者换低一点的版本也可以。
查看https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-elasticsearch/2.6.2,得到版本依赖关系,在springboot2.6.2版本下,7.15.2和7.16.2都可以用。
docker启动es:
然后我们需要配置一下es的信息:
docker exec -ites es7162 /bin/bash cd config vi elasticsearch.yml
配置文件:
cluster.name: dailyhub-es network.host: 0.0.0.0 node.name: node-1 http.port: 9200 http.cors.enabled: true http.cors.allow-origin: "*" node.master: true node.data: true
docker restart es7162 重启es,注意千万别写错配置的信息,否则启动会失败,启动失败是后可以通过docker logs -f es7162查看原因,但也只能重新来了。然后服务器访问:
// 查询es所有mapping http://119.45.25.164:9200/_mapping?pretty=true
注意如果是云服务器的话,要在安全组中配置对应的端口开放、还有防火墙啥的,然后安全些的话,还需要给es配合账号密码啥的。我这里为了实验就简单来了。
安装中文分词器
可以有两种方式安装中文分词器,如果在线安装的时候分词器插件下载不下来那就只能离线安装了。
1、在线安装中文分词器:
docker exec -ites es7162 /bin/bash ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.16.2/elasticsearch-analysis-ik-7.16.2.zip
2、离线安装中文分词器:
首先打开这个链接:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.16.2,把分词器插件下载下来,
# 把插件复制到容器内 docker cp elasticsearch-analysis-ik-7.16.2.zip es7162:/usr/share/elasticsearch/plugins docker exec -it es7162 /bin/bash cd /usr/share/elasticsearch/plugins/ mkdir ik unzip elasticsearch-analysis-ik-7.16.2.zip -d ik rm -rf elasticsearch-analysis-ik-7.16.2.zip
重启es,查看日志是否加载ik分词器成功!
docker restart es7162 docker logs es7162
当你看到日志中有输出analysis-ik,说明已经安装成功。
3、安装canal-server
拉取镜像并启动:
docker pull canal/canal-server:v1.1.5 docker run --name canal115 -p 11111:11111 --link mysql5736:mysql5736 -id canal/canal-server:v1.1.5
修改对应的配置:
docker exec -it canal115 /bin/bash cd canal-server/conf/example/ vi instance.properties // 修改配置 # 把0改成10,只要不和mysql的id相同就行 canal.instance.mysql.slaveId=10 # 修改成mysql对应的账号密码,mysql5736就是mysql镜像的链接别名 canal.instance.master.address=mysql5736:3306 canal.instance.dbUsername=root canal.instance.dbPassword=admin
验证配置是否成功:
#首先重启一下canal docker restart canal115 docker exec -it canal115 /bin/bash cd canal-server/logs/example/ tail -100f example.log // 查看日志
截图如下,说明已经链接上了mysql主机,此时mysql中的数据变化,都会在canal中有同步。
可以通过Java程序测试有没连接上mysql:
导入canal-client包
<!-- 为了测试canal-server是否连接mysql成功,1.1.5版本少包,所以用1.1.4版本 --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency>
com.markerhub.SimpleCanalClientExample
/** * 公众号:MarkerHub * * 说明:用于测试canal是否已经连接上了mysql */ public class SimpleCanalClientExample { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("119.45.25.164", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
当mysql的数据更新时候效果如下:
注意当后面canal-adapter也连接上canal-server后,程序就监听不到数据变化了。
4、安装canal-adapter
由于目前canal-adapter没有官方docker镜像,所以拉去一个非官方的
docker pull slpcat/canal-adapter:v1.1.5 docker run --name adapter115 -p 8081:8081 --link mysql5736:mysql5736 --link canal115:canal115 --link es7162:es7162 -d slpcat/canal-adapter:v1.1.5
修改配置:
docker exec -it adapter115 /bin/bash cd conf/ vi application.yml
配置修改如下,一些不需要的配置或者注释掉的配置可以删除掉:
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: canal115:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: srcDataSources: defaultDS: url: jdbc:mysql://mysql5736:3306/dailyhub?useUnicode=true username: root password: admin canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: es7 hosts: es7162:9200 # 127.0.0.1:9200 for rest mode properties: mode: rest # security.auth: test:123456 # only used for rest mode cluster.name: dailyhub-es
接下来是修改表映射索引文件:
docker exec -it adapter115 /bin/bash cd conf/es7 cp -v mytest_user.yml dailyhub_collect.yml # 删除其他多余的 rm -rf biz_order.yml customer.yml mytest_user.yml vi dailyhub_collect.yml
配置文件:
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: dailyhub_collect _id: _id _type: _doc upsert: true # pk: id sql: " SELECT c.id AS _id, c.user_id AS userId, c.title AS title, c.url AS url, c.note AS note, c.collected AS collected, c.created AS created, c.personal AS personal, u.username AS username, u.avatar AS userAvatar FROM m_collect c LEFT JOIN m_user u ON c.user_id = u.id " # objFields: # _labels: array:; # etlCondition: "where c.c_time>={}" commitBatch: 3000
注意对于时间类型,在后端一定要使用LocalDateTime或者LocalDate类型,如果是Date类型,需要自己手动设置格式。
5、联合测试
然后就可以直接测试了,准备测试条件:
在数据库中生成表和字段,
然后elasticsearch中生成索引。先新建数据库dailyhub。然后数据表结构:
SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for m_collect -- ---------------------------- DROP TABLE IF EXISTS `m_collect`; CREATE TABLE `m_collect` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `collected` date DEFAULT NULL, `created` datetime(6) DEFAULT NULL, `note` varchar(255) DEFAULT NULL, `personal` int(11) DEFAULT NULL, `title` varchar(255) DEFAULT NULL, `url` varchar(255) DEFAULT NULL, `user_id` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`), KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`), CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4; -- ---------------------------- -- Records of m_collect -- ---------------------------- -- ---------------------------- -- Table structure for m_user -- ---------------------------- DROP TABLE IF EXISTS `m_user`; CREATE TABLE `m_user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `avatar` varchar(255) DEFAULT NULL, `created` datetime(6) DEFAULT NULL, `lasted` datetime(6) DEFAULT NULL, `open_id` varchar(255) DEFAULT NULL, `statu` int(11) DEFAULT NULL, `username` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4; -- ---------------------------- -- Records of m_user -- ---------------------------- INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05 16:08:40.042000', '2022-01-06 13:07:45.153000', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', '公众号:MarkerHub');
接下来借postman来新建elasticsearch的索引:
# 创建索引并添加映射字段 PUT http://119.45.25.164:9200/dailyhub_collect { "mappings": { "properties": { "collected": { "type": "date", "format": "date_optional_time||epoch_millis" }, "created": { "type": "date", "format": "date_optional_time||epoch_millis" }, "note": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart" }, "personal": { "type": "integer" }, "title": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart" }, "url": { "type": "text" }, "userAvatar": { "type": "text" }, "userId": { "type": "long" }, "username": { "type": "keyword" } } } }
其他常用操作:
# 删除索引 PUT http://119.45.25.164:9200/dailyhub_collect # 查看素有索引映射 GET http://119.45.25.164:9200/_mapping?pretty=true # 搜索文档 GET http://119.45.25.164:9200/dailyhub_collect/_search # 删除ID为1的文档 DELETE http://119.45.25.164:9200/dailyhub_collect/_doc/1
然后我们打开canal-adapter的输入日志:
docker logs --tail 100 -f adapter115
然后我们在mysql的m_collect中新添加一条记录,可以看到日志输出如下:
然后搜索全部文档,发现es中有数据啦。
如果看到adaptar115一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysql、es、canal、adapar
2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!
到这里,实验成功,over,关注公众号:MarkerHub,带你做更多Java实验!视频讲解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/
-
canal同步mysql数据到es
2020-06-10 18:35:59canal同步mysql数据到es 更多请看:https://github.com/alibaba/canal/wiki 1.环境 centos7 jdk1.8 ElasticSearch:6.x(我的是6.7.1) Kibana:6.x canal.deployer:1.1.4(不支持es 7版) canal.adapter:1.1.4 下载...canal同步mysql数据到es
更多请看:https://github.com/alibaba/canal/wiki1.环境
centos7
jdk1.8
ElasticSearch:6.x(我的是6.7.1)
Kibana:6.x
canal.deployer:1.1.4(不支持es 7版)
canal.adapter:1.1.4下载地址: https://github.com/alibaba/canal/releases
说明:这里需要下载的是:
1、canal.deployer1.1.4版本- - - 可以理解为相当于canal的服务端
2、canal.adapter1.1.4版本- - - 可以理解为相当于canal的插件
3、最新的是1.1.5版本,但是是快照版,我们这里还是选择稳定版本
es和kibana的安装:https://blog.csdn.net/qq_40198004/article/details/89194129mysql的配置
开启MySQL的 binlog 写入功能,配置 binlog-format 为 ROW 模式
找到my.cnf文件,我的是/etc/my.cnf,添加以下配置:log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
重启mysql:service mysql restart
查看是否开启 binlog:show variables like '%log_bin%';
准备:es_db数据库CREATE DATABASE es_db;
创建tb_item表
CREATE TABLE `tb_item` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '商品id,同时也是商品编号', `title` varchar(100) NOT NULL COMMENT '商品标题', `sell_point` varchar(500) DEFAULT NULL COMMENT '商品卖点', `price` decimal(20,2) NOT NULL COMMENT '商品价格,单位为:分', `num` int(10) NOT NULL COMMENT '库存数量', `brand` varchar(255) DEFAULT NULL COMMENT '品牌', `barcode` varchar(30) DEFAULT NULL COMMENT '商品条形码', `image` varchar(500) DEFAULT NULL COMMENT '商品图片', `cid` bigint(10) NOT NULL COMMENT '所属类目,叶子类目', `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '商品状态,1-正常,2-下架,3-删除', `created` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `updated` timestamp NULL DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`), KEY `cid` (`cid`), KEY `status` (`status`), KEY `updated` (`updated`), KEY `title` (`title`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='商品表';
canal.deployer修改配置
打开canal.deployer-1.1.4/conf/example/instance.properties修改
## mysql serverId , v1.0.26+ will autoGen # 这个东西可以不设置,如果要设置别和上面mysql配置文件中的值重复就行 # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info #mysql启动的ip:端口 #例如:192.168.34.66:3306 canal.instance.master.address=192.168.221.132:3306 ## 这个可以根据需要修改过滤,默认是直接监听所有 canal.instance.filter.regex=.*\\..* # username/password 自己的用户名和密码(也可以是root) canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8
启动
./bin/startup.sh
查看日志:cat logs/canal/canal.logcanal.adapter修改配置
1.修改adapter-1.1.4/conf/application.yml
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ ## 修改成自己canal-deployer启动的ip和端口 默认端口是:11111 canalServerHost: 192.168.221.132:11111 # zookeeperHosts: slave1:2181 # mqServers: 127.0.0.1:9092 #or rocketmq # flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://192.168.221.132:3306/es_db?useUnicode=true # mysql的地址和端口 username: canal password: canal canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase - name: es hosts: 192.168.221.132:9300 # 127.0.0.1:9200 for rest mode properties: # mode: transport # or rest # # security.auth: test:123456 # only used for rest mode cluster.name: my-application
注意和es中配置对应
2.进入到adapter-1.1.4/conf/es文件中
cp customer.yml canal.yml
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: item _type: _doc _id: id relations: customer_order: name: customer sql: "select t.id,t.title,t.sell_point,t.price,t.num,t.brand,t.barcode,t.image,t.cid,t.status,t.created,t.updated from tb_item t" etlCondition: "where t.c_time>={}" commitBatch: 3000
mappings不要配
3.然后在kibana创建索引和映射
注意
6.x是有type的且必须都对应PUT /item { "mappings": { "_doc": { "properties": { "id": { "type": "long" }, "title": { "type": "text" }, "sell_point": { "type": "text" }, "price": { "type": "text" }, "num": { "type": "integer" }, "brand": { "type": "text" }, "barcode": { "type": "text" }, "image": { "type": "text" }, "cid": { "type": "long" }, "status": { "type": "short" }, "created": { "type": "date" }, "updated": { "type": "date" } } } } }
4.启动:./bin/startup.sh
查看日志:cat logs/adapter/adapter.log
adapter-1.1.4/conf/es的yml
4.插入es_db数据库的tb_item表INSERT INTO `tb_item` VALUES ('3', '三星S10', '下单即送10400毫安移动电源!再赠手机魔法盒!', '7000.00', '10', '三星', null, 'http://image.jt.com/jd/c1775819c7e44b1c903f27514e70b998.jpg', '3', '1', '2015-03-08 21:29:27', '2015-03-08 21:29:27');
5.更新数据UPDATE tb_item set title='华为P40',brand='华为' where id=3;
增删改是可以的,删除自己试试吧
参考:
https://blog.csdn.net/tuesdayma/article/details/103294448
https://segmentfault.com/a/1190000019066098?utm_source=tag-newest
https://www.cnblogs.com/dalaoyang/p/11069850.html
https://www.cnblogs.com/caoweixiong/p/11825303.html -
使用canal实时同步MySQL数据到Elasticsearch
2022-02-23 22:56:32使用canal实时同步MySQL数据到es软件版本信息安装 elasticsearch安装 kibana下载和安装canal1、下载canal2、配置MySQL3、配置 canal-server (canal-deploy)4、配置 canal-adapter5、配置canal-admin同步MySQL到es1、... -
使用canal同步mysql数据到es
2020-08-27 17:53:08canal client demo: package com.canalclient.process; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import ... -
canal_mysql_elasticsearch_sync:基于canal的mysql和elasticsearch实时同步方案,支持增量同步和全量同步
2021-02-04 02:55:38canal于v1.1.2版本后,已支持自动同步到Elasticsearch。赞canal! 基于运河的Mysql的与Elasticsearch实时同步的的JavaWeb服务。 canal是阿里巴巴mysql数据库binlog的增量订阅和消费组件。 工作原理 全量 暴露的... -
canal同步mysql数据到es问题
2021-01-26 23:01:19该楼层疑似违规已被系统折叠隐藏此楼查看此楼环境:jdk1.8 、centos7、 elasticsearch7.6 、canal1.4,mysql5.7(以上所有全部安装在centos7上运行)按照GitHub配置完启动canal-adapter出现以下错误:java.lang.... -
canal-mysql-elasticsearch:通用的数据导入功能,实现canal导入mysql数据至es,提供查询的分布式性能
2021-05-09 19:25:54基于 canal 的 Mysql 与 Elasticsearch 实时同步的 javaweb 服务。 canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件。 工作原理 全量 暴露Http接口(接口定义见),待调用后开启后台线程,通过主键分批同步... -
canal同步mysql数据到elasticsearch7.15.0
2021-11-03 16:45:381.准备 官方网址:https://github.com/alibaba/canal/wiki/QuickStart 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为...server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId -
canal同步mysql
2018-12-12 16:48:02canal同步mysql,基于springboot2.0.6,使用undertow容器,可直接运行。 -
使用canal同步MySQL数据到ES的有序性保证
2021-04-15 19:16:04最近在做的项目中有用到canal实时同步MySQL的数据,并且写入es的场景,总结了一些心得,以备后查。总体同步的流程图如下:MySQL-es process.png链路中的环节稍微解释下:binlog MySQL的自身的操作日志,用来记录数据... -
canal同步MySQL数据到Elasticsearch
2022-05-17 17:22:11canal可实现对es数据进行增删改的增量同步,删除是真正的物理删除,这点是logstash不具备的 -
使用canal同步mysql数据到ES:日期格式问题.
2020-09-03 14:40:05使用canal同步mysql数据到es,时间格式转换问题。 常规安装部署以及使用暂不提供,百度搜一堆。。。 环境: canal1.1.5,mysql8,Elasticsearch6.7 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , ... -
利用Canal同步mysql数据到ElasticSearch,通过修改配置文件支持动态同步Mysql数据
2021-03-22 09:02:02MysqlToElasticSearchCanal利用Canal同步mysql数据到ElasticSearch,通过修改配置文件支持动态同步Mysql数据。主要介绍一下Canal的主要配置信息。CanalCanal介绍canal是阿里巴巴旗下的一款开源项目,纯Java开发。... -
Canal(基于Docker同步mysql数据到elasticsearch)
2021-05-09 10:37:26Docker(同步mysql数据到elasticsearch)一、版本环境二、docker安装mysql,并配置binlog2.1 安装mysql2.2 修改mysql配置2.3 验证mysql binlog配置2.4 查看日志文件三、docker安装elasticsearch,并建立索引3.1 安装... -
canal同步mysql数据到es、oracle、mq、redis和mysql中
2019-11-28 15:34:16canal安装 数据同步一:java代码实现 数据同步二:mysql同步到myql中 数据同步三:mysql同步到es中 -
通过canal将MySQL数据同步到Elasticsearch
2020-09-01 11:10:35当您需要将MySQL中的增量数据同步至Elasticsearch时,可通过Canal来实现。 本文以阿里云Elasticsearch和RDS MySQL为例,为您介绍数据同步的方法。阿里云Elasticsearch兼容开源Elasticsearch的功能,以及Security、... -
canal实现同步mysql至es
2022-03-08 15:33:51一、canal 简介 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务...当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x ,