精华内容
下载资源
问答
  • 1.canal简介+ n! R& W- x3 x!... k2 Acanal可以用来监控数据库数据的变化,从而获得新增、修改的数据。6 U% T! D7 m; u7 U4 J4 N) Q原理相对比较简单:8 x1 @. u- U( Y6 Pv, G(1)canal模拟mysql ...

    1.canal简介

    + n! R& W- x3 x! E$ W" r$ ^6 Q0 F" S7 o% R( c* n: X

    ; `/ C5 d' s8 u6 \, {. k2 Acanal可以用来监控数据库数据的变化,从而获得新增、修改的数据。6 U% T! D7 m; u7 U4 J4 N) Q

    435300e4227d534afa920ff32386c57d.png原理相对比较简单:

    8 x1 @. u- U( Y6 P  v, G(1)canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议# g+ K9 j0 r4 C& U% E/ k

    (2) mysql master收到dump请求,开始推送binary log给slave(也就是canal)

    ( ~' p5 o: Q& Y( _(3) canal解析binary log对象(原始为byte流)

    1 u" o% s. ^1 p7 r2.环境部署

    * B# Y! O4 U0 }6 Q% }9 B* ?: q% A! P! F8 }

    0 Q4 [. Y/ j2 I" z(1)mysql开启binlog模式$ q) U3 A9 R$ m# W" d7 E+ M

    查看当前mysql是否开启binlog模式,如果log_bin的值为OFF是未开启,为ON是已开启。

    / f. v7 E  O* `$ l  h# K; U1 wSHOW VARIABLES LIKE '%log_bin%'

    9 w$ ?; R- s8 D& K. N修改/etc/my.cnf 需要开启binlog模式。(修改完成之后,重启mysqld的服务。)% o! `9 R  @, g6 q3 N% \4 z* m

    log-bin=mysql-bin 1 T+ K+ L, `& f& x7 M3 ^

    binlog-format=ROW& l1 M; t% r8 v: D$ A$ b: C

    server_id=1

    # v0 [7 V! n8 c1 [0 a1 d进入mysql

    : X( X: S' l, J! W5 Q# G) ~* Zmysql -h localhost -u root -p

    ) S& d/ B* A9 X  R+ o创建账号 用于测试使用(使用root账号创建用户并授予权限)

    " A4 i& z9 R$ @9 Wcreate user canal@'%' IDENTIFIED by 'canal';

    . F3 n* x( d$ R+ o- b, \3 LGRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';

    " P$ b/ q% ?4 F2 G- YFLUSH PRIVILEGES;% _/ x* J4 s8 {& P0 k1 A0 Q+ G* q3.canal服务端安装配置

    2 q$ U: K7 U2 E/ Q

    / V1 a3 V5 x  M7 Q(1)下载地址canal, i* _$ d7 X2 a; n7 L: r

    https://github.com/alibaba/canal/releases/tag/canal-1.0.24 * F3 D0 n1 X- }/ e# M  @  V(2)下载之后 上传到linux系统中,解压缩到指定的目录/usr/local/canal

    " G5 |1 B! z, R* M  Y解压缩之后的目录结构如下:

    % h; W9 ]+ _0 j- ]( n. M(3)修改 exmaple下的实例配置(修改如图所示的几个参数)

    . u% E' [: B$ @  nvi conf/example/instance.properties

    B! F4 Q" ]& Q; L5 r+ g! G(4)指定读取位置Y2 ^5 P: B/ u  G; y  p1 T9 k

    进入mysql中执行下面语句查看binlog所在位置

    J; S& O2 n% d

    831d4ebef5c91ee43db887c56c2494e8.png如果fifile中binlog文件不为 mysql-bin.000001 可以重置mysql

    + x  l# T/ ~2 ~* q( f* P1 bmysql> reset master;$ u" x+ N' ?" ]5 K! R, ]- ?+ ^& h查看canal配置文件* `" d+ P3 P" I3 }$ E

    vim /usr/local/canal/conf/example/meta.dat" }& l6 ]0 O2 w, |3 }+ Q3 U找到对应的binlog信息更改一致即可' t% Z, Z% b+ M

    "journalName":"mysql-bin.000001","position":120,"6 a4 {5 N- t4 w# ~2 t) v( e" \8 t注意:如果不一致,可能导致以下错误$ y( q$ U- C2 l. H8 @6 a

    c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x7f2e9be3, /192.168.200.56:52225 => /192.168.200.128:11111],exception=java.io.IOException: Connection reset by peer1 }. ^4 E6 c6 h9 V* Z(5)启动服务:

    + }, S' G- F, d: e9 @[root@localhost canal]# ./bin/startup.sh 8 z4 H* I* K7 w* d5 ](6)查看日志:! u. c* c( d" [6 B

    cat /usr/local/canal/logs/canal/canal.log

    2 x4 K1 w. J& o* P1 Q) Q/ X9 w

    859b4ef2711de7be6dbf50a35820dc22.png4.数据监控微服务

    9 d7 W# v6 X) q; {, g4 B8 y- W% s2 G  z4 L. P" {- O3 E

    当用户执行数据库的操作的时候,binlog日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行相应的逻辑处理。0 p8 q! Q! e5 b3 h

    https://github.com/chenqian56131/spring-boot-starter-canal. z. i+ C2 n" }2 |/ W- b

    以上开源项目,实现了springboot与canal的集成。比原生的canal更加优雅。

    ) o& a4 }# L6 x. P8 B1 E使用前需要将starter-canal安装到本地仓库。我们可以参照它提供的canal-test,进行代码实现。" j( Z# J9 j& F

    (1)创建工程模块changgou_canal,pom引入依赖; m7 b: I; u: r

    & O, w# k# V& P' z/ C$ T

    com.xpand9 F, ?% W: Z; y# c' W6 m4 g7 [  H

    starter-canal9 J3 w4 j( f2 k: ]

    0.0.1-SNAPSHOT! |/ F) k3 g+ g' ~

    9 }; r/ G9 n% I/ D4 C+ o7 h$ O(2)创建包com.changgou.canal ,包下创建启动类1 p" q" z5 L2 b, B8 v  T

    @SpringBootApplication

    ' ]6 x8 t  C9 `& @% f. N@EnableCanalClient //声明当前的服务是canal的客户端6 e. Z: G3 j( ~7 t8 q8 `' h  b

    public class CanalApplication {

    0 ?  Y0 c, U: X/ u7 V! |/ f0 ?public static void main(String[] args) {

    ( F  l; Z' i- {% H2 a+ }SpringApplication.run(CanalApplication.class,args);! l  {: |5 N! T5 o

    }" T2 ]8 S4 ^: V* p! f1 b( G

    }

    {+ i5 [7 ?5 z3 }(3)添加配置文件application.properties

    - }9 h2 m/ t0 a+ T5 K" z, `$ J/ a# X: `canal.client.instances.example.port=11111

    ' I# M4 Z5 e, N# r6 S1 Zcanal.client.instances.example.batchSize=1000

    & Q' R3 g6 B8 t6 j5 H3 h7 Cspring.rabbitmq.host=192.168.200.128

    $ G4 G9 r0 G0 F(4)创建com.changgou.canal.listener包,包下创建类

    / z, y- G, W0 e1 K! v% G@CanalEventListener //声明当前的类是canal的监听类 " s/ j+ P6 H5 O9 y- @& |

    public class BusinessListener {

    & h0 _4 h4 O5 w" M( R' c1 E@Autowired" j+ \6 s2 r+ i& R/ {

    private RabbitTemplate rabbitTemplate;Z1 V8 O  y0 P( p

    /**

    6 X8 U6 q) {1 C+ ]  A( {*

    0 s3 _% j* C6 ~. ]* @param eventType 当前操作数据库的类型

    8 g& \) G* e0 \# P# _" k' I6 r* @param rowData 当前操作数据库的数据& P4 Y+ E; K. a0 d: M

    */+ _' c2 l3 e- [3 J' o

    @ListenPoint(schema = "business",table = "tb_ad"). T) k. e& E3 r# Q* K

    public void adUpdate(CanalEntry.EventType eventType,CanalEntry.RowData rowData){

    , x. \* ^" i; k6 w) u! k; F" nSystem.out.println("广告表数据发生改变");

    : v7 S, {! }7 B+ d6 y' d, A4 Y* T//获取改变之前的数据

    9 L5 O, v) [+ A/ j1 H/ ZrowData.getBeforeColumnsList().forEach((c)-> System.out.println("改变前的数据:"+c.getName()+"::"+c.getValue()));

    " z6 w3 G/ H9 b: }2 T6 |# G& C//获取改变之后的数据0 k$ N* N6 T5 F6 i9 R1 u! Y/ Q

    rowData.getAfterColumnsList().forEach((c)-> System.out.println("改变之后的C, ~6 ~7 |) d5 E) e

    数据:"+c.getName()+"::"+c.getValue()));

    ' F9 x5 K: y# [% Q}

    ) o' h" U0 E6 M( Q9 h7 {}3 a" N+ m/ d. |) B2 Q* X; T/ H测试:启动数据监控微服务,修改business的tb_ad表,观察控制台输出。4 Z; \# P8 e/ ]3 `. L. r# q7 }

    2 b8 Y, a6 O; Q( YJava吧 收集整理 java论坛 www.java8.com

    展开全文
  • KingSunSha(弱水三千):是单向同步吗?即对于特定的表只有一个数据库的数据发生变化?我们是用文本文件进行数据交换,定时一个数据库发送所有在上次发送以后更改过的纪录,然后在另一端读入文本文件,当然要写一些...

    KingSunSha(弱水三千):

    是单向同步吗?即对于特定的表只有一个数据库的数据发生变化?我们是用文本文件进行数据交换,定时一个数据库发送所有在上次发送以后更改过的纪录,然后在另一端读入文本文件,当然要写一些发送和读入的脚本

    如果是双向的那问题就复杂好多

    lisz()   :

    我实现了从Oracle到SQL的复制。

    建立链接,把Oracle的数据库链接到SQL上,再在SQL上使用SP定时更新记录,具体的做法在SQL的帮助上都有的。我想SQL到Oracle也可以做到。

    KingSunSha(弱水三千):

    还有一个做法我们也用过:对于需要同步的表,建立一个LOG表,每次对原表的操作,通过过程或者触发器(很少用触发器也是因为跟踪太困难)写入LOG表,定时把LOG表中的纪录用文本文件的方式发FTP到目标端服务器,然后清空LOG表;在服务器端把文本文件倒入临时表中,在从临时表中对数据进行校验后写入目的表。

    这么做确实工作量比较大,但我们认为是值得的。对于同步来说,很重要的一点是必须非常方便查错和恢复,因为在传输过程中常常有不可预知的错误。所以我们不大喜欢用SNAPSHOT之类的工具,而采用自己写代码的方式控制。因为这些工具跟踪过程太麻烦,很多东西是黑箱操作。

    farspeed(farspeed)回复于 2002-05-29 15:17:00 得分 33

    我做过类似的操作,具体的方法是,同事连着   oracle   和   sqlserver

    (通讯程序)

    然后从数据发起端读出数据,然后写入到数据接收端。

    需要说明的是,两边的接口尽量简单,数据库部分的操作尽量放在数据库中,

    通讯程序只负责通讯工作。

    展开全文
  • 1、 早期关系型数据库之间的数据同步1)、全量同步比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法就是 分页查询源的表,然后通过 jdbc的batch 方式插入到目标表,这个地方需要注意的是,分页查询时,...

    1、 早期关系型数据库之间的数据同步

    1)、全量同步

    比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法就是 分页查询源端的表,然后通过 jdbc的batch 方式插入到目标表,这个地方需要注意的是,分页查询时,一定要按照主键id来排序分页,避免重复插入。

    2)、基于数据文件导出和导入的全量同步,这种同步方式一般只适用于同种数据库之间的同步,如果是不同的数据库,这种方式可能会存在问题。

    3)、基于触发器的增量同步

    增量同步一般是做实时的同步,早期很多数据同步都是基于关系型数据库的触发器trigger来做的。

    使用触发器实时同步数据的步骤:

    A、 基于原表创触发器,触发器包含insert,modify,delete 三种类型的操作,数据库的触发器分Before和After两种情况,一种是在insert,modify,delete 三种类型的操作发生之前触发(比如记录日志操作,一般是Before),一种是在insert,modify,delete 三种类型的操作之后触发。

    B、 创建增量表,增量表中的字段和原表中的字段完全一样,但是需要多一个操作类型字段(分表代表insert,modify,delete 三种类型的操作),并且需要一个唯一自增ID,代表数据原表中数据操作的顺序,这个自增id非常重要,不然数据同步就会错乱。

    C、 原表中出现insert,modify,delete 三种类型的操作时,通过触发器自动产生增量数据,插入增量表中。

    D、处理增量表中的数据,处理时,一定是按照自增id的顺序来处理,这种效率会非常低,没办法做批量操作,不然数据会错乱。  有人可能会说,是不是可以把insert操作合并在一起,modify合并在一起,delete操作合并在一起,然后批量处理,我给的答案是不行,因为数据的增删改是有顺序的,合并后,就没有顺序了,同一条数据的增删改顺序一旦错了,那数据同步就肯定错了。

    市面上很多数据etl数据交换产品都是基于这种思想来做的。

    E、 这种思想使用kettle 很容易就可以实现,笔者曾经在自己的博客中写过 kettle的文章,https://www.cnblogs.com/laoqing/p/7360673.html

    4)、基于时间戳的增量同步

    A、首先我们需要一张临时temp表,用来存取每次读取的待同步的数据,也就是把每次从原表中根据时间戳读取到数据先插入到临时表中,每次在插入前,先清空临时表的数据

    B、我们还需要创建一个时间戳配置表,用于存放每次读取的处理完的数据的最后的时间戳。

    C、每次从原表中读取数据时,先查询时间戳配置表,然后就知道了查询原表时的开始时间戳。

    D、根据时间戳读取到原表的数据,插入到临时表中,然后再将临时表中的数据插入到目标表中。

    E、从缓存表中读取出数据的最大时间戳,并且更新到时间戳配置表中。缓存表的作用就是使用sql获取每次读取到的数据的最大的时间戳,当然这些都是完全基于sql语句在kettle中来配置,才需要这样的一张临时表。

    2、    大数据时代下的数据同步

    1)、基于数据库日志(比如mysql的binlog)的同步

    我们都知道很多数据库都支持了主从自动同步,尤其是mysql,可以支持多主多从的模式。那么我们是不是可以利用这种思想呢,答案当然是肯定的,mysql的主从同步的过程是这样的。

      A、master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);

      B、slave将master的binary log events拷贝到它的中继日志(relay log);

      C、slave重做中继日志中的事件,将改变反映它自己的数据。

    阿里巴巴开源的canal就完美的使用这种方式,canal 伪装了一个Slave 去喝Master进行同步。

    A、 canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

    B、 mysql master收到dump请求,开始推送binary log给slave(也就是canal)

    C、 canal解析binary log对象(原始为byte流)

    另外canal 在设计时,特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

    canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample

    canal c# 客户端: https://github.com/dotnetcore/CanalSharp

    canal go客户端: https://github.com/CanalClient/canal-go

    D、在使用canal时,mysql需要开启binlog,并且binlog-format必须为row,可以在mysql的my.cnf文件中增加如下配置

    log-bin=E:/mysql5.5/bin_log/mysql-bin.log

    binlog-format=ROW

    server-id=123、

    E、 部署canal的服务端,配置canal.properties文件,然后 启动 bin/startup.sh 或bin/startup.bat

    #设置要监听的mysql服务器的地址和端口

    canal.instance.master.address = 127.0.0.1:3306

    #设置一个可访问mysql的用户名和密码并具有相应的权限,本示例用户名、密码都为canal

    canal.instance.dbUsername = canal

    canal.instance.dbPassword = canal

    #连接的数据库

    canal.instance.defaultDatabaseName =test

    #订阅实例中所有的数据库和表

    canal.instance.filter.regex = .*\\..*

    #连接canal的端口

    canal.port= 11111

    #监听到的数据变更发送的队列

    canal.destinations= example

    F、 客户端开发,在maven中引入canal的依赖

    com.alibaba.otter

    canal.client

    1.0.21

    代码示例:

    package com.example;

    import com.alibaba.otter.canal.client.CanalConnector;

    import com.alibaba.otter.canal.client.CanalConnectors;

    import com.alibaba.otter.canal.common.utils.AddressUtils;

    import com.alibaba.otter.canal.protocol.CanalEntry;

    import com.alibaba.otter.canal.protocol.Message;

    import com.google.protobuf.InvalidProtocolBufferException;

    import java.net.InetSocketAddress;

    import java.util.HashMap;

    import java.util.List;

    import java.util.Map;

    public class CanalClientExample {

    public static void main(String[] args) {

    while (true) {

    //连接canal

    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");

    connector.connect();

    //订阅 监控的 数据库.表

    connector.subscribe("demo_db.user_tab");

    //一次取10条

    Message msg = connector.getWithoutAck(10);

    long batchId = msg.getId();

    int size = msg.getEntries().size();

    if (batchId < 0 || size == 0) {

    System.out.println("没有消息,休眠5秒");

    try {

    Thread.sleep(5000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    } else {

    //

    CanalEntry.RowChange row = null;

    for (CanalEntry.Entry entry : msg.getEntries()) {

    try {

    row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

    List rowDatasList = row.getRowDatasList();

    for (CanalEntry.RowData rowdata : rowDatasList) {

    List afterColumnsList = rowdata.getAfterColumnsList();

    Map dataMap = transforListToMap(afterColumnsList);

    if (row.getEventType() == CanalEntry.EventType.INSERT) {

    //具体业务操作

    System.out.println(dataMap);

    } else if (row.getEventType() == CanalEntry.EventType.UPDATE) {

    //具体业务操作

    System.out.println(dataMap);

    } else if (row.getEventType() == CanalEntry.EventType.DELETE) {

    List beforeColumnsList = rowdata.getBeforeColumnsList();

    for (CanalEntry.Column column : beforeColumnsList) {

    if ("id".equals(column.getName())) {

    //具体业务操作

    System.out.println("删除的id:" + column.getValue());

    }

    }

    } else {

    System.out.println("其他操作类型不做处理");

    }

    }

    } catch (InvalidProtocolBufferException e) {

    e.printStackTrace();

    }

    }

    //确认消息

    connector.ack(batchId);

    }

    }

    }

    public static Map transforListToMap(List afterColumnsList) {

    Map map = new HashMap();

    if (afterColumnsList != null && afterColumnsList.size() > 0) {

    for (CanalEntry.Column column : afterColumnsList) {

    map.put(column.getName(), column.getValue());

    }

    }

    return map;

    }

    }

    2)、基于BulkLoad的数据同步,比如从hive同步数据到hbase

    我们有两种方式可以实现,

    A、 使用spark任务,通过HQl读取数据,然后再通过hbase的Api插入到hbase中。

    但是这种做法,效率很低,而且大批量的数据同时插入Hbase,对Hbase的性能影响很大。

    在大数据量的情况下,使用BulkLoad可以快速导入,BulkLoad主要是借用了hbase的存储设计思想,因为hbase本质是存储在hdfs上的一个文件夹,然后底层是以一个个的Hfile存在的。HFile的形式存在。Hfile的路径格式一般是这样的:

    /hbase/data/default(默认是这个,如果hbase的表没有指定命名空间的话,如果指定了,这个就是命名空间的名字)

    B、 BulkLoad实现的原理就是按照HFile格式存储数据到HDFS上,生成Hfile可以使用hadoop的MapReduce来实现。如果不是hive中的数据,比如外部的数据,那么我们可以将外部的数据生成文件,然后上传到hdfs中,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。

    当然我们也可以不事先生成hfile,可以使用spark任务直接从hive中读取数据转换成RDD,然后使用HbaseContext的自动生成Hfile文件,部分关键代码如下:

    //将DataFrame转换bulkload需要的RDD格式

    val rddnew = datahiveDF.rdd.map(row => {

    val rowKey = row.getAs[String](rowKeyField)

    fields.map(field => {

    val fieldValue = row.getAs[String](field)

    (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))

    })

    }).flatMap(array => {

    (array)

    })

    //使用HBaseContext的bulkload生成HFile文件

    hbaseContext.bulkLoad[Put](rddnew.map(record => {

    val put = new Put(record._1)

    record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))

    put

    }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")

    val conn = ConnectionFactory.createConnection(hBaseConf)

    val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())

    val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))

    val realTable = conn.getTable(hbTableName)

    HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)

    // bulk load start

    val loader = new LoadIncrementalHFiles(hBaseConf)

    val admin = conn.getAdmin()

    loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)

    sc.stop()

    }

    def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {

    val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()

    import scala.collection.JavaConversions._

    for (cells

    val family = cells.getKey

    for (value

    val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))

    ret.+=((kfq, CellUtil.cloneValue(value)))

    }

    }

    ret.iterator

    }

    }

    C、pg_bulkload的使用

    这是一个支持pg库(PostgreSQL)批量导入的插件工具,它的思想也是通过外部文件加载的方式,这个工具笔者没有亲自去用过,详细的介绍可以参考:https://my.oschina.net/u/3317105/blog/852785   pg_bulkload项目的地址:http://pgfoundry.org/projects/pgbulkload/

    3)、基于sqoop的全量导入

    Sqoop 是hadoop生态中的一个工具,专门用于外部数据导入进入到hdfs中,外部数据导出时,支持很多常见的关系型数据库,也是在大数据中常用的一个数据导出导入的交换工具。

    Sqoop从外部导入数据的流程图如下:

    Sqoop将hdfs中的数据导出的流程如下:

    本质都是用了大数据的数据分布式处理来快速的导入和导出数据。

    4)、HBase中建表,然后Hive中建一个外部表,这样当Hive中写入数据后,HBase中也会同时更新,但是需要注意

    A、hbase中的空cell在hive中会补null

    B、hive和hbase中不匹配的字段会补null

    C、hive的外部表是通过hbase handle 来加载数据,在hbase的数据量非常大时,性能并不好。hive的外部表 在数据量大时,不管是通过HQL计算查询还是通过spark sql,外部表的性能都非常差,因为在加载数据时,会使用hbase的scan等,产生全表扫描。

    我们可以在hbase的shell 交互模式下,创建一张hbse表

    create 'bokeyuan','zhangyongqing'

    使用这个命令,我们可以创建一张叫bokeyuan的表,并且里面有一个列族zhangyongqing,hbase创建表时,可以不用指定字段,但是需要指定表名以及列族

    我们可以使用的hbase的put命令插入一些数据

    put 'bokeyuan','001','zhangyongqing:name','robot'

    put 'bokeyuan','001','zhangyongqing:age','20'

    put 'bokeyuan','002','zhangyongqing:name','spring'

    put 'bokeyuan','002','zhangyongqing:age','18'

    可以通过hbase的scan 全表扫描的方式查看我们插入的数据

    scan ' bokeyuan'

    我们继续创建一张hive外部表

    create external table bokeyuan (id int, name string, age int)

    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,zhangyongqing:name,zhangyongqing:age")

    TBLPROPERTIES("hbase.table.name" = " bokeyuan");

    外部表创建好了后,我们可以使用HQL语句来查询hive中的数据了

    select * from bokeyuan ;

    OK

    1 robot 20

    2 spring 18

    5)、Debezium+bireme:Debezium for PostgreSQL to Kafka  Debezium也是一个通过监控数据库的日志变化,通过对行级日志的处理来达到数据同步,而且Debezium 可以通过把数据放入到kafka,这样就可以通过消费kafka的数据来达到数据同步的目的。而且还可以给多个地方进行消费使用。

    Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。

    本来监控数据库,并且在数据变动的时候获得通知其实一直是一件很复杂的事情。关系型数据库的触发器可以做到,但是只对特定的数据库有效,而且通常只能更新数据库内的状态(无法和外部的进程通信)。一些数据库提供了监控数据变动的API或者框架,但是没有一个标准,每种数据库的实现方式都是不同的,并且需要大量特定的知识和理解特定的代码才能运用。确保以相同的顺序查看和处理所有更改,同时最小化影响数据库仍然非常具有挑战性。

    Debezium正好提供了模块为你做这些复杂的工作。一些模块是通用的,并且能够适用多种数据库管理系统,但在功能和性能方面仍有一些限制。另一些模块是为特定的数据库管理系统定制的,所以他们通常可以更多地利用数据库系统本身的特性来提供更多功能,Debezium提供了对MongoDB,mysql,pg,sqlserver的支持。

    Debezium是一个捕获数据更改(CDC)平台,并且利用Kafka和Kafka Connect实现了自己的持久性、可靠性和容错性。每一个部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一个上游数据库服务器,捕获所有的数据库更改,然后记录到一个或者多个Kafka topic(通常一个数据库表对应一个kafka topic)。Kafka确保所有这些数据更改事件都能够多副本并且总体上有序(Kafka只能保证一个topic的单个分区内有序),这样,更多的客户端可以独立消费同样的数据更改事件而对上游数据库系统造成的影响降到很小(如果N个应用都直接去监控数据库更改,对数据库的压力为N,而用debezium汇报数据库更改事件到kafka,所有的应用都去消费kafka中的消息,可以把对数据库的压力降到1)。另外,客户端可以随时停止消费,然后重启,从上次停止消费的地方接着消费。每个客户端可以自行决定他们是否需要exactly-once或者at-least-once消息交付语义保证,并且所有的数据库或者表的更改事件是按照上游数据库发生的顺序被交付的。

    对于不需要或者不想要这种容错级别、性能、可扩展性、可靠性的应用,他们可以使用内嵌的Debezium connector引擎来直接在应用内部运行connector。这种应用仍需要消费数据库更改事件,但更希望connector直接传递给它,而不是持久化到Kafka里。

    另外Maxwell也是可以实现MySQL到Kafka的消息中间件,消息格式采用Json:

    6)、datax

    datax 是阿里开源的etl 工具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能,采用java+python进行开发,核心是java语言实现。

    github地址:https://github.com/alibaba/DataX

    A、设计架构:

    数据交换通过DataX进行中转,任何数据源只要和DataX连接上即可以和已实现的任意数据源同步

    B、框架

    核心模块介绍:

    DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

    DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

    切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

    每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

    DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

    DataX调度流程:

    举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

    DataXJob根据分库分表切分成了100个Task。

    根据20个并发,DataX计算共需要分配4个TaskGroup。

    4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

    优势:

    每种插件都有自己的数据转换策略,放置数据失真;

    提供作业全链路的流量以及数据量运行时监控,包括作业本身状态、数据流量、数据速度、执行进度等。

    由于各种原因导致传输报错的脏数据,DataX可以实现精确的过滤、识别、采集、展示,为用户提过多种脏数据处理模式;

    精确的速度控制

    健壮的容错机制,包括线程内部重试、线程级别重试;

    从插件视角看框架

    Job:是DataX用来描述从一个源头到目的的同步作业,是DataX数据同步的最小业务单元;

    Task:为最大化而把Job拆分得到最小的执行单元,进行并发执行;

    TaskGroup:一组Task集合,在同一个TaskGroupContainer执行下的Task集合称为TaskGroup;

    JobContainer:Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker;

    TaskGroupContainer:TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TAskTacker。

    总之,Job拆分为Task,分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。

    物理执行有三种运行模式:

    Standalone:单进程运行,没有外部依赖;

    Local:单进程运行,统计信息,错误信息汇报到集中存储;

    Distrubuted:分布式多线程运行,依赖DataX Service服务;

    总体来说,当JobContainer和TaskGroupContainer运行在同一个进程内的时候就是单机模式,在不同进程执行就是分布式模式。

    如果需要开发插件,可以看zhege这个插件开发指南:   https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md

    数据源支持情况:

    类型数据源Reader(读)Writer(写)文档

    RDBMS 关系型数据库

    MySQL

    Oracle

    SQLServer

    PostgreSQL

    DRDS

    通用RDBMS(支持所有关系型数据库)

    阿里云数仓数据存储

    ODPS

    ADS

    OSS

    OCS

    NoSQL数据存储

    OTS

    Hbase0.94

    Hbase1.1

    Phoenix4.x

    Phoenix5.x

    MongoDB

    Hive

    无结构化数据存储

    TxtFile

    FTP

    HDFS

    Elasticsearch

    时间序列数据库

    OpenTSDB

    TSDB

    7)、OGG

    OGG 一般主要用于Oracle数据库。即Oracle GoldenGate是Oracle的同步工具 ,可以实现两个Oracle数据库之间的数据的同步,也可以实现Oracle数据同步到Kafka,相关的配置操作可以参考如下:

    8)、databus

    Databus是一个实时的、可靠的、支持事务的、保持一致性的数据变更抓取系统。 2011年在LinkedIn正式进入生产系统,2013年开源。

    Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更。

    Databus的传输层端到端延迟是微秒级的,每台服务器每秒可以处理数千次数据吞吐变更事件,同时还支持无限回溯能力和丰富的变更订阅功能。

    databus架构设计:

    来源独立:Databus支持多种数据来源的变更抓取,包括Oracle和MySQL。

    可扩展、高度可用:Databus能扩展到支持数千消费者和事务数据来源,同时保持高度可用性。

    事务按序提交:Databus能保持来源数据库中的事务完整性,并按照事务分组和来源的提交顺寻交付变更事件。

    低延迟、支持多种订阅机制:数据源变更完成后,Databus能在微秒级内将事务提交给消费者。同时,消费者使用Databus中的服务器端过滤功能,可以只获取自己需要的特定数据。

    无限回溯:这是Databus最具创新性的组件之一,对消费者支持无限回溯能力。当消费者需要产生数据的完整拷贝时(比如新的搜索索引),它不会对数据库产生任何额外负担,就可以达成目的。当消费者的数据大大落后于来源数据库时,也可以使用该功能。

    Databus Relay中继的功能主要包括:

    从Databus来源读取变更行,并在内存缓存内将其序列化为Databus变更事件

    监听来自Databus客户端(包括Bootstrap Producer)的请求,并传输新的Databus数据变更事件

    Databus客户端的功能主要包括:

    检查Relay上新的数据变更事件,并执行特定业务逻辑的回调

    如果落后Relay太多,向Bootstrap Server发起查询

    新Databus客户端会向Bootstrap Server发起bootstrap启动查询,然后切换到向中继发起查询,以完成最新的数据变更事件

    单一客户端可以处理整个Databus数据流,或者可以成为消费者集群的一部分,其中每个消费者只处理一部分流数据

    Databus Bootstrap Producer的功能有:

    检查中继上的新数据变更事件

    将变更存储在MySQL数据库中

    MySQL数据库供Bootstrap和客户端使用

    Databus Bootstrap Server的主要功能,监听来自Databus客户端的请求,并返回长期回溯数据变更事件。

    Databus和canal的功能对比:

    对比项Databuscanal结论

    支持的数据库

    mysql, oracle

    mysql(据说内部版本支持oracle)

    Databus目前支持的数据源更多

    业务开发

    业务只需要实现事件处理接口

    事件处理外,需要处理ack/rollback,

    反序列化异常等

    Databus开发接口用户友好度更高

    服务模型

    relay

    relay可以同时服务多个client

    一个server instance只能服务一个client

    (受限于server端保存拉取位点)

    Databus服务模式更灵活

    client

    client可以拉取多个relay的变更,

    访问的relay可以指定拉取某些表某些分片的变更

    client只能从一个server拉取变更,

    而且只能是拉取全量的变更

    可扩展性

    client可以线性扩展,处理能力也能线性扩展

    (Databus可识别pk,自动做数据分片)

    client无法扩展

    Databus扩展性更好

    可用性

    client ha

    client支持cluster模式,每个client处理一部分数据,

    某个client挂掉,其他client自动接管对应分片数据

    主备client模式,主client消费,

    如果主client挂掉,备client可自动接管

    Databus实时热备方案更成熟

    relay/server ha

    多个relay可连接到同一个数据库,

    client可以配置多个relay,relay故障启动切换

    主备relay模式,relay通过zk进行failover

    canal主备模式对数据库影响更小

    故障对上游

    数据库的影响

    client故障,bootstrap会继续拉取变更,

    client恢复后直接从bootstrap拉取历史变更

    client故障会阻塞server拉取变更,

    client恢复会导致server瞬时从数据库拉取大量变更

    Databus本身的故障对数据库影响几乎为0

    系统状态监控

    程序通过http接口将运行状态暴露给外部

    暂无

    Databus程序可监控性更好

    开发语言

    java,核心代码16w,测试代码6w

    java,4.2w核心代码,6k测试代码

    Databus项目更成熟,当然学习成本也更大

    9)、gobblin

    Gobblin是用来整合各种数据源的通用型ETL框架,在某种意义上,各种数据都可以在这里“一站式”的解决ETL整个过程,专为大数据采集而生,易于操作和监控,提供流式抽取支持。主要用于Kafka的数据同步到HDFS。

    该框架来源于kafka的东家LinkedIn。大体的架构如下:

    Gobblin的功能真的是非常的全。底层支持三种部署方式,分别是standalone,mapreduce,mapreduce on yarn。可以方便快捷的与Hadoop进行集成,上层有运行时任务调度和状态管理层,可以与Oozie,Azkaban进行整合,同时也支持使用Quartz来调度(standalone模式默认使用Quartz进行调度)。对于失败的任务还拥有多种级别的重试机制,可以充分满足我们的需求。再上层呢就是由6大组件组成的执行单元了。这6大组件的设计也正是Gobblin高度可扩展的原因。

    Gobblin组件

    Gobblin提供了6个不同的组件接口,因此易于扩展并进行定制化开发。分别是:

    source

    extractor

    convertor

    quality checker

    writer

    publisher

    Source主要负责将源数据整合到一系列workunits中,并指出对应的extractor是什么。这有点类似于Hadoop的InputFormat。

    Extractor则通过workunit指定数据源的信息,例如kafka,指出topic中每个partition的起始offset,用于本次抽取使用。Gobblin使用了watermark的概念,记录每次抽取的数据的起始位置信息。

    Converter顾名思义是转换器的意思,即对抽取的数据进行一些过滤、转换操作,例如将byte arrays 或者JSON格式的数据转换为需要输出的格式。转换操作也可以将一条数据映射成0条或多条数据(类似于flatmap操作)。

    Quality Checker即质量检测器,有2中类型的checker:record-level和task-level的策略。通过手动策略或可选的策略,将被check的数据输出到外部文件或者给出warning。

    Writer就是把导出的数据写出,但是这里并不是直接写出到output file,而是写到一个缓冲路径( staging directory)中。当所有的数据被写完后,才写到输出路径以便被publisher发布。Sink的路径可以包括HDFS或者kafka或者S3中,而格式可以是Avro,Parquet,或者CSV格式。同时Writer也可是根据时间戳,将输出的文件输出到按照“小时”或者“天”命名的目录中。

    Publisher就是根据writer写出的路径,将数据输出到最终的路径。同时其提供2种提交机制:完全提交和部分提交;如果是完全提交,则需要等到task成功后才pub,如果是部分提交模式,则当task失败时,有部分在staging directory的数据已经被pub到输出路径了。

    Gobblin执行流程

    Job被创建后,Runtime就根据Job的部署方式进行执行。Runtime负责job/task的定时执行,状态管理,错误处理以及失败重试,监控和报告等工作。Gobblin存在分支的概念,从数据源获取的数据由不同的分支进行处理。每个分支都可以有自己的Converter,Quality Checker,Writer和Publisher。因此各个分支可以按不同的结构发布到不同的目标地址。单个分支任务失败不会影响其他分支。 同时每一次Job的执行都会将结果持久化到文件( SequenceFiles)中,以便下一次执行时可以读到上次执行的位置信息(例如offset),本次执行可以从上次offset开始执行本次Job。状态的存储会被定期清理,以免出现存储无限增长的情况。

    10)、MongoShake

    MongoShake是阿里巴巴Nosql团队开源出来的一个项目,主要用于mongdb的数据同步到kafka或者其他的mongdb数据库中,MongoShake是一个以golang语言进行编写的通用的平台型服务,通过读取MongoDB集群的Oplog操作日志,对MongoDB的数据进行复制,后续通过操作日志实现特定需求。日志可以提供很多场景化的应用,为此,我们在设计时就考虑了把MongoShake做成通用的平台型服务。通过操作日志,我们提供日志数据订阅消费PUB/SUB功能,可通过SDK、Kafka、MetaQ等方式灵活对接以适应不同场景(如日志订阅、数据中心同步、Cache异步淘汰等)。集群数据同步是其中核心应用场景,通过抓取oplog后进行回放达到同步目的,实现灾备和多活的业务场景。

    整体的架构图如下:

    应用场景举例

    1.  MongoDB集群间数据的异步复制,免去业务双写开销。

    2.  MongoDB集群间数据的镜像备份(当前1.0开源版本支持受限)

    3.  日志离线分析

    4.  日志订阅

    5.  数据路由。根据业务需求,结合日志订阅和过滤机制,可以获取关注的数据,达到数据路由的功能。

    6.  Cache同步。日志分析的结果,知道哪些Cache可以被淘汰,哪些Cache可以进行预加载,反向推动Cache的更新。

    7.  基于日志的集群监控

    功能介绍

    MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。现有通道类型有:

    1.  Direct:直接写入目的MongoDB

    2.  RPC:通过net/rpc方式连接

    3.  TCP:通过tcp方式连接

    4.  File:通过文件方式对接

    5.  Kafka:通过Kafka方式对接

    6.  Mock:用于测试,不写入tunnel,抛弃所有数据

    数据同步的架构如下图所示

    11)、Flinkx

    FlinkX是一款基于Flink的分布式离线/实时数据同步插件,可实现多种异构数据源高效的数据同步,其由袋鼠云于2016年初步研发完成,目前有稳定的研发团队持续维护,已在Github上开源(开源地址详见文章末尾)。并于今年6年份,完成批流统一,离线计算与流

    计算的数据同步任务都可基于FlinkX实现。

    FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等。FlinkX目前包含下面这些特性:

    大部分插件支持并发读写数据,可以大幅度提高读写速度;

    部分插件支持失败恢复的功能,可以从失败的位置恢复任务,节约运行时间;失败恢复

    关系数据库的Reader插件支持间隔轮询功能,可以持续不断的采集变化的数据;间隔轮询

    部分数据库支持开启Kerberos安全认证;Kerberos

    可以限制reader的读取速度,降低对业务数据库的影响;

    可以记录writer插件写数据时产生的脏数据;

    可以限制脏数据的最大数量;

    支持多种运行模式;

    基于Flink开发,支持分布式运行;

    双向读写,某数据库既可以作为源库,也可以作为目标库;

    支持多种异构数据源,可实现MySQL、Oracle、SQLServer、Hive、Hbase等近20种数据源的双向采集。

    高扩展性,强灵活性,新扩展的数据源可与现有数据源可即时互通。

    FlinkX目前支持下面这些数据库:

    Database TypeReaderWriter

    Batch Synchronization

    MySQL

    Oracle

    SqlServer

    PostgreSQL

    ClickHouse

    PolarDB

    SAP Hana

    Teradata

    Phoenix

    Cassandra

    MongoDB

    ElasticSearch

    Carbondata

    Stream

    Redis

    Hive

    Stream Synchronization

    Kafka

    MySQL Binlog

    MongoDB Oplog

    PostgreSQL WAL

    Oracle Logminer

    Coming Soon

    SqlServer CDC

    Coming Soon

    FlinkX开发者只需要关注InputFormat和OutputFormat接口实现即可。工作原理如下:

    更多详情参考:

    12)、Apache NIFI

    A、背景:

    Apache NiFi项目,它是一种实时数据流处理 系统,在去年由美国安全局(NSA)开源并进入Apache社区,NiFi初始的项目名称是Niagarafiles。当NiFi项目开源之后,一些早先在NSA的开发者们创立了初创公司Onyara,Onyara随之继续NiFi项目的开发并提供相关的支

    持。Hortonworks公司收购了Onyara并将其开发者整合到自己的团队中,形成HDF(Hortonworks Data Flow)平台,如下图所示。

    B、介绍:

    一个易于使用,功能强大且可靠处理和分发数据的系统。

    接下来我们分析一下关键字。

    NIFI定义:

    处理和分发数据,这是NIFI的要旨。它可以在系统中移动数据,并为你提供处理该数据的工具。

    NIFI可以处理各种各样的数据源和不同格式的数据。你可以从一个源中获取数据,对其进行转换,然后将其推送到另一个目标存储地。

    易于使用

    Processors-boxes-通过连接器链接-箭头创建流程。NIFI提供了一个基于流的编程体验。

    NIFI让我们一眼就能理解一组数据流操作,而这或许将需要数百行源代码来实现。

    考虑下面的pipeline:

    如果要在NIFI中实现转换上述的数据流,只需在NIFI图形用户界面,将三个组件拖放到画布中,然后连接做配置。也就需要个两分钟。

    而如果你编写代码来执行相同的操作,则可能需要数百行才能达到相似的结果。

    NIFI在构建数据pipeline方面更具表现力,我们不需要写代码,而NIFI就是为此而设计的。

    强大

    NIFI提供了许多开箱即用的处理器。使用者其实是站在巨人的肩膀上。这些标准处理器可以处理你可能遇到的绝大多数需求。

    NIFI是高度并发的,但其内部封装了相关的复杂性。我们看到的处理器是一个高级抽象,它掩盖了并行编程固有的复杂性。我们可以多个处理器一起运行,一个处理器也可以有多个线程运行。

    并发是你不希望打开的计算型Pandora盒。NIFI使得pipeline构建器免受并发复杂性的影响。

    可靠

    NIFI的设计实现具有扎实的理论基础。与SEDA之类的模型相似(SEDA全称是:stage event driver architecture,中文直译为“分阶段的事件驱动架构”,它旨在结合事件驱动和多线程模式两者的优点,从而做到易扩展,解耦合,高并发。各个stage之间的通信由event来传递,event的处理由stage的线程池异步处理。)。

    对于数据流系统,要解决的主要问题之一就是可靠性。你想确保发送到某处的数据得到了有效接收。

    NIFI通过多种机制在任何时间点跟踪系统状态,从而实现了高度的可靠性。这些机制是可配置的,因此你可以在延迟和应用程序所需的吞吐量之间进行适当的权衡。

    NIFI利用lineage和provenance特征来跟踪每条数据的历史记录。它使得知道每条信息发生了什么转变。

    Apache NIFI提出的数据血缘解决方案被证明是审核数据pipeline的出色工具。在诸如欧盟这样的跨国参与者提出支持准确数据处理的准则的背景下,数据血缘功能对于增强人们对大数据和AI系统的信心至关重要。

    为什么要使用NIFI?

    在确定解决方案时,请记住大数据的四个特点。

    Volume — 你有多少数据?在数量级上,你接近几GB还是几百个PB?

    Variety — 你有多少个数据源?你的数据是否结构化?如果是,结构是否经常变化?

    Velocity — 你需要处理的频率是多少?是信用卡付款吗?它是物联网设备发送的每日性能报告吗?

    Veracity — 你可以信任数据吗?另外,在操作之前是否需要进行多次清洁操作?

    NIFI无缝地从多个数据源提取数据,并提供了处理数据中不同模式的机制。因此,当数据种类繁多时,它就非常适用了。

    如果数据准确性不高,则NIFI尤其有价值。NIFI提供了多个处理器来清理和格式化数据。

    通过其配置选项,NIFI可以解决各种 volume/velocity 场景问题。

    数据路由解决方案的应用程序列表越来越多

    物联网的兴起及其生成的数据流都强调了诸如Apache NIFI之类的工具的重要性。

    微服务是新潮。在那些松耦合的服务中,数据是服务之间的契约。NIFI是在这些服务之间路由数据的可靠方法。

    物联网将大量数据带到云中。对从边缘到云的数据的采集和验证带来了许多新挑战,NIFI可以有效应对这些挑战(主要是通过MiNIFI,针对边缘设备的NIFI项目)

    制定了新的准则和法规以重新调整大数据经济。在日益增加的监视范围内,对于企业来说,至关重要的是清楚地了解其数据pipeline。例如,NIFI数据血缘可能会有助于你遵守法规。

    弥合大数据专家与其他专家之间的鸿沟

    从用户界面可以看到,用NIFI表示的数据流非常适合与你的数据pipeline进行通信。它可以帮助你的组织成员更加了解数据pipeline中发生的事情。

    分析师正在寻求有关为什么这些数据以这种方式到达此处的见解?坐在一起,并在流程中漫步。在五分钟内,你将对提取转换和加载-ETL-pipeline有深入的了解。

    你是否需要同行的反馈,以帮助你创建新的错误处理流程?NIFI决定将错误路径视为有效结果,这是一项设计决策。期望流程审查比传统的代码审查要短。

    你应该使用它吗?或许吧

    NIFI本身就易于使用。尽管如此,它还是一个企业数据流平台。它提供了一套完整的功能,你可能只需要其中的一部分即可。

    如果你是从头开始并管理来自受信任数据源的一些数据,那么最好设置ETL pipeline。你可能只需要从数据库中捕获更改数据和一些数据准备脚本即可。

    另一方面,如果你在使用现有大数据解决方案(用于存储,处理或消息传递)的环境中工作,则NIFI可以很好地与它们集成,并且很可能会很快获胜。你可以利用现成的连接器连接其他大数据解决方案。

    既然我们已经看到了Apache NIFI的优点,现在我们来看看它的关键概念并剖析其内部结构。

    我们已经理解了“NiFi is boxes and arrow programming”。但是,如果你必须使用NIFI,则可能需要更多地了解其工作原理。

    在第二部分中,我将说明Apache NIFI的关键概念。

    剖析Apache NIFI

    启动NIFI时,你会进入其Web界面。 Web UI是设计和控制数据pipeline的蓝图。

    在NIFI中,处理器通过connections连接在一起。在前面介绍的示例数据流中,有三个处理器。

    理解NIFI术语

    要使用NIFI表示数据流,你必须首先掌握其语言。不用担心,只需几个术语就足以掌握其背后的概念。

    那些一个个黑匣子称为处理器,它们通过称为connections的队列交换名为FlowFiles的信息块。最后,FlowFile Controller负责管理这些组件之间的资源。

    让我们看看它是如何工作的。

    FlowFile

    在NIFI中,FlowFile是在pipeline处理器中移动的信息包。

    FlowFile分为两个部分:

    Attributes,即键/值对。例如,文件名,文件路径和唯一标识符是标准属性。

    Content,对字节流的引用构成了FlowFile内容。

    FlowFile不包含数据本身,否则会严重限制pipeline的吞吐量。相反,FlowFile保留的是一个指针,该指针引用存储在本地存储中某个位置的数据。这个地方称为内容存储库(Content Repository)。

    为了访问内容,FlowFile从内容存储库中声明资源(claims),然后将跟踪内容所在位置的确切磁盘偏移,并将其返回FlowFile。

    并非所有处理器都需要访问FlowFile的内容来执行其操作-例如,聚合两个FlowFiles的内容不需要将其内容加载到内存中。

    当处理器修改FlowFile的内容时,将保留先前的数据。NIFI的copies-on-write机制会在将内容复制到新位置时对其进行修改。原始信息保留在内容存储库中。

    Example

    比如一个压缩FlowFile内容的处理器。原始内容会保留在内容存储库中,NIFI并为压缩内容创建一个新条目。

    内容存储库最终将返回对压缩内容的引用。 FlowFile里指向内容的指针被更新为指向压缩数据。

    下图总结了带有压缩FlowFiles内容的处理器的示例。

    Reliability

    NIFI声称是可靠的,实际上如何?当前使用的所有FlowFiles的属性以及对其内容的引用都存储在FlowFile Repository中。

    在pipeline的每个步骤中,在对流文件进行修改之前,首先将其以预写日志的方式(write-ahead log)记录在FlowFile Repository中。

    对于系统中当前存在的每个FlowFile,FlowFile Repository存储:

    FlowFile属性

    指向FlowFile内容的指针

    FlowFile的状态。例如:Flowfile在此瞬间属于哪个队列。

    FlowFile Repository为我们提供了流程的最新状态;因此,它是从中断中恢复的强大工具。

    NIFI提供了另一个工具来跟踪流程中所有FlowFiles的完整历史记录:Provenance Repository。

    Provenance Repository

    每次修改FlowFile时,NIFI都会获取FlowFile及其上下文的快照。NIFI中此快照的名称是Provenance Event。Provenance Repository记录Provenance Events。

    Provenance使我们能够追溯数据血缘关系并为在NIFI中处理的每条信息建立完整的监管链。

    除了提供完整的数据血缘之外,Provenance Repository还提供从任何时间点重播数据的功能。

    等等,FlowFile Repository和Provenance Repository有什么区别?

    FlowFile Repository和Provenance Repository背后的想法非常相似,但是它们解决的是不同的问题。

    FlowFile Repository是一个日志,仅包含系统中正在使用的FlowFiles的最新状态。这是flow的最新情况,可以快速从中断中恢复。

    Provenance Repository更为详尽,因为它可以跟踪流中每个FlowFile的完整生命周期。

    可以这么理解,FlowFile Repository里面保存的是你此时某个动作的照片,Provenance Repository保存的是你这个动作的视频。你可以倒退到过去的任何时刻,研究数据,并从给定的时间重放操作。它提供了数据的完整血缘关系。

    #Processor

    处理器是执行操作的黑匣子。处理器可以访问FlowFile的属性和内容来执行所有类型的操作。它们使你能够在数据输入,标准数据转换/验证任务中执行许多操作,并将这些数据保存到各种数据接收器。

    NIFI在安装时会附带许多处理器。如果你找不到适合自己的用例的处理器,可以构建自己的处理器。

    处理器是完成一项任务的高级抽象。这种抽象非常方便,因为它使pipeline的构建免受并发编程和错误处理机制的困扰。

    处理器提供了多个配置设置的界面以微调其行为。

    这些处理器的属性是NIFI与你的应用程序需求之间的最后联系。细节很重要,所以pipeline建设者会花费大部分时间来微调这些属性以匹配预期的行为。

    Scaling

    对于每个处理器,你可以指定要同时运行的并发任务数。这样,流控制器将更多资源分配给该处理器,从而提高其吞吐量。处理器共享线程。如果一个处理器请求更多的线程,则其他处理器的可用线程就会少了。

    横向扩展:扩展的另一种方法是增加NIFI群集中的节点数。

    Process Group

    现在,我们已经了解了什么是处理器,这很简单。

    一堆处理器及其连接可以组成一个Process Group。你添加了一个Input Port和一个Output Port,以便Process Group可以接收和发送数据。

    Connections

    Connections是处理器之间的队列。这些队列允许处理器以不同的速率进行交互。就像存在不同尺寸的水管Connections可以具有不同的容量。

    由于处理器根据它们执行的操作以不同的速率消耗和产生数据,因此Connections充当FlowFiles的缓冲区。

    Connections中可以有多少数据是有限制的。同样,当水管已满时,你将无法再加水,否则水会溢出。

    在NIFI中,你可以限制FlowFile的数量及其通过Connections的聚合内容的大小。

    当你发送的数据超出Connections的处理能力会发生什么?

    如果FlowFiles的数量或数据量超过定义的阈值,则将触发背压机制(backpressure)。在队列中没有空间之前,Flow Controller不会安排Connections上游的处理器再次运行。

    假设你在两个处理器之间最多只能有10000个FlowFile。在某个时候,连接中有7000个元素。因为限制为10000。P1仍然可以通过Connections发送数据到P2。

    现在,假设处理器一下子向该Connections发送了4000个新的FlowFiles。 7000 + 4000 = 11000→我们超过了10000个FlowFiles的连接阈值。

    这个限制是软限制,表示可以超出限制,但是Flow Controller不会调度处理器P1,直到Connections恢复到其阈值(10000个FlowFiles)以下。

    你想要设置适合于要处理的数据量和速度的Connections阈值,要始终考虑四个V(大数据的四个特点)。

    超出限制的想法听起来很奇怪,当FlowFiles或关联数据的数量超过阈值时,将触发交换机制(swap mechanism)。

    优先处理FlowFiles

    NIFI中的Connections是高度可配置的。你可以选择如何在队列中确定FlowFiles的优先级,以确定接下来要处理的文件。

    在可用的配置中,例如,先进先出-FIFO。但是,你甚至可以通过FlowFile中的属性来优先处理传入数据包。

    #Flow Controller

    Flow Controller是将一切融合在一起的粘合剂。它为处理器分配和管理线程。这就是执行数据流的方式。

    此外,Flow Controller还可以添加Controller Services。

    这些服务有助于管理共享资源,例如数据库连接或云服务提供商凭据。Controller Services是守护进程(daemons)。它们在后台运行,并提供配置,资源和参数供处理器执行。

    例如,你可以使用AWS凭证提供程序服务使你的服务与S3存储桶进行交互,而不必担心处理器级别的凭证。

    与处理器一样,开箱即用的控制器服务也很多。

    13)、streamsets

    StreamSets 数据收集器是一个轻量级,强大的引擎,实时流数据。使用Data Collector在数据流中路由和处理数据。

    要为Data Collector定义数据流,请配置管道。一个流水线由代表流水线起点和终点的阶段以及您想要执行的任何附加处理组成。配置管道后,单击“开始”,“ 数据收集器”开始工作。

    Data Collector在数据到达原点时处理数据,在不需要时静静地等待。您可以查看有关数据的实时统计信息,在数据通过管道时检查数据,或仔细查看数据快照。

    对于Streamsets来说,最重要的概念就是数据源(Origins)、操作(Processors)、目的地(Destinations)。创建一个Pipelines管道配置也基本是这三个方面。

    常见的Origins有Kafka、HTTP、UDP、JDBC、HDFS等;Processors可以实现对每个字段的过滤、更改、编码、聚合等操作;Destinations跟Origins差不多,可以写入Kafka、Flume、JDBC、HDFS、Redis等。

    你可以自己部署一个单机版,免费的,那么这个单机版就会收到硬件资源的限制导致总任务数量的是有瓶颈的,目前看下来还是有点吃资源的,4核16G,差不多跑50多个任务吧,可能我的任务计算量还是比较大,部署有增量和递减,管理用户只能有一个。有Rest Api

    你可以部署一个Control Hub集群版,收费的,多钱官网没有明确标明

    学习方式:

    youtube 的Demo视频

    Ask 社区 https://ask.streamsets.com/ 提问用的,回答率不保证,在Jira之后的一个社区

    Jira https://issues.streamsets.com/

    slack 聊天室 streamsetters.slack.com

    14)、FLink SQL CDC

    flink sql cdc 是flink 1.11 版本开始推出的功能,指在加强flink sql的能力以及flink 数据同步的能力。支持canal和Debezium 的cdc 。

    Flink’s Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Avro, Parquet, or ORC.

    总结:

    1、databus活跃度不高,datax和canal 相对比较活跃。

    2、datax 一般比较适合于全量数据同步,对全量数据同步效率很高(任务可以拆分,并发同步,所以效率高),对于增量数据同步支持的不太好(可以依靠时间戳+定时调度来实现,但是不能做到实时,延迟较大)。

    3、canal 、databus 等由于是通过日志抓取的方式进行同步,所以对增量同步支持的比较好。

    4、以上这些工具都缺少一个监控和任务配置调度管理的平台来进行支撑。

    5、flinkx活跃度也不是非常高,关注的人还不是很多。

    6、Apache NIFI 适合于重量级的数据同步处理,datax 相对来说比较轻量级。

    7、Flink sql cdc 应该再未来会很有前景

    个人新书,点击:

    展开全文
  • DataWorks数据集成提供了 “MySQL一键实时同步至Elasticsearch” 的解决方案,可以将MySQL中的数据库,通过一次性的简单配置,全增量一体化同步到Elasticsearch,达到数据实时落入ElasticSearch,实时可以用于分析的...
    简介:企业的实时数据除了存储在大数据引擎中,还有很多非结构化的日志数据,通过阿里云的Elasticsearch,用全托管的方式提供低成本的冷热存储方案,轻松助力企业搭建统一的云上全观测运维监控平台,实现海量数据的实时监控分析,提高自动化运维管理效率。DataWorks数据集成提供了 “MySQL一键实时同步至Elasticsearch” 的解决方案,可以将MySQL中的数据库,通过一次性的简单配置,全增量一体化同步到Elasticsearch,达到数据实时落入ElasticSearch,实时可以用于分析的效果。也可以将MySQL数据库离线全量或者增量搬迁到Elasticsearch中

    企业的实时数据除了存储在大数据引擎中,还有很多非结构化的日志数据,通过阿里云的Elasticsearch,用全托管的方式提供低成本的冷热存储方案,轻松助力企业搭建统一的云上全观测运维监控平台,实现海量数据的实时监控分析,提高自动化运维管理效率。DataWorks数据集成提供了 “MySQL一键实时同步至Elasticsearch” 的解决方案,可以将MySQL中的数据库,通过一次性的简单配置,全增量一体化同步到Elasticsearch,达到数据实时落入ElasticSearch,实时可以用于分析的效果。如果您只需要将业务库数据离线全量或者增量搬迁到Elasticsearch中,也可以将MySQL数据库,通过一次性的简单配置,全增量一体化离线同步到Elasticsearch中。DataWorks数据集成采用自研高性能引擎,在相同的机器规格情况下,同步性能更高,价格更优惠!


    方案简介

    本方案是整库全增量实时/离线同步 至Elasticsearch(目前支持的源数据库类型为MySQL,后续更多类型持续增加中)。在DataWorks数据集成界面下,单击 “一键实时同步至Elasticsearch” 新建同步任务,再通过完成“设置同步来源和规则”、“设置目标表”、“DDL消息处理规则”、“运行资源设置”这样4步简单的产品化配置,就可以将指定类型的数据库中全部表或者部分表的数据实时同步到Elasticsearch里。或者单击“整库离线同步至Elasticsearch”新建离线同步任务,再通过完成“设置同步来源和规则”、“设置目标索引”、“同步规则设置”、“运行资源设置”实现数据离线同步到Elasticsearch里。

    适用场景

    “一键实时同步至Elasticsearch”适用于业务库需要保持业务数据库数据实时更新至ElasticSearch的场景,供上层应用做实时数据检索分析或者后续数据开发。“整库离线同步至Elasticsearch”适用于将业务库数据全量或者增量搬迁到Elasticsearch中。

    优势特点

    整库级别同步:

    • 不需要一个个建立表到索引的同步,支持以库为单位,选择其中所有表或者部分表进行同步

    高效实时同步:

    • 支持数据实时同步至ElasticSearch,灵活配置DDL规则

    多种同步方式:

    • 离线同步支持全量、增量以及全量和增量结合的方式,同时支持周期性调度设置

    配置简单:

    • 避开纷繁复杂的同步任务、建索引配字段、相互依赖、参数对齐等操作,只需简单的产品化的功能配置。

    操作步骤

    步骤一:创建同步解决方案任务(实时/离线)

    1.登录并进入"数据集成"页面,单击“一键实时同步至Elasticsearch”新建实时同步任务或者单击“整库离线同步至Elasticsearch”新建离线同步任务。

    数据集成1.png

    2.完成方案名称等基本信息配置。在基本配置区域,配置各项参数。

    参数描述
    方案名称同步解决方案的名称,最多支持50个字符。
    描述对当前方案进行简单描述,最多支持50个字符。
    目标任务存放位置默认创建一个新的业务流程,所有任务均以clone_database_源端数据源名称+to+目标数据源名称的命名方式存放至数据集成目录下。您也可以取消自动建立工作流程,在选择位置下拉列表中指定存放目标任务的路径。

    步骤二:选择来源数据源并配置同步规则

    1.在数据来源区域,选择类型数据源(仅支持选择MySQ类型的数据源)

    2.在选择同步的源表区域,选中需要同步的源表 图标,将其移动至已选源表

    该区域会为您展示所选数据源下所有的表,您可以选择整库全表或部分表进行同步。

    注意 如果选中的表没有主键,将无法进行实时同步。

    3.在设置同步规则区域,单击添加规则,选择相应的规则进行添加。同步规则包括表名转换规则目标表名规则

    • 表名转换规则:转换表名为目标表名,进行字符串替换。
    • 目标表名规则:支持对转换后的表名添加前缀和后缀。

    4.单击下一步

    步骤三:选择目标数据源并配置目标表格式

    1.在设置目标表/设置目标索引页面,选择目标**Elasticsearch数据源**。

    2.单击刷新源表和**Elasticsearch索引映射**,创建需要同步的源表和目标Elasticsearch索引的映射关系。
    3.查看任务的执行进度和表来源。

    数据集成3-1.png

    序号描述
    显示映射关系的创建进度。说明 如果同步的表数量较多,会导致执行进度较慢,请耐心等待。
    ②③如果来源库有主键则会直接使用此主键。如果没有,则会显示编辑标志,允许自定义主键(支持联合主键)
    ④56选择的索引建立方式:- 当索引建立方式选择自动建索引时,显示自动创建的Elasticsearch索引名称。您可以单击表名称,修改建索引的配置。- 当索引建立方式选择使用已有索引时,请在下拉列表中选择需要的索引。

    4.单击下一步

    步骤四:DDL消息处理规则/同步规则设置

    1.如果是“一键实时同步至Elasticsearch”任务,那么这一步是配置DDL消息处理规则,如下图配置要同步的方式和参数。

    数据集成4-1.png

    2.处理规则说明:

    处理方式解释
    正常处理此DDL消息将会继续下发给目标数据源,由目标数据源来处理,不同目标数据源处理策略可能会不同。比如“增加列”对于MaxCompute来说就是个错误,但是对于Hologres来说就可以正常增加一列。
    忽略丢弃掉此DDL消息,不再向目标数据源发送此消息。
    告警在日志中发送告警信息,同时丢弃掉此DDL消息。
    出错直接让实时同步任务以出错状态终止运行。

    3.如果是“整库离线同步至Elasticsearch”任务,那么这一步应该是配置同步规则设置,如下图配置要同步的方式和参数。

    数据集成4-2.png

    4.方案选择:

    方案解释
    全量一次性同步后周期增量先将源端所有数据全量拉取到Elasticsearch后,再按照指定的过滤条件和重复周期,每次循环将增量数据拉取到Elasticsearch中。
    只全量一次性同步只进行一次同步,将源端所有数据全量拉取到Elasticsearch。
    只增量一次性同步只进行一次同步,按照指定的过滤条件将源端的增量数据拉取到Elasticsearch中。
    周期性全量同步按照指定的重复周期,每次循环都将源端所有数据拉取到Elasticsearch中。
    周期性增量同步按照指定的过滤条件和重复周期,每次循环将增量数据拉取到Elasticsearch中。

    步骤五:运行资源设置

    运行资源设置页面,配置各项参数。目前解决方案仅支持使用独享数据集成资源组,该资源组可以在DataWorks官网下“单独产品”购买处点击购买(注意是“独享数据集成资源”,不是调度资源),资源组详情也可参见资源规划与配置文档。

    1.如果是“一键实时同步至Elasticsearch”任务,这一步配置界面如下:

    数据集成5-1.png

    2.如果是“整库离线同步至Elasticsearch”任务,这一步配置界面如下:

    数据集成5-2.png

    参数描述
    离线任务名称规则全量同步时的离线任务名称。创建解决方案后,会先生成一个离线任务用于读取全量数据,再生成实时任务持续读取实时增量数据。
    选择实时任务独享资源组分别选择实时任务和全量离线任务需要使用的独享资源组。目前解决方案仅支持使用独享数据集成资源组,此处可配置为准备操作中已购买并配置的独享数据集成资源组,详情可参见资源规划与配置
    选择全量离线任务独享资源组
    选择调度资源组选择运行任务时使用的调度资源组。
    来源端读取支持最大连接数读取端的最大连接数,即来源端数据库的JDBC连接数。请根据数据库资源的实际情况合理配置。

    3.单击完成配置,完成数据同步解决方案任务创建。

    查看运行状态及结果

    解决方案任务列表页面,单击已运行任务后的执行详情,查看当前解决方案数据同步过程中各子任务节点的运行详情。
    单击子任务节点后的执行详情,可在弹窗中单击任务链接进入子节点的数据开发页面。

    管理数据同步解决方案任务

    查看或编辑任务。在解决方案任务列表页面,单击相应任务后的任务配置,可以查看或编辑任务。
    仅单击未运行状态后的任务配置,您可以编辑任务。其它状态下的任务配置页面,仅支持查看。

    删除任务:单击相应任务后的删除。在删除对话框中,单击确定(仅删除当前任务的配置记录,已经生成的表和任务不受影响)。

    数据集成6.png

    总结

    以上就是Elasticsearch实时同步解决方案的全部内容,数据同步到Elasticsearch之后,您可以很方便地做实时分布式的搜索与分析,Elasticsearch构建在Elastic Stack开源生态矩阵中,包括Beats(轻量级数据采集工具)、Logstash(收集、过滤、传输数据的工具)、Elasticsearch、Kibana(灵活的可视化工具)。您可以很方便地利用丰富的工具快速搭建您的数据检索或者实时监控运维应用。

    如果您对本次方案感兴趣的话,可以到Elasticsearch和DataWorks的官网查看具体产品信息:
    数据集成产品介绍:https://help.aliyun.com/document_detail/199008.html
    Elasticsearch产品官网:https://www.aliyun.com/product/bigdata/product/elasticsearch
    DataWorks产品官网:https://www.aliyun.com/product/bigdata/ide

    原文链接:https://developer.aliyun.com/article/781134?

    版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
    展开全文
  • 本发明专利技术是一种通用的数据库复制框架技术。...数据同步模块,将数据过滤模块产生的数据发送的目标数据库。Theinventionisageneraldatabasereplicationframeworktechnology.Thistechnologyisdividedintoth...
  • MySQL 实时同步 解决方案

    千次阅读 2021-02-16 00:11:45
    目录 1 需求概述 2 技术原理 3 MySQL环境配置 ...将MySQL5.6生产库张表的数据实时同步到Oracle11g数据仓库,MySQL历史数据700G,平均每天产生50G左右日志文件,MySQL日志空间50G,超过后滚动删除日志文件。
  • 多源异构数据库实时同步解决方案

    千次阅读 2021-03-07 15:57:37
    1 需求概述 将企业个业务系统(Oracle、SQL Server、MySQL...采用灵蜂数据集成软件BeeDI在异构库间进行实时数据同步,通过ETL全量同步历史数据,通过日志解析方式实时同步增量数据,BeeDI提供Oracle、SQL Server、M.
  • 一种终端设备上的数据同步方法【技术领域】[0001] 属于移动通信技术领域,特别是涉及基于离网环境下多种移动终端设备之间的数 据同步的方法。 技术背景[0002] 90年代未,数据同步始于有线连接,如MAC机作为数据...
  • 本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的数据同步方案,基于 Flink CDC 同步的解决方案以及更的应用场景和 CDC 未来开发规划等方面进行介绍和演示。 传统数据同步方案 基于 ...
  • 另外,备库只是单纯的备份,资源利用率50%,这点方案二可解决。 一致性分析:读写都操作主库,不存在数据一致性问题。 扩展性分析:无法通过加从库来扩展读性能,进而提高整体性能。 可落地分析:两点
  • 会发现从状态与主状态不一样(主要观察Master_Log_File和Read_Master_Log_Pos),就是binlog日志不同步 现在需要使主从的数据同步,然后再使binlog日志同步方能解决问题 (1).把主数据库停止更新,查看binlog(file和...
  • 作者:牛牛码特juejin.cn/post/6844903924915240974背景微服务场景下需要同步信息的场景。还是前文的栗子: 如下微服务支付服务:负责完成支付操作,其中有支付流...
  • 运维基本功(十五):Linux系统优化基础 运维基本功(十六):远程管理SSH服务 运维基本功(十七):远程管理SSH服务免密登录解决方案 运维基本功(十八): Linux系统下数据同步服务RSYNC解决方案 需求背景 某公司...
  • 本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的数据同步方案,基于 Flink CDC 同步的解决方案以及更的应用场景和 CDC 未来开发规划等方面进行介绍和演示。 整理:陈政羽(Flink ...
  • 服务端: 【增量日志记录表】:添加了,删除了,... 【客户端同步记录表】:每个客户端最新的变更序号和获取时间 客户端唯一标志变更序号更新时间 创建时间 客户端: 首次进入(本地没有存储【变更序号】): ...
  • 为适应移动业务从以电路语音为主的单一业务向业务转变,移动网络架构向IP化、宽带化进行发展。为了适应业务IP化发展的必然趋势,作为基础网络的承载网已经由传统电路交换向分组交换方式演进,分组传送网(PTN)技术...
  • 屏显示解决方案

    2021-07-27 04:34:08
    屏显示解决方案AWIND奇机成立于2003年,一直专注于“商教领域”的无线屏显示,经过不懈努力,奇机在电脑一机屏有2种解决方案屏同显方案和独立分屏显示方案。方案选择在于用户需要将电脑屏幕同步显示在不同...
  • 下面的文章主要介绍的是SQL Server数据库和Oracle数据库的数据同步方案以及具体的解决过程,SQL Server数据库与Oracle的数据同步方案与其实际的解决流程,其更内容请参考下文:说到同步,其实是靠"作业"定时调度...
  • 想了解详解MySQL实时同步到Oracle解决方案的相关内容吗,wahahaman在本文为您仔细讲解MySQL实时同步到Oracle的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:MySQL实时同步到Oracle,MySQL同步到Oracle,...
  • 需求redis中数据同步到mysql中数据,如果在更新途中redis又更新了,按照Redis中最新的数据进行更新。实现思路:存储redis数据用RedisTemplate.opsForValue进行数据存储,在数据发生改变的时候,优先向redis中更新...
  • 为了解决这些问题,我们在 MySQL 到 MySQL 双向同步方案上又走了一步。相比之前的方案,优势明显。 不依赖 GTID 不依赖事务的顺序,可并行 对操作减少 对云数据库(MySQL)的普遍支持 支持库表列裁剪、映射以及...
  • 一种针对图数据超级节点的数据建模优化解决方案一种针对图数据超级节点的数据建模优化解决方案一、超级节点1.1 超级节点概念1.2 从图数据网络中寻找超级节点二、与超级节点相关的关键问题案例三、模拟超级节点3.1 ...
  • 读写分离其实就是将数据库分为了主从库,一个主库用于写数据个从库完成读数据的操作,主从库之间通过某种机制进行数据同步,是一种常见的数据库架构。 一个组从同步集群,通常被称为是一个“分组”。 二、...
  • 本文主要向大家介绍了MySQL数据库异构数据同步,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助。在实现levelDB挂载成MySQL引擎时,发现在实际存储是key-value格式时候,MySQL的异构数据同步,可以更...
  • 一、MySQL数据库主从同步延迟产生的原因 MySQL的主从复制都是单线程的操作,主库对所有DDL和DML产生的日志写进binlog,由于binlog是顺序写,所以效率很高。Slave的SQL Thread线程将主库的DDL和DML操作事件在slave中...
  • MySQL数据同步之otter

    2021-01-19 04:11:54
    一、otter介绍基于日志数据,用于MySQL或者ORACLE之间准实时同步数据。...可跨IDC机房)node (同步过程setl)canal / eromanga (同步数据来源)大集群化部署1个manager集群+个IDC机房...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 142,346
精华内容 56,938
关键字:

多端数据同步解决方案