精华内容
下载资源
问答
  • canal同步mysql数据到ES
    千次阅读
    2019-11-17 20:49:07

    canal同步mysql数据到ES

    1. 前提是:mysql开启了bingo日志
    2. canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限.
    CREATE USER canal IDENTIFIED BY 'Mmc..5211314';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    
    1. 安装并启动Canal-server
      2.1 连接阿里云ECS实例,下载并解压Canal-deployer。本文使用Canal-deployer 1.1.4版本。
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
    
    2.2 解压canal.deployer-1.1.4.tar.gz。
    
    tar -zxvf canal.deployer-1.1.4.tar.gz
    
    2.3 修改conf/example/instance.properties文件。
    
    vi conf/example/instance.properties
    

    在这里插入图片描述
    2.4 启动Canal-server,并查看日志。

    ./bin/startup.sh
    cat logs/canal/canal.log
    

    在这里插入图片描述
    3. 安装并启动Canal-adapter
    3.1 连接阿里云ECS实例,下载并解压Canal-adapter。本文使用Canal-adapter1.1.4版本。

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
    
    3.2 解压canal.adapter-1.1.4.tar.gz
    
    tar -zxvf canal.adapter-1.1.4.tar.gz
    
    3.3 修改conf/application.yml文件。
    
    vi conf/application.yml
    

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    3.4 同样的方式,修改conf/es/*.yml文件,定义MySQL数据到ES数据的映射字段(此处文件以表名命名最好区分)
    在这里插入图片描述
    在这里插入图片描述
    3.5 启动Canal-adapter服务,并查看日志。

    ./bin/startup.sh
    cat logs/adapter/adapter.log
    
    1. 最后在数据库插入、修改、删除数据进行验证
    更多相关内容
  • canal是阿里巴巴开源的mysql数据传输组件,基于mysql binlog,提供了准确、实时的数据传输服务。有关binlog介绍,参见binlog介绍。 以下来自官方GitHub介绍。GitHub地址 canal [kə’næl],译意为水道/管道/沟渠,...

    1、功能及使用场景

    1.1、功能介绍

    canal是阿里巴巴开源的mysql数据传输组件,基于mysql binlog,提供了准确、实时的数据传输服务。有关binlog介绍,参见binlog介绍
    以下来自官方GitHub介绍。GitHub地址

    在这里插入图片描述
    canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

    早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

    基于日志增量订阅和消费的业务包括

    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护(拆分异构索引、倒排索引等)
    • 业务 cache 刷新
    • 带业务逻辑的增量数据处理

    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

    1.2、使用场景

    根据官方文档,目标系统支持MySQL、kafka、elasticsearch、hbase、rocketMQ、pulsar等。

    本文主要介绍MySQL同步到Elasticsearch的使用。

    2、需求引入

    整个过程用一个示例来介绍canal的安装使用。

    存在如下两张表(一对多关系),用户信息表和用户权限表,需要将用户信息以及权限同步到es

    user_info(用户信息表):

    idnamerole_id
    1张三1
    2李四2
    3王大锤2

    role(权限表):

    role_idrole_name
    1管理员
    2测试员

    查询sql:

    SELECT   a.id AS _id,  a.name,  a.role_id,  b.role_name
    FROM
    user_info a
    LEFT JOIN role b
    ON
    b.role_id = a.role_id
    

    查询结果:

    _idnamerole_idrole_name
    3王大锤2测试员
    1张三1管理员
    2李四1管理员

    对应的es索引结构:

    {
        "user_index": {
            "mappings": {
                "_doc": {
                    "properties": {
                        "name": {
                            "type": "text"
                        },
                        "role_id": {
                            "type": "long"
                        },
                        "role_name": {
                            "type": "text"
                        }
                    }
                }
            }
        }
    }
    

    3、canal文件下载及准备

    3.1 下载文件

    官方GitHub下载地址:https://github.com/alibaba/canal/releases
    下载canal服务端(canal.deployer-1.1.4.tar.gz)和客户端(canal.adapter-1.1.4.tar.gz),如下图。
    在这里插入图片描述
    下载完成如下
    在这里插入图片描述

    3.2 准备文件

    在这里插入图片描述
    命令运行完成后,进入adapter和deployer可以看到如下结构(忽略我本机的.DS_Store)
    adapter:
    在这里插入图片描述
    deployer:

    在这里插入图片描述

    4、deployer安装及效果测试

    4.1、deployer 配置修改

    4.1.1 准备

    (针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步)

    • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
      [mysqld]
      log-bin=mysql-bin # 开启 binlog
      binlog-format=ROW # 选择 ROW 模式
      server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

    • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant,下面新建了canal账号

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

    4.1.2 修改连接数据库信息

    vi conf/example/instance.properties
    修改如下标红信息
    在这里插入图片描述

    4.2 启动deployer

    在deployer目录运行启动脚本

    sh bin/startup.sh
    

    在这里插入图片描述

    • 查看 server 日志
      tail -f logs/canal/canal.log
      在这里插入图片描述
    • 查看 instance 的日志
      tail -f logs/example/example.log
      在这里插入图片描述

    看到如上日志,标志启动成功

    4.3 测试deployer效果

    (4.3 步骤可跳过,主要为验证deployer端效果)

    4.3.1 在本地电脑新建普通maven工程

    在这里插入图片描述
    pom文件,依赖如下pom

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.0</version>
    </dependency>
    

    在这里插入图片描述

    4.3.2 新建ClientSimple类

    粘贴测试代码(下面链接页面上的ClientSimple代码)
    https://github.com/alibaba/canal/wiki/ClientExample

    将圈红ip改为部署adapter的ip,然后直接启动此main方法
    在这里插入图片描述
    启动完成看到如下日志:
    在这里插入图片描述
    日志会循环打印count
    此时触发数据库变更

    user_info表变更前
    在这里插入图片描述
    user_info表变更后,新加了一条名叫逻辑的记录
    在这里插入图片描述
    可在日志处观察到变更信息,标志着deployer监听mysql binlog变更成功

    在这里插入图片描述

    5、adapter安装及效果测试

    canal adapter 的 Elastic Search 版本支持6.x.x以上

    官方文档地址
    https://github.com/alibaba/canal/wiki/Sync-ES

    5.1 修改配置

    5.1.1 修改启动器配置: application.yml

    进adapter/conf目录
    vi application.yml
    修改如下配置,注意缩进格式,yml文件严格缩进格式,格式错误会引起启动失败问题。
    在这里插入图片描述
    mode使用rest模式,测试使用transport会出问题,目前没找到原因,下图mode还没更改

    在这里插入图片描述

    5.1.2 修改 conf/es/mytest_user.yml文件

    adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件
    不需要的文件可删除,只配置需要的yml文件
    修改如下配置,esMapping信息,包括 index,type,sql,其中sql尽量保证不要换行,在文本编辑器中编辑成一行,在粘贴进去,否则可能会出问题(还是由于yml格式问题)
    此处的sql的字段别名即是es字段名,不写别名,默认原名即是es字段名,有关详细说明,可参见官方文档https://github.com/alibaba/canal/wiki/Sync-ES
    etlCondition 可以注释掉,我们默认任何条件都同步
    在这里插入图片描述

    5.2 启动adapter

    启动命令 sh bin/startup.sh
    查看日志:
    tail -f logs/adapter/adapter.log
    观察如下日志,即表示启动成功
    在这里插入图片描述

    5.3 效果测试

    canal是一个MySQL增量订阅组件,所以不支持数据的初始化
    我们需要在数据库触发变更,才能将数据同步到es
    变更前:
    es数据->在kibana查询对应数据,可以看到右侧数据为空
    在这里插入图片描述
    mysql数据→使用查询sql,查询到数据如下

    在这里插入图片描述
    下面,我们进行数据变更:

    比如将张三名字变成张六,MySQL数据如下:
    在这里插入图片描述
    此时,es数据在MySQL数据变更同时,es数据相应变更,如下,可以观察到,数据变更已经成功从MySQL同步到elasticsearch

    需要注意的是,由于我们只变更了user_info表,所以此处只同步了user_info表的name和id字段,role_name字段,并没有同步,只有role_name字段变更时,才会被同步,所以实际使用时,要先做好数据初始化工作。

    在这里插入图片描述
    同时,可在logs/adapter/adapter.log观察到数据变更日志

    在这里插入图片描述
    至此,全部操作完成

    6、全量数据导入

    由于canal只支持增量导入,所以官方adapter提供了全量导入手动触发的功能,

    canal全表同步(etl功能,手动触发)
    参见源码:

    /**
     * ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST
     *
     * @param type 类型 hbase, es
     * @param task 任务名对应配置文件名 mytest_person2.yml
     * @param params etl where条件参数, 为空全部导入
     */
    @PostMapping("/etl/{type}/{task}")
    public EtlResult etl(@PathVariable String type, @PathVariable String task,
                         @RequestParam(name = "params", required = false) String params) {
        return etl(type, null, task, params);
    }
    

    只需要发送http命令即可

    例:如下,ip和端口是部署canal adapter的IP和端口,然后类型选es,后面在跟上对应的yml配置文件即可

    curl http://127.0.0.1:8081/etl/es/mytest_person2.yml -X POST
    

    导入成功提示
    在这里插入图片描述

    展开全文
  • 首发公众号:MarkerHub 原创作者:吕一明 视频讲解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/ hello,大家好呀,好久没写过原创了,今天带大家做个实验吧,基于canal同步mysql数据到es中! 原理啥的,都给我...

    首发公众号:MarkerHub

    原创作者:吕一明

    视频讲解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/

    hello,大家好呀,好久没写过原创了,今天带大家做个实验吧,基于canal同步mysql的数据到es中!

    原理啥的,都给我百度去吧,这里直接搞实验!

    本文使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中。

    实验中间件版本说明:

    • centos 8

    • mysql 5.7.36

    • es 7.16.2

    • cannal.server: 1.1.5

    • canal.adapter: 1.1.5

    • postman

    0、安装docker

    基本命令:

    #centos 7 安装 docker
    yum install docker
    
    #centos 8 安装docker
    yum erase podman buildah 
    yum install -y yum-utils
    yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
    yum install docker-ce docker-ce-cli containerd.io
    
    #检验安装是否成功
    [root@localhost opt]# docker --version
    Docker version 20.10.12, build e91ed57
    
    #启动
    systemctl start docker
    
    #换镜像源
    sudo vim /etc/docker/daemon.json
    内容如下:
    {
     "registry-mirrors": ["https://m9r2r2uj.mirror.aliyuncs.com"]
    }
    保存退出,重启docker
    
    #重启
    sudo service docker restart
    
    #列出镜像
    docker images
    
    #查看运行进程
    docker ps

    1、安装mysql

    docker pull mysql:5.7.36
    docker run --name mysql5736 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36
    
    docker exec -it mysql5736 /bin/bash
    apt-get update
    apt-get install vim
    cd /etc/mysql/mysql.conf.d
    vim mysqld.cnf  // 修改mysql配置

    配置:

    [mysqld]
    #binlog setting
    log-bin=mysql-bin  // 开启logbin
    binlog-format=ROW  // binlog日志格式
    server-id=1  // mysql主从备份serverId,canal中不能与此相同

    818332bebbf6892c54af9eea129d9ce4.png

    保存退出,重启mysql:service mysql restart

    可能会退出docker镜像,注意重启启动docker的mysql。

    mysql -uroot -p
    show master status  // binlog日志文件
    reset master; // 重启日志

    d91ab00c64e436a95febc4790d3ef789.png

    查看是否配置成功:

    6af3df63a30860f068bde066ed62a3de.png

    查看日志文件:

    cd /var/lib/mysql  // 进入日志文件目录
    mysqlbinlog -vv mysql-bin.000001  // row格式查看日志

    c99e5abe7639cf4c5697d5ecf34168ec.png

    使用数据库工具连接上docker中的mysql,然后创建dailyhub数据库,然后再查看日志(mysqlbinlog -vv mysql-bin.000001)可以看到截图如下:

    a59c37eec531aa0bdb191d5d496df849.png

    到这里,mysql已经安装成功了。

    d7e4af96f5f771b11e875ff93cfc2bdd.png

    2、安装es

    docker pull elasticsearch:7.16.2
    docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --name='es7162' -d elasticsearch:7.16.2

    注意如果拉取不出对应的版本,可以上https://registry.hub.docker.com/_/elasticsearch?tab=tags&page=1&ordering=last_updated,查看对应的版本再拉取。我之前是拉取7.15.2的实验的,后来过来几天发现这版本已经拉取不了了,就改成了7.16.2。或者换低一点的版本也可以。 5042971929fd14ac76ada4696515defa.png

    查看https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-elasticsearch/2.6.2,得到版本依赖关系,在springboot2.6.2版本下,7.15.2和7.16.2都可以用。

    48c87da91ad1cb6fbcd6cae5f5099d92.png

    docker启动es:

    2988733f2055405bdea59064fe9a7dc8.png

    87c155f314333c0b509bc321ab131f94.png

    然后我们需要配置一下es的信息:

    docker exec -ites es7162 /bin/bash
    cd config
    vi elasticsearch.yml

    配置文件:

    cluster.name: dailyhub-es
    network.host: 0.0.0.0
    
    node.name: node-1
    http.port: 9200
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    node.master: true
    node.data: true

    docker restart es7162 重启es,注意千万别写错配置的信息,否则启动会失败,启动失败是后可以通过docker logs -f es7162查看原因,但也只能重新来了。然后服务器访问:

    // 查询es所有mapping
    http://119.45.25.164:9200/_mapping?pretty=true

    注意如果是云服务器的话,要在安全组中配置对应的端口开放、还有防火墙啥的,然后安全些的话,还需要给es配合账号密码啥的。我这里为了实验就简单来了。

    安装中文分词器

    可以有两种方式安装中文分词器,如果在线安装的时候分词器插件下载不下来那就只能离线安装了。

    1、在线安装中文分词器:

    docker exec -ites es7162 /bin/bash
    
    ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.16.2/elasticsearch-analysis-ik-7.16.2.zip

    2e062a8ba71f7e7ddbc984b618f56120.png

    2、离线安装中文分词器:

    首先打开这个链接:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.16.2,把分词器插件下载下来,

    # 把插件复制到容器内
    docker cp elasticsearch-analysis-ik-7.16.2.zip es7162:/usr/share/elasticsearch/plugins
    
    docker exec -it es7162 /bin/bash
    cd /usr/share/elasticsearch/plugins/
    mkdir ik
    unzip elasticsearch-analysis-ik-7.16.2.zip -d ik
    rm -rf elasticsearch-analysis-ik-7.16.2.zip

    c6da7e038a0df7e1470f1a9499199924.png

    重启es,查看日志是否加载ik分词器成功!

    docker restart es7162
    docker logs es7162

    9048f4a72182bbfb67718c54be992d13.png

    当你看到日志中有输出analysis-ik,说明已经安装成功。

    3、安装canal-server

    拉取镜像并启动:

    docker pull canal/canal-server:v1.1.5
    
    docker run --name canal115 -p 11111:11111  --link mysql5736:mysql5736 -id canal/canal-server:v1.1.5

    修改对应的配置:

    docker exec -it canal115 /bin/bash
    cd canal-server/conf/example/
    vi instance.properties  // 修改配置
    
    # 把0改成10,只要不和mysql的id相同就行
    canal.instance.mysql.slaveId=10
    # 修改成mysql对应的账号密码,mysql5736就是mysql镜像的链接别名
    canal.instance.master.address=mysql5736:3306
    canal.instance.dbUsername=root
    canal.instance.dbPassword=admin

    c83b0403de9358b147243161f0f05ac1.png

    验证配置是否成功:

    #首先重启一下canal
    docker restart  canal115
    
    docker exec -it canal115 /bin/bash
    cd canal-server/logs/example/
    tail -100f example.log  // 查看日志

    截图如下,说明已经链接上了mysql主机,此时mysql中的数据变化,都会在canal中有同步。 bbaf47414555d3c38e84a2fbf500ac48.png

    可以通过Java程序测试有没连接上mysql:

    导入canal-client包

    <!-- 为了测试canal-server是否连接mysql成功,1.1.5版本少包,所以用1.1.4版本 -->
    <dependency>
       <groupId>com.alibaba.otter</groupId>
       <artifactId>canal.client</artifactId>
       <version>1.1.4</version>
    </dependency>
    • com.markerhub.SimpleCanalClientExample

    /**
     * 公众号:MarkerHub
     *
     * 说明:用于测试canal是否已经连接上了mysql
     */
    public class SimpleCanalClientExample {
        public static void main(String args[]) {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("119.45.25.164",
                    11111), "example", "", "");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                int totalEmptyCount = 120;
                while (emptyCount < totalEmptyCount) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        emptyCount = 0;
                        // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                        printEntry(message.getEntries());
                    }
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
        private static void printEntry(List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
                EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------&gt; before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------&gt; after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
        private static void printColumn(List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }

    当mysql的数据更新时候效果如下: d6b2cd75b9af8e9be7044c28125aa9b2.png

    注意当后面canal-adapter也连接上canal-server后,程序就监听不到数据变化了。

    4、安装canal-adapter

    由于目前canal-adapter没有官方docker镜像,所以拉去一个非官方的

    docker pull slpcat/canal-adapter:v1.1.5
    
    docker run --name adapter115 -p 8081:8081 --link mysql5736:mysql5736 --link canal115:canal115 --link es7162:es7162 -d slpcat/canal-adapter:v1.1.5

    3c7677d9e5484b4b688965aa1a42c245.png

    修改配置:

    docker exec -it adapter115 /bin/bash
    cd conf/
    vi application.yml

    配置修改如下,一些不需要的配置或者注释掉的配置可以删除掉:

    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: tcp #tcp kafka rocketMQ rabbitMQ
      flatMessage: true
      zookeeperHosts:
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      consumerProperties:
        # canal tcp consumer
        canal.tcp.server.host: canal115:11111
        canal.tcp.zookeeper.hosts:
        canal.tcp.batch.size: 500
        canal.tcp.username:
        canal.tcp.password:
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://mysql5736:3306/dailyhub?useUnicode=true
          username: root
          password: admin
      canalAdapters:
      - instance: example # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
          - name: es7
            hosts: es7162:9200 # 127.0.0.1:9200 for rest mode
            properties:
              mode: rest
              # security.auth: test:123456 #  only used for rest mode
              cluster.name: dailyhub-es

    f0350df582a818a8d6b62b40a9f07ad0.png

    接下来是修改表映射索引文件:

    docker exec -it adapter115 /bin/bash
    cd conf/es7
    
    cp -v mytest_user.yml dailyhub_collect.yml
    # 删除其他多余的
    rm -rf biz_order.yml customer.yml mytest_user.yml
    vi dailyhub_collect.yml

    配置文件:

    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    esMapping:
      _index: dailyhub_collect
      _id: _id
      _type: _doc
      upsert: true
    #  pk: id
      sql: "
    SELECT
            c.id AS _id,
            c.user_id AS userId,
            c.title AS title,
            c.url AS url,
            c.note AS note,
            c.collected AS collected,
            c.created AS created,
            c.personal AS personal,
            u.username AS username,
            u.avatar AS userAvatar
    FROM
            m_collect c
    LEFT JOIN m_user u ON c.user_id = u.id
    
    "
    #  objFields:
    #    _labels: array:;
    #   etlCondition: "where c.c_time>={}"
      commitBatch: 3000

    注意对于时间类型,在后端一定要使用LocalDateTime或者LocalDate类型,如果是Date类型,需要自己手动设置格式。

    5、联合测试

    然后就可以直接测试了,准备测试条件:

    • 在数据库中生成表和字段,

    • 然后elasticsearch中生成索引。先新建数据库dailyhub。然后数据表结构:

    SET FOREIGN_KEY_CHECKS=0;
    
    -- ----------------------------
    -- Table structure for m_collect
    -- ----------------------------
    DROP TABLE IF EXISTS `m_collect`;
    CREATE TABLE `m_collect` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `collected` date DEFAULT NULL,
      `created` datetime(6) DEFAULT NULL,
      `note` varchar(255) DEFAULT NULL,
      `personal` int(11) DEFAULT NULL,
      `title` varchar(255) DEFAULT NULL,
      `url` varchar(255) DEFAULT NULL,
      `user_id` bigint(20) DEFAULT NULL,
      PRIMARY KEY (`id`),
      KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),
      CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;
    
    -- ----------------------------
    -- Records of m_collect
    -- ----------------------------
    
    -- ----------------------------
    -- Table structure for m_user
    -- ----------------------------
    DROP TABLE IF EXISTS `m_user`;
    CREATE TABLE `m_user` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `avatar` varchar(255) DEFAULT NULL,
      `created` datetime(6) DEFAULT NULL,
      `lasted` datetime(6) DEFAULT NULL,
      `open_id` varchar(255) DEFAULT NULL,
      `statu` int(11) DEFAULT NULL,
      `username` varchar(255) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
    
    -- ----------------------------
    -- Records of m_user
    -- ----------------------------
    INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05 16:08:40.042000', '2022-01-06 13:07:45.153000', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', '公众号:MarkerHub');

    接下来借postman来新建elasticsearch的索引:

    # 创建索引并添加映射字段
    PUT http://119.45.25.164:9200/dailyhub_collect
    
    {
        "mappings": {
            "properties": {
                "collected": {
                    "type": "date",
                    "format": "date_optional_time||epoch_millis"
                },
                "created": {
                    "type": "date",
                    "format": "date_optional_time||epoch_millis"
                },
                "note": {
                    "type": "text",
                    "analyzer": "ik_max_word",
                    "search_analyzer": "ik_smart"
                },
                "personal": {
                    "type": "integer"
                },
                "title": {
                    "type": "text",
                    "analyzer": "ik_max_word",
                    "search_analyzer": "ik_smart"
                },
                "url": {
                    "type": "text"
                },
                "userAvatar": {
                    "type": "text"
                },
                "userId": {
                    "type": "long"
                },
                "username": {
                    "type": "keyword"
                }
            }
        }
    }

    e461ed9b54ad5527f44dd41d783388ec.png

    其他常用操作:

    # 删除索引
    PUT  http://119.45.25.164:9200/dailyhub_collect
    
    # 查看素有索引映射
    GET  http://119.45.25.164:9200/_mapping?pretty=true
    
    # 搜索文档
    GET http://119.45.25.164:9200/dailyhub_collect/_search
    
    # 删除ID为1的文档
    DELETE  http://119.45.25.164:9200/dailyhub_collect/_doc/1

    然后我们打开canal-adapter的输入日志:

    docker logs --tail 100  -f adapter115

    然后我们在mysql的m_collect中新添加一条记录,可以看到日志输出如下: 9201b2552059cbc5a9dccf4c55100479.png

    然后搜索全部文档,发现es中有数据啦。

    f4ce0c4736dedea7f40a94ec6610be9d.png

    如果看到adaptar115一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysql、es、canal、adapar

    2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!

    到这里,实验成功,over,关注公众号:MarkerHub,带你做更多Java实验!视频讲解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/

    展开全文
  • canal同步mysql数据到es

    2020-06-10 18:35:59
    canal同步mysql数据到es 更多请看:https://github.com/alibaba/canal/wiki 1.环境 centos7 jdk1.8 ElasticSearch:6.x(我的是6.7.1) Kibana:6.x canal.deployer:1.1.4(不支持es 7版) canal.adapter:1.1.4 下载...

    canal同步mysql数据到es

    在这里插入图片描述
    更多请看:https://github.com/alibaba/canal/wiki

    1.环境

    centos7
    jdk1.8
    ElasticSearch:6.x(我的是6.7.1)
    Kibana:6.x
    canal.deployer:1.1.4(不支持es 7版)
    canal.adapter:1.1.4

    下载地址: https://github.com/alibaba/canal/releases
    说明:这里需要下载的是:
    1、canal.deployer1.1.4版本- - - 可以理解为相当于canal的服务端
    2、canal.adapter1.1.4版本- - - 可以理解为相当于canal的插件
    3、最新的是1.1.5版本,但是是快照版,我们这里还是选择稳定版本
    es和kibana的安装:https://blog.csdn.net/qq_40198004/article/details/89194129

    mysql的配置

    开启MySQL的 binlog 写入功能,配置 binlog-format 为 ROW 模式
    找到my.cnf文件,我的是/etc/my.cnf,添加以下配置:

    log-bin=mysql-bin   # 开启 binlog
    binlog-format=ROW   # 选择 ROW 模式
    server_id=1        # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    

    在这里插入图片描述
    重启mysql:service mysql restart
    查看是否开启 binlog: show variables like '%log_bin%';
    在这里插入图片描述
    准备:es_db数据库

    CREATE DATABASE es_db;
    

    创建tb_item表

    CREATE TABLE `tb_item` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '商品id,同时也是商品编号',
      `title` varchar(100) NOT NULL COMMENT '商品标题',
      `sell_point` varchar(500) DEFAULT NULL COMMENT '商品卖点',
      `price` decimal(20,2) NOT NULL COMMENT '商品价格,单位为:分',
      `num` int(10) NOT NULL COMMENT '库存数量',
      `brand` varchar(255) DEFAULT NULL COMMENT '品牌',
      `barcode` varchar(30) DEFAULT NULL COMMENT '商品条形码',
      `image` varchar(500) DEFAULT NULL COMMENT '商品图片',
      `cid` bigint(10) NOT NULL COMMENT '所属类目,叶子类目',
      `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '商品状态,1-正常,2-下架,3-删除',
      `created` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
      `updated` timestamp NULL DEFAULT NULL COMMENT '更新时间',
      PRIMARY KEY (`id`),
      KEY `cid` (`cid`),
      KEY `status` (`status`),
      KEY `updated` (`updated`),
      KEY `title` (`title`)
    ) ENGINE=InnoDB  DEFAULT CHARSET=utf8 COMMENT='商品表';
    

    canal.deployer修改配置

    打开canal.deployer-1.1.4/conf/example/instance.properties修改

    ## mysql serverId , v1.0.26+ will autoGen
    #  这个东西可以不设置,如果要设置别和上面mysql配置文件中的值重复就行
    # canal.instance.mysql.slaveId=0
    
    # enable gtid use true/false
    canal.instance.gtidon=false
    
    # position info
    #mysql启动的ip:端口  #例如:192.168.34.66:3306
    canal.instance.master.address=192.168.221.132:3306
    
    ## 这个可以根据需要修改过滤,默认是直接监听所有
    canal.instance.filter.regex=.*\\..*
    
    # username/password 自己的用户名和密码(也可以是root)
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.connectionCharset = UTF-8
    

    启动./bin/startup.sh
    查看日志:cat logs/canal/canal.log

    canal.adapter修改配置

    1.修改adapter-1.1.4/conf/application.yml

    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: tcp # kafka rocketMQ
    ## 修改成自己canal-deployer启动的ip和端口 默认端口是:11111
      canalServerHost: 192.168.221.132:11111
    #  zookeeperHosts: slave1:2181
    #  mqServers: 127.0.0.1:9092 #or rocketmq
    #  flatMessage: true
      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://192.168.221.132:3306/es_db?useUnicode=true # mysql的地址和端口
          username: canal
          password: canal
      canalAdapters:
      - instance: example # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
    #      - name: rdb
    #        key: mysql1
    #        properties:
    #          jdbc.driverClassName: com.mysql.jdbc.Driver
    #          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
    #          jdbc.username: root
    #          jdbc.password: 121212
    #      - name: rdb
    #        key: oracle1
    #        properties:
    #          jdbc.driverClassName: oracle.jdbc.OracleDriver
    #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
    #          jdbc.username: mytest
    #          jdbc.password: m121212
    #      - name: rdb
    #        key: postgres1
    #        properties:
    #          jdbc.driverClassName: org.postgresql.Driver
    #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
    #          jdbc.username: postgres
    #          jdbc.password: 121212
    #          threads: 1
    #          commitSize: 3000
    #      - name: hbase
    #        properties:
    #          hbase.zookeeper.quorum: 127.0.0.1
    #          hbase.zookeeper.property.clientPort: 2181
    #          zookeeper.znode.parent: /hbase
          - name: es
            hosts: 192.168.221.132:9300 # 127.0.0.1:9200 for rest mode
            properties:
    #          mode: transport # or rest
    #          # security.auth: test:123456 #  only used for rest mode
              cluster.name: my-application
    
    
    

    注意和es中配置对应
    在这里插入图片描述
    2.进入到adapter-1.1.4/conf/es文件中
    cp customer.yml canal.yml
    在这里插入图片描述

    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    esMapping:
      _index: item
      _type: _doc
      _id: id
      relations:
        customer_order:
          name: customer
      sql: "select t.id,t.title,t.sell_point,t.price,t.num,t.brand,t.barcode,t.image,t.cid,t.status,t.created,t.updated from tb_item t"
      etlCondition: "where t.c_time>={}"
      commitBatch: 3000
    

    mappings不要配
    在这里插入图片描述
    3.然后在kibana创建索引和映射
    注意
    6.x是有type的且必须都对应

    PUT /item
    {
        "mappings": {
            "_doc": {
                "properties": {
                    "id": {
                        "type": "long"
                    },
                    "title": {
                        "type": "text"
                    },
                    "sell_point": {
                        "type": "text"
                    },
                    "price": {
                        "type": "text"
                    },
                    "num": {
                        "type": "integer"
                    },
                    "brand": {
                        "type": "text"
                    },
                    "barcode": {
                        "type": "text"
                    },
    				"image": {
                        "type": "text"
                    },
    				"cid": {
                        "type": "long"
                    },
    				"status": {
                        "type": "short"
                    },
    				"created": {
                        "type": "date"
                    },
    				"updated": {
                        "type": "date"
                    }
                }
            }
        }
    }
    

    4.启动:./bin/startup.sh
    查看日志:cat logs/adapter/adapter.log
    在这里插入图片描述
    adapter-1.1.4/conf/es的yml
    在这里插入图片描述
    在这里插入图片描述
    4.插入es_db数据库的tb_item表

    INSERT INTO `tb_item` VALUES ('3', '三星S10', '下单即送10400毫安移动电源!再赠手机魔法盒!', '7000.00', '10', '三星', null, 'http://image.jt.com/jd/c1775819c7e44b1c903f27514e70b998.jpg', '3', '1', '2015-03-08 21:29:27', '2015-03-08 21:29:27');
    

    在这里插入图片描述
    5.更新数据

    UPDATE tb_item set title='华为P40',brand='华为' where id=3;
    

    在这里插入图片描述
    增删改是可以的,删除自己试试吧
    参考:
    https://blog.csdn.net/tuesdayma/article/details/103294448
    https://segmentfault.com/a/1190000019066098?utm_source=tag-newest
    https://www.cnblogs.com/dalaoyang/p/11069850.html
    https://www.cnblogs.com/caoweixiong/p/11825303.html

    展开全文
  • 使用canal实时同步MySQL数据到es软件版本信息安装 elasticsearch安装 kibana下载和安装canal1、下载canal2、配置MySQL3、配置 canal-server (canal-deploy)4、配置 canal-adapter5、配置canal-admin同步MySQL到es1、...
  • 使用canal同步mysql数据到es

    千次阅读 2020-08-27 17:53:08
    canal client demo: package com.canalclient.process; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import ...
  • canal于v1.1.2版本后,已支持自动同步到Elasticsearch。赞canal! 基于运河的Mysql的与Elasticsearch实时同步的的JavaWeb服务。 canal是阿里巴巴mysql数据库binlog的增量订阅和消费组件。 工作原理 全量 暴露的...
  • 该楼层疑似违规已被系统折叠隐藏此楼查看此楼环境:jdk1.8 、centos7、 elasticsearch7.6 、canal1.4,mysql5.7(以上所有全部安装在centos7上运行)按照GitHub配置完启动canal-adapter出现以下错误:java.lang....
  • 基于 canalMysqlElasticsearch 实时同步的 javaweb 服务。 canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件。 工作原理 全量 暴露Http接口(接口定义见),待调用后开启后台线程,通过主键分批同步...
  • canal同步mysql数据到elasticsearch7.15.0

    千次阅读 2021-11-03 16:45:38
    1.准备 官方网址:https://github.com/alibaba/canal/wiki/QuickStart 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为...server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId
  • canal同步mysql

    2018-12-12 16:48:02
    canal同步mysql,基于springboot2.0.6,使用undertow容器,可直接运行。
  • 最近在做的项目中有用到canal实时同步MySQL数据,并且写入es的场景,总结了一些心得,以备后查。总体同步的流程图如下:MySQL-es process.png链路中的环节稍微解释下:binlog MySQL的自身的操作日志,用来记录数据...
  • canal可实现对es数据进行增删改的增量同步,删除是真正的物理删除,这点是logstash不具备的
  • 使用canal同步mysql数据到es,时间格式转换问题。 常规安装部署以及使用暂不提供,百度搜一堆。。。 环境: canal1.1.5,mysql8,Elasticsearch6.7 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , ...
  • MysqlToElasticSearchCanal利用Canal同步mysql数据到ElasticSearch,通过修改配置文件支持动态同步Mysql数据。主要介绍一下Canal的主要配置信息。CanalCanal介绍canal是阿里巴巴旗下的一款开源项目,纯Java开发。...
  • Docker(同步mysql数据到elasticsearch)一、版本环境二、docker安装mysql,并配置binlog2.1 安装mysql2.2 修改mysql配置2.3 验证mysql binlog配置2.4 查看日志文件三、docker安装elasticsearch,并建立索引3.1 安装...
  • canal安装 数据同步一:java代码实现 数据同步二:mysql同步到myql中 数据同步三:mysql同步到es
  • 通过canalMySQL数据同步到Elasticsearch

    千次阅读 2020-09-01 11:10:35
    当您需要将MySQL中的增量数据同步Elasticsearch时,可通过Canal来实现。 本文以阿里云Elasticsearch和RDS MySQL为例,为您介绍数据同步的方法。阿里云Elasticsearch兼容开源Elasticsearch的功能,以及Security、...
  • canal实现同步mysqles

    千次阅读 2022-03-08 15:33:51
    一、canal 简介 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务...当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x ,

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,163
精华内容 1,265
关键字:

canal同步mysql数据到es

mysql 订阅