canal 订阅
canal,英语单词,名词、及物动词,作名词时意为“运河;[地理] 水道;[建] 管道;灌溉水渠”。作及物动词时意为“在…开凿运河”。 展开全文
canal,英语单词,名词、及物动词,作名词时意为“运河;[地理] 水道;[建] 管道;灌溉水渠”。作及物动词时意为“在…开凿运河”。
信息
发    音
英[kəˈnæl];美[kəˈnæl]
意    思
运河;在…开凿运河
词    性
名词、及物动词
外文名
canal
canal短语搭配
Erie Canal伊利运河Kiel Canal基尔运河inguinal canal[解剖]腹股沟管Spinal canal椎管 ;[脊椎]髓管 ; 脊髓管 ;[解剖]脊椎管resin canal[植]树脂道canal dues[税收]运河通行税 ;[税收]运河税 ;[税收]运河通行费Vertebral canal[解剖]椎管nasolacrimal canal[解剖]鼻泪管pit canal[植]纹孔道 ; 孔沟 [1] 
收起全文
精华内容
下载资源
问答
  • canal
    2021-04-19 22:03:48

    工作原理

    • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
    • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    • canal 解析 binary log 对象(原始为 byte 流)

    优点: 可以完全和业务代码解耦,增量日志订阅。

    缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。

    配置Mysql

    set global validate_password_policy=0; 
    set global validate_password_length=1;
    
    use mysql;
    create user 'canal'@'%' identified by 'canal'; 
    grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal';
    
    flush privileges;
    
    SHOW VARIABLES LIKE 'BINLOG_format';
    SHOW VARIABLES LIKE 'log_bin';
    show master status;
    SHOW GRANTS FOR 'canal'@'192.168.199.105'
    
    创建测试数据库es_test
    
    创建表
    CREATE TABLE `user` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `name` varchar(256) COLLATE utf8mb4_unicode_ci NOT NULL,
      `detail` text COLLATE utf8mb4_unicode_ci NOT NULL,
      `age` int(3) NOT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
    

    下载Canal

    canal下载地址:https://github.com/alibaba/canal/releases
    下载最新的cana1.1.5,1.1.5 才支持es7
    
    mkdir -p /data/canal
    canal.deployer-1.1.5-SNAPSHOT.tar.gz
    canal.adapter-1.1.5-SNAPSHOT.tar.gz
    canal.admin-1.1.5-SNAPSHOT.tar.gz
    
    解压
    mkdir -p /data/canal/canal-adapter/ && tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C ./canal-adapter/
    mkdir -p /data/canal/canal-deployer/ && tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C ./canal-deployer/
    mkdir -p /data/canal/canal-admin/ && tar -zxvf canal.admin-1.1.5-SNAPSHOT.tar.gz -C ./canal-admin/
    
    

    canal-admin

    https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart
    
    设置密码
    select password('12345678')
    84AAC12F54AB666ECFC2A83C676908C8BBC381B1
    https://www.bookstack.cn/read/canal-v1.1.4/5fb1e3d9c31b6e21.md
    
    启动
    sh bin/startup.sh
    
    

    在这里插入图片描述

    canal-deployer

    cd canal-deployer
    vim conf/example/instance.properties
    
    配置mysql的相关信息
    canal.instance.master.address=192.168.199.101:3306
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    #table regex
    canal.instance.filter.regex=es_test\\..*
    
    启动Canal-server,并查看日志。
    ./bin/startup.sh
    cat logs/canal/canal.log
    2021-04-17 23:39:07.773 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
    
    com.alibaba.otter.canal.common.CanalException: instance : example config is not found
    
    sh bin/startup.sh local
    
    2021-04-19 20:16:29.028 [New I/O server worker #1-1] ERROR c.a.o.c.common.zookeeper.running.ServerRunningMonitor - start failed
    com.google.common.util.concurrent.UncheckedExecutionException: com.alibaba.otter.canal.server.exception.CanalServerException: can't find destination:example
    	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) ~[guava-18.0.jar:na]
    
    
    删掉/data/canal/canal-deployer/conf/example/meta.dat
    mv conf/example/meta.dat conf/example/meta.dat.bak
    mv conf/example/h2.mv.db conf/example/h2.mv.db.bak
    或者修改canal.instance.global.lazy = true
    
    还是不行就得手工去canal-admin新建instance
    
    查看日志
    tail -100f  logs/canal/canal.log
    tail -100f  logs/example/example.log
    

    canal-adapter

    cd canal-adapter
    vim conf/application.yml
    srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.199.101:3306/es_test?useUnicode=true
      username: root
      password: 123456
    canalAdapters:
    - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        key: es71 #这个很重要,必须设置唯一,不然有坑
        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: es-cluster
    
    
    在conf/es7/目录下新建user.yml,定义MySQL数据到ES数据的映射字段。
    dataSourceKey: defaultDS
    outerAdapterKey: es71
    destination: example
    groupId: g1
    esMapping:
      _index: user
      _type: _doc
      _id: _id
      upsert: true
    #  pk: id
      sql: "select id as _id,id,name,detail,age from user"
      etlCondition: "where id<='{0}'" #etl的条件参数,可以将之前没能同步的数据同步,数据量大的话可以用logstash
      commitBatch: 3000
    
    chmod 777 user.yml
    
    启动Canal-adapter服务,并查看日志。
    ./bin/startup.sh
    tail -100f logs/adapter/adapter.log
    

    测试验证

    es安装中文分词
    下载地址
    https://github.com/medcl/elasticsearch-analysis-ik/releases
    https://github.com/medcl/elasticsearch-analysis-pinyin/releases/download/v7.10.2/elasticsearch-analysis-pinyin-7.10.2.zip
    
    root安装
    ./bin/elasticsearch-plugin install file:///data/efk/elasticsearch-analysis-ik-7.10.2.zip
    ./bin/elasticsearch-plugin install file:///data/efk/elasticsearch-analysis-pinyin-7.10.2.zip
    
    [root@yfm05 elasticsearch-7.10.2]# ./bin/elasticsearch-plugin list
    analysis-icu
    analysis-ik
    analysis-pinyin
    
    重启es
    Likely root cause: java.nio.file.AccessDeniedException: /data/efk/elasticsearch-7.10.2/config/analysis-ik
    chown -R elastic:elastic /data/efk/elasticsearch-7.10.2
    
    
    创建索引,索引一旦建立,主分片数量不可改变
    创建mapping
    PUT user/_mapping
    {
      "settings": {
        "index": {
          "number_of_shards": "2",
          "number_of_replicas": "1"
        }
      },
      "mappings": {
        "properties": {
          "age": {
            "type": "integer"
          },
          "id": {
            "type": "long"
          },
          "name": {
            "type": "text",
            "analyzer": "ik_smart"
          },
          "detail": {
            "type": "text",
            "analyzer": "ik_max_word"
          }
        }
      }
    }
    
    [root@yfm05 canal-deployer]# curl http://192.168.199.105:8081/etl/es7/es71/user.yml -X POST
    {"succeeded":true,"resultMessage":"导入ES 数据:1 条"}
    
    canal-adapter自动监听日志
    2021-04-19 21:53:08.506 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":5,"name":"5","detail":"5","age":5}],"database":"es_test","destination":"example","es":1618840388000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"user","ts":1618840388457,"type":"INSERT"}
    2021-04-19 21:53:08.640 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":5,"name":"5","detail":"5","age":5}],"database":"es_test","destination":"example","es":1618840388000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"user","ts":1618840388457,"type":"INSERT"} 
    Affected indexes: user 
    2021-04-19 21:53:30.780 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":5,"name":"5","detail":"5","age":5}],"database":"es_test","destination":"example","es":1618840410000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"user","ts":1618840410780,"type":"DELETE"}
    2021-04-19 21:53:30.781 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":5,"name":"5","detail":"5","age":5}],"database":"es_test","destination":"example","es":1618840410000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"user","ts":1618840410780,"type":"DELETE"} 
    Affected indexes: user 
    
    

    在这里插入图片描述

    更多相关内容
  • canal-python 一.canal-python 简介 canal-python 是阿里巴巴开源项目 是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 python 客户端。为 python 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql...
  • Canal 是mysql数据库binlog的增量订阅&消费组件。 基于日志增量订阅&消费支持的业务: 数据库镜像 数据库实时备份 多级索引 (卖家和买家各自分库索引) search build 业务cache刷新 价格变化等重要业务消息 关于 ...
  • 之前说过,我们使用statefuset类型使canal-server域名固定之后又挂载pv使server即使重启也不会丢失数据,本以为万事大吉,没想到在用adapter同步数据时发现还是出了bug… 问题 使用域名注册server之后,马上启了个...
  • 易用的canal 客户端 easy canal client 介绍 canal 是阿里巴巴mysql数据库binlog的增量订阅&消费组件 使用该客户端前请先了解canal, canal 自身提供了简单的客户端,如果要转换为数据库的实体对象,处理消费数据要...
  • springboot整合canal

    2022-02-23 14:31:13
    在springboot中整合与使用canal
  • canal 1.1.5安装包

    2020-05-27 17:02:56
    引入canal-admin工程,支持面向WebUI的canal管理能力 Canal-Admin-Guide Canal-Admin-Docker canal-server新增基于账号密码的ACL支持能力 canal-server新增admin动态运维指令,配合canal-admin工程动态管理订阅关系 ...
  • canal_mysql_elasticsearch_sync支持请星 :sparkles: 交流Q群:733688083 canal于v1.1.2版本后,已支持自动同步到Elasticsearch。赞canal! 基于运河的Mysql的与Elasticsearch实时同步的的JavaWeb服务。 canal是...
  • 稳定,解决class com.alibaba.druid.pool.DruidDataSource cannot be cast to class com.alibaba.druid.pool.DruidDataSource 报错问题
  • canal1.1.5.zip

    2020-12-13 13:44:45
    canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务...
  • canal-1.1.5.zip

    2021-09-22 17:30:01
    canal.adapter-1.1.5.tar.gz canal.deployer-1.1.5.tar.gz
  • canal开发者使用文档

    2020-11-05 16:46:29
    canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB),支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x.
  • k8s集群下canal-server的伪高可用实践前言问题解决方案总结 前言 前面我们已经介绍了canal的admin、server、adapter三个部分的容器化以及在k8s集群下的搭建过程。在创建canal-server的时候,k8s环境下,容器重启会...
  • canal-1.1.6-SNAPSHOT.rar

    2021-11-25 15:12:56
    jdk17安装Canal的安装文件,解压缩之后修改配置文件即可
  • canal.deployer-1.1.5.zip

    2021-10-21 15:29:40
    阿里的开源数据库同步工具 canal,window版本 linux版本都可以用,里面的配置文件我基本上配置好了
  • 最新版阿里开源中间件canal实现mysql数据库同步,零侵入不写代码实现,也可以通过整合到项目程序实现更加灵活的控制。详细使用方法:https://blog.csdn.net/u014374009/category_9409106.html
  • 阿里巴巴的canal的1.1.4版本的安装包。里面包含了canal.admin-1.1.4.tar.gz、canal.deployer-1.1.4.tar.gz、canal.example-1.1.4.tar.gz、canal-canal-1.1.4.tar.gz、canal-canal-1.1.4.zip
  • 使用docker容器来安装canal和mysql 待解决的问题:监听mysql以后对数据据库所做的操作canal监听不到 怀疑canal没有配置好,检查canal的日志后发现异常: 2020-02-15 10:58:24.506 [destination = example , address ...
  • canal_mysql_elasticsearch_sync 支持请star:sparkles: 基于 canal 的 Mysql 与 Elasticsearch 实时同步的 javaweb 服务。 canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件。 工作原理 全量 暴露Http接口...
  • 畅购商城 canal依赖包
  • canal.deployer-1.1.4.tar ; canal.admin-1.1.4.tar.gz ; canal.adapter-1.1.4.tar.gz ; canal.example-1.1.4.tar.gz 官网下载非常不易。
  • CanAl入门

    2021-01-07 05:24:00
    一、为什么要使用canal 1、作用:同步mysql 主要有两种方式: (1)做拉链表 (2)更新redis 2、某些情况无法从日志里获取信息,而又无法利用sqoop等ETL工具对数据实时的监控 二、canal工作原理 1、mysql主备复制的...
  • binlog开源同步组件canal部署包 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行...
  • 去运河 简单配置,可将数据库变更记录记录递递到系统中 准备 对于自建MySQL,需要先开启Binlog写入功能,配置binlog-format为ROW模式,my.cnf中配置如下 ... CREATE USER canal IDENTIFIED BY ' canal ' ; GRANT S
  • Canal

    千次阅读 2022-04-01 11:10:07
    1、什么是canal Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。 目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。...

    第一章 初识 Canal

    1、什么是canal

    Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。


    Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。
    在这里插入图片描述

    2、什么是Binlog

    MySql 的二进制日志可以说是Mysql 最重要的日志了,它记录了所有 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,包含语句执行的消耗的时间,Mysql 的二进制日志是事务安全型的。一般来说:开启二进制日志大概会有 1% 的性能消耗。二进制有两个最重要的使用场景


    其一: Mysql Replication 在Master 端开启 Binlog,Master 把它的二进制日志传递给 Slaves 来达到 Master-Slave 数据一致性的目的。


    其二: 就是数据恢复了,通过使用 Mysql Binlog 工具来使数据恢复。

    二进制日志包含两类文件:二进制日志索引文件(文件名后缀为 .index)用于记录所有二进制文件,二进制日志文件(文件名后缀为 .00000*)记录数据库所有 DDL 和 DML(除了数据查询语句)语句事件。

    3、Binlog 的分类

    Mysql Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配置 binlog_format=statement|mixed|row.。三种格式区别:


    1)statement:语句级,binlog会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如:“update tt set create_date=now()”,如果用binlog进行恢复,由于执行时间不同可能产生的数据就不同。

      	优点:节省空间
      	缺点:有可能造成数据不一致
    

    2)row:行级,binlog 会记录每次操作后每行记录的变化。

      	优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,它只记录执行后的结果。
      	缺点:占用较大空间
    

    3)mixed:statement 的升级版,一定程度上解决了,因为一些情况造成的 statement 模式不一致问题,默认还是statement,在某些情况下譬如: :当函数中包含 UUID() 时;包含
    AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照 row 的方式进行处理

      	优点:节省空间,同时兼顾了一定的一致性。
      	缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 的监控的情况都不方便。
    

    综合上面对比,Canal 想做监控分析,选择 row 格式比较合适。

    4、Canal 工作原理

    4.1 Mysql 主从复制过程

    1)Master 主库将改变记录,写到二进制日志(Binary Log)中;


    2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log);


    3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库

    在这里插入图片描述

    4.2 Canal 的工作原理

    很简单,就是把自己伪装成 Slave ,假装从 Master 复制数据

    4.3 Canal 的使用场景

    1)原始场景:阿里 Otter 中间件一部分
    Otter 是阿里用于进行异地数据库之间的同步框架,Canal是其中一部分。
    在这里插入图片描述


    2)常见场景一:更新缓存
    在这里插入图片描述


    3)常见场景二:抓取业务表的新增变化数据,用于制作实时统计(我们就是这种场景)

    第二章 Mysql 准备

    1、创建数据库

    在这里插入图片描述

    2、创建表

    CREATE TABLE user_info(
    `id` VARCHAR(255),
    `name` VARCHAR(255),
    `sex` VARCHAR(255)
    );
    

    3、修改配置文件开启 Binlog

    vi /etc/my.cnf
    

    在这里插入图片描述
    注意:binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库,如果不配置则表示所有数据库均开启 Binlog

    4、重启 MySQL 使配置生效

     systemctl restart mysqld
    

    到 /var/lib/mysql 目录下查看初始文件大小 154

    在这里插入图片描述

    5、测试 Binlog 是否开启

    插入数据

    INSERT INTO user_info VALUES('1000','zhangsan','male');
    

    再次查看 index 文件大小
    在这里插入图片描述

    6、赋权限

    在 mysql 中执行

    set global validate_password_length=4;
    
    set global validate_password_policy=0;
    
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
    

    第三章 Canal 下载和安装

    1、下载并解压 Jar 包

    下载地址 https://github.com/alibaba/canal/releases

    2、查看 canal.properties 的配置(不做修改)

    在这里插入图片描述

    说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的输出 model,默认 tcp,改为输出到 kafka多实例配置如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。

    3、修改 instance.properties 的配置

    我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在conf/example 目录下
    1)配置 MySQL 服务器地址
    2)配置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal

    在这里插入图片描述

    第四章 实时监控测试

    1、TCP 模式测试

    1.1 创建 canal 项目

    1.2 导入依赖

     <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.1.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.4.1</version>
    </dependency>
    

    1.3 通用监视类 ——CanalClient

    Canal 封装的数据结构

    在这里插入图片描述
    在 canal 模块下创建 com.ausware.app 包,并在包下创建 CanalClient(java 代码)
    代码如下:

    /**
     * canal客户端
     * @author Yao
     * @Date: 2022/2/15 15:28
     */
    public class CanalClient {
        public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
            //连接Canal服务器
            CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("node1", 11111), "example", "", "");
            while (true){
                canalConnector.connect();
                //订阅数据库 .* 代表该数据库下所有表
                canalConnector.subscribe("canal_test.*");
                //单次拉取1000条变化数据
                Message message = canalConnector.get(100);
                List<CanalEntry.Entry> entries = message.getEntries();
                if (CollectionUtils.isEmpty(entries)){
                    System.out.println("本次拉取数据为空,稍后拉去....");
                    TimeUnit.SECONDS.sleep(2);
                }else {
                    for (CanalEntry.Entry entry : entries) {
                        //获取表名
                        String tableName = entry.getHeader().getTableName();
                        //
                        CanalEntry.EntryType entryType = entry.getEntryType();
                        //获取序列化后的数据
                        ByteString storeValue = entry.getStoreValue();
                        //确保类型为ROWDATA
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                            //对数据进行反序列化
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                            for (CanalEntry.RowData rowData : rowDataList) {
                                JSONObject beforeData = new JSONObject();
                                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeData.put(column.getName(), column.getValue());
                                }
                                JSONObject afterData = new JSONObject();
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterData.put(column.getName(), column.getValue());
                                }
                                System.out.println("表名:"+tableName+
                                        ",entryType:"+entryType+
                                        ",before"+beforeData+
                                        ",after"+afterData);
                            }
                        }
                    }
                }
            }
        }
    
    

    2、kafka 模式测试

    1.修改 canal.properties 中 canal 的输出 model,默认 tcp,改为输出到 kafka
    在这里插入图片描述

    2.修改 Kafka 集群的地址
    在这里插入图片描述

    3.修改 instance.properties 输出到 Kafka 的主题以及分区数
    在这里插入图片描述

    注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱 binlog 的顺序 , 如 果 要 提 高 并 行 度 , 首 先 设 置 kafka 的 分 区 数 >1, 然 后 设 置canal.mq.partitionHash 属性

    4.启动 Canal

    ./start.sh
    

    5.看到 CanalLauncher 你表示启动成功,同时会创建 canal_test 主题
    在这里插入图片描述

    如果没有正常启动,查看日志报一下错误
    在这里插入图片描述
    打开 start.sh 文件修改参数,重启

    str=`file -L $JAVA | grep 64-bit`
    if [ -n "$str" ]; then
        JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -Xss256k"
    else
        JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m "
    fi
    

    6.启动 Kafka 消费客户端测试,查看消费情况
    7.向 MySQL 中插入数据后查看消费者控制台
    在这里插入图片描述

    展开全文
  • 阿里canal1.1.4全.zip

    2020-07-03 15:37:17
    阿里巴巴开源Canal 1.1.4下载,包含adapter,admin,deployer,example,canal很全,还有源码
  • Canal是阿里巴巴旗下的一款开源项目,利用Java开发。主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费,目前主要支持MySQL。 GitHub地址:https://github.com/alibaba/canal 在介绍Canal内部原理...
  • canal.deployer-1.1.4.tar.gz

    2020-04-20 13:02:55
    先解压 canal.deployer-1.1.4.tar.7z, 得到canal.deployer-1.1.4.tar.gz 然后就可以按照你所学的安装教程,进行安装了。 ================================================== 早期,阿里巴巴 B2B 公司因为存在杭州...
  • 本项目基于canal 1.0.24最新版基础上修改,扩展了服务端和客户端的一些功能,主要包含: 客户端:数据库镜像备份(解决扩机房的数据同步问题); 客户端:数据库表的映射同步(如自动生成拼音码,简拼,字段计算等)...
  • 网上下载了几个小时都下不下来,终于都下载完了。canal.adapter-1.1.4.tar.gz; canal.admin-1.1.4.tar.gz; canal.deployer-1.1.4.tar.gz; canal.example-1.1.4.tar.gz; canal-canal-1.1.4.zip

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 27,306
精华内容 10,922
关键字:

canal