精华内容
下载资源
问答
  • 前面介绍了以守护进程的方式传输或同步数据rsync软件,linux系统数据同步软件很,今天来介绍下sersync数据同步软件 一:sersync介绍 sersync其实是利用inotify和rsync两种软件技术来实现数据实时同步功能...

    Linux系统sersync数据实时同步


    前面介绍了以守护进程的方式传输或同步数据rsync软件,linux系统数据同步软件很多,今天来介绍下sersync数据同步软件

    一:sersync介绍

    sersync其实是利用inotify和rsync两种软件技术来实现数据实时同步功能的,inotify是用于监听sersync所在服务器上的文件变化,结合rsync软件来进行数据同步,将数据实时同步给客户端服务器

    二:sersync工作过程

    在同步主服务器上开启sersync,负责监听文件系统的变化,然后调用rsync命令把更新的文件同步到目标服务器上,主服务器上安装sersync软件,目标服务器上安装rsync服务

    三:整体环境拓扑图


    四:客户端安装配置rsync服务

    [root@Client ~]# cat /etc/rsyncd.conf

    cat: /etc/rsyncd.conf: No such file or directory

    如果有此文件,配置前要进行备份,再进行相关配置

    [root@Client etc]# vi /etc/rsyncd.conf 

    ##rsync config  start

    ##created by root 2016-08-08 15:00

    ##rsync.conf config start

    uid = rsync

    gid = rsync

    use chroot = no

    max connetctions = 200

    timeout = 100

    pid file = /var/run/rsyncd.pid

    lock file = /var/run/rsync.lock

    log file = /var/log/rsyncd.log

    [backup]

    path = /backup/

    ignore errors

    read only = false

    list = false

    hosts allow = 192.168.1.0/24

    hosts deny = 0.0.0.0/32

    auth users = rsync_backup

    secrets file = /etc/rsync.password

    ##rsync config  end   

    "rsyncd.conf" [New] 21L, 458C written

    添加用户

    [root@Client ~]# useradd rsync -s /sbin/nologin -M

    改变目录权限

    [root@Client ~]# chown -R rsync.rsync /backup   

    配置密码文件

    [root@Client ~]# echo "rsync_backup:rsync.conf">>/etc/rsync.password

    [root@Client ~]# cat /etc/rsync.password                            

    rsync_backup:rsync.conf

    改变密码文件权限

    [root@Client ~]# chmod 600 /etc/rsync.password 

    [root@Client ~]# ls -ld /etc/rsync.password 

    -rw-------. 1 root root 24 Sep  9 13:06 /etc/rsync.password

    格式化文件

    [root@Client ~]# dos2unix /etc/rsyncd.conf 

    dos2unix: converting file /etc/rsyncd.conf to UNIX format ...

    开启服务后台运行

    [root@Client ~]# rsync --daemon

    [root@Client ~]# netstat -lntup|grep rsync

    tcp 0 0 0.0.0.0:873 0.0.0.0:*  LISTEN   2002/rsync

    tcp 0 0 :::873 :::*       LISTEN   2002/rsync

    五:主服务器上配置密码文件

    [root@Master ~]# echo "rsync.conf">>/etc/rsync.password

    [root@Master ~]# cat /etc/rsync.password

    rsync.conf

    [root@Master ~]# chmod 600 /etc/rsync.password

    [root@Master ~]# ls -ld /etc/rsync.password

    -rw-------. 1 root root 11 Sep  8 06:25 /etc/rsync.password

    六:测试手工同步

    [root@Master /]# rsync -avzP /etc/hosts rsync_backup@192.168.1.3::rsync --password-file=/etc/rsync.password

    sending incremental file list

    hosts

    158 100%    0.00kB/s    0:00:00 (xfer#1, to-check=0/1)

    sent 120 bytes  received 27 bytes  26.73 bytes/sec

    total size is 158  speedup is 1.07

    [root@Master /]# cat /etc/hosts

    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

    [root@Client ~]# cat /backup/hosts

    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

    经过对比两边数据一致,表明同步成功,手工同步成功之后,然后再进行后面的配置

    七:安装sersync服务

    首先下载好安装软件

    sersync_64bit_binary_stable_final.tar.gz

    [root@Master tools]# tar -zxvf sersync_64bit_binary_stable_final.tar.gz -C

    /usr/local/

    GNU-Linux-x86/

    GNU-Linux-x86/sersync2

    GNU-Linux-x86/confxml.xml

    [root@Master tools]# cd /usr/local/

    [root@Master local]# ls

    bin  games   include  lib64    sbin   src

    etc  GNU-Linux-x86  lib   libexec  share

    GNU-Linux-x86就是sersync安装软件,为了方便将它改名

    [root@Master local]# mv GNU-Linux-x86 sersync

    为了后续方便管理,创建几个目录用于存放各类文件

    [root@Master sersync]# mkdir -p conf bin logs

    [root@Master sersync]# mv confxml.xml conf

    [root@Master sersync]# ls

    bin  conf  logs  sersync2

    [root@Master sersync]# cd conf

    [root@Master conf]# ls

    confxml.xml

    在配置配置文件之前备份

    [root@Master conf]# cp confxml.xml confxml.xml.$(date +%F)

    [root@Master conf]# ls

    confxml.xml  confxml.xml.2016-09-08

    修改配置文件内容(confxml.xml)

    1、修改24-28行

    <localpath watch="/opt/tongbu">

                <remote ip="127.0.0.1" name="tongbu1"/>

                <!--<remote ip="192.168.8.39" name="tongbu"/>-->注释内容

                <!--<remote ip="192.168.8.40" name="tongbu"/>-->注释内容

            </localpath>

    修改后的内容为

     <localpath watch="/opt/backup"> 本地数据的路径

                <remote ip="192.168.1.3" name="rsync"/>远端IP与模块名称

                    </localpath>

        <!#################################### -->注释内容

    2、修改31-34行内容——认证

    <commonParams params="-artuz"/>

    <auth start="false" users="root" passwordfile="/etc/rsync.pas"/>

    <userDefinedPort start="false" port="874"/><!-- port=874 -->

    <timeout start="false" time="100"/><!-- timeout=100 -->

    <ssh start="false"/>

    修改后的内容为

    <commonParams params="-aruz"/>

    <auth start="true" users="rsync_backup"  passwordfile="/etc/rsync.password"/>

    <userDefinedPort start="false" port="874"/><!-- port=874 -->

    <timeout start="true" time="100"/><!-- timeout=100 -->

    <ssh start="false"/>

    3、修改36-37行

    <failLog path="/tmp/rsync_fail_log.sh" timeToExecute="60"/><!--default

     every 60mins execute once-->

    修改成我们刚刚创建好的logs目录

    <failLog path="/usr/local/sersync/logs/rsync_fail_log.sh" timeToExecut

    e="60"/><!--default every 60mins execute once-->

    修改完成后最终的配置文件如下

    [root@Master conf]# cat /usr/local/sersync/conf/confxml.xml

    <?xml version="1.0" encoding="ISO-8859-1"?>

    <head version="2.5">

        <host hostip="localhost" port="8008"></host>

        <debug start="false"/>

        <fileSystem xfs="false"/>

        <filter start="false">

            <exclude expression="(.*)\.svn"></exclude>

            <exclude expression="(.*)\.gz"></exclude>

            <exclude expression="^info/*"></exclude>

            <exclude expression="^static/*"></exclude>

        </filter>

        <inotify>

            <delete start="true"/>

            <createFolder start="true"/>

            <createFile start="false"/>

            <closeWrite start="true"/>

            <moveFrom start="true"/>

            <moveTo start="true"/>

            <attrib start="false"/>

            <modify start="false"/>

        </inotify>

        <sersync>

            <localpath watch="/opt/backup">

                <remote ip="192.168.1.3" name="rsync"/>

            </localpath>

                <!#################################### -->

            <rsync>

                <commonParams params="-aruz"/>

                <auth start="true" users="rsync_backup" passwordfile="/etc/rsync.password"/>

                <userDefinedPort start="false" port="874"/><!-- port=874 -->

                <timeout start="true" time="100"/><!-- timeout=100 -->

                <ssh start="false"/>

            </rsync>

            <failLog path="/usr/local/sersync/logs/rsync_fail_log.sh" timeToExecute="60"/><!--default every 60mins execute once-->

            <crontab start="false" schedule="600"><!--600mins-->

                <crontabfilter start="false">

                    <exclude expression="*.php"></exclude>

                    <exclude expression="info/*"></exclude>

                </crontabfilter>

            </crontab>

            <plugin start="false" name="command"/>

        </sersync>

        <plugin name="command">

            <param prefix="/bin/sh" suffix="" ignoreError="true"/>  <!--prefix /opt/tongbu/mmm.sh suffix-->

            <filter start="false">

                <include expression="(.*)\.php"/>

                <include expression="(.*)\.sh"/>

            </filter>

        </plugin>

        <plugin name="socket">

            <localpath watch="/opt/tongbu">

                <deshost ip="192.168.138.20" port="8009"/>

            </localpath>

        </plugin>

        <plugin name="refreshCDN">

            <localpath watch="/data0/htdocs/cms.xoyo.com/site/">

                <cdninfo domainname="ccms.chinacache.com" port="80" username="xxxx" passwd="xxxx"/>

                <sendurl base="http://pic.xoyo.com/cms"/>

                <regexurl regex="false" match="cms.xoyo.com/site([/a-zA-Z0-9]*).xoyo.com/images"/>

            </localpath>

        </plugin>

    </head>

    八:开启sersync守护进程

    首先配置全局环境变量,使得后同可以直接调用sersync命令

    [root@Master conf]# echo 'export PATH=$PATH:/usr/local/sersync/bin'>>/etc/profile

    [root@Master conf]# tail -1 /etc/profile

    export PATH=$PATH:/usr/local/sersync/bin

    [root@Master conf]# source /etc/profile

    [root@Master conf]# which sersync

    /usr/local/sersync/bin/sersync

    启动sersync服务

    serync -r -d -o /usr/local/sersync/conf/confxml.xml

    -r初始化数据  

    -d后台启动 

    -o指定路径

    如果需要将命令开启动,只需将命令去掉参数-r定入/etc/rc.local

    启动后结果如下

    [root@Master backup]# sersync -r -d -o /usr/local/sersync/conf/confxml.xml 

    set the system param

    execute:echo 50000000 > /proc/sys/fs/inotify/max_user_watches

    execute:echo 327679 > /proc/sys/fs/inotify/max_queued_events

    parse the command param

    option: -r rsync all the local files to the remote servers before the sersync work

    option: -d    run as a daemon

    option: -o    config xml name:  /usr/local/sersync/conf/confxml.xml

    daemon thread num: 10

    parse xml config file

    host ip : localhost     host port: 8008

    daemon start,sersync run behind the console 

    use rsync password-file :

    user is rsync_backup

    passwordfile is         /etc/rsync.password

    config xml parse success

    please set /etc/rsyncd.conf max connections=0 Manually

    sersync working thread 12  = 1(primary thread) + 1(fail retry thread) + 10(daemon sub threads) 

    Max threads numbers is: 22 = 12(Thread pool nums) + 10(Sub threads)

    please according your cpu ,use -n param to adjust the cpu rate

    ------------------------------------------

    rsync the directory recursivly to the remote servers once

    working please wait...

    execute command: cd /opt/backup && rsync -aruz -R --delete ./  --timeout=100 rsync_backup@192.168.1.3::rsync --password-file=/etc/rsync.password >/dev/null 2>&1 

    run the sersync: 

    watch path is: /opt/backup

    九:测试数据同步

    [root@Master sersync]# cd /opt/backup/

    [root@Master backup]# ls

    [root@Master backup]# ls -ll

    total 0

    [root@Master backup]# touch 123

    [root@Master backup]# touch 1234

    [root@Master backup]# touch 1235

    [root@Master backup]# touch 12333

    [root@Master backup]# ls -ll

    total 0

    -rw-r--r--. 1 root root 0 Sep  8 09:26 123

    -rw-r--r--. 1 root root 0 Sep  8 09:26 12333

    -rw-r--r--. 1 root root 0 Sep  8 09:26 1234

    -rw-r--r--. 1 root root 0 Sep  8 09:26 1235

    目标服务器查看同步情况

    [root@Client backup]# ls -ll

    total 0

    -rw-r--r--. 1 rsync rsync 0 Sep  8 04:26 123

    -rw-r--r--. 1 rsync rsync 0 Sep  8 04:26 12333

    -rw-r--r--. 1 rsync rsync 0 Sep  8 04:26 1234

    -rw-r--r--. 1 rsync rsync 0 Sep  8 04:26 1235

    测试结果表明数据同步正常,能够实时同步



    长按关注公众号——友侃有笑




    展开全文
  • 引言:《异构数据源的CDC实时同步系统》 系列第一篇 (已完成)《零编码打造异构数据实时同步系统——异构数据源CDC之2》 系列第二篇(已完成)《零编码打造异构数据实时同步系统——异构数据源CDC之3》 系列第三篇(已...

    引言:

    《异构数据源的CDC实时同步系统》 系列第一篇 (已完成)

    《零编码打造异构数据实时同步系统——异构数据源CDC之2》 系列第二篇(已完成)

    《零编码打造异构数据实时同步系统——异构数据源CDC之3》 系列第三篇(已完成)

    《异构数据源的CDC实时同步系统——最终选型实战》 系列第四篇(已完成)

    7.debezium

    debezium是由redhat支持的开源分布式CDC系统,支持多端数据源,如mysql、mongodb、postgresql、oracle、sql server和Cassandra,社区非常活跃,很多的新功能和新数据源都在快速发展中,源码地址:https://github.com/debezium/debezium

    e09d3f8be993a55e3748806aa5c50a31.png

    我们使用debezium主要是看中它支持多数据源,同时与kafka的整合,在CDC领域不能忽略的一个商用产品是kafka conflent,在它的产品中,连接源端的组件就是debezium,我们一度就想使用这个商用

    d5356678059a85c6ab4eac1fadc2a375.png

    组件,但是试用版本仅支持一个broker,无法在真正的的生产环境使用,它的优势在于配置的可视化,后来我们使用kafka eagle来进行kafka的管理后,才彻底下定决心自己使用开源版本搞一套。我们最终采用的整体方案是debezium+kafka+kafka-connect-jdbc,管理端使用的kafka eagle.

    关于confluent的资料,网上很多,我们在实际配置的过程中也参考了很多它的建议。

    注意事项:

    1)debezium需要设置的mysql权限:GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    2)采用阿里云的rds的mysql数据源非常坑,默认是不开启SHOW DATABASES权限的,需要在debezium中单独配置属性database.history.store.only.monitored.tables.ddl:true
    3)debezium配合kafaka启动使用properties方式,也就是说第一个源需要配置为文本模式,后续可采用动态增加源的方式动态增加,但是文件模式需要为json

    8.kafka-connect-jdbc

    开源地址:https://github.com/confluentinc/kafka-connect-jdbc,它是confluent开源的兼容jdbc的数据库同步数据的kafka connect连接器。

    b0273fb993d1b7c2b892f7a454b064fa.png

    这个组件支持的目的端的源非常多,理论上有java客户端的基本都支持,所以基本上可以涵盖你能用到的绝大多数数据源,它的延迟非常好,比之前的bireme好太多了,毕竟是国外大厂支持的组件,是国内小公司开源组件所不能比拟的。

    9.最终选型方案

    a847eca9bd933a15046e70614de8a8c5.png

    上图为我们最终确定的方案,在实际生产中,除了直接DB层级的数据实时同步外,我们还有一套pulsar的比较灵活的数据接口方案,不在此次讨论范围之内,也就是说我们最终实现了基于DB和业务层级的实时数据同步方案。

    00698278032cd3465569e42622429e26.png

    业界其他公司的CDC方案:

    465bc91233ffc55abb0880e55972b1c8.png
    16ca9789034863da0208e7ff7ece99b9.png
    bd6e3923d2a176948cc917c33a7425a9.png

    =======.实际生产配置过程:==========

    1.kafka安装配置,以standalone为例

    需要单独说明的是:因为gpdb6目前还不支持upsert模式,debezium的新增和更新均会导致一条新增加的完整数据到kafka,默认kafka按批提交的模式会造成gpdb6的主键冲突,需要修改模式为逐条应用,同时配合自己单独写的check程序进行offset错误的自动修正

    #1)安装kafka,注意2.30有个bugtar -zxvf kafka_2.12-2.4.0.tgzcd kafka_2.12-2.4.0Vim config/server.properties    #单机版只需要消息存放路径即可log.dirs=/opt/kafka_2.12-2.4.0/kafka-logs#增加可以删除topicdelete.topic.enable=true#保留日志大小:1GB,不设置的话会日志撑爆log.retention.bytes=1073741824mkdir -p /opt/kafka_2.12-2.4.0/kafka-logs#修改kafka的connect-standalone.properties设置为逐条应用consumer.max.poll.records=1#2)修改内置zk的配置vim config/zookeeper.properties#制定zk元数据存放路径dataDir=/opt/kafka_2.12-2.4.0/zdatamkdir -p /opt/kafka_2.12-2.4.0/zdata#3)启动服务,先启动zk再启动kafkacd /opt/kafka_2.12-2.4.0/nohup bin/zookeeper-server-start.sh config/zookeeper.properties  &nohup bin/kafka-server-start.sh config/server.properties —加守护进程启动bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties

    2.kafka基本命令

    #4)查看服务是够启动    jps#5)创建一个测试用的topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test#查询topic列表:bin/kafka-topics.sh --list --zookeeper localhost:2181#查看topic信息:bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test                          #删除topic(只会删除元数据):配置上面的delete.topic.enable=true后可生效bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test#手动删除文件:bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test./kafka-topics.sh --zookeeper 192.168.6.42:2181 --describe --topic itslawnode1./kafka-consumer-groups.sh  --describe --group test-consumer-group --zookeeper localhost:2181 #查看offset信息bin/kafka-consumer-groups.sh --bootstrap-server 192.168.6.42:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 0#查看和删除群组:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group connect-sink-judge-up#从开始的消费信息: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test#6)创建控制台生产者生产数据bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test#7)新开一个进程创建消费者数据bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

    3.debezium安装配置

    #下载debezium-connector-mysql,将文件中的jar包copy到kafka的libs目录cd /opt/kafka_2.12-2.4.0/tableconfig  #tableconfig是新建目录,存放配置文件#######第一个启动的properties文件格式############name=authorization-mysql-connector-new-01connector.class=io.debezium.connector.mysql.MySqlConnectordatabase.hostname=mysql源IPdatabase.port=3306database.user=账号database.password=密码database.server.id=1database.server.name=debeziumdatabase.whitelist=platform_authorizationdatabase.serverTimezone=UTCtable.whitelist=platform_authorization.lawyer_authorization,platform_authorization.lawyer_authorization_recorddatabase.history.kafka.bootstrap.servers=localhost:9092database.history.kafka.topic=auth.platform_authorizationinclude.schema.changes=false#使用table名作为topic名字,因为machine.db.table默认topictransforms=routetransforms.route.type=org.apache.kafka.connect.transforms.RegexRoutertransforms.route.regex=([^.]+).([^.]+).([^.]+)transforms.route.replacement=$3#不进行初始化,只获取当前的schema,初始化采用rds_dbsync比较方便,实际测试比init方式快几十倍,因为此处是逐条应用的snapshot.mode=schema_only##########json格式的文件#########{"name":"hanukkah-mysql-connector","config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"mysql主机名","database.port":"3306","database.user":"用户名","database.password":"密码","database.server.id":"1","database.server.name":"debezium","database.whitelist":"hanukkah","database.serverTimezone":"UTC","table.whitelist":"hanukkah.cooperation_lawyer","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"mysql1.hanukkah","include.schema.changes":"false","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+).([^.]+).([^.]+)","transforms.route.replacement":"$3","snapshot.mode":"schema_only"}}                                            

    4.sink配置

    #首先下载kafka-connect-jdbc-5.3.1.jar并防止到kafka的libs目录即可{    "name": "sink-cooperation_lawyer-ins",    "config": {        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",        "tasks.max": "1",        "topics": "cooperation_lawyer",        "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用户&password=密码&stringtype=unspecified¤tSchema=当前schema名",        "transforms": "unwrap",        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        "transforms.unwrap.drop.tombstones": "false",        "auto.create": "true",        "insert.mode": "insert",        "delete.enabled": "true","table.name.format": "platform.cooperation_lawyer",        "pk.fields": "id",        "pk.mode": "record_key"    }}

    需要额外说明的是:在目的是greenplum数仓环境下:
    1)如果mysql源端字段类型是timestamp,则需要在gpdb端配置字段类型为timestamptz后无需额外配置sink项
    2)如果mysql源端字段类型是datetime,则目的端字段类型需要配置为timestamp,同时需要sink文件中增补TimestampConverter配置项,有几个datetime字段配置几个配置项
    3)如果mysql源端datetime配置了精度,需要debezium配置增加time.precision.mode=connect
    4) "auto.evolve": "true" 则源端表结构变更后会自动在目的端创建对应数据结构 "auto.create": "true" 则源端新增表后会自动同步到目的端

    {    "name": "sink-pa_course-ins",    "config": {        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",        "tasks.max": "1",        "topics": "pa_course",        "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用户&password=密码&stringtype=unspecified¤tSchema=当前schema名",        "transforms": "unwrap,TimestampConverter",        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        "transforms.unwrap.drop.tombstones": "false",        "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",        "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",        "transforms.TimestampConverter.field": "create_time",        "transforms.TimestampConverter.target.type": "string",        "auto.create": "true",        "auto.evolve": "true",        "insert.mode": "insert",        "delete.enabled": "true",        "pk.fields": "id",        "pk.mode": "record_key"    }}

    5.启动服务

    #启动kafka,进程多了Kafka和QuorumPeerMainbin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties#启动第一个sourcebin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>connector-logs/connector.log 2>&1 增补其他sourcecurl -X POST -H  "Content-Type:application/json" -d @tableconfig/authorization-source.json http://localhost:8083/connectors/#启动sinkcurl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties ...#查看所有的connectors:     curl -X GET  http://127.0.0.1:8083/connectors 

    6.使用eagle进行topic状态查看和管理

    关于kafka eagle的下载安装,特别简单,就不单独说明了,这里仅贴出效果图

    c80ea4eb175bc95aaa42c7acb8915f11.png
    129f3fad2e25ab31e26341c064aed2b7.png

    从上图非常清楚的能看到哪些topic是有问题的(红色),绝大多数问题在于offset的错误导致的,在实际使用中我们通过一个简单python守护进程的代码进行了管理

    import requestsimport loggingimport psycopg2import jsonimport reimport time# 获取数仓连接def get_gp_conn():    conn = psycopg2.connect(host="192.168.2.175", port=5432, user="name", password="password",dbname='datawarehouse')    return conn'''删除目的端的主键ID'''def del_dup_id(tablefullname,dup_id):    db = get_gp_conn()    cursor = db.cursor()    sql = "delete from "+ tablefullname +" where id='" + dup_id+"'"    cursor.execute(sql)    db.commit()    cursor.close()    db.close()'''重启sink'''def restart_sink(sinkname,configname):    '''delurl = 'http://127.0.0.1:8083/connectors/'+ sinkname    del_res = requests.delete(delurl)    print("del resp:",del_res)    url = 'http://127.0.0.1:8083/connectors/'    headers = 'Content-Type:application/json,Accept:application/json'    datas = 'tableconfig/' + configname    start_res = requests.post(url,data=datas,headers=headers)    print("start resp:",start_res)    #checkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status'    '''    url = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/restart'    requests.post(url)'''检测任务状态'''def check_sink_status(sinkname,tablefullname,configname):    sinkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status'    print(sinkurl)    resp = requests.get(sinkurl)    status = json.loads(resp.text)    state = status['state']    if state == 'FAILED':        trace = status['trace']        pattern = re.compile(r'Key (id)=((.+)) already exists')        search = re.search(pattern, trace)        #print(search)        if search:            del_id = search.group(1)            print('duplicate key is {}, now to del this record of target database'.format(del_id))            del_dup_id(tablefullname,del_id)            restart_sink(sinkname,configname)'''获取任务列表'''def get_sink_list():    conn = get_gp_conn()    cur = conn.cursor()    cur.execute("select taskname,tableschema,tablename,configname from platform.tasklist where tablename is not null")    print("current time is:",time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))    rows = cur.fetchall()    for row in rows:        taskname = row[0]        schema = row[1]        tablename =row[2]        configname = row[3]        tablefullname = schema +'.'+tablename        check_sink_status(taskname,tablefullname,configname)    cur.close()    conn.close()if __name__ == '__main__':    get_sink_list()

    同时为了避免standalone进程的异常终止,我们用shell的守护进行进行了监控

    #! /bin/bashfunction check(){      count=`ps -ef |grep $1 |grep -v "grep" |wc -l`      #echo $count      if [ 0 == $count ];then        time=$(date "+%Y-%m-%d %H:%M:%S")        echo "standalone restart at:${time}"        cd /opt/kafka_2.12-2.4.0/        bin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>>connector-logs/connector.log 2>&1 &        sleep 60scurl -X POST -H  "Content-Type:application/json" -d @tableconfig/platform-source.json http://localhost:8083/connectors/    ......        sleep 10s        curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties     .....      fi    }    check ConnectStandalone

    另外,在实际运行过程中会出现offset错误的情况,极其特殊情况下使用上面的方法无法快速解决问题,建议使用kafkacat查看详细信息,人为跳过offset,具体细节不再赘述。

    如喜欢此专题,请关注并提问,技术人驱动自身的是永不停歇的渴望。

    展开全文
  • Linux系统sersync数据实时同步 原文:http://blog.csdn.net/mingongge/article/details/52985259 前面介绍了以守护进程的方式传输或同步数据rsync软件,linux系统数据同步软件很,今天来介绍下sersync数据同步...

    Linux系统sersync数据实时同步

     原文:http://blog.csdn.net/mingongge/article/details/52985259

    前面介绍了以守护进程的方式传输或同步数据rsync软件,linux系统数据同步软件很多,今天来介绍下sersync数据同步软件

    一:sersync介绍

    sersync其实是利用inotify和rsync两种软件技术来实现数据实时同步功能的,inotify是用于监听sersync所在服务器上的文件变化,结合rsync软件来进行数据同步,将数据实时同步给客户端服务器

    二:sersync工作过程

    在同步主服务器上开启sersync,负责监听文件系统的变化,然后调用rsync命令把更新的文件同步到目标服务器上,主服务器上安装sersync软件,目标服务器上安装rsync服务

    三:整体环境拓扑图

    四:客户端安装配置rsync服务

    [root@Client ~]# cat /etc/rsyncd.conf

    cat: /etc/rsyncd.conf: No such file or directory

    如果有此文件,配置前要进行备份,再进行相关配置

     

    [root@Client etc]# vi /etc/rsyncd.conf 

    ##rsync config  start

    ##created by root 2016-08-08 15:00

    ##rsync.conf config start

    uid = rsync

    gid = rsync

    use chroot = no

    max connetctions = 200

    timeout = 100

    pid file = /var/run/rsyncd.pid

    lock file = /var/run/rsync.lock

    log file = /var/log/rsyncd.log

    [backup]

    path = /backup/

    ignore errors

    read only = false

    list = false

    hosts allow = 192.168.1.0/24

    hosts deny = 0.0.0.0/32

    auth users = rsync_backup

    secrets file = /etc/rsync.password

    ##rsync config  end   

    "rsyncd.conf" [New] 21L, 458C written

    添加用户

     

    [root@Client ~]# useradd rsync -s /sbin/nologin -M

    改变目录权限

    [root@Client ~]# chown -R rsync.rsync /backup   

    配置密码文件

    [root@Client ~]# echo "rsync_backup:rsync.conf">>/etc/rsync.password

    [root@Client ~]# cat /etc/rsync.password                            

    rsync_backup:rsync.conf

    改变密码文件权限

    [root@Client ~]# chmod 600 /etc/rsync.password 

    [root@Client ~]# ls -ld /etc/rsync.password 

    -rw-------. 1 root root 24 Sep  9 13:06 /etc/rsync.password

    格式化文件

    [root@Client ~]# dos2unix /etc/rsyncd.conf 

    dos2unix: converting file /etc/rsyncd.conf to UNIX format ...

    开启服务后台运行

    [root@Client ~]# rsync --daemon

    [root@Client ~]# netstat -lntup|grep rsync

    tcp 0 0 0.0.0.0:873 0.0.0.0:*  LISTEN   2002/rsync

    tcp 0 0 :::873 :::*       LISTEN   2002/rsync

    五:主服务器上配置密码文件

    [root@Master ~]# echo "rsync.conf">>/etc/rsync.password

    [root@Master ~]# cat /etc/rsync.password

    rsync.conf

    [root@Master ~]# chmod 600 /etc/rsync.password

    [root@Master ~]# ls -ld /etc/rsync.password

    -rw-------. 1 root root 11 Sep  8 06:25 /etc/rsync.password

    六:测试手工同步

     

    [root@Master /]# rsync -avzP /etc/hosts rsync_backup@192.168.1.3::rsync --password-file=/etc/rsync.password

    sending incremental file list

    hosts

    158 100%    0.00kB/s    0:00:00 (xfer#1, to-check=0/1)

    sent 120 bytes  received 27 bytes  26.73 bytes/sec

    total size is 158  speedup is 1.07

     

    [root@Master /]# cat /etc/hosts

    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

    [root@Client ~]# cat /backup/hosts

    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

    经过对比两边数据一致,表明同步成功,手工同步成功之后,然后再进行后面的配置

    七:安装sersync服务

    首先下载好安装软件

    sersync_64bit_binary_stable_final.tar.gz

    [root@Master tools]# tar -zxvf sersync_64bit_binary_stable_final.tar.gz -C

    /usr/local/

    GNU-Linux-x86/

    GNU-Linux-x86/sersync2

    GNU-Linux-x86/confxml.xml

    [root@Master tools]# cd /usr/local/

    [root@Master local]# ls

    bin  games   include  lib64    sbin   src

    etc  GNU-Linux-x86  lib   libexec  share

    GNU-Linux-x86就是sersync安装软件,为了方便将它改名

    [root@Master local]# mv GNU-Linux-x86 sersync

    为了后续方便管理,创建几个目录用于存放各类文件

    [root@Master sersync]# mkdir -p conf bin logs

    [root@Master sersync]# mv confxml.xml conf

    [root@Master sersync]# ls

    bin  conf  logs  sersync2

    [root@Master sersync]# cd conf

    [root@Master conf]# ls

    confxml.xml

    在配置配置文件之前备份

    [root@Master conf]# cp confxml.xml confxml.xml.$(date +%F)

    [root@Master conf]# ls

    confxml.xml  confxml.xml.2016-09-08

    修改配置文件内容(confxml.xml)

    1、修改24-28行

     

    <localpath watch="/opt/tongbu">

                <remote ip="127.0.0.1" name="tongbu1"/>

                <!--<remote ip="192.168.8.39" name="tongbu"/>-->注释内容

                <!--<remote ip="192.168.8.40" name="tongbu"/>-->注释内容

            </localpath>

    修改后的内容为

     

     <localpath watch="/opt/backup"> 本地数据的路径

                <remote ip="192.168.1.3" name="rsync"/>远端IP与模块名称

                    </localpath>

        <!#################################### -->注释内容

    2、修改31-34行内容——认证

    <commonParams params="-artuz"/>

    <auth start="false" users="root" passwordfile="/etc/rsync.pas"/>

    <userDefinedPort start="false" port="874"/><!-- port=874 -->

    <timeout start="false" time="100"/><!-- timeout=100 -->

    <ssh start="false"/>

    修改后的内容为

    <commonParams params="-aruz"/>

    <auth start="true" users="rsync_backup"  passwordfile="/etc/rsync.password"/>

    <userDefinedPort start="false" port="874"/><!-- port=874 -->

    <timeout start="true" time="100"/><!-- timeout=100 -->

    <ssh start="false"/>

    3、修改36-37行

    <failLog path="/tmp/rsync_fail_log.sh" timeToExecute="60"/><!--default

     every 60mins execute once-->

    修改成我们刚刚创建好的logs目录

    <failLog path="/usr/local/sersync/logs/rsync_fail_log.sh" timeToExecut

    e="60"/><!--default every 60mins execute once-->

    修改完成后最终的配置文件如下

    [root@Master conf]# cat /usr/local/sersync/conf/confxml.xml

    <?xml version="1.0" encoding="ISO-8859-1"?>

    <head version="2.5">

        <host hostip="localhost" port="8008"></host>

        <debug start="false"/>

        <fileSystem xfs="false"/>

        <filter start="false">

            <exclude expression="(.*)\.svn"></exclude>

            <exclude expression="(.*)\.gz"></exclude>

            <exclude expression="^info/*"></exclude>

            <exclude expression="^static/*"></exclude>

        </filter>

        <inotify>

            <delete start="true"/>

            <createFolder start="true"/>

            <createFile start="false"/>

            <closeWrite start="true"/>

            <moveFrom start="true"/>

            <moveTo start="true"/>

            <attrib start="false"/>

            <modify start="false"/>

        </inotify>

        <sersync>

            <localpath watch="/opt/backup">

                <remote ip="192.168.1.3" name="rsync"/>

            </localpath>

                <!#################################### -->

            <rsync>

                <commonParams params="-aruz"/>

                <auth start="true" users="rsync_backup" passwordfile="/etc/rsync.password"/>

                <userDefinedPort start="false" port="874"/><!-- port=874 -->

                <timeout start="true" time="100"/><!-- timeout=100 -->

                <ssh start="false"/>

            </rsync>

            <failLog path="/usr/local/sersync/logs/rsync_fail_log.sh" timeToExecute="60"/><!--default every 60mins execute once-->

            <crontab start="false" schedule="600"><!--600mins-->

                <crontabfilter start="false">

                    <exclude expression="*.php"></exclude>

                    <exclude expression="info/*"></exclude>

                </crontabfilter>

            </crontab>

            <plugin start="false" name="command"/>

        </sersync>

        <plugin name="command">

            <param prefix="/bin/sh" suffix="" ignoreError="true"/>  <!--prefix /opt/tongbu/mmm.sh suffix-->

            <filter start="false">

                <include expression="(.*)\.php"/>

                <include expression="(.*)\.sh"/>

            </filter>

        </plugin>

        <plugin name="socket">

            <localpath watch="/opt/tongbu">

                <deshost ip="192.168.138.20" port="8009"/>

            </localpath>

        </plugin>

        <plugin name="refreshCDN">

            <localpath watch="/data0/htdocs/cms.xoyo.com/site/">

                <cdninfo domainname="ccms.chinacache.com" port="80" username="xxxx" passwd="xxxx"/>

                <sendurl base="http://pic.xoyo.com/cms"/>

                <regexurl regex="false" match="cms.xoyo.com/site([/a-zA-Z0-9]*).xoyo.com/images"/>

            </localpath>

        </plugin>

    </head>

    八:开启sersync守护进程

    首先配置全局环境变量,使得后同可以直接调用sersync命令

    [root@Master conf]# echo 'export PATH=$PATH:/usr/local/sersync/bin'>>/etc/profile

    [root@Master conf]# tail -1 /etc/profile

    export PATH=$PATH:/usr/local/sersync/bin

    [root@Master conf]# source /etc/profile

    [root@Master conf]# which sersync

    /usr/local/sersync/bin/sersync

    启动sersync服务

    serync -r -d -o /usr/local/sersync/conf/confxml.xml

    -r初始化数据  

    -d后台启动 

    -o指定路径

    如果需要将命令开启动,只需将命令去掉参数-r定入/etc/rc.local

    启动后结果如下

    [root@Master backup]# sersync -r -d -o /usr/local/sersync/conf/confxml.xml 

    set the system param

    execute:echo 50000000 > /proc/sys/fs/inotify/max_user_watches

    execute:echo 327679 > /proc/sys/fs/inotify/max_queued_events

    parse the command param

    option: -r rsync all the local files to the remote servers before the sersync work

    option: -d    run as a daemon

    option: -o    config xml name:  /usr/local/sersync/conf/confxml.xml

    daemon thread num: 10

    parse xml config file

    host ip : localhost     host port: 8008

    daemon start,sersync run behind the console 

    use rsync password-file :

    user is rsync_backup

    passwordfile is         /etc/rsync.password

    config xml parse success

    please set /etc/rsyncd.conf max connections=0 Manually

    sersync working thread 12  = 1(primary thread) + 1(fail retry thread) + 10(daemon sub threads) 

    Max threads numbers is: 22 = 12(Thread pool nums) + 10(Sub threads)

    please according your cpu ,use -n param to adjust the cpu rate

    ------------------------------------------

    rsync the directory recursivly to the remote servers once

    working please wait...

    execute command: cd /opt/backup && rsync -aruz -R --delete ./  --timeout=100 rsync_backup@192.168.1.3::rsync --password-file=/etc/rsync.password >/dev/null 2>&1 

    run the sersync: 

    watch path is: /opt/backup

    九:测试数据同步

    [root@Master sersync]# cd /opt/backup/

    [root@Master backup]# ls

    [root@Master backup]# ls -ll

    total 0

    [root@Master backup]# touch 123

    [root@Master backup]# touch 1234

    [root@Master backup]# touch 1235

    [root@Master backup]# touch 12333

    [root@Master backup]# ls -ll

    total 0

    -rw-r--r--. 1 root root 0 Sep  8 09:26 123

    -rw-r--r--. 1 root root 0 Sep  8 09:26 12333

    -rw-r--r--. 1 root root 0 Sep  8 09:26 1234

    -rw-r--r--. 1 root root 0 Sep  8 09:26 1235

    目标服务器查看同步情况

    [root@Client backup]# ls -ll

    total 0

    -rw-r--r--. 1 rsync rsync 0 Sep  8 04:26 123

    -rw-r--r--. 1 rsync rsync 0 Sep  8 04:26 12333

    -rw-r--r--. 1 rsync rsync 0 Sep  8 04:26 1234

    -rw-r--r--. 1 rsync rsync 0 Sep  8 04:26 1235

    测试结果表明数据同步正常,能够实时同步

    转载于:https://www.cnblogs.com/liujiacai/p/8503735.html

    展开全文
  • 结合数据采集系统在电力系统中的应用,设计了一种基于FPGA的同步实时数据采集系统,该系统个功能模块集成到一片FPGA中,构成片上可编程系统,使用一片FPGA完成对A/D转换和双口RAM等模块的控制;给出了系统的...
  • 为了满足精密设备监测过程中对数据采集的精确性、实时性和同步性的严格要求,设计了一种基于FPGA的通道实时同步高速数据采集系统。本系统采用Xilinx公司的Spartan6系列的FPGA作为核心控制器件,实现了数据采集控制...
  • 这里的数据实时同步是指近乎实时的将数据从源端数据库同步到其它目的端数据库的一种方式,比如 MySQL 中的数据在发生变化时,系统能够尽可能实时的将这部分变化的数据同步到 HBase 中或其他目的端。与离线数据同步不...

    这里的数据实时同步是指近乎实时的将数据从源端数据库同步到其它目的端数据库的一种方式,比如 MySQL 中的数据在发生变化时,系统能够尽可能实时的将这部分变化的数据同步到 HBase 中或其他目的端。与离线数据同步不同的是,数据实时同步面对的是随时都可能发生改变的数据,它除了需要保证数据能够正确到达目的端之外,还需要保证数据的正确性、时序性、可接受的延迟范围等情况,所以这篇博客主要用户记录数据实时同步过程中需要关注的一些关键点。

    在这里插入图片描述

    一、高效的数据同步模型

    所谓高效,即既要保证高吞吐量,也要保证低延迟,这样才能保证即使源端数据变化量过大,也能尽可能短时间、实时的将数据传输到目的端。

    而数据实时同步的最基本原理,就是将增量/变化数据从源端读出,然后再将数据写入目的端。在这中间则可以增加数据统计、转换、过滤、限流等功能,以此丰富数据同步的过程或适应更多的业务场景。那么我们应该如何保证这个过程的高效呢?

    就我目前的了解一些同步系统而言,数据同步模型几乎都参考或类似于阿里 DataX 的数据同步模型,我喜欢叫它流式 + 批次的数据同步模型以及多任务多通道的线程模型。

    1. 流式 + 批次的数据同步模型

    与传统的在单个线程中读取数据然后写入数据的流程不同,这里使用了流式计算的一些原理,数据的读取与写入分别位于两个独立的线程,它们通过管道(channel)传输数据。数据的读取方每读取到一个结果数据,都将其发送至管道,而数据的写入方每接收到一个新的结果数据,都会立即作出处理,这样做的一个好处在于,数据的写入方不必等待读取方的一次查询完成后才能开始处理数据,它可以在读取方读取数据的过程中,同时进行数据处理,尤其是数据读取可能需要经过多次网络传输时,这种方式能够更及时的处理数据。

    批次数据处理的目的在于减少数据写入时的网络传输,不管是将数据写入 HBase 还是 Kafka 或其它目的端,合并多条数据并统一发送,都是减少网络传输以提高吞吐量的一种有效方式,所以数据写入方在处理完成一条数据后,通常都需要根据情况等待后面的数据,再统一发送。

    2. 多任务多通道/单通道的线程模型

    多通道多任务的线程模型主要用于并发读取数据与写入数据,从而进一步提高系统的吞吐量,它的同步模型大致如下图所示:

    在这里插入图片描述
    上图是阿里 DataX 的系统架构设计,虽然它是离线数据同步,但是数据同步模型却是类似的。一个 Job 被拆分成多个 Task,每个 Task 则负责与之对应的数据的读取与写入,Reader 对应数据的读取,Writer 对应数据的写入,它们运行在相互独立的线程之中,通过 Channel 传输数据,每个 Reader、Writer、Channel 是一一对应的,所以我管它叫多通道多任务的数据同步模型。

    与多任务多通道类似的另外一种模型,也是我在系统中应用的一种模型,即多任务单通道数据模型,它的同步模型大致如下图所示:

    在这里插入图片描述

    Reader 与 Writer 之间没有一一对应,而是公用一个 Channel,这样做的目的在于,因为源端读取数据与目的端写入数据的速度不一致,通过合理的配置 Reader 与 Writer 的任务数量,能尽可能的减少数据库连接数。

    二、数据的一致性与时序性

    数据的一致性是系统最基本的保障,不容有失。保证数据的一致性主要有两种依赖方式,一种是依赖于数据同步系统本身,在保证数据不丢失的前提下,通过将数据有序的传递给目的端,从而保证数据最终会达成与源端一致;二是依赖于目的端数据库的特性来实现,比如在 HBase 中,数据可以依赖于时间戳来确定数据是否为最新的,所以只需要在同步的过程中,针对每次操作都有固定的时间戳与之对应,那么不管操作顺序如何,只要最后的操作有应用在 HBase 之上,那么获取的数据总会是最新的。

    上面提到的依赖于数据本身来保证数据的最终一致,应该是数据同步系统的一个最基本实现,因为我们不可能要求所有的目的端都能够像 HBase 那样,也不可能要求所有的数据都能传递一个固定的时间戳。那么这里就会涉及到一个操作顺序的问题,我们简单的将数据时序性分为两种类型:

    1. 数据全局有序,即不同行的数据的操作顺序必须与源端一致
    2. 数据以主键为单位的顺序一致,即仅保证单行数据的操作顺序必须与源端一致

    1. 数据全局有序

    数据全局有序是指数据的所有操作都按照先后顺序依次同步至目的端,即使不同数据行之间的操作顺序也必须一致。事实上,这样的业务场景相对比较罕见,如果真的需要满足这样的场景,那么通常的做法是将数据的读取与写入均设置为单任务即单线程模式,带来的影响是系统的吞吐量会相对变得低下。

    2. 数据以主键为单位的顺序一致

    数据以主键为单位的顺序一致是大多数业务场景需要的实现,同时它能使用多任务并发模式,有利于系统的吞吐量。而对于主键顺序一致,主要有两种情况:1) 数据可以回退,只需要保证数据的最终一致性;2) 数据不允许回退,只要更新为最新值后,便不能再更新回过去的值;

    针对于上述两种情况,主要发生在系统异常时。在系统正常的情况下,数据的操作顺序应该总是有序的被应用于目的端,而异常的情况,主要是因为 Job 作为一个长期运行的作业,难免会发生重试的情况,比如对于 OP_1 -> OP_2 操作,可能会由于重试而产生 OP_1 -> OP_2 -> OP_1 -> OP_2 这样反复的操作,这就给时序性带来了一种新的可能。下面以 MySQL 和 SQLServer 为源端的数据同步为例。

    • 在 MySQL 中,通常通过伪装从库并接收和解析 binlog 日志,来实现数据的监听和同步,此时拿到的是所有变更数据,它将直接被发送到目的端,所以当发生操作重发时,它的数据也会被重发,比如 OP_1(id=1, version=1) -> OP_2(id=1, version=2) 的两次操作,可能到目的端会变成 OP_1(id=1, version=1) -> OP_2(id=1, version=2) -> OP_1(id=1, version=1) -> OP_2(id=1, version=2) 的四次操作,其中 version 属性列会发生数据回退的现象;
    • 在 SQLServer 中,因为它闭源的原因,通常使用 trigger 或者 CT/CDC 的方式来监听数据变动,为了减少系统负载, trigger 或者 CT 表中通常只会记录表的主键信息,在监听到数据变动时,会根据主键信息再去源表中查询全部所需的信息,此时对于 OP_1(id=1, version=1) -> OP_2(id=1, version=2) 的两次操作,可能到目的端会变成 OP_1(id=1, version=1) -> OP_2(id=1, version=2) -> OP_1(id=1, version=2) -> OP_2(id=1, version=2) 的四次操作,尽管操作发生了改变,但是数据却未发生回退现象。这种现象在《SQLServer 数据异构实时同步之数据时序的问题》中也有讨论过“数据操作的重复发送与影响”,在此不再熬述。

    三、游标与断点续传

    引起数据重发的原因,一方面是由于系统运行中,由于网络等原因引起的,另一方面也可能是由于手动停止然后再启动而引起的。对于手动停止或者异常停止,都会牵扯到断点续传的功能;

    数据实时同步的过程是一个源源不断的将变化数据从源端流入目的端的过程,它可能会由于某种原因而中止同步,比如应用异常停止或者手动停止,不管如何,只要重启 Job,都得保证它能继续上一次停止的地方继续前进,而不是重头开始。

    保证系统能够断点续传,有两个必要条件:一是变更数据必须有一个类似于主键的用来标识其唯一性的值,以便于系统在重启后能够知道从哪里继续,这里管这个唯一的值为游标,二是系统必须能够持久化这个值,并且只能在数据被确认写入目的端后,才能保存这个值,否则可能发生数据丢失的风险。

    那么如何实现呢?这里简单介绍两种方式,一种是在数据写入时,同时保存游标值,但是需要注意的写入端为多任务时,游标的顺序问题,如果仅保存一份游标值,那么必须保证它的值是有序的更新;二是参考 Storm 的 ACK 机制 + Kafka 的定时提交 offset 机制,系统在监听到数据变化时,注册 ACK 元祖,在写入完成后,响应对应的元祖,并由固定的监听程序来更新游标值,游标值更新在内存中,通过定时刷新游标值来将其刷新到磁盘,这样做的好处是,刷新游标几乎不会影响到数据写入的时间,但是不好的是,在系统停止时,游标值可能会落后更多,而导致数据发生更多的重发。具体如何实现,需要根据业务特点来做判断。

    四、总结

    上述主要围绕如何构建高效的数据同步模型,以及数据同步的过程中,关于数据一致性、时序性以及断点续传的问题来讨论,真正实现的过程中,需要注意的细节远比这里讨论的更多,后面在慢慢总结总结吧。

    下面是了解到的几个开源的数据同步系统,如果能满足你的需求,就可以免得自己开发了:

    • DataX: 阿里开源的离线数据同步工具/平台;
    • DataLink: 神州租车的一个满足各种异构数据源之间的实时增量同步,分布式、可扩展的数据交换平台;
    • FlinkX: 袋鼠云内部广泛使用的基于flink的分布式离线数据同步框架;
    • Porter: 随行付内部广泛使用的数据同步工具/平台;
    • Bifrost: MySQL 同步到 Redis, MongoDB等服务的异构中间件;
    展开全文
  • 对目前主流的前端开发框架有一个基础性的学习 学会React和Vue中ArcGIS API for JavaScript的使用 学会主流前端框架下WebGIS系统的整体开发
  • 实现异构系统数据实时同步 接口 写了个接口,核心思想是两个异构系统之间实现数据的非实时同步。 也就是说平台1生成的数据在每天规定时间放到FTP服务器地址中去,平台2在规定的时间去取数据。这样就可以实现...
  • 源服务器:192.168.21.129 ...目的:把源服务器上/home/www.osyunwei.com目录实时同步到目标服务器的/home/www.osyunwei.com下 系统运维 www.osyunwei.com 温馨提醒:qihang01原创内容 版权所有,转载请注...
  • 由于内容较分为几篇列出:《异构数据源的CDC实时同步系统》 系列第一篇 (已完成)《零编码打造异构数据实时同步系统——异构数据源CDC之2》 系列第二篇(已完成)《零编码打造异构数据实时同步系统——...
  • 同步的前提下,环境一定要搭好,测试的时候应为mysql安装的一些bug导致失败了很次,又重装过---安装安装mysqlconnector-----配置mysqlconnectorODBC数据管理器->系统DSN->添加->mysql ODBC 5.3 ANSI ...
  • 因为项目选项需要,最近了解了一下几款实时数据同步工具,在此做一个简单介绍和对比,给大家一个参考。1.Debezium Debezium是RedHat开源的一个将多种数据实时变更数据捕获,形成数据流输出的工具。通过安装配置...
  • 1、sersync是基于Inotify开发的,类似于Inotify-tools的工具2、sersync可以记录下被监听目录中发生变化的(包括增加、删除、修改)具体某一个文件或某一个目录的名字,然后使用rsync同步的时候,只同步发生变化的这...
  • Linux系统实时数据同步数据转发

    千次阅读 2013-12-23 22:24:52
    linux以其高性能、任务、高并发等特点而著称,在服务器市场上,linux系统占据着越来越的市场份额,特别是在云计算风起云涌的年代,linux系统更是在众多系统中脱颖而出。 今天要讲的是云存储,云存储简单点说就是...
  • 如果你man一下sync的话,就会发现:sync-flush file system buffers,它是一个把缓冲区中的数据同步到文件系统中的一个命令;而rsync其实就是remote rsync,它是一个远程同步工具,兼具cp和scp的功能,rsync...
  • 一、软件说明(内容来自网络) 1.rsync 与传统的 cp、tar 备份方式相比,rsync ...随着应用系统规模的不断扩大,对数据的安全性和可靠性也提出的更好的要求,rsync 在高端业务系统中也逐渐暴露出了很不足,首先...
  • Rsync+sersync实现数据实时同步Rsync  Rsync(remote synchronize)是一个远程数据同步工具,可通过LAN/WAN快速同步台主机间的文件。Rsync使用所谓的“Rsync算法”来使本地和远 程两个主机之间的文件达到同步,...
  • 前阵子老板安排了一个新任务,要建设一个商家商品搜索系统,能够为用户提供快速、准确的搜索能力。图片来自 Pexels设计要求在...我们面临以下几个难题:商家数据库和商品数据库是台不同的服务器,并且数据量达百万...
  • 简介: rsync数据同步优缺点  与传统的cp、tar备份方式相比,... 随着应用系统规模的不断扩大,对数据的安全性和可靠性也提出的更好的要求,rsync在高端业务系统中也逐渐暴露出了很不足。首先,rsync同步数据时...
  • Sersync利用inotify与rsync对服务器进行实时同步,其中inotify用于监控文件系统事件,rsync是目前广泛使用的同步算法,其优点是只对文件不同的部分进行操作,所以其优势大大超过使用挂接文件系统的方式进行镜像同步...
  • 生产需求:企业一般有台web服务器,为了保证数据的统一,方便统一管理以及节省空间,将web文件放在samba服务器或者nfs上,为了数据的安全可以找一台服务器实时同步数据,可以通过inotify+rsync+脚本实现一、监控...
  • 本发明公开一种基于MySQL的流式实时数据同步系统,涉及数据通信技术领域。背景技术:数据分析特别是实时数据分析,已经越来越的成为各行各业的分析要求与标准。例如零售行业可能希望通过线下POS数据实时门店客流...
  • 实时同步最原始的方法是inotify+rsync,但是inotify有些缺陷...当同步的目录数据量很大时(几百G甚至1T以上)文件很时,建议使用rsync+sersyncsersync优点:1.sersync是使用c++编写,而且对linux系统文件系统产生...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,038
精华内容 415
关键字:

多系统数据实时同步