精华内容
下载资源
问答
  • Mysql binlog日志解析

    2019-07-12 23:22:20
    Mysql日志抽取与解析正如名字所将的那样,分抽取和解析两个部分。这里Mysql日志主要是指binlog日志。二进制日志由配置文件的log-bin选项负责启用,Mysql服务器将在数据根目录创建两个新文件XXX-bin.001和XXX-bin....

    1. 摘要:

    Mysql日志抽取与解析正如名字所将的那样,分抽取和解析两个部分。这里Mysql日志主要是指binlog日志。二进制日志由配置文件的log-bin选项负责启用,Mysql服务器将在数据根目录创建两个新文件XXX-bin.001和XXX-bin.index,若配置选项没有给出文件名,Mysql将使用主机名称命名这两个文件,其中.index文件包含一份全体日志文件的清单。Mysql会把用户对所有数据库的内容和结构的修改情况记入XXX-bin.n文件,而不会记录SELECT和没有实际操作意义的语句。

    2. 设计概要:

    本项目主要包括两个独立的模块:1、日志抽取(mysql-tracker);2、日志解析(mysql-parser)。日志抽取主要负责与mysql进行交互,通过socket连接以及基于mysql的开源协议数据报文,来进行从mysql主库上dump相应的日志数据下来。而日志解析主要负责与日志抽取模块交互,通过将dump下来的bytes类型的数据,根据mysql协议,对bytes数据进行解析,并封装成易读的event对象。

    2.1  流程概要:

    mysql-tracker 总体流程设计:
    tracker与mysql交互:
    1. 建立socket连接
    2. 加载上次退出时的位点信息(从checkpoint表中加载)
    3. 利用socket连接发送基于mysql协议的数据包+checkpoint表中的位点信息,创建mysql主库的binlog dump线程
    4. 利用socket接受(监听)mysql主库传过来的数据包
    5. 解析数据包(有多种形式,OK包,EOF包,ERROR包,EVENT包等等)
    6. 如果有EVENT包,将基于byte的数据包解析成event对象
    7. 将对象存入List或Queue里面
    tracker与hbase交互:
    1. 从queue中接收固定量数据(上限:防止内存溢出,下限:防止频繁I/O),或固定时间数据(防止内存溢出)。
    2. 这里的数据就是event对象
    3. 将event对象序列化,存入hbase(protobuf 和 entry)
    4. 存入过程中,保证位点确认机制,如果有关于mysql binlog 的标志性位点,则将该event存入hbase后(注意这里有对特殊xid位点的确认机制,而parser是没有的,直接确认即可),然后再将该event的位点信息存入checkpoint表(维护各种位点信息:包括mysql binlog位点,event表(存如序列化后的event)位点,entry表(存入反序列化后的event)位点)
    5. 也就是说只要是存入hbase实体数据,都要伴随位点确认机制。这里tracker确认两个方面的位点:mysql binlog 位点(xid:binlog file name + next position) + event表位点(tracker写位点:row key)

    tracker 每分钟记录位点:

    1. 每分中固定时间记录确认的checkpoint位点(可能有重复,长时间没有数据fetch重复最多)

    mysql-parser 总体流程设计:(设计思路非常类似,只不过是mysql binlog变成了event表,parser fetch数据从这里fetch)
    parser与event表交互:
    1. 建立hbase连接
    2. 加载上次退出的位点信息(从checkpoint表中加载)
    3. 通过hbase连接+checkpoint表中的位点信息,不断监听event表一旦event表有更新,就从event表中把序列化的event fetch下来
    4. 得到的序列化event(bytes) 存入List或Queue里面。
    parser与hbase交互:
    1. 从queue中接收固定量数据(上限:防止内存溢出,下限:防止频繁I/O),或固定时间数据(防止内存溢出)。
    2. 将序列化的event(bytes)反序列化成entry(其实就是event对象)
    3. 将entry存入hbase。
    4. 存入过程中,伴随位点确认机制(直接确认位点,不需要特殊位点确认机制)(存入位点信息到checkpoint表中去:parser 读 event表的位点(row key) + parser 写entry表的位点(row key))
    parser每分钟确认位点:
    1. 每分钟固定时间记录确认的checkpoint位点(可能有重复,长时间没有数据fetch重复最多)

    2.2 架构

    tracker 与 parser都有同样的3个线程结构 取数据线程、消费数据线程、每分钟记录线程。并且每个消费数据线程必定伴随着位点确认的机制,就如前面2.1流程概要所说:
    1、tracker的位点确认机制需要有xid的特殊event作为mysql主库的位点确认点。
    2、而hbase里面的位点确认除了tracker写位点也需要是xid的event作为位点确认点,其他确认点没有特殊位点的要求。

    3. mysql 相关

    主要涉及到mysql的通讯协议和mysql 日志协议。mysql通讯协议,这里主要是利用到与mysql交互中的收发数据包的解析;mysql日志协议,这里主要是利用受到数据包后得到event事件的数据包,然后解析event数据包会用到相关的日志协议。即前者主要用在mysql交互、数据收发上面;后者主要用于日志解析、数据封装上面。

    3.1 mysql 通讯协议

    mysql通讯协议主要用于mysql客户端与mysql服务端的交互,通讯协议通过SSL加密通讯、数据包压缩通讯、连接阶段的强交互性。

    3.1.1 mysql数据包

    如果客户端要和服务端交互,他们会把数据打包成数据包的形式然后通过发送数据包的形式,实现信息的传递。数据包的具体格式如下:

     

    例如一个COM_BINLOG_DUMP类型的数据包的payload(数据包体)是这样的:

     


    3.2 mysql 日志协议

    binlog日志是一个对于mysql记录各种变化的日志集合,开启日志功能可以通--log-bin 选项来开启。MySQL的二进制日志可以说或是MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是失误安全型的.
    MySQL的二进制日志的作用是显而易见的,可以方便的备份这些日志以便做数据恢复,也可以作为主从复制的同步文件。

    3.2.1 event事件

    mysql通过C++的类来描述事件的基本类型 log event,在这里我们可以通过mysql源码的log_event.cc来详细了解 各种各样的event事件类型。log event是一个描述事件的基本类型,更加细致的log event 组成了基本的log event,即log event是可派生的,并派生处了一些描述事件信息更详细的子事件类型。比如row event就是一个母事件类型。在mysql源码中是通过一系列枚举整数值来描述各个事件的,如下所示:
    enum Log_event_type { 
      UNKNOWN_EVENT= 0, 
      START_EVENT_V3= 1, 
      QUERY_EVENT= 2, 
      STOP_EVENT= 3, 
      ROTATE_EVENT= 4, 
      INTVAR_EVENT= 5, 
      LOAD_EVENT= 6, 
      SLAVE_EVENT= 7, 
      CREATE_FILE_EVENT= 8, 
      APPEND_BLOCK_EVENT= 9, 
      EXEC_LOAD_EVENT= 10, 
      DELETE_FILE_EVENT= 11, 
      NEW_LOAD_EVENT= 12, 
      RAND_EVENT= 13, 
      USER_VAR_EVENT= 14, 
      FORMAT_DESCRIPTION_EVENT= 15, 
      XID_EVENT= 16, 
      BEGIN_LOAD_QUERY_EVENT= 17, 
      EXECUTE_LOAD_QUERY_EVENT= 18, 
      TABLE_MAP_EVENT = 19, 
      PRE_GA_WRITE_ROWS_EVENT = 20, 
      PRE_GA_UPDATE_ROWS_EVENT = 21, 
      PRE_GA_DELETE_ROWS_EVENT = 22, 
      WRITE_ROWS_EVENT = 23, 
      UPDATE_ROWS_EVENT = 24, 
      DELETE_ROWS_EVENT = 25, 
      INCIDENT_EVENT= 26, 
      HEARTBEAT_LOG_EVENT= 27, 
      ENUM_END_EVENT 
      /* end marker */ 
    };
    具体各种事件含义的详细说明可以参照mysql官方说明文档: http://dev.mysql.com/doc/internals/en/event-meanings.html

    3.2.2 event 事件结构

    接下来我们来看看一个通用事件的具体结构(参照mysql packet 数据包)
    所有的event 都含有如下通用的事件结构:
    +===================+
    | event header      |
    +===================+
    | event data        |
    +===================+
    分别由时间头和时间体组成。
    而事件的内部结构随mysql的版本不同而变化着,这里取出3个代表性的版本结构:
    v1 :用于mysql 3.23
    v3 :用于mysql 4.01
    v4 :用于mysql 5.0 及 以上
    v1 的event 结构:
    +=====================================+
    | event     | timestamp         0 : 4    |
    | header  +----------------------------+
    |               | type_code         4 : 1    |
    |              +----------------------------+
    |               | server_id         5 : 4    |
    |              +----------------------------+
    |               | event_length      9 : 4    |
    +=====================================+
    | event      | fixed part       13 : y    |
    | data       +----------------------------+
    |                | variable part              |
    +=====================================+
    v3 的 event 结构 :
    +=====================================+
    | event     | timestamp         0 : 4    |
    | header  +----------------------------+
    |              | type_code         4 : 1    |
    |             +----------------------------+
    |              | server_id         5 : 4    |
    |             +----------------------------+
    |              | event_length      9 : 4    |
    |             +----------------------------+
    |              | next_position    13 : 4    |
    |             +----------------------------+
    |              | flags            17 : 2    |
    +=====================================+
    | event    | fixed part       19 : y    |
    | data     +----------------------------+
    |             | variable part              |
    +=====================================+
    v4 的event结构:
    +=====================================+
    | event     | timestamp         0 : 4    |
    | header  +----------------------------+
    |              | type_code         4 : 1    |
    |             +----------------------------+
    |              | server_id         5 : 4    |
    |             +----------------------------+
    |              | event_length      9 : 4    |
    |             +----------------------------+
    |              | next_position    13 : 4    |
    |             +----------------------------+
    |              | flags            17 : 2    |
    |             +----------------------------+
    |              | extra_headers    19 : x-19 |
    +=====================================+
    | event     | fixed part        x : y    |
    | data      +----------------------------+
    |              | variable part              |
    +=====================================+
    更详细的事件包数据可见: http://dev.mysql.com/doc/internals/en/event-header-fields.html相关页面



    4. 位点确认机制

    在于mysql的交互过程中发现,xid event通常是作为一个事务的结尾(DML,DDL的话是Query作为结尾),现将DML和DDL的事件组成展示出来(过滤掉一些对解析日志无意义的事件):
    DML:
    1. QUERY EVENT
    2. TABLE MAP EVENT
    3. ROWS EVENT
    4. XID EVENT
    DDL:
    1. QUERY EVENT
    这里我们可以通过一定的辨识机制将DDL的QUERY EVENT 和 DML的XID EVENT归为一类,所以我们把这种结束事务的时间统称为特殊xid 事件。从调试中可以得到这样一个推论:
    在与mysql交互中,binlog dump线程的起始位点一定要是特殊xid事件的next position的。即特殊xid一定要作为mysql的结束标识,读时候一定要确认这里的位点机制。
    所以在tracker重启,重新抓取数据时一定要从xid开始fetch数据,这样就是位点确认机制的由来。
    目前的位点确认机制有:
    1. mysql的位点确认,必须是以xid位点来确认的,所以checpoint表存储mysql位点信息的数据必须要是特殊xid事件
    2. 写event表的checkpoint位点确认,受mysql位点特殊xid的影响,这里checkpoint表中tracker写event表的位点信息也必须是特殊xid的位点信息。(考虑这样一种场景,大事务里面有很多个event,如果tracker在写event表是crash掉了,这样我们可以把大事务的第一个时间a[0] 到发生crash的时间a[i]成为脏数据,为什么呢??,因为如果重启tracker,他与mysql的交互特性是必须要以xid作为起始位点才开始fetch event数据,所以我们tracker会又从这个大事务的a[0]开始fetch,如果hbase event不以xid作为位点确认,那么这次event表就变成a[0]......a[i] a[0] …..a [j] ,这样a[0]......a[i]成了明确的脏数据,如果是以xid作为tracker写event的位点确认,实际上就是重写了一段a[0]......a[i]的数据,当然你可已在tracker fetch a[0]到a[i]这一段,先不写hbase,到crash的位点再开始写hbase也是可以的。注意这里有无限循环的bug漏洞)
    3. 除以上两个的位点,其他位点的确认均采取直接确认,不需要考虑特殊xid事件。

    4.1 确认位点分类:

    大致有以下几类位点需要确认:




    5. tracker设计

    依照2.1的流程概要设计,其流程图如下所示:



     

     这部分可以结合源代码理解(Handler1.java)
    1. prepare方法:
    1. 建立mysql的两个连接,其中一个连接用于fetch event数据,另外一个连接用于fetch表结构元数据。这里如果创建连接不成功,将一直处于创建连接流程。保证程序的存活不依赖与mysql的存活
    2. 建立hbase连接,这里如果hbase没有启动会处于阻塞和重连的状态。
    3. 加载起始位点,既有mysql的起始位点也有event表的其实位点,注意这两个位点都是受xid影响的位点,如果hbase没有相关信息,这里我们用show master status的mysql命令,让mysql位点处于本库的最末端,而让event表位点置0,即相当于清除所有数据,从0开始。
    4. 启动fetch 线程,开始从mysql主库上fetch event数据。
    5. 启动per minute 线程,开始每分中记录相应位点
    6. 启动persistence 线程,开始接受fetch到的event数据,并且序列化,然后存入hbase event表中,并且伴随位点确认机制。(注意:这个线程其实就是Handler.run()方法,实际上run()方法也是一个线程的机制,只不过对Handler是不可见的而已)
    2. fetch 线程:
    1. preRun方法做一些初始化工作,包括设置binlog dump线程参数、send binlog dump让fetch指针置为到起始位点(start position)、初始化数据抓取器fetcher
    2. fetch方法,抓取一条event数据。
    3. 加入queue多线程队列中
    4. kafka监控相关
    5. 这里fetch是一个循环重连的机制,入股fetch方法失败跳出第一层循环,通过外层循环和checkMysqlConn()方法时间fetch线程重连mysql。即如果fetch中途mysql crash掉,fetch线程会等待mysql有效后重连mysql。
    3. Per minute 线程:
    1. 每分钟执行一次run方法
    2. 具体是将得到的存储位点的全局变量存入checkpoint中去。注意row key的设计
    4. Persistence 线程 Handler1.run()方法:
    1. 接受多线成queue的数据到list中,以此位一批数据,
    2. 以数量的上限,下限和时间的阀值来判断时候执行一批数据的持久化
    3. 进行持久化。
    4. 将一批event数据 序列化 然后 tracker写入到event表中去。
    5. 伴随位点确认机制:当真正写入event表数据成功后,看这一批数据是否有特殊xid事件,如果有则作为位点确认,写入checkpoint表中去(tracker整体重启,启动的时候加载这个位点信息)







    6. parser 设计

    与tracker设计思路基本相同,不过是fetch的目的mysql换成hbase event表,以及位点信息的直接确认,不需要考虑特殊xid。这里不再详述。
    注意:所有的位点确认一定要是在持久化成功之后才开始位点确认。



    7. 重连机制

    tracker中的mysql connector建立过程加入重连与等待机制。
    fetch线程中加入了正在fetch数据,mysql突然断掉的重连机制。
    hbase的断掉的自身重连机制

    8. 性能评测

    目前尚未进行系统性的,正式的性能测试。
    仅以单机作测试有 每1-2秒 tracker能fetch 1万条数据,parser 每1万条数据 需要耗时4~5秒左右。
    本单机测试尚不能作为评测标准,其性能以机器的硬件性能的不同而不同,不能以此作为性能标准。

    9. bug与优化

    1. 对于巨大事务的海量事件的场景,可能存在潜在的无限循环bug,即到事件a[i] crash掉,然后重启,重新fetch时 到 时间 a[i]再一次crash,然后再重启,这样一直不停地循环,永远扫描不完着一个巨量的大事务。
    2. tracker与parser的数据交接目前仍是单线程的模式,可以考虑大规模分布式并行的模式,使tracker与parser在数据交接上能够提升效率(与mysql的交接,与hbase的交接)。

    10. 结论

    基于单机的,传统的,MySQL解析就到这里,主要是利用了mysql的协议进行数据传递与解析,后面组件考虑基于分布式的,基于大规模并行化的,基于高HA的模式。

    展开全文
  • EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog...

    来自:https://blog.csdn.net/u012985132/article/details/74964366/

    关系型数据库和Hadoop生态的沟通越来越密集,时效要求也越来越高。本篇就来调研下实时抓取MySQL更新数据到HDFS。

    本篇仅作为调研报告。

    初步调研了canal(Ali)+kafka connect+kafka、maxwell(Zendesk)+kafka和mysql_streamer(Yelp)+kafka。这几个工具抓取MySQL的方式都是通过扫描binlog,模拟MySQL master和slave(Mysql Replication架构–解决了:数据多点备份,提高数据可用性;读写分流,提高集群的并发能力。(并非是负载均衡);让一些非实时的数据操作,转移到slaves上进行。)之间的协议来实现实时更新的


    先科普下Canal

    Canal简介

    原理

    Canal原理图

    Canal原理图


    原理相对比较简单:

     

    1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
    2. mysql master收到dump请求,开始推送(slave拉取,不是master主动push给slaves)binary log给slave(也就是canal)
    3. canal解析binary log对象(原始为byte流)

    架构

    Canal架构图

    Canal架构图

     

    组件说明:

    1. server代表一个canal运行实例,对应于一个jvm
    2. instance对应于一个数据队列(1个server对应1..n个instance)

    而instance模块又由eventParser(数据源接入,模拟slave协议和master进行交互,协议解析)、eventSink(Parser和Store连接器,进行数据过滤,加工,分发的工作)、eventStore(数据存储)和metaManager(增量订阅&消费信息管理器)组成。

    • EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功),传送成功之后更新Log Position。流程图如下:
      EventParser流程图

      EventParser流程图

       

    • EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。

    • EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。

    • MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:

    Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]
    void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
    void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

    增量订阅和消费之间的协议交互如下:
    增量订阅和消费协议

    增量订阅和消费协议

     

    canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.

    流式api设计的好处:

    • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
    • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)

    流式api设计示意图如下:
    流式api

    流式api

     

    • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
    • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
    • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
    • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

    这个流式api是不是类似hdfs write在pipeline中传输packet的形式,先将packet放入dataQueue,然后向下游传输,此时将packet放入ackQueue等到下游返回的ack,这也是异步的。

    HA机制

    canal是支持HA的,其实现机制也是依赖zookeeper来实现的,用到的特性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA类似。

    canal的ha分为两部分,canal server和canal client分别有对应的ha实现

    • canal server: 为了减少对mysql dump的请求,不同server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态(standby是instance的状态)。
    • canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

    server ha的架构图如下:
    ha

    ha


    大致步骤:

     

    1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
    2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
    3. 一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance。
    4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。

    Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.

    Canal部署及使用

    MySQL配置

    canal同步数据需要扫描MySQL的binlog日志,而binlog默认是关闭的,需要开启,并且为了保证同步数据的一致性,使用的日志格式为row-based replication(RBR),在my.conf中开启binlog,

     
         
    1
    2
    3
    4
     
         
    [mysqld]
    log-bin=mysql-bin #添加这一行就ok
    binlog-format=ROW  #选择row模式
    server_id=1  #配置mysql replaction需要定义,不能和canal的slaveId重复

    更改my.conf之后,需要重启MySQL,重启的方式有很多找到合适自己的就行。

    Canal配置

    由上面的介绍得知Canal由ServerInstance组成,而Server中又可以包含很多个Instance,一个Instance对应一个数据库实例,则Canal将配置分为两类,一类是server的配置,名字为canal.properties,另一类是instance的配置,名字为instance.properties,一般会在conf目录下新建一个instance同名的目录,将其放入此目录中。

    先介绍canal.properties中的几个关键属性

    参数名字参数说明默认值
    canal.destinations当前server上部署的instance列表
    canal.conf.dirconf/目录所在的路径../conf
    canal.instance.global.spring.xml全局的spring配置方式的组件文件classpath:spring/file-instance.xml 
    (spring目录相对于canal.conf.dir)
    canal.zkServerscanal server链接zookeeper集群的链接信息
    canal.zookeeper.flush.periodcanal持久化数据到zookeeper上的更新频率,单位毫秒1000
    canal.file.data.dircanal持久化数据到file上的目录../conf (默认和instance.properties为同一目录,方便运维和备份)
    canal.file.flush.periodcanal持久化数据到file上的更新频率,单位毫秒1000
    canal.instance.memory.batch.modecanal内存store中数据缓存模式 
    1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量 
    2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小
    MEMSIZE
    canal.instance.memory.buffer.sizecanal内存store中可缓存buffer记录数,需要为2的指数16384
    canal.instance.memory.buffer.memunit内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小1024

    下面看下instance.properties,这里的属性较少:

    参数名字参数说明默认值
    canal.instance.mysql.slaveIdmysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一1234
    canal.instance.master.addressmysql主库链接地址127.0.0.1:3306
    canal.instance.master.journal.namemysql主库链接时起始的binlog文件
    canal.instance.master.positionmysql主库链接时起始的binlog偏移量
    canal.instance.master.timestampmysql主库链接时起始的binlog的时间戳
    canal.instance.dbUsernamemysql数据库帐号canal
    canal.instance.dbPasswordmysql数据库密码canal
    canal.instance.defaultDatabaseNamemysql链接时默认schema
    canal.instance.connectionCharsetmysql 数据解析编码UTF-8
    canal.instance.filter.regexmysql 数据解析关注的表,Perl正则表达式. 
    多个正则之间以逗号(,)分隔,转义符需要双斜杠
    .*\\..*

    除了上面两个配置文件,conf目录下还有一个目录需要强调下,那就是spring目录,里面存放的是instance.xml配置文件,目前默认支持的instance.xml有memory-instance.xml、file-instance.xml、default-instance.xml和group-instance.xml。这里主要维护的增量订阅和消费的关系信息(解析位点和消费位点)。

    对应的两个位点组件,目前都有几种实现:

    • memory (memory-instance.xml中使用)
    • zookeeper
    • mixed
    • file (file-instance.xml中使用,集合了file+memory模式,先写内存,定时刷新数据到本地file上)
    • period (default-instance.xml中使用,集合了zookeeper+memory模式,先写内存,定时刷新数据到zookeeper上)

    分别介绍下这几种配置的功能

    • memory-instance.xml:

    所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析

    特点:速度最快,依赖最少(不需要zookeeper)

    场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境

    • file-instance.xml:

    所有的组件(parser , sink , store)都选择了基于file持久化模式(组件内容持久化的file存在哪里???),注意,不支持HA机制.

    特点:支持单机持久化

    场景:生产环境,无HA需求,简单可用.

    • default-instance.xml:

    所有的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.(所有组件持久化的内容只有位置信息吧???)

    特点:支持HA

    场景:生产环境,集群化部署.

    • group-instance.xml:

    主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。

    场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.

    canal example 部署

    • 在需要同步的MySQL数据库中创建一个用户,用来replica数据,这里新建的用户名和密码都为canal,命令如下:
     
         
    1
    2
    3
    4
     
         
    CREATE USER canal IDENTIFIED BY  'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO  'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO  'canal'@'%' ;
    FLUSH PRIVILEGES;
    • Mysql创建canal用户并为其赋所需权限之后,需要对Canal的配置文件(canal.properties和instance.properties)进行设置。

    canal.properties和instance.properties里采用默认配置即可(这里只是运行个样例,生产中可以参考具体的参数属性进行设置),

    • Canal配置好之后,启动Canal client(client的作用是将Canal里的解析的binlog日志固化到存储介质中)。

    client组件Canal本身是不提供的,需要根据api进行开发,这里将官方提供的client代码打包成jar进行消费Canal信息。

    canal HA配置

    canal的HA机制是依赖zk来实现的,需要更改canal.properties文件,修改内容如下:

     
         
    1
    2
    3
    4
     
         
    # zk集群地址
    canal.zkServers=10.20.144.51:2181
    # 选择记录方式
    canal.instance.global.spring.xml = classpath:spring/default-instance.xml

    更改两台canal机器上instance实例的配置instance.properties,修改内容如下:

     
         
    1
    2
     
         
    canal.instance.mysql.slaveId = 1234  ##另外一台机器改成1235,保证slaveId不重复即可
    canal.instance.master.address = 10.20.144.15:3306

    配置好之后启动canal进程,在两台服务器上执行sh bin/startup.sh

    client进行消费时,可以直接指定zookeeper地址和instance name,也可以让canal client会自动从zookeeper中的running节点,获取当前服务的工作节点,然后与其建立链接。

    maxwell简介

    maxwell实时抓取mysql数据的原理也是基于binlog,和canal相比,maxwell更像是canal server + 实时client。(数据抽取 + 数据转换)

    maxwell集成了kafka producer,直接从binlog获取数据更新并写入kafka,而canal则需要自己开发实时client将canal读取的binlog内容写入kafka中。

    maxwell特色:

    • 支持bootstrap启动,同步历史数据
    • 集成kafka,直接将数据落地到kafka
    • 已将binlog中的DML和DDL进行了模式匹配,将其解码为有schema的json(有利于后期将其重组为nosql支持的语言)
      {“database”:”test”,”table”:”e”,”type”:”update”,”ts”:1488857869,”xid”:8924,”commit”:true,”data”:{“id”:1,”m”:5.556666,”torvalds”:null},”old”:{“m”:5.55}}

    缺点:

    • 一个MySQL实例需要对应一个maxwell进程
    • bootstrap的方案使用的是select *

    maxwell的配置文件只有一个config.properties,在home目录。其中除了需要配置mysql master的地址、kafka地址还需要配置一个用于存放maxwell相关信息的mysql地址,maxwell会把读取binlog关系的信息,如binlog name、position。

    工具对比

    以上是Canal的原理及部署,其余类似maxwell和mysql_streamer对mysql进行实时数据抓取的原理一样就不再进行一一介绍,这里只对他们进行下对比:

    特色CanalMaxwellmysql_streamer
    语言JavaJavaPython
    活跃度活跃活跃不活跃
    HA支持定制支持
    数据落地定制落地到kafka落地到kafka
    分区支持不支持不支持
    bootstrap不支持支持支持
    数据格式格式自由json(格式固定)json(格式固定)
    文档较详细较详细略粗
    随机读支持支持支持

    以上只是将mysql里的实时变化数据的binlog以同种形式同步到kafka,但要实时更新到hadoop还需要使用一个实时数据库来存储数据,并自定制开发将kafka中数据解析为nosql数据库可以识别的DML进行实时更新Nosql数据库,使其与MySQL里的数据实时同步。

    基础架构

    架构图如下:
    基础架构图

    基础架构图

     

    虚线框是可选的方案

    方案对比

    1. 方案1使用阿里开源的Canal进行Mysql binlog数据的抽取,另需开发一个数据转换工具将从binlog中解析出的数据转换成自带schema的json数据并写入kafka中。而方案2使用maxwell可直接完成对mysql binlog数据的抽取和转换成自带schema的json数据写入到kafka中。
    2. 方案1中不支持表中已存在的历史数据进行同步,此功能需要开发(如果使用sqoop进行历史数据同步,不够灵活,会使结果表与原始表结构相同,有区别于数据交换平台所需的schema)。方案2提供同步历史数据的解决方案。
    3. 方案1支持HA部署,而方案2不支持HA

    方案1和方案2的区别只在于kafka之前,当数据缓存到kafka之后,需要一个定制的数据路由组件来将自带schema的数据解析到目标存储中。
    数据路由组件主要负责将kafka中的数据实时读出,写入到目标存储中。(如将所有日志数据保存到HDFS中,也可以将数据落地到所有支持jdbc的数据库,落地到HBase,Elasticsearch等。)

    综上,
    方案1需要开发的功能有:

    • bootstrap功能
    • 实时数据转换工具
    • 数据路由工具

    方案2需要开发的功能有:

    • 数据路由工具
    • HA模块(初期可暂不支持HA,所以开发紧急度不高)

    数据路由工具是两个方案都需要开发的,则我比较偏向于第二种方案,因为在初期试水阶段可以短期出成果,可以较快的验证想法,并在尝试中能够较快的发现问题,好及时的调整方案。即使方案2中maxwell最终不能满足需求,而使用canal的话,我们也可能将实时数据转换工具的数据输出模式与maxwell一致,这样初始投入人力开发的数据路由工具依然可以继续使用,而不需要重新开发。

    把增量的Log作为一切系统的基础。后续的数据使用方,通过订阅kafka来消费log。

    比如:
    大数据的使用方可以将数据保存到Hive表或者Parquet文件给Hive或Spark查询;
    提供搜索服务的使用方可以保存到Elasticsearch或HBase 中;
    提供缓存服务的使用方可以将日志缓存到Redis或alluxio中;
    数据同步的使用方可以将数据保存到自己的数据库中;
    由于kafka的日志是可以重复消费的,并且缓存一段时间,各个使用方可以通过消费kafka的日志来达到既能保持与数据库的一致性,也能保证实时性;

    {“database”:”test”,”table”:”e”,”type”:”update”,”ts”:1488857869,”xid”:8924,”commit”:true,”data”:{“id”:1,”m”:5.556666,”torvalds”:null},”old”:{“m”:5.55}}

    {“database”:”test”,”table”:”e”,”type”:”insert”,”ts”:1488857922,”xid”:8932,”commit”:true,”data”:{“id”:2,”m”:4.2,”torvalds”:null}}

    转载于:https://www.cnblogs.com/qcfeng/p/9270828.html

    展开全文
  • 背景 ...不过早期的数据库同步业务,主要是基于trigger的方式获取增 量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&am...

    from: http://www.cnblogs.com/duanxz/p/5062833.html

    背景

       早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增 量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此 开启了一段新纪元。ps. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析

     

    基于日志增量订阅&消费支持的业务:

    1. 数据库镜像
    2. 数据库实时备份
    3. 多级索引 (卖家和买家各自分库索引)
    4. search build
    5. 业务cache刷新
    6. 价格变化等重要业务消息

    项目介绍

       名称:canal [kə'næl]

       译意: 水道/管道/沟渠 

       语言: 纯java开发

       定位: 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql

     

    工作原理

    mysql主备复制实现


     从上层来看,复制分成三步:

    1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
    2. slave将master的binary log events拷贝到它的中继日志(relay log);
    3. slave重做中继日志中的事件,将改变反映它自己的数据。

    canal的工作原理:

    原理相对比较简单:

    1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
    2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
    3. canal解析binary log对象(原始为byte流)

    架构

    说明:

    • server代表一个canal运行实例,对应于一个jvm
    • instance对应于一个数据队列  (1个server对应1..n个instance)

    instance模块:

    • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
    • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
    • eventStore (数据存储)
    • metaManager (增量订阅&消费信息管理器)

    知识科普

    mysql的Binlay Log介绍

    简单点说:

    • mysql的binlog是多文件存储,定位一个LogEvent需要通过binlog filename +  binlog position,进行定位
    • mysql的binlog数据格式,按照生成的方式,主要分为:statement-based、row-based、mixed。
      Java代码  
      1. mysql> show variables like 'binlog_format';  
      2. +---------------+-------+  
      3. | Variable_name | Value |  
      4. +---------------+-------+  
      5. | binlog_format | ROW   |  
      6. +---------------+-------+  
      7. 1 row in set (0.00 sec)  

    目前canal只能支持row模式的增量订阅(statement只有sql,没有数据,所以无法获取原始的变更日志)

     

     

    EventParser设计

    大致过程:

    整个parser过程大致可分为几步:

    1. Connection获取上一次解析成功的位置  (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
    2. Connection建立链接,发送BINLOG_DUMP指令
       // 0. write command number
       // 1. write 4 bytes bin-log position to start at
       // 2. write 2 bytes bin-log flags
       // 3. write 4 bytes server id of the slave
       // 4. write bin-log file name
    3. Mysql开始推送Binaly Log
    4. 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
      // 补充字段名字,字段类型,主键信息,unsigned类型处理
    5. 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
    6. 存储成功后,定时记录Binaly Log位置

    mysql的Binlay Log网络协议:

     

    说明:

    EventSink设计

    说明:

    • 数据过滤:支持通配符的过滤模式,表名,字段内容等
    • 数据路由/分发:解决1:n (1个parser对应多个store的模式)
    • 数据归并:解决n:1 (多个parser对应1个store)
    • 数据加工:在进入store之前进行额外的处理,比如join

    数据1:n业务

      为了合理的利用数据库资源, 一般常见的业务都是按照schema进行隔离,然后在mysql上层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过cobar/tddl来解决数据源路由问题。

      所以,一般一个数据库实例上,会部署多个schema,每个schema会有由1个或者多个业务方关注

     

    数据n:1业务

      同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个store进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。

      所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局id进行排序归并.

     

    EventStore设计

    • 1.  目前仅实现了Memory内存模式,后续计划增加本地file存储,mixed混合模式
    • 2.  借鉴了Disruptor的RingBuffer的实现思路

    RingBuffer设计:

    定义了3个cursor

    • Put :  Sink模块进行数据存储的最后一次写入位置
    • Get :  数据订阅获取的最后一次提取位置
    • Ack :  数据消费成功的最后一次消费位置

    借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:

    实现说明:

    • Put/Get/Ack cursor用于递增,采用long型存储
    • buffer的get操作,通过取余或者与操作。(与操作: cusor & (size - 1) , size需要为2的指数,效率比较高)

    Instance设计


     

    instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。

    抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:

    • manager方式: 和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用)
    • spring方式:基于spring xml + properties进行定义,构建spring配置. 

    Server设计


    server代表了一个canal的运行实例,为了方便组件化使用,特意抽象了Embeded(嵌入式) / Netty(网络访问)的两种实现

    • Embeded :  对latency和可用性都有比较高的要求,自己又能hold住分布式的相关技术(比如failover)
    • Netty :  基于netty封装了一层网络协议,由canal server保证其可用性,采用的pull模型,当然latency会稍微打点折扣,不过这个也视情况而定。(阿里系的notify和metaq,典型的 push/pull模型,目前也逐步的在向pull模型靠拢,push在数据量大的时候会有一些问题) 

    增量订阅/消费设计

    具体的协议格式,可参见:CanalProtocol.proto

    get/ack/rollback协议介绍:

    • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
      a. batch id 唯一标识
      b. entries 具体的数据对象,对应的数据对象格式:EntryProtocol.proto
    • void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
    • void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

    canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api. 

    流式api设计的好处:

    • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
    • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化.  (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)

    流式api设计:

    • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
    • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
    • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
    • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

    数据对象格式:EntryProtocol.proto

    Java代码  
    1. Entry  
    2.     Header  
    3.         logfileName [binlog文件名]  
    4.         logfileOffset [binlog position]  
    5.         executeTime [发生的变更]  
    6.         schemaName   
    7.         tableName  
    8.         eventType [insert/update/delete类型]  
    9.     entryType   [事务头BEGIN/事务尾END/数据ROWDATA]  
    10.     storeValue  [byte数据,可展开,对应的类型为RowChange]  
    11.       
    12. RowChange  
    13.     isDdl       [是否是ddl变更操作,比如create table/drop table]  
    14.     sql     [具体的ddl sql]  
    15.     rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]  
    16.         beforeColumns [Column类型的数组]  
    17.         afterColumns [Column类型的数组]  
    18.           
    19. Column   
    20.     index         
    21.     sqlType     [jdbc type]  
    22.     name        [column name]  
    23.     isKey       [是否为主键]  
    24.     updated     [是否发生过变更]  
    25.     isNull      [值是否为null]  
    26.     value       [具体的内容,注意为文本]  

    说明:

    • 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全
    • 可以提供ddl的变更语句

     

    HA机制设计

    canal的ha分为两部分,canal server和canal client分别有对应的ha实现

    • canal server:  为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态. 
    • canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

    整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定),可以看下我之前zookeeper的相关文章。

     

    Canal Server: 


    大致步骤:

    1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断  (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
    2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
    3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
    4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

    Canal Client的方式和canal server方式类似,也是利用zokeeper的抢占EPHEMERAL节点的方式进行控制. 

     

    最后

    项目的代码: https://github.com/alibabatech/canal

    这里给出了如何快速启动Canal Server和Canal Client的例子,如有问题可随时联系

    Quick Start

    Client Example

    转载于:https://www.cnblogs.com/buxizhizhoum/p/7595031.html

    展开全文
  • MySQL binlog 增量数据解析服务

    千次阅读 2018-10-22 15:49:11
    除了基础的进程监控,数据同步服务的关键是 binlog 解析服务与 MySQL master 之间的延迟监控,避免在 MySQL 写入高峰期导致数据延迟,影响后面的数据消费服务。 获取延迟的方法也很简单: 在 MySQL master 上...

    1. 起因

    做过后端开发的同学都知道, 经常会遇到如下场景:

    1. 后端程序根据业务逻辑, 更新数据库记录
    2. 过了几天, 业务需求需要更新搜索索引
    3. 又过了几天, 随着数据需求方的增多, 结构改成发送数据到消息中间件(例如 Kafka), 其他系统自行从消息中间件订阅数据

    传统程序结构

    所有涉及到类似需求的代码中都写了各种发送消息中间件的代码, 冗余, 易错, 而且难以保证一致性. 那么问题来了:

    数据都在 MySQL 中, 是否可以实现仅仅更新 MySQL 就实现数据更新和发布逻辑?

    2. Linkedin Databus

    最早我听说的解决方案是 Linkedin 实现的, 参见

    核心思路就是通过数据库的 binary log(简称: binlog) 来实现数据库更新的自动获取. Linkedin 自己实现了 MySQL 版本和 Oracle 版本。

    3. 原理

    以 MySQL 为例, 数据库为了主从复制结构和容灾,都会有一份提交日志 (commit log),通过解析这份日志,理论上说可以获取到每次数据库的数据更新操作。获取到这份日志有两种方式:

    1. 在 MySQL server 上通过外部程序监听磁盘上的 binlog 日志文件
    2. 借助于 MySQL 的 Master-Slave 结构,使用程序伪装成一个单独的 Slave,通过网络获取到 MySQL 的binlog 日志流

    这里有一个注意的点: MySQL 的 binlog 支持三种格式:StatementRowMixed 格式:

    • Statement 格式就是说日志中记录 Master 执行的 SQL
    • Row 格式就是说每次讲更改的数据记录到日志中
    • Mixed 格式就是让 Master 自主决定是使用 Row 还是 Statement 格式

    由于伪装成 Slave 的解析程序很难像 MySQL slave 一样通过 Master 执行的 SQL 来获取数据更新,因此要将 MySQL Master 的 binlog 格式调整成 Row 格式才方便实现数据更新获取服务

    至于 Oracle 的实现,我厂没用 Oracle。。。。

    4. 数据增量同步服务拆解

    好了, 如果想自己写一个 Databus 服务, 就需要如下几个核心模块:

    • 4.1、MySQL binlog 解析类库
    • 4.2、部署方式
    • 4.3、binlog 状态维护模块
    • 4.4、消息中间件(大多数人会选择 Kafka 吧)
    • 4.5、数据发布策略
    • 4.6、数据序列化方式
      • 将获取到的 binlog 序列化成其他可识别格式
      • AVRO、protocol buffer、JSON,哪个喜欢选哪个,但注意跨平台,别用 Java 原生的序列化 =.=|||
    • 4.7、集群管理服务
    • 4.8、服务监控

    4.1、协议解析可选方案

    时至今日, 已经有很多大厂开源了自己的 MySQL binlog 解析方案,Java 语言可选的有:

    想自己造轮子实现协议的,也可以参考 MySQL 官方文档

    4.2、部署方式

    由于 binlog 可以通过网络协议获取,也可以直接通过读取磁盘上的 binlog 文件获取, 因此同步服务就有两种部署方式:

    • 通过读取 binlog 文件的话, 就要跟 MySQL Master 部署到同一台服务器
      • 系统隔离性不好,高峰期会不会跟 MySQL master 争抢系统资源
      • 类似 AWS RDS 这种云数据库服务,不允许部署程序到 RDS 节点
    • 通过 relay-log 协议通过网络读取,同步服务就方便部署到任意地方

    部署方式

    4.3、binlog 状态维护模块

    在 MySQL 中, Master-slave 之间只用标识:

    1. serverId:master一般设置为1, 各个 server 之间必须不同
    2. binlog 文件名称:当前读取到了哪一个 binlog 文件
    3. binlog position:当前读取的 binlog 文件的位置

    由于同步服务会重启,因此必须自行维护 binlog 的状态。一般存储到 MySQL 或者 Zookeeper 中。当服务重启后,自动根据存储的 binlog 位置,继续同步数据。

    4.4、消息中间件可选方案

    虽然现在 Kafka 如日中天,大多数情况下大家都会选择 Kafka 作为消息中间件缓冲数据。选择其他的消息中间件也未尝不可。 但有一点注意:

    • MySQL 中的数据更新是有顺序的
    • 数据更新发布到消息中间件中,也建议能够保序,例如事务中经典的转账的例子,试想一下如果消息队列不保序, 其他数据服务消费到不保序的数据是否还能满足业务需求

    由于上诉原因,类似 AWS SQS 这样的消息队列就不满足此处对消息队列的需求(参见:AWS SQS 官方文档关于保序方面的解释

    4.5、数据发布策略

    解析到了数据,现在要做的就是将数据发布到消息中间件中。有一下几个方面需要注意:

    4.5.1、topic 策略

    一个 MySQL 节点中可以有多个数据库, 每个数据库有多张表,是采用一个节点一个 Kafka Topic,还是一个数据库一个 Topic, 还是一张表一个 Topic?

    4.5.2、数据分区策略

    Kafka 中数据是根据 key 进行分区, 同一个分区下保证消息的顺序。

    如何选择数据的key的限制因素就是看数据消费端是否希望同一个表的同一条数据的更新记录都落到同一个 Kafka 分区上,进而不需要消费端做多进程间的状态维护, 简化消费端逻辑。例如: 一个Kafka Topic 有20个分区,同一个表 table_1 中 ID 为1的数据前后两次更新被发送到了不同的 partition,这就要求消费端必须每个 partition 保持lag一致, 并且及时同步数据状态到其他消费进程可见才可以保证保序; 但如果同一个表 table_1 中 ID 为1的数据前后两次更新被发送到了同一个 partition, 由于 Kafka 保证同一个 partition 保序,消费端就简化了很多。

    如下图展示数据乱序问题:

    • 假设 kafka 中 A2 为新的数据, A1 为同一个 ID 的老数据
    • 由于 慢消费进程数据堆积,导致 A2 这个新数据先被消费, 当老数据A1被消费时有可能覆盖之前的结果

      数据乱序问题

    要实现上述的逻辑, 就要求在 Kafka 数据的 Key 的选择上做文章:

    1. 一种方式是使用 table 的名称作为 Kafka 的 key,这样同一张表的数据一定在一个 partition 上保序。 但这样的坏处是,如果数据集中在某一张表频繁更新,会造成某一个 partition 上数据量远大于其他 partition,消费端无法通过并行方式提高扩展性。
    2. 另一种方式就是,在 db 层面保证每张表的第一个 column 是主键,这样采用 binlog 中第一个 column 的数据作为 Kafka 的 key, 数据的平衡性会好很多,易于消费端扩容。

    如下图,消息无乱序情况:

    • 数据 AC 的每个版本由于 Hash 值 % 分区数量相同,同属于同一个分区, 并且按数据版本保序
    • 数据 BDAC, 数据按修改时间顺序保序但属于不同分区

    4.6、数据序列化方式选择

    读取到 binlog 数据后, 需要将数据序列化成更简单易用的格式,发送到 Kafka。如果选择 Avro 作为序列化方式的话,可以考虑集成 Kafka 背后的公司 Confluent 提出的一个新的方法:Schema Registry,具体信息参见 Confluent 公司官网。

    4.7、集群管理服务

    随着业务的扩展,越来越多的 MySQL 接入了数据同步服务。运维管理的压力也就随之而来。因此可能最后系统演变成如下结构:

    • 独立一个集群管理程序,负责管理解析程序节点,分配任务
    • 各个解析程序启动后,首先在 Zookeeper 注册,然后领取同步节点任务,启动解析过程
    • 类似的任务管理结构很常见,比如 Storm 中 Nimbus 节点管理 worker 节点等。

    集群管理

    4.8 服务监控

    服务的监控必不可少。除了基础的进程监控,数据同步服务的关键是 binlog 解析服务与 MySQL master 之间的延迟监控,避免在 MySQL 写入高峰期导致数据延迟,影响后面的数据消费服务。

    获取延迟的方法也很简单:

    1. 在 MySQL master 上实行 SHOW MASTER STATUS 获取到 Master 节点当前的文件 ID 和 binlog 位置
    2. 获取同步服务当前处理的 binlog 文件 ID 和位置:
    3. 将相减的结果发送到监控服务(例如 open-falcon),后续根据需求报警
      • 一般文件 ID 相减结果 N 大于1, 表示同步服务已经落后 MySQL Master N 个文件,情况比较严重(除非是 MySQL Master 刚刚 rotate 新文件)
      • 文件 ID 相同,binlog 位置相减结果 M 就是相差的 binlog 文件大小, 单位: bytes
      • 此计算公式仅仅为近似估算,建议在差距持续一段时间(比如持续2分钟)的情况下再报警。

    5、踩过的坑

    Canal Blob 类型字段编码

    由于 Canal 将 binlog 中的值序列化成了 String 格式给下游程序,因此在 Blob 格式的数据序列化成 String 时为了节省空间,强制使用了 IOS_8859_0 作为编码。因此,在如下情况下会造成中文乱码:

    1. 同步服务 JVM 使用了 UTF-8 编码
    2. BLOB 字段中存储有中文字符

    参见:

    // com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert 第541行起:
    case Types.BINARY:
    case Types.VARBINARY:
    case Types.LONGVARBINARY:
        // fixed text encoding
        // https://github.com/AlibabaTech/canal/issues/18
        // mysql binlog中blob/text都处理为blob类型,需要反查table
        // meta,按编码解析text
        if (fieldMeta != null && isText(fieldMeta.getColumnType())) {
            columnBuilder.setValue(new String((byte[]) value, charset));
            javaType = Types.CLOB;
        } else {
            // byte数组,直接使用iso-8859-1保留对应编码,浪费内存
            columnBuilder.setValue(new String((byte[]) value, ISO_8859_1));
            javaType = Types.BLOB;
        }
        break;
    

    总结

    通过实现数据同步服务,可以在一定程度上实现数据消费端与后端程序解耦。但凡事皆有成本,是否值得引入到现有系统架构中,还需要架构师自己斟酌。

    -- EOF --



    作者:haitaoyao
    链接:https://www.jianshu.com/p/be3f62d4dce0
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

    展开全文
  • 前两天,无意间看到了林晓斌(丁奇)的一篇文章(详见文末参考链接的第一个),突发好奇心,想从MySQL暴露出来的一些信息上着手,看看能不能直观地看到MySQL 5.7的并行复制到底可以同时执行多少个binlog group(last_...
  • MySQL引入binlog来实现主从实例之间的数据同步,提高数据库系统的可用性,但同时也增加了事务整体的资源消耗,需要额外的磁盘空间和IO处理能力。尤其是为了保证本地事务的持久性,必须将binlog刷盘控制参数sync_...
  • drc-mysql是一种支持多master 多slave的快速并行复制的解决方案,基于mysql的binlog,目前支持binlog的STATEMENT模式。为了实现drc-mysql对 ROW模式的支持,本文对此展开研究,分析了binlog的事件格式,并针对不同的...
  • binlog

    2019-11-29 14:51:20
    什么是二进制日志(binlog) binlog是记录所有数据库表结构变更(例如CREATE、ALTER TABLE…)以及表数据修改(INSERT、UPDATE、DELETE…)的二进制日志。 binlog不会记录SELECT和SHOW这类操作,因为这类操作对数据...
  • 负责解析Mysql的二级制文件binlog成为event 核心类:AbstractEventParser start方法 1. 初始化缓冲队列,然后start()启动 2. 初始化BinLogParser,然后start()启动 3.启动工作线程parseThread 4.开始执行 4.1...
  • Canal binlog 日志 Dump 流程分析

    千次阅读 2020-08-12 22:36:31
    从前面的文章我们得知 Canal binlog 日志解析的基本流程如下图所示: 解析来重点梳理一下 dump 命令的发送逻辑,特别是日志的处理流程,一些基本的日志格式。 1、 dump 流程分析 在 Canal 中 dump 方法声明如下: ...
  • MySQL的Binlog原理

    万次阅读 多人点赞 2019-03-26 12:54:43
    什么是二进制日志(binlog) binlog是记录所有数据库表结构变更(例如CREATE、ALTER TABLE…)以及表数据修改(INSERT、UPDATE、DELETE…)的二进制日志。 binlog不会记录SELECT和SHOW这类操作,因为这类操作对数据...
  • (semaphore是一种保护便量或抽象数据类型用于构成限制共享资源(比如并行计算环境下的共享内存)访问的方法。基本上以锁的方式实现。) ---------- OS WAIT ARRAY INFO: reservation count 6 (当前等待列表...
  • 阿里RDS开发专家解析MySQL各版本并行复制 原创 2016-01-21 林晓斌 DBAplus社群 MySQL并行复制已经是老生常谈,我从2010年开始就着手处理线上这个问题,刚开始两三年也乐此不疲地分享。现在再...
  • binlog解析方式 结合案例分享下开源项目otter的一个小坑 1.案例背景 某个周末突然收到报警,发现线上多云数据库的数据同步任务挂起,显示日志写入数据失败。 错误原因非常明显: 唯一索引冲突。 查看了一下源库...
  • Mysql Binlog原理

    2020-08-29 21:59:27
    什么是二进制日志(binlog) binlog是记录所有数据库表结构变更(例如CREATE、ALTER TABLE…)以及表数据修改(INSERT、UPDATE、DELETE…)的二进制日志。 binlog不会记录SELECT和SHOW这类操作,因为这类操作对数据...
  • 作者:莫善 某互联网公司资深 DBA。...4、解析 binlog 五、总结 一、概述 作为一个 MySQL DBA,查看分析 binlog 是日常工作的一部分。 不知道你是否遇到过这样的需求:一个时间段,各个表的 dml 统计.
  • =================================================...binlog_format参数介绍 binlog_format 在mysql 5.1 版本前,所有二进制文件的格式都是基于SQL语句级别的,在mysql 5.1 版本后引入binlog_format参数,可以...
  • MySQL binlog相关参数

    2019-10-11 16:01:51
    binlog_cache_size 每个线程的binlog cache大小,如果超过了会将binlog暂存到磁盘上,影响性能。 binlog_checksum binlog校验码,默认是采用CRC32,会为每一个binlog event写一个校验码;可指定为none关闭,关闭后...
  • 自建Binlog订阅服务 —— Maxwell

    千次阅读 2018-10-19 10:17:58
    Maxwell 是java语言编写的能够读取、解析MySQL binlog,将行更新以json格式发送到 Kafka、RabbitMQ、AWS Kinesis、Google Cloud Pub/Sub、文件,有了增量的数据流,可以想象的应用场景实在太多了,如ETL、维护缓存、...
  • Mysql的Binlog原理

    2020-08-17 15:40:02
    什么是二进制日志(binlog) binlog是记录所有数据库表结构变更(例如CREATE、ALTER TABLE…)以及表数据修改(INSERT、UPDATE、DELETE…)的二进制日志。 binlog不会记录SELECT和SHOW这类操作,因为这类操作对数据...
  • binlog解读3

    2020-12-14 22:29:18
    二、解析binlog后的每个事务的第一个TIMESTAMP: 时间戳的有序性可能是被误用最多的。在mysqlbinlog这个工具的输出结果中,每个事务起始有会输出一个SET TIMESTAMP=n。这个值取自第一个更新事件(update)的时间。...
  • 中间件---Binlog传输同步---Maxwell

    千次阅读 2019-02-17 15:04:41
    Maxwell 是java语言编写的能够读取、解析MySQL binlog,将行更新以json格式发送到 Kafka、RabbitMQ、AWS Kinesis、Google Cloud Pub/Sub、文件,有了增量的数据流,可以想象的应用场景实在太多了,如ETL、维护缓存、...
  • mysql 原理 ~ binlog

    2018-08-10 19:27:00
    一 简介:我们会持续对binlog进行分析,但是不深入代码二 版本 5.6 格式 GTID和传统格式 传统格式 一 binlog针对具体事务注意点-1 1 update会记录更改前和更改后所有列的值 2 delete会记录删除前所有列的值 3 insert会...
  • Binlog日志到底是什么? 1.Binlog日志简介 在MySQL中一般有以下几种日志: 日志类型 写入日志的信息 错误日志 记录在启动,运行或停止mysqld时遇到的问题 通用查询日志 记录建立的客户端连接和执行的语句 ...
  • binlog的详细介绍

    千次阅读 2018-12-27 14:35:42
    1、推荐用mixed,默认使用statement,基于上下文 set session/global binlog_format=mixed; 2、二进制日记录了数据库执行更改的操作,如Insert,Update,Delete等。不包括Select等不影响数据库记录的操作 3、MySQL...
  • 转载自:...这个冠军的方案确实赞,10G的mysql binlog重放并传输只用了2秒! 总决赛冠军队伍 作死小分队 比赛攻略 决赛答...
  • mysql的日志分为几大类:错误日志、查询日志、慢查询日志、事务日志(redo log和undolog)、二进制日志(binlog)。 binlog 用于记录数据库执行的写入性操作(不包括查询)信息,以二进制的形式保存在磁盘中。binlog...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,673
精华内容 2,269
关键字:

binlog并行解析