精华内容
下载资源
问答
  • java多个数据库实现数据同步

    热门讨论 2015-03-16 21:54:59
    内部java实现多个数据库,保持数据同步案例。
  • 大数据开发平台-数据同步服务

    万次阅读 2017-09-21 13:38:35
    同步一切

    什么是数据同步服务?顾名思义,就是在不同的系统之间同步数据。根据具体业务目的和应用场景的不同,各种数据同步服务框架的功能侧重点往往不尽相同,因而大家也会用各种大同小异的名称来称呼这类服务,比如数据传输服务,数据采集服务,数据交换服务等等

    至于大数据开发平台的数据同步服务,加上了限定词,那当然是进一步把业务的范围限定在了和数据平台业务相关的一些组件和应用场景之下了。

    大数据平台数据同步服务业务场景

    讨论场景之前,先来看一下数据同步的目的,为什么我们需要在不同的系统之间进行数据的同步?

    从大数据开发平台的角度来说,很显然,是因为我们通常不能直接对线上业务系统所存储或生成的数据进行各种运算或检索处理,组件技术架构是一方面原因,业务安全性隔离是另一方面原因。

    所以,我们就需要把这些数据采集到开发平台的各种存储计算组件中来进行加工处理,这个过程也就是所谓的ETL过程。

    然后,在开发平台中处理完毕的数据,有时候也并不能或着不适合在大数据开发平台的相关服务中直接使用,需要反馈回线上的业务系统中,这个过程我们称为数据的回写或导出。

    最后,即使在大数据开发平台自身的各种存储/计算/查询服务组件之间,因为架构方案,读写方式,业务需求的不同,也可能存在数据的传输同步需求。

    从上述三类应用场景来看,我们可以看到,通常来说我们所说的大数据开发平台环境下的数据同步服务,主要处理的是不同系统组件之间的数据导入导出工作。 比如将DB的数据采集到Hive中来,将Hive中的数据导出给HBase之类。也就是输入和输出的数据源是异构的,数据同步的目的是让数据可以适合业务需求的形式,在不同的系统中用各自擅长的方式运转起来。

    除此之外,还有另外一种出于数据备份,或者负载均衡的目的而存在的数据同步场景。比如DB的主从同步,HBase集群的Replicator备份等等,他们的输入输出数据源往往是同构的。这类场景下,具体的同步方案和流程通常和系统自身的健康,功能逻辑,服务诉求等有着较强的关联性,所以往往对应的系统会自带同步方案实现,属于系统自身功能实现的一部分,比如MySQL的binlog主从同步复制机制。这类特定系统自带的数据同步架构方案实现,不在本文讨论的范围之类。

    数据源

    业务范围明确了,那么让我们来看看在这种业务场景下,需要处理的数据源可能都有哪些,简单分以一下类,常见的数据源大致可以分为;

    • 关系型数据库类 : 比如 MySql, Oracle, SqlServer, PostgreSQL 等等

    • 文件类:比如日志log,csv,excel等各种传统单机文件

    • 消息队列类:比如kafka和各种MQ

    • 各种大数据相关组件:比如HDFS/Hive/HBase/ES /Cansandra

    • 其它网络接口或服务类:比如FTP/HTTP/Socket 等


    现有的解决方案介绍

    如上所述,数据同步服务可能涉及到的外部系统多种多样,实际上,但凡能存储或产生数据的系统,都可能成为需要接入数据同步服务的数据源。因此,也不难想象,市面上一定存在众多的解决方案。

    为什么呢?很显然,这些各式各样的数据同步服务方案,在不同的业务场景中,无论整体功能定位还是业务覆盖范围都可能千差万别。即使某些方案的业务定位类似,在具体的功能实现方面,大家关注的重点也可能有所区别。此外部分系统在设计的时候,为了保证易用性或者提供一站式的解决方案,其架构和具体的功能逻辑与上下游系统可能还有一定的业务关联性,再加上程序员又喜欢用各种开发语言来折腾一遍, Python/Java/Ruby/Go 。所以,这类服务系统的解决方案想不多也很难啊。

    那么常见的解决方案都有哪些呢?

    以关系型数据库为主要处理对象的系统:

    Tungsten-replicator

    是Continuent公司开发的数据库运维管理的两个相关联的工具产品套件中的一个,负责Oracle 和 MySQL两个数据库之间异构的数据复制同步工作,以及对外导出到比如Redshift,Vertica等数据库,也包括导出到Hadoop环境。

    Continuent的另一个产品是tungsten-clustering,为MySQL等数据库提供拓扑逻辑管理,灾备,数据恢复,高可用等功能,很显然这些功能很大程度上是和Replicator相结合的。

    我们没有真正使用过Tungsten的产品,只是在架构和代码方面有过一些调研了解,总体感觉,作为商业解决方案,架构完善,但是相对复杂,一些业务流程方案是定制化的,对关系型数据库自身的数据同步和管理以及稳定性应该是它的强项,不过作为一个开放的系统来用的话,接入成本可能有点高。

    Canal 和 Otter

    Canal是阿里的MySql增量数据同步工具,Otter则是构建在Canal之上的数据库远程同步管理工具。两者结合起来,产品的目标范畴大致和Tungsten-replicator差不多。

    和Tungsten类似,Canal也是基于MySql的Binlog机制来获取增量数据的,通过伪装MySql的Slave,来获取Binlog并解析增量数据。

    这两种方案通常是大家用来接入Mysql binlog的最常见的选择,毕竟MySql Binlog的格式解析模块也是一个相对专业化的格式逆向工程工作,即使不直接使用这两个方案,大家也会借用这两个方案中的binlog Parser模块来做二次开发。

    Canal的主要优点是结构流程比较简单,部署起来并不太难,额外做一些配置管理方面的改造就可以更加自动化的使用起来。不过,主要问题是Server和Client之间是一对一的消费。不太适用于需要多消费和数据分发的场景。

    我司之前既有对Canal简单的封装应用,也有在借用Canal的Binlog Parser的基础上,开发的DB增量数据分发系统

    阿里的DRC/精卫等

    DRC按阿里官方的说法定位于支持异构数据库实时同步,数据记录变更订阅服务。为跨域实时同步、实时增量分发、异地双活、分布式数据库等场景提供产品级的解决方案。

    其实精卫也是类似的产品,不过是由不同的团队开发的,除了这两者,阿里内部可能也还有过其它大大小小类似的产品,最后大概都整合合并了,DRC胜出 ;)

    从定位说明就可以看到,很明显,除了点对点同步,DRC还需要支持一对多的消费和灵活的消费链路串联,对性能,顺序一致性等方面的要求也可能会因此而变得更加复杂(未必更难实现或要求更苛刻,但是可能有更多不同角度的功能需求),比如,可能需要支持有限时间段内的回溯,和精确定位消费的能力等。

    DRC相关系统,阿里并没有开源,不过我司之前和阿里的同学有过一些简单交流,我们也从中了解和学习了一些产品设计和架构方面的思想。

    我司的DB增量数据分发系统Pigeon的第一版,就是在Canal的Binlog解析代码模块基础上,参照DRC的部分思想进行开发的。大致的方案是将前端的Parser对接到消息队列上,让消息队列来承担消息持久化和分发的工作,此外在Server层面再辅助以服务节点和消费链路的动态管理,负载均衡,加上数据的过滤,转换和分发模块及策略的管理,消费端SDK的封装等工作。

    以日志或消息队列为主要业务对象的系统

    这类系统一开始可能是以日志查询为主要业务场景,其中的数据同步服务相关组件,有些是独立的组件,有些则是一整套采集,计算,展示等完整的业务方案中的一部分。不过随着架构的不断发展和成熟,有些系统渐渐的也不仅仅是只定位于处理日志类业务场景了,开始向通用数据采集传输服务的角色靠近。

    Flume

    Flume现在大家用的多的,是Flume-NG这个redesign过的版本,它的定位是离线的日志采集,聚合和传输。Flume的特点是在聚合传输这块花了比较多的力气,特别是早期的版本,需要配置各种节点角色,而在NG版本的设计中,其拓扑逻辑已经简化成只有Agent一个单一角色了



    通过Agent的串联可以构建出复杂的数据传输链路,此外还有事务的机制设计来确保链路传输的可靠性。 不过,个人觉得,由于Kafka等通用消息队列的广泛使用,Flume在聚合,传输这方面的作用,在一些场景下其实是可以通过其它方式来实现和弱化的(比如没有网络带宽或远程二次传输问题的场景中)。

    我司的日志链路中,也没有使用Flume,而是采用了自主研发的Agent采集器直接对接Kafka(当然,一些场景下,也未必绝对合理)

    LogStash

    LogStash是著名的ELK套件中的一个组件,负责日志采集和转换,另外ES负责存储和检索,Kibana负责查询结果的展示

    LogStash从设计上来看,在数据传输和链路串联方面的考量就简单了很多,它的重点放在了数据的转换处理上。所以它在过滤器,编解码器等环节下了很多的功夫,比如支持grok脚本做过滤器逻辑开发,在内部链路上还有各种的buffer设计,用来支持数据的合并,转换,条件触发输出等功能

    应该说从数据转换处理的角度来看,个人觉得LogStash的设计已经足够灵活和完备了。不过,它的主体实现语言是Ruby...

    Camus

    Camus严格的说算不上是一个框架,它是Linkedin开发的基于Kafka消费日志,批量写入Hdfs的一个工具,不过用的人也不少,所以提一下,我司之前也有大量的日志是通过Camus来采集的。(话说Linkedin是把自家的kafka用到极致了,各种链路但凡能依托kafka实现的,大概都不会考虑其它的实现方式)

    Camus的架构方案,基本上就是写了一个MR任务,实现批量从Kafka读取日志并写入Hdfs,此外自身维护了kafka中各个topic的消费进度。用它来做kafka topic的简单映射采集,稍微做一点管理方面的适配开发,基本还是可行的。不过,它的缺点主要是对Topic进行定制化的处理比较困难,需然也提供了一些Hook接口,但是毕竟架构过于简单,对数据进行一些Per topic的过滤转换工作,就有点力不从心了。

    另外一些想不出怎么强行分类的数据同步解决方案 

    Sqoop :

    Sqoop大家应该不陌生了,即使没用过总应该也听过,也有不少公司使用Sqoop来构建自己的大数据平台数据采集同步方案。Sqoop从一开始,就几乎是完全定位于大数据平台的数据采集业务的,整体框架以Hadoop为核心,包括任务的分布执行这些,多半都是依托MR任务来实现的。数据同步的工作,也是以任务的方式提交给Server来执行,以服务的形式对外提供业务支持。

    Sqoop的处理流程,定制化程度比较高,主要通过参数配置的方式来调整组件行为,在用户自定义逻辑和业务链路流程方面能力比较弱,另外,依托于MR的任务处理方式,在功能拓展方面也有一些约束和局限性。此外各种数据源的输入输出实现部分,稳定性和工程实现细节方面,也只是可用,但算不上完善和成熟。

    我司也没有使用Sqoop来构建大数据平台的数据采集,导入导出服务。上述原因虽然是一方面原因,但绝对不是主要的原因。最主要原因还是因为数据的采集和导入导出服务体系,具体的输入输出模块的构建只是一部分内容。更重要的是要构建任务的配置,管理,监控,调度等服务,以及对整个数据同步业务流程和生命周期的封装,和对用户交互体验及产品形态的完善。理想中,需要和开发平台整体开发环境深度集成。

    DataX

    DataX是阿里开源的一款插件式的,以通用的异构数据交换为目标的产品。其核心思想,简单的说,就是之前阿里的同学写各种数据源之间的同步工具,都是点对点的实现,写多了以后,发现这种两两之间网状链路的开发代价比较高。而DataX呢,是通过标准化的输入输出模块,将点对点的实现变成了星形的拓扑结构,增加一个数据源只要单独写这个数据源的输入输出实现模块就好了。

    其实,这个思路也没什么大不了的,和前面的Flume/LogStash等的思想并没有本质的差别,人家一开始就没有走网状结构的路 :)

    不过,DataX的特点是内部的结构更加简单一些,没有channel啊之类的概念,不具备持久化能力,也没打算构建复杂的数据流动链路。你可以认为它本质上就是将两个数据源之间点对点的传输工作模块化标准化了,最终构建出来的,还是一个简单的进程内直连的数据读写链路。

    此外从一开始,DataX的目标就是在简化新链路开发代价的基础上,追求数据的传输效率。比如,使用了Ringbuffer类的技术来做input和output模块之间的数据转发工作。

    因为DataX简单和标准化的特点,所以也有不少公司基于DataX来构建自己的异构数据交换服务系统。

    当然,DataX也在持续改进中,目前的3.0版本在作业的分片处理,业务容错,数据转换,流量控制等方面也做了不少的功能拓展。

    Heka

    Heka是Mozilla开源的一套流式数据采集和分析工具,最主要的架构实现,其实也就是数据采集同步这部分框架。整体的结构设计和LogStash等系统看起来大同小异。这个系统,我并没有做过实际的实践应用,只是简单了解了一下产品设计,提它呢,是因为架构看起来也相对比较完善,另外,它是用Go写的,偏底层后端服务开发的同学可能会喜欢。

    数据交换服务产品设计和需求分析

    从前面的业务场景讨论和市面上常见的系统的介绍中,你应该不难看出,数据同步是一个业务覆盖范围很广的术语,具体的产品形态设计和功能需求,其实在很大程度上取决于你所定位的业务的职能范围。

    我司的大数据开发平台中,数据交换服务系统的定位,和DataX比较类似,系统的功能和产品形态定位,是异构数据源之间的点对点数据读写链路的构建。至于比如业务端的数据采集,数据分级传输链路的构建,增量数据的分发,数据库同步拓扑逻辑管理等环节,并不在我们的数据交换服务系统的功能定义范围之内,这些环节并不是不重要,只是在我司的实践中,是由其它的系统来独立提供服务的。

    而点对点的数据读写链路服务产品的组成,又可以分为两部分,一是底层具体承载了单个数据交换任务的插件式的数据交换组件,二是上层的数据交换任务管控平台,其职能范围不仅包括系统和任务自身的配置运行管理,有时候还需要考虑针对上下游系统和具体业务的一些特性进行流程上的适配和定制。

    下面的讨论基于上述产品定位展开:

    数据交换底层组件

    底层组件设计需要关注的地方,在前面的各种开源系统的介绍中,其实大多都已经涉及到了。

    首先,从框架结构的角度来说

    整个数据的读写转换流程,理想中当然是每个环节都能以Plugin的方式进行灵活拓展。链路环节拆得越细,定制能力当然就越好,但是要同时保持系统整体的易用性也就相对更加困难一点。

    那么,数据交换读写链路的分解,大致可以分为几个模块呢,往大了拆分,差不多就是:输入,过滤转换,输出这三个模块。

    再细化一些,为了提高模块的复用能力,那么还可以从输入模块中拆解出Decoder模块,从输出模块中拆解出Encoder模块

    最后,为了达到数据链路复用的目的,还可以在输出模块之前增加一个路由模块,将一份数据拆分或复制输出到多个目标源中。不过,如果在框架中引入了这样的设计,实际上是将业务流程方面的复杂度下沉到底层组件中来,是否值得,如何取舍,就要看整体系统的设计思路了。

    其次,从性能的角度考虑

    为了提升性能,除了要求执行节点具备水平拓展能力,还需要考虑支持单个作业的分布式执行能力。

    前者如何实现取决于数据同步服务系统的架构设计,如果是采用server模式的服务,客户端提交任务请求到服务端执行的,那么需要Server端能够水平拓展任务的worker执行节点,这个通常不会太难,就是需要自己管理工作节点,或者依托其它集群服务,比如提交MR任务到Hadoop集群上执行。而如果采用的是本地进程模式,客户端在哪里发起调用就在哪里执行,那么资源调度和负载均衡的工作,通常就会上移到工作流调度系统上来管理,数据同步服务自身不负责工作节点的管控。

    后者,单个作业的分布式执行能力,实现起来就复杂一些了。因为这涉及到单个作业内部数据的分片处理。当数据源是Hadoop类的系统时,由于这类系统从架构设计的角度,天生就支持数据分片的能力,所以实现起来通常都不会太困难,但是对于DB,消息队列类的数据源,如何实现分片,往往就要复杂一些了。

    以DB扫表任务为例,你要分片执行,那就需要数据表具备分段检索的能力,最好是可以基于主键索引进行分段检索,否则只是单纯的条件过滤,会大大加大对DB的压力。但是,现实应用中,很可能并不是所有的表都具备确定范围的主键,有些主键也可能是非连续离散的,这些都会导致很难均衡的对数据进行分片,进而影响分片执行的效率。

    另外,基于DataX这种输入输出端独立插件思想构建的数据交换链路,如何和Hadoop体系的数据源的数据分片处理流程更好的结合,充分利用好原生的分布式处理能力,也是需要仔细构思的。

    最后,从业务稳定性的角度考虑

    要保证业务的稳定性,从底层组件的角度来说,整体系统的流控和失败重试这两个环节往往也是需要重点考虑的。因为数据交换服务所对接的外部存储系统,通常还承载了其它的业务。所以其负载能力往往都有一定的约束要求,其业务环境也不是完全可控的。因此数据交换服务组件,需要能够约束自己的行为,同时应对可能发生的错误。其目的,都是为了提升整体链路的稳定性,降低维护代价。

    数据交换服务管控平台

    作为服务,不提供可视化的管控平台,只提供命令行交互方式,那就是耍流氓。

    管控平台管什么?首先,当然是管理数据交换作业的任务配置信息了

    标准的做法,基本都是让用户通过UI界面,以参数的形式配置任务信息,比如输入输出数据源,表格,字段信息,分区信息,过滤条件,异常数据处理方式,调度时间,并发度控制,流量控制,增量或全量配置,生命周期等等。总之,就是尽量让用户能够通过配置信息来表达自己的业务诉求。

    当然,任务可供配置的参数越多,使用起来可能就越繁琐,此外,一些复杂的过滤,聚合或转换逻辑,很可能也没办法简单的用配置的方式进行表达,这时候就需要考虑提供自定义组件的管理方式了。

    除去数据交换任务自身配置信息的管理,数据交换服务管控平台需要提供的其它服务,其实和大数据开发平台上其它类型的作业任务的管理十分类似,比如:

    • 提供数据交换任务的执行流水信息,便于用户查询任务执行情况和进行业务健康分析

    • 提供权限管控和业务分组管理,更好的支持多租户环境应用场景

    • 提供系统流量负载监控,任务错误跟踪报警等,更好的支持日常的系统及业务运行维护工作。

    这些服务可以由数据交换服务平台独立提供,但最理想的,还是和开发平台的其它作业任务融合到同一个平台上进行管理,即使底层支撑对应服务的后台可能是独立的,在用户交互后台上,也要尽可能集成到一起。一方面减少重复开发的代价,另一方面,降低用户的学习使用成本。

    上下游系统和业务流程适配

    你无法左右别人,但是你可以改变自己。很多时候,数据同步服务,需要配合上下游系统,进行必要的流程定制,来满足业务的需求。

    数据结构变更

    数据同步业务,最经常遇到的问题,就是业务DB的数据结构发生变更,导致任务运行失败。

    数据结构的变更,通常很难自动解决。比如用户自定义了数据扫描的语法,当数据结构变更以后,已经非法了;比如源表的字段信息发生了增删改,目标表如何映射适配?历史数据能否转换处理,是否需要转换处理?另外,不同的数据源,增删改的处理方式也可能不同,业务方希望采取的应对方式可能也和具体业务逻辑相关。所以,很多情况下,数据结构的变更,都是需要人工干预的。

    那么系统能做些什么呢?自然是通过工具尽可能的降低这个变更过程的代价,比如

    • 监控源表元数据的变更,提早发现问题,提早解决,避免在半夜真正执行任务时才出错报警

    • 规范业务流程,比如约定字段的变更方式,变更的通知机制等,通过最佳实践降低问题风险概率

    • 对一些已知场景提供标准化的自动处理方式,减少人工干预的需要,加快数据转换,重建处理流程等等

    数据时间问题

    在离线业务中,大量的数据导入任务都是在凌晨附近导入前一天的数据进行批处理分析。这种场景下经常可能会遇到以下问题:

    • 数据可能由于各种原因晚到,在数据导入任务开始执行的时候,前一天的数据还没有完全到位。


    数据晚到的可能原因很多,比如DB主从延迟太大,客户端上报不及时,业务端采集链路因为流量或负载或故障等原因未能及时采集数据等等。

    这时候,通常的做法,一是将日常数据采集时间适当往后推迟一小段时间(比如15分钟到半个小时)降低问题出现的概率。二是往往需要对各种链路已知可能延迟的环节进行监控,比如采集DB主从延迟时间,队列消费进度等等,及时报警或阻断下游任务的执行。三是对晚到的数据,需要根据业务需求制定适当处理策略,是丢弃还是补充回写到前一天的数据中,还是直接划入第二天的数据里等等。

    • 数据本身没有手段区分业务更新时间,具体执行结果依赖于任务执行时间


    比如DB扫表的任务,如果表格中没有用于区分业务时间的字段,但是统计业务中却需要按日期划分统计,就只能靠凌晨精确的时间点采集来实现了,这就很尴尬了,因为你很可能无法保证任务开始执行的时间。你可能会说这种情况是DB表结构设计得有问题,的确如此,这时候就需要推动业务方进行改造了。

    还有一种情况更常见一些,就是DB表格中的确存在业务更新字段,但是,同一主键的数据可能有多个状态变迁,会被更新多次。而时间戳只有一个。举个例子,比如你有一个订单信息表格,里面记录了下单,付款,发货,收货,确认等等不同的状态,但是,只有一个update字段。那么根据某一个时间点扫描的数据,你可能无法判断出这些状态发生变化的准确时间,那么就有可能发生统计归属错误或者遗漏的情况。

    这两种情况,通常都是因为业务方的业务流程本身并不依赖于这些时间信息的记录,但是做数据统计的时候需要这些信息,而业务开发方和数据统计方负责的同学是两拨人,开发方没有充分考虑统计的需求。

    有时候这种情况问题也不大,比如半夜业务变更不频繁,数据采集过程迟一些早一些,数据偏差都不大,或者这类数据统计到前一天还是后一天都没有太大的关系。但是,当出现大范围时间偏移,或者你需要重跑历史数据的时候,比如今天重跑上周的数据,那么从当前DB快照无法复原业务字段变更的具体时间点,就会成为一个无法忽视的问题了。

    总体来说,这类问题的解决,首先数据同步服务自身得提供根据业务时间过滤数据的手段,其次要推动业务方改造数据结构,避免出现无法还原的场景,最后,有些业务还可以通过采集binlog等实时增量的形式,通过分析每次数据的具体变化时序来解决(当然,由于log保存时间有限,对于长时间跨度重跑的场景,是无法通过这种方式来解决的)

    分库分表处理

    分库分表,大概是业务上了规模以后,大家都喜欢做的事。但是DB中分表可以,导入到比如Hive中以后,你得想办法合并啊,便于后续各种运算逻辑的开发和统计查询脚本的撰写,那么问题来了:

    比如你是通过扫表的方式获取数据,如果没有类似阿里的TDDL这样的分库分表中间件来屏蔽DB分库分表细节,你会需要自己处理相关逻辑,管理和连接所有数据实例。如果走binlog获取数据,在分库的场景下也需要自己想办法合并数据采集流程和结果。

    更麻烦的是,如果你的业务方分表设计的时候,不够规范,不同的分表之间没有唯一的主键可以加以区分(可以区分的字段,也可能不是主键),那么在合并数据的时候,你可能就需要允许用户自定义合并用字段,或者自动捏造出一个主键出来,避免数据的冲突

    这个问题同样,最理想的解决方案也是通过推动DB分库分表中间件的建设和业务规范的建立来解决,但这对很多公司来说往往不是一件简单的事,所以,在此之前,就需要自己想办法定制解决了。

    数据合并去重等

    通过binlog增量方式来获取DB变更数据,优势是时效性好,有时也是某些场景下唯一的解决方案。但是因为走Binlog来给离线批处理任务同步数据,实际上,数据是经过了表-流-表这种模式的切换,而这种切换也会带来附加的问题

    从表的变化解析成数据流,这个过程问题不大,但从数据流重新构建回表格,就会有几个问题需要关注了:

    • 取决于数据流传输的方式,数据流可能发生乱序,重复的问题,对重构表格带来困难。


    比如用消息队列传输数据,各个分区的数据可能无法保持全局有序性,消息队列本身可能也无法保证Exactly once的投递。如果业务流程不能允许这类问题的发生,那就需要针对性的加以防范了,比如结合业务知识,使用合适的分区字段,使得局部有序的数据对业务结果不会造成影响。

    • 目标端数据源,比如像HDFS或Hive文件,可能只允许添加记录,或全局重写,而无法单条删除或者更新记录。


    这种情况下如果源端数据源类似DB中,一条记录发生多次变更,就会生成多条变更记录,而下游任务比如一天的批处理任务,只需要最后凌晨时间点上的状态信息,这时候就需要对变更记录进行合并了。

    合并数据的可能方法很多,取决于具体的业务场景和代价,未必有统一的最佳方案。首先你需要解决数据乱序问题,然后:

    你可以在数据流式采集方案的后端,将数据先写入一个支持单条记录删改操作的中间数据源,然后到点再从这个中间数据源导出最终数据到目标数据源。

    如果数据量不大,你也可以在采集程序中汇总所有数据,去重完再写出到目标数据源。

    你也可以不去重直接将所有变更流水写入目标数据源,事后再运行一个清理程序进行去重,前提是除去采集时间,原始数据中还具备可以用作去重判断的依据。

    我司相关服务的现状和未来改进计划

    目前我司的数据交换服务,日志相关链路,采用Camus和自定义的Hive Kafka Handler两种方式采集,后者在采集的基础上添加了Per topic的过滤转换逻辑,可以通过自定义Hiveql一步完成数据的采集和转换工作。

    其它大数据组件之间以及与DB间的数据交换服务,由自研的与DataX类似架构的系统承担,插件式开发,能够处理增量/全量,并发流控,分库分表等前面所描述的常见需求。另外,管控平台基本实现了用户可视化的配置,管理,执行流水查询,变更记录查询,系统负载和业务进度监控报警等功能。此外,在数据交换任务的数据质量监控方面,也做了部分采集和统计分析工作。

    整体来说,主要的服务框架流程没有很大的问题,但是在与开发平台的整体集成和用户自助服务的易用性方面与理想的状态还有很大的差距,其次在性能,稳定性,拓展性等方面也有很多工作等待开展,在数据质量监控方面做的工作也相对粗糙,所以未来的改进方向,包括:

    • 底层数据交换组件的进一步模块化,标准化,重点加强用户自定义数据过滤和转换模块的建设

    • 单个作业分布式分片处理方案的改进,提升大表同步作业的处理效率

    • 数据合并/去重方案的改进,提升性能规避容量瓶颈(目前的变更合并工作还是通过二次写入专属DB来实现)

    • 任务流量,负载,进度,异常等Metrics信息的全面采集和汇总分析,便于及时发现问题,持续改进业务

    • 全链路的分级容错和自动重试恢复机制的完善改进(目前的容错重试机制是作业级别的,粒度太粗)

    • 更加自动,更加平滑的流控和负载隔离机制

    • 数据交换服务管控后台与大数据平台整体开发环境的进一步融合,提升用户自主服务能力,降低业务开发维护成本

    • 完善异常,错误反馈机制: 比如对常见问题,汇总,解析后再明确的反馈给用户,可能的话,提供解决意见和方案,而不是直接抛出异常代码,降低用户支持的代价。

    • 前述业务数据时间问题的全面推动改进,降低数据同步任务结果的不确定性


    小结

    总体来说,大数据开发平台的数据同步服务的构建,可以参考的方案很多,具体的读写组件的开发也并不困难,能够找到很多现成的解决方案。对于多数公司的大多数业务来说,底层不论采取什么方案,通常都是可行的。所以数据同步服务建设的成熟度水平,往往体现在管控平台的服务能力水平和业务接入及运维代价的高低。


    常按扫描下面的二维码,关注“大数据务虚杂谈”,务虚,我是认真的 ;)



    展开全文
  • 介绍 ZooKeeper 集群数据同步之前,先要清楚为什么要进行数据同步。在 ZooKeeper 集群服务运行过程中,主要负责处理发送到 ZooKeeper 集群服务端的客户端会话请求。这些客户端的会话请求基本可以分为事务性的会话...

    在这里插入图片描述


    流程图

    在这里插入图片描述

    在 Leader 节点选举后,还需要把 Leader 服务器和 Follow 服务器进行数据同步。在保证整个 ZooKeeper 集群中服务器数据一致的前提下,ZooKeeper 集群才能对外提供服务。


    why ?

    介绍 ZooKeeper 集群数据同步之前,先要清楚为什么要进行数据同步。在 ZooKeeper 集群服务运行过程中,主要负责处理发送到 ZooKeeper 集群服务端的客户端会话请求。这些客户端的会话请求基本可以分为事务性的会话请求和非事务性的会话请求,而这两种会话的本质区别在于,执行会话请求后,ZooKeeper 集群服务器状态是否发生改变。

    事物性会话请求最常用的操作类型有节点的创建、删除、更新等操作。而查询数据节点等会话请求操作就是非事务性的,因为查询不会造成 ZooKeeper 集群中服务器上数据状态的变更 。

    我们之前介绍过,分布式环境下经常会出现 CAP 定义中的一致性问题。

    比如当一个 ZooKeeper 集群服务器中,Leader 节点处理了一个节点的创建会话操作后,该 Leader 服务器上就新增了一个数据节点,如果不在 ZooKeeper 集群中进行数据同步,那么其他服务器上的数据则保持旧有的状态,新增加的节点在服务器上不存在。

    当 ZooKeeper 集群收到来自客户端的查询请求时,会出现该数据节点查询不到的情况,这就是典型的集群中服务器数据不一致的情况

    为了避免这种情况的发生,在进行事务性请求的操作后,ZooKeeper 集群中的服务器要进行数据同步,而主要的数据同步是从 Learnning 服务器同步 Leader 服务器上的数据


    How ?

    主要通过三个方面来讲解 ZooKeeper 集群中的同步方法,分别是同步条件、同步过程、同步后的处理。


    何时触发数据同步的机制?

    我们知道 Leader 选举首先要判断集群中 Leader 服务器是否存在不同,要想进行集群中的数据同步,首先需要 ZooKeeper 集群中存在用来进行数据同步的 Learning 服务器。

    也就是说,当 ZooKeeper 集群中选举出 Leader 节点后,除了被选举为 Leader 的服务器,其他服务器都作为 Learnning 服务器,并向 Leader 服务器注册。之后系统就进入到数据同步的过程中。


    同步哪些数据

    在数据同步的过程中,ZooKeeper 集群的主要工作就是将那些没有在 Learnning 服务器上执行过的事务性请求同步到 Learning 服务器上。

    这里请你注意,事务性的会话请求会被同步,而像数据节点的查询等非事务性请求则不在数据同步的操作范围内


    同步方式

    在具体实现数据同步的时候,ZooKeeper 集群又提供四种同步方式,

    在这里插入图片描述

    DIFF 同步

    DIFF 同步即差异化同步的方式.

    在 ZooKeeper 集群中,Leader 服务器探测到 Learnning 服务器的存在后,首先会向该 Learnning 服务器发送一个 DIFF 不同指令。

    在收到该条指令后,Learnning 服务器会进行差异化方式的数据同步操作。

    在这个过程中,Leader 服务器会将一些 Proposal 发送给 Learnning 服务器。之后 Learnning 服务器在接收到来自 Leader 服务器的 commit 命令后执行数据持久化的操作。


    TRUNC+DIFF 同步

    TRUNC+DIFF 同步代表先回滚再执行差异化的同步,这种方式一般发生在 Learnning 服务器上存在一条事务性的操作日志,但在集群中的 Leader 服务器上并不存在的情况 。

    发生这种情况的原因可能是 Leader 服务器已经将事务记录到本地事务日志中,但没有成功发起 Proposal 流程。

    当这种问题产生的时候,ZooKeeper 集群会首先进行回滚操作,在 Learning 服务器上的数据回滚到与 Leader 服务器上的数据一致的状态后,再进行 DIFF 方式的数据同步操作。


    TRUNC 同步

    TRUNC 同步是指仅回滚操作,就是将 Learnning 服务器上的操作日志数据回滚到与 Leader 服务器上的操作日志数据一致的状态下。之后并不进行 DIFF 方式的数据同步操作。


    SNAP 同步

    SNAP 同步的意思是全量同步,是将 Leader 服务器内存中的数据全部同步给 Learnning 服务器。

    在进行全量同步的过程中,Leader 服务器首先会向 ZooKeeper 集群中的 Learning 服务器发送一个 SNAP 命令,在接收到 SNAP 命令后, ZooKeeper 集群中的 Learning 服务器开始进行全量同步的操作。

    随后,Leader 服务器会从内存数据库中获取到全量数据节点和会话超时时间记录器,将他们序列化后传输给 Learnning 服务器。Learnning 服务器接收到该全量数据后,会对其反序列化后载入到内存数据库中。


    同步后的处理

    数据同步的本质就是比对 Leader 服务器与 Learning 服务器,将 Leader 服务器上的数据增加到 Learnning 服务器,再将 Learnning 服务器上多余的事物日志回滚。

    前面的介绍已经完成了数据的对比与传递操作,接下来就在 Learning 服务器上执行接收到的事物日志,进行本地化的操作。


    源码分析

    首先我们来看看 Learnning 服务器是如何接收和判断同步方式的。

    如下面的代码所示,ZooKeeper 底层实现了一个 Learner 类,该类可以看作是集群中 Learnning 服务器的实例对象,与集群中的 Learning 服务器是一一对应的。

    public class Learner {}
    
    

    而在 Learner 类的内部,主要通过 syncWithLeader 函数来处理来自 Leader 服务器的命令。在接收到来自 Leader 服务器的命令后,通过 qp.getType() 方法判断数据同步的方式。

    protected void syncWithLeader(long newLeaderZxid) throws Exception{
    
     if (qp.getType() == Leader.DIFF) {
    
        snapshotNeeded = false;
    
      }else if (qp.getType() == Leader.TRUNC) {
    
      }
    
    }
    
    

    在确定了数据同步的方式后,再调用 packetsCommitted.add(qp.getZxid()) 方法将事物操作同步到处理队列中,之后调用事物操作线程进行处理。

    if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
    
        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
    
        boolean majorChange = self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(),
    
                qp.getZxid(), true);
    
        if (majorChange) {
    
            throw new Exception("changes proposed in reconfig");
    
        }
    
    }
    
    if (!writeToTxnLog) {
    
        if (pif.hdr.getZxid() != qp.getZxid()) {
    
            LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
    
        } else {
    
            zk.processTxn(pif.hdr, pif.rec);
    
            packetsNotCommitted.remove();
    
        }
    
    } else {
    
        packetsCommitted.add(qp.getZxid());
    
    

    在这里插入图片描述

    展开全文
  • HTML5 localStorage 页面数据同步demo

    热门讨论 2014-05-11 21:58:05
    HTML5 localStorage 页面数据同步demo。 演示方法,同时打开localstorage1.html, localstorage2.html, localstorage3.html,需用用http方式打开,修改任意一个后,点save或clear。 其他两个页面会同步更新。
  • 关于数据同步的几种实现

    千次阅读 2018-08-08 09:11:08
    关于数据同步的几种实现 转载:https://blog.csdn.net/xuemoyao/article/details/14002209 概述 关于数据同步主要有两个层面的同步,一是通过后台程序编码实现数据同步,二是直接作用于数据库,在数据库层面实现...

    关于数据同步的几种实现

    转载:https://blog.csdn.net/xuemoyao/article/details/14002209
    概述

    关于数据同步主要有两个层面的同步,一是通过后台程序编码实现数据同步,二是直接作用于数据库,在数据库层面实现数据的同步。通过程序编码实现数据同步,其主要的实现思路很容易理解,即有就更新,无则新增,其他情况日志记录,就不做过多的介绍,这里主要讲述的是第二个层面的数据同步,即在数据库层面实现数据同步。

    数据库层面的数据库同步主要有三种方式:通过发布/订阅的方式实现同步,通过SQL JOB方式实现数据同步,通过Service Broker 消息队列的方式实现数据同步。

    下面分别就这三种数据同步方式,一一详解。

    1. 通过发布/订阅的方式实现同步

    发布/订阅是Sql Server自带的一种数据库备份的机制,通过该机制可以快速的实现数据的备份同步,不用编写任何的代码。

    此种数据同步的方式存在的以下的一些问题:

    表结构不能更改,同步双方的表结构必须一致,一旦表结构发生更改需要重新生成数据库快照。
    对于大数据量的同步没有可靠的保证。
    网络不稳定的情况下同步也不能保证。
    总的来说,这种数据备份同步的方式,在表结构一致、数据量不是特别大的情况下还是非常高效的一种同步方式。

    网上有很多的关于如何使用发布/订阅的方式实现数据同步的操作示例,这里就不再重复的演示了,有兴趣想要了解的朋友可以参考下面这篇文章:

    http://kb.cnblogs.com/page/103975/

    1. 通过SQL JOB方式实现数据同步

    通过Sql Job定时作业的方式实现同步其基本原理就是通过目标服务器和源服务器的连接,然后通过编写Sql语句,从源服务器中读取数据,再更新到目标服务器。

    这种数据同步的方式比较灵活。创建过sql定时作业之后,主要需要执行以下关键的两步。

    2.1 创建数据库连接(一般作为定时作业执行的第一步)

    不同数据库之间的连接可以通过系统的存储过程实现。下面就直接用一个示例来讲一下如何创建数据库连接。

    –添加一个连接

    –系统存储过程sp_addlinkedserver 参数:

    ———————-1:目标服务器的IP或别名,本例中为:’WIN-S1PO3UA6J7I’;———————-2:” (srvproduct,默认);

    ———————-3:’SQLOLEDB’(provider,默认值);

    ———————-4:目标服务器的IP或别名(datasrc),本例中为:’WIN-S1PO3UA6J7I’

    exec sp_addlinkedserver ‘WIN-S1PO3UA6J7I’,”,’SQLOLEDB’,’WIN-S1PO3UA6J7I’

    –添加登录用户连接

    –系统存储过程sp_addlinkedsrvlogin 参数:

    ———————-1:目标服务器的IP或别名,本例中为:’WIN-S1PO3UA6J7I’;

    ———————-2:’false’,默认值;

    ———————-3:null,默认值;

    ———————-4:’sa’,登录用户名;

    ———————-5:’pass@word1’,登录密码;

    exec sp_addlinkedsrvlogin ‘WIN-S1PO3UA6J7I’,’false’,null,’sa’,’pass@word1’

    创建数据库连接主要用到了以上的两个存储过程,但是在实际操作的过程中可能会遇到“仍有对服务器XXX的远程登录或连接登录问题”这样的问题,如果遇到此类问题,在执行上边的添加连接和登录用户连接之前还需要先删除某个已存在的链接,具体如下:

    –系统存储过程sp_droplinkedsrvlogin 参数:

    ———————-1:目标服务器的IP或别名,本例中为:’WIN-S1PO3UA6J7I’;———————-2:null

    exec sp_droplinkedsrvlogin ‘WIN-S1PO3UA6J7I’,null

    –系统存储过程sp_dropserver 参数:

    ———————-1:目标服务器的IP或别名,本例中为:’WIN-S1PO3UA6J7I’

    exec sp_dropserver ‘WIN-S1PO3UA6J7I’

    2.2 使用SQL 语句 实现数据同步

    主要的同步思路:

    1:在目标数据库中先清空要同步的表的数据

    2:使用insert into Table (Cloumn….) select Column….. from 服务器别名或IP.目标数据库名.dbo.TableName 的语法将数据从源数据库读取并插入到目标数据库

    Truncate table Org_DepartmentsExt –删除现有系统中已存在的部门表

    insert into Org_DepartmentsExt –从名为WIN-S1PO3UA6J7I的服务器上的DBFrom数据库上获取源数据,并同步到目标数据库中

     (
    
      [DeptID]
    
      ,[DeptStatus]
    
      ,[DeptTel]
    
      ,[DeptBrief]
    
      ,[DeptFunctions] 
    
     )
    

    SELECT [DeptID]

      ,[DeptStatus]
    
      ,[DeptTel]
    
      ,[DeptBrief]
    
      ,[DeptFunctions]
    

    FROM [WIN-S1PO3UA6J7I].[DBFrom].[dbo].[Org_DepartmentsExt]

    以上这两步便是通过SQL Job实现数据同步的关键步骤,在完成以上两步之后,如果没有其他的表要进行同步,则可创建同步计划以完善定时作业。带作业创建完后,便可以执行。

    这里主要只是演示了通过Sql Job方式实现数据同步的关键步骤。网上有很多具体的实例演示。有兴趣的朋友可以参考以下文章进行练习检验:

    http://www.cnblogs.com/tyb1222/archive/2011/05/27/2060075.html

    1. 通过SQL Server Service Broker 消息队列的方式实现数据同步

    3.1 SQL Server Service Broker概述

    SQL Server Service Broker 是数据库引擎的组成部分,为 SQL Server 提供队列和可靠的消息传递。既可用于使用单个 SQL Server 实例的应用程序,也可用于在多个实例间分发工作的应用程序。

    在单个 SQL Server 实例内,Service Broker 提供了一个功能强大的异步编程模型。数据库应用程序通常使用异步编程来缩短交互式响应时间,并增加应用程序总吞吐量。

    在多个SQL Server实例之间Service Broker 还可以提供可靠的消息传递服务。Service Broker 可帮助开发人员通过称为服务的独立、自包含的组件来编写应用程序。需要使用这些服务中所包含功能的应用程序可以使用消息来与这些服务进行交互。Service Broker 使用 TCP/IP 在实例间交换消息。Service Broker 中所包含的功能有助于防止未经授权的网络访问,并可以对通过网络发送的消息进行加密。

    3.2 具体的实现演示

    在这一小节里,主要是通过一个完整的数据同步的流程向大家演示,如何实现同一个数据库实例不同数据库的数据同步。关于不同的数据库实例间的数据库的数据同步整体上跟同一个实例的数据库同步是一样的,只不过在不同的数据库实例间同步时还需启用传输安全、对话安全,创建路由、远程服务绑定等额外的操作。

    这里边用到了大量的SQL Server XML的东西,如果有不理解的地方可以参考以下链接:http://www.cnblogs.com/Olive116/p/3355840.html

    这是我在做技术准备时,自己的一点学习记录。

    下面就是具体的实现步骤:

    3.2.1为数据库启动Service Broker活动

    这一步主要是用来对要进行数据同步的数据启用Service Broker 活动,并且授信。
    

    USE master
    GO
    –如果数据库DBFrom、DBTo不存在,则创建相应的数据库
    IF NOT EXISTS (SELECT name FROM sys.databases WHERE name =’DBFrom’)
    CREATE DATABASE DBFrom
    GO
    IF NOT EXISTS (SELECT name FROM sys.databases WHERE name =’DBTo’)
    CREATE DATABASE DBTo
    GO
    –分别为该数据库启用Service Broker活动并且授权信任
    ALTER DATABASE DBFrom SET ENABLE_BROKER
    GO
    ALTER DATABASE DBFrom SET TRUSTWORTHY ON
    GO
    ALTER AUTHORIZATION ON DATABASE::DBFrom To sa
    GO
    ALTER DATABASE DBTo SET ENABLE_BROKER
    GO
    ALTER DATABASE DBTo SET TRUSTWORTHY ON
    GO
    ALTER AUTHORIZATION ON DATABASE::DBTo TO sa
    GO

    3.2.2 创建数据库主密匙

    这一步主要用来创建数据库主密匙,上边有提到Service Broker可以对要发送的消息进行加密。

    Use DBFrom
    go
    create master key
    encryption by password=’pass@word1’
    go

    Use DBTo
    go
    create master key
    encryption by password=’pass@word1’
    go

    3.2.3 创建消息类型、协定

    这里主要用来创建消息类型和消息协定,源数据库和目标数据库的消息类型和协定都要一致。

    Use DBFrom
    go
    –数据同步—消息类型
    create message type [http://oa.founder.com/Data/Sync]
    validation=well_formed_xml
    go
    –数据同步–错误反馈消息类型
    create message type [http://oa.founder.com/Data/Sync/Error]
    validation=well_formed_xml
    go
    –数据同步协议
    create contract[http://oa.founder.com/Data/SyncContract]
    (
    [http://oa.founder.com/Data/Sync]
    sent by initiator,
    [http://oa.founder.com/Data/Sync/Error]
    sent by target
    )
    go

    Use DBTo
    go
    –数据同步—消息类型
    create message type [http://oa.founder.com/Data/Sync]
    validation=well_formed_xml
    go
    –数据同步–错误反馈消息类型
    create message type [http://oa.founder.com/Data/Sync/Error]
    validation=well_formed_xml
    go
    –数据同步协议

    create contract[http://oa.founder.com/Data/SyncContract]
    (
    [http://oa.founder.com/Data/Sync]
    sent by initiator,
    [http://oa.founder.com/Data/Sync/Error]
    sent by target
    )
    Go

    创建过之后效果如下图:

    3.2.4 创建消息队列

    这里主要用来创建消息队列,源数据库和目标数据库都要创建,队列名字可以自主命名。
    

    use DBFrom
    go
    create queue [DBFrom_DataSyncQueue]
    with status=on
    go

    use DBTo
    go
    create queue [DBFrom_DataSyncQueue]
    with status=on
    go

    创建之后效果如下图:

    3.2.5 创建数据同步服务

    这里我们通过利用上边创建的消息协定和消息队列来创建数据同步的服务。

    use DBFrom
    go
    create service [http://oa.founder.com/DBFrom/Data/SyncService]
    on queue dbo.DBFrom_DataSyncQueue
    go

    –数据同步服务
    use DBTo
    go
    create service [http://oa.founder.com/DBTo/Data/SyncService]
    on queue dbo.DBFrom_DataSyncQueue
    go

    创建后效果如下图:
    

    3.2.6 在源数据库上创建服务配置列表

    这里需要在源数据库上创建一个服务配置列表,主要用来保存之前创建过的服务名称,本例只是用来演示,所以只创建了一个服务,只能是同步一个数据表,如果有多个数据表需要同步,则需创建多个服务,所以这里创建一个服务配置列表,用来存储多个服务的服务名称。

    需要注意的是,下面的脚本在执行完创建表的操作之后又插入了一条数据,也就是上边我们创建的服务名,如果有多个服务的话,依次插入该表即可。

    use DBFrom
    go
    –同步数据–目标服务配置
    create table SyncDataFarServices
    (
    ServiceID uniqueidentifier,
    ServiceName nvarchar(256)
    )
    go
    –将上边创建的服务名,插入此表中
    insert into SyncDataFarServices (ServiceID,ServiceName)
    values
    (NEWID(),’http://oa.founder.com/DBTo/Data/SyncService‘)
    go

    效果如下图:

    3.2.7 发送数据同步消息

    这里创建了一个存储过程主要用来发送同步消息,该消息内容主要包括操作类型、主键、表名、正文内容,分别对应@DMLType,@PrimaryKeyField,@TableName,@XMLData。然后通过创建一个游标来条的读取上边创建的服务列表中的列表信息,向不同的服务发送消息。
    

    Use DBFrom
    go
    –发送同步数据消息
    Create procedure UP_SyncDataSendMsg
    (
    @PrimaryKeyField nvarchar(128),
    @TableName nvarchar(128),
    @DMLType char(1),
    @XMLData xml
    )
    as
    begin
    SET @XMLData.modify(‘insert {sql:variable(“@DMLType”)} as first into /’);
    SET @XMLData.modify(‘insert {sql:variable(“@PrimaryKeyField”)} as first into /’);
    SET @XMLData.modify(‘insert

    {sql:variable(“@TableName”)}
    as first into /’);
    DECLARE FarServices CURSOR FOR SELECT ServiceName FROM SyncDataFarServices;
    open FarServices
    declare @FarServiceName nvarchar(256);
    fetch FarServices into @FarServiceName;
    while @@FETCH_STATUS=0
    begin
    begin Transaction
    declare @Conv_Handler uniqueidentifier
    begin DIALOG conversation @Conv_Handler –开始一个会话
    from service [ http://oa.founder.com/DBFrom/Data/SyncService]
    to service @FarServiceName
    on contract [ http://oa.founder.com/Data/SyncContract];
    send on conversation @Conv_Handler
    Message type http://oa.founder.com/Data/Sync;
    fetch FarServices into @FarServiceName;
    commit;
    end
    close FarServices;
    deallocate FarServices;
    end
    go

    3.2.8 创建数据同步异常信息记录表

    这里创建该表主要用来记录在数据同步过程中出现的异常信息。

    use DBFrom
    go
    create Table dbo.SyncException
    (
    ErrorID uniqueidentifier,
    ConversationHandleID uniqueidentifier,
    ErrorNumber int,
    ErrorSeverity int,
    ErrorState int,
    ErrorProcedure nvarchar(126),
    ErrorLine int,
    ErrorMessage nvarchar(2048),
    MessageContent nvarchar(max),
    CreateDate DateTime
    )
    go
    –修改异常信息记录表
    alter table dbo.SyncException
    add
    PrimaryKeyField nvarchar(128),
    TableName nvarchar(128),
    DMLType char(1),
    DBName nvarchar(128)
    Go

    效果如下图:

    3.2.9 数据同步反馈

    这里主要用来在源数据库中接收队列中的消息,将同时出错的信息,解析一下,然后插入到异常信息记录表里边。

    –数据同步回馈

    use DBFrom
    go
    create procedure UP_SyncDataFeedback
    as
    begin
    set nocount on
    –会话变量声明
    declare @ConversationHandle uniqueidentifier;–会话句柄
    declare @Msg_Body nvarchar(max);
    declare @Msg_Type_Name sysname;
    –变量赋值
    while(1=1)
    begin
    begin transaction
    –从队列中接收消息
    waitfor
    (
    receive top(1)
    @Msg_Type_Name=message_type_name,
    @ConversationHandle=[conversation_handle],
    @Msg_Body=message_body
    from dbo.[DBFrom_DataSyncQueue]
    ),timeout 1000
    –如果接收到消息处理,否则跳过
    if(@@ROWCOUNT<=0)
    break;
    if @Msg_Type_Name=’http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog
    end conversation @ConversationHandle;
    else if @Msg_Type_Name=’http://oa.founder.com/Data/Sync/Error
    begin
    declare @DataSource xml;
    set @DataSource=Convert(xml,@Msg_Body);
    insert into dbo.SyncException(ErrorID,ConversationHandleID,ErrorNumber,ErrorSeverity,ErrorState,ErrorProcedure,ErrorLine,ErrorMessage,
    PrimaryKeyField,TableName,DMLType,MessageContent,DBName,CreateDate)
    select
    NEWID(),@ConversationHandle,
    T.c.value(‘./@ErrNumber’,’INT’),
    T.c.value(‘./@ErrSeverity’,’INT’),
    T.c.value(‘./@ErrState’,’INT’),
    T.c.value(‘./@ErrProcedure’,’Nvarchar(126)’),
    T.c.value(‘./@ErrLine’,’INT’),
    T.c.value(‘./@ErrMessage’,’nvarchar(2048)’),
    T.c.value(‘./@PrimaryKeyField’,’nvarchar(128)’),
    T.c.value(‘./@TableName’,’nvarchar(128)’),
    T.c.value(‘./@DMLType’,’char(1)’),
    T.c.value(‘./@MessageContent’,’nvarchar(max)’),
    T.c.value(‘./@DBName’,’nvarchar(128)’),
    GETDATE()
    from @DataSource.nodes(‘/row’) as T(c);
    end
    else if @Msg_Type_Name=’http://schemas.microsoft.com/SQL/ServiceBroker/Error
    end conversation @ConversationHandle;
    commit Transaction;
    end
    end
    commit;
    go

    3.2.10对Service Broker队列使用内部激活,并指定将调用的存储过程

    这里主要用来激活源数据库的消息队列,并为其指定调用的存储过程,即上边3.2.9 中创建的存储过程。
    

    –对Service Broker队列使用内部激活,并指定将调用的存储过程
    use DBFrom
    go
    alter queue dbo.DBFrom_DataSyncQueue with activation
    (
    status=on,
    max_queue_Readers=1,
    procedure_name=UP_SyncDataFeedback,
    execute as owner
    );
    Go

    3.2.11 在源数据库中为需要同步的数据表创建触发器

    这里就以用户表为例,具体操作如下,这里通过查询系统的Inserted和Deleted临时表来判断执行同步的操作类型是更新(U)、新增(A)还是删除(D),最后调用3.2.7 中创建的存储过程来对数据进行处理并发送。

    use DBFrom
    Go
    –用户信息同步
    Create Trigger UT_DataSync_Users
    on dbo.Org_Users
    after insert,update,delete
    as
    set nocount on ;
    –变量声明
    declare @PrimaryKeyField nvarchar(128),@TableName nvarchar(128),@DMLType char(1);
    declare @InsertCount int ,@DeleteCount int ;
    declare @XMLData xml;
    –变量赋值
    set @PrimaryKeyField=’ID’ –组合主键,多个主键使用”,”隔开
    set @TableName=’Org_Users’
    set @InsertCount=(select COUNT(*) from inserted)
    set @DeleteCount=(select COUNT(*) from deleted)
    if @InsertCount=@DeleteCount and @InsertCount<>0 —-Update
    begin
    select @XMLData=(select * from inserted For xml raw,binary base64,ELEMENTS XSINIL);
    set @DMLType=’U’;
    end
    else if(@InsertCount<>0 and @DeleteCount=0) —-Insert
    begin
    select @XMLData=(select * from inserted for xml raw ,Binary base64,ELEMENTS XSINIL)
    set @DMLType=’A’;
    end
    else—-Delete
    begin
    select @XMLData=(select *from deleted for xml raw,binary base64,ELEMENTS XSINIL)
    set @DMLType=’D’;
    end
    if(@XMLData is not null)
    begin
    exec UP_SyncDataSendMsg @PrimaryKeyField,@TableName,@DMLType,@XMLData;
    end
    go

    3.2.12 目标数据库中创建,字符分割函数

    该函数主要是用来进行字符分割,用来处理主键有多个字段的情况。

    –目标数据库

    use DBTo
    go
    –转换用‘,’分割的字符串@str
    create Function dbo.uf_SplitString
    (
    @str nvarchar(max),
    @Separator nchar(1)=’,’
    )
    returns nvarchar(2000)
    as
    begin
    declare @Fields xml;–结果字段列表
    declare @Num int;—–记录循环次数
    declare @Pos int;—–记录开始搜索位置
    declare @NextPos int;–搜索位置临时变量
    declare @FieldValue nvarchar(256);–搜索结果
    set @Num=0;
    set @Pos=1;
    set @Fields=CONVERT(xml,’‘);
    while (@Pos<=LEN(@Str))
    begin
    select @NextPos=CHARINDEX(@Separator,@Str,@Pos)
    if(@NextPos=0 OR @NextPos is null)
    select @NextPos=LEN(@Str)+1;
    select @FieldValue=RTRIM(ltrim(substring(@Str,@Pos,@NextPos-@Pos)))
    select @Pos=@NextPos+1
    set @Num=@Num+1;
    if @FieldValue<> ”
    begin
    set @Fields.modify(‘insert {sql:variable(“@FieldValue”)} as last into /Fields[1]’);
    end
    end
    return Convert(nvarchar(2000),@Fields);
    end
    go

    3.2.13 将解析过的消息信息,根据操作类型的不同同步到数据表中

    这是所有的数据同步中最关键也是最复杂的一步了,在整个开发的过程中,大部分时间都花在这上边了,具体的操作都在下面解释的很清楚了。
    

    –将XML数据源中的数据同步到数据表中(包括增删改)

    Use DBTo
    go
    create function dbo.UF_XMLDataSourceToSQL
    (
    @DataSource XML,–数据源
    @TableName varchar(128),–同步数据表名称
    @PrimaryKeyField varchar(128),–需要同步的表的主键,主键为多个时用‘,’隔开
    @DMLType char(1) –A:新建;U:编辑;D:删除
    )
    returns nvarchar(4000)
    as
    begin
    –变量声明及数据初始化
    –声明数据表@TableName列Column相关信息变量
    declare @ColumnName nvarchar(128),@DataType nvarchar(128),@MaxLength int;
    –声明用于拼接SQL的变量
    declare @FieldsList nvarchar(4000),@QueryStatement nvarchar(4000);
    declare @Sql nvarchar(4000);
    declare @StrLength int;
    –变量初始化
    set @FieldsList=’ ‘;–初始化变量不为null,否则对变量使用’+=’操作符无效
    set @QueryStatement=’ ‘;
    –主键信息,根据参数求解如:ID1ID2
    declare @PKs xml;
    –当前字段是否主键-在‘更新’,‘删除’同步数据时使用
    declare @IsPK nvarchar(128);
    –初始化游标–游标内容包括目标数据表TableName列信息
    DECLARE ColumnNameList CURSOR FOR SELECT COLUMN_NAME,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME=@TableName AND
    DATA_TYPE<>’xml’;
    –数据处理
    if @DMLType=’A’–插入数据
    begin
    open ColumnNameList
    fetch ColumnNameList into @ColumnName,@DataType,@MaxLength;
    while @@FETCH_STATUS=0
    begin
    –判断数据源列中是否存在属性:@ColumnName
    –判断数据源列中是否存在–元素:@ColumnName
    If @DataSource.exist(‘/row/*[local-name()=sql:variable(“@ColumnName”)]’)=1
    begin
    –拼接SQL
    set @FieldsList+=(@ColumnName+’,’);
    set @QueryStatement+=(‘T.c.value(”(./’+@ColumnName+’[not(@xsi:nil)])[1]”,”’+@DataType);–元素读取(包含空值情况)
    if @MaxLength is not null and @MaxLength<>-1
    begin
    set @QueryStatement+=’(‘+CONVERT(nvarchar,@MaxLength)+’)’;
    end
    else if @MaxLength=-1 and @DataType<>’xml’–已调整
    begin
    set @QueryStatement+=’(MAX)’;
    end
    set @QueryStatement+=(”’) as ‘+@ColumnName+’,’);
    end
    fetch ColumnNameList into @ColumnName,@DataType,@MaxLength
    end
    close ColumnNameList;
    deallocate ColumnNameList;
    set @StrLength=LEN(@FieldsList);
    –去掉@FieldsList结尾的’,’
    set @FieldsList=SUBSTRING(@FieldsList,1,@StrLength-1);
    set @StrLength=LEN(@QueryStatement);
    –去掉@QueryStatement结尾的’,’
    set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);
    set @Sql=N’insert into ‘+@TableName+’(‘+@FieldsList+’) select ‘+@QueryStatement+’ from @DataSource.nodes(”/row/”) as T(c)’;
    end

    else if @DMLType='U'--更新数据
        begin
           --更新语句where 后的条件表达式
           declare @Condition nvarchar(1000);
           set @Condition='  ';
           set @PKs=CONVERT(xml,dbo.uf_SplitString(@PrimaryKeyField,','));
           Open ColumnNameList
                fetch ColumnNameList into @ColumnName,@DataType,@MaxLength;
               while @@FETCH_STATUS=0
                begin
                --判断数据源列中是否存在元素:@ColumnName
                  if @DataSource.exist('/row/*[local-name()=sql:variable("@ColumnName")]')=1
                  begin
                     set @IsPK=null;
                     SELECT @IsPk=Fs.F FROM (SELECT T.c.value('.[text()]','Nvarchar(128)') AS F FROM @PKs.nodes('/Fields/Field') AS T(c))Fs Where Fs.F=@ColumnName
                     if @IsPK is null or @IsPK=''
                     begin
                       --非主键,更新字段值
                       set @FieldsList+=(@ColumnName+'=Source.'+@ColumnName+',');
                     end
                     else
                     begin
                        --主键,作为要更新条件
                       set @Condition+=@TableName+'.'+@ColumnName+'=Source.'+@ColumnName+' And ';
                     end
                     --XML查询
                     set @QueryStatement+=('T.c.value(''(./'+@ColumnName+'[not(@xsi:nil)])[1]'','''+@DataType);--元素读取(包含空值情况)
                     if @MaxLength is not null and @MaxLength<>-1
                        begin
                           set @QueryStatement+='('+CONVERT(nvarchar,@MaxLength)+')';
                        end
                     else if @MaxLength=-1 and @DataType<>'xml'
                        begin
                           set @QueryStatement+='(max)';
                        end
                      set @QueryStatement+=(''') as '+@ColumnName+',');
                  end
                  fetch ColumnNameList Into @ColumnName,@DataType,@MaxLength
                end
            close ColumnNameList;
            Deallocate ColumnNameList;          
            --去掉@FieldsList结尾的','
            set @StrLength=LEN(@FieldsList);
            set @FieldsList=SUBSTRING(@FieldsList,1,@StrLength-1);        
         --去掉@QueryStatement结尾的','
         set @StrLength=LEN(@QueryStatement);
         set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);
         --去掉@Condition结尾的‘and'
         set @StrLength=LEN(rtrim(@Condition));
         set @Condition=SUBSTRING(rtrim(@Condition),1,@StrLength-3);           
            set @Sql=N'USE DBTo ; update '+@TableName+' set '+@FieldsList+' from (select '+@QueryStatement+' 
            from @DataSource.nodes(''/row'') as T(c)) Source where '+@Condition;
    end  
    else if @DMLType='D' --删除数据
       begin
         --更新语句where后的条件表达式
         declare @LinkField nvarchar(1000);
         set @LinkField='  ';
         set @PKs=CONVERT(xml,dbo.uf_SplistString(@PrimaryKeyField,','));
         open ColumnNameList
            fetch ColumnNameList into @ColumnName,@DataType,@MaxLength;
            while @@FETCH_STATUS=0
            begin
            if @DataSource.exist('row/*[local-name()=sql:variable("@ColumnName")]')=1
             begin
              set @IsPK=null;--初始化
              --当前字段是否为主键
              select @IsPK=Fs.F from (select T.c.value('.[text()]','nvarchar(128)') as F from @PKs.nodes('/Fields/Field') as T(c))Fs where Fs.F=@ColumnName
              --主键
              if @IsPK is not null and @IsPK<>''
              begin
                 --主键删除条件
                 set @LinkField+='Target.'+@ColumnName+'=Source.'+@ColumnName+' And ';
                 --XML 查询
                 set @QueryStatement+=('T.c.value(''(./'+@ColumnName+'[not(@xsi:nil)])[1]'','''+@DataType);--元素读取(包含空值情况)
                if(@MaxLength is not null and @MaxLength<>-1)
                   begin
                      set @QueryStatement+='('+CONVERT(nvarchar,@MaxLength)+')';
                   end
                else if @MaxLength=-1 and @DataType<>'xml'
                   begin
                   set @QueryStatement+='(max)';
                  end
                set @QueryStatement+=(''') as '+@ColumnName+',');
              end 
             end
            fetch ColumnNameList into @ColumnName,@DataType,@MaxLength
            end
            close ColumnNameList;
            deallocate ColumnNameList;         
            --去除@QueryStateMent结尾的','
            set @StrLength=LEN(@QueryStatement);
            set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);          
            --去除@LinkField 结尾的’Add‘
            set @StrLength=LEN(rtrim(@LinkField));
            set @LinkField=SUBSTRING(rtrim(@LinkField),1,@StrLength-3);         
            set @Sql=N'Delete from '+@TableName+' from '+@TableName+' as Target inner join (select '+@QueryStatement+ ' from @DataSource.nodes(''/row'') as T(c))
    

    Source on ‘+@LinkField;
    end
    Return @Sql–‘hello’
    end
    go

    3.2.14 解析并处理从队列中读取的消息

    这里主要用来读取队列中的消息,并将消息进行处理,最终处理成一定的格式,并调用3.2.13中的存储过程,将数据同步到数据库中。

    –将数据同步到数据表中
    create procedure UP_SyncDataToTable
    as
    begin
    set nocount on
    –会话变量声明
    declare @ConversationHandle uniqueidentifier;–会话句柄
    declare @Msg_Body nvarchar(max);
    declare @Msg_Type_Name sysname;
    declare @ErrorNumber int ;
    –变量赋值
    while(1=1)
    begin
    begin transaction
    –从队列中接收消息
    waitfor
    (
    receive top(1)
    @Msg_Type_Name=message_type_name,
    @ConversationHandle=[conversation_handle],
    @Msg_Body=message_body
    – from dbo.[DBTo_DataSyncQueue]
    from dbo.[DBFrom_DataSyncQueue]
    ),timeout 500
    –如果接收到消息-处理,否则跳过
    if @@ROWCOUNT<=0
    begin
    break;
    end
    if @Msg_Type_Name=’http://oa.founder.com/Data/Sync
    begin
    –声明变量
    declare @DMLType char(1);
    declare @PrimaryKeyField nvarchar(128),@TableName nvarchar(128),@Sql nvarchar(4000);
    declare @DataSource xml
    –受影响的行数
    declare @EffectRowCount int;
    declare @ErrMsg xml;
    begin try
    –变量赋值
    set @DataSource=convert(xml,@Msg_Body);–数据源
    set @PrimaryKeyField=@DataSource.value(‘(/PrimaryKeyField)[1][text()]’,’nvarchar(128)’);–主键列表
    set @TableName=@DataSource.value(‘(/Table)[1][text()]’,’nvarchar(128)’);–操作数据表
    set @DMLType=@DataSource.value(‘/DMLType[1][text()]’,’char(1)’);–操作类型
    set @Sql=dbo.UF_XMLDataSourceToSQL(@DataSource,@TableName,@PrimaryKeyField,@DMLType);
    exec sp_executesql @Sql,
    N’@DataSource XML’,
    @DataSource;
    end try
    begin catch
    declare @DBName nvarchar(128)
    select @DBName=Name from master..SysDataBases where dbid=(select dbid from master..sysprocesses where spid=@@SPID)
    set @ErrorNumber=ERROR_NUMBER();
    set @ErrMsg=(select ERROR_NUMBER() as ErrNumber,
    ERROR_SEVERITY() as ErrSeverity,
    ERROR_STATE() as ErrState,
    ERROR_PROCEDURE() as ErrProcedure,
    ERROR_LINE() as ErrLine,
    ERROR_MESSAGE() as ErrMessage,
    @PrimaryKeyField as PrimaryKeyField,
    @TableName as TableName,
    @DMLType as DMLType,
    @Msg_Body as MessageContent,
    @DBName as DBName
    for XML raw);
    –GOTO 错误处理标签
    goto Err_Handle;
    end catch
    –结束会话
    End Conversation @ConversationHandle
    if @ErrorNumber is not null
    begin
    –错误处理区域
    Err_Handle:
    if @ErrMsg is not null
    begin
    declare @test nvarchar(128);
    –发送失败消息
    send on conversation @ConversationHandle
    message type http://oa.founder.com/Data/Sync/Error
    end
    –结束会话
    end conversation @ConversationHandle
    –break;
    –回滚–不可回滚,否则将无法发送失败消息
    –GoTO Err_Lab;
    end
    end
    commit transaction
    end
    end
    go

    3.2.15 对目标数据库的消息队列进行内部激活

    这里主要是用来激活目标数据库的消息队列,主要用来实现数据的同步以及同步出错的错误信息的反馈。

    –对Service Broker队列使用内部激活,并指定将要调用的存储过程
    use DBTo
    go
    –alter Queue dbo.[DBTo_DataSyncQueue] with activation
    alter Queue dbo.[DBFrom_DataSyncQueue] with activation
    (
    status=on,
    max_queue_readers=1,
    Procedure_name=UP_SyncDataToTable,
    Execute as self
    )
    Go

    完成以上这些步骤以后,就可以实现同一数据库实例上两个不同的数据库之间的数据同步。即如果DBFrom数据库中的Org_Users中的某一条信息发生变化,会马上的自动同步到DBTo数据库中的Org_Users 表。如果是想要实现不同的数据库实例间的数据库的表的同步,则可以参考以下链接:
    

    http://www.cnblogs.com/downmoon/archive/2011/05/05/2037830.html

    在创建启用传输安全、对话安全,创建路由、远程服务绑定等额外的操作之后,剩下的操作跟在同一数据库实例中的操作是一样的。

       此外,本文还参考了如下的链接:
    

    http://www.cnblogs.com/downmoon/archive/2011/04/05/2005900.html

       希望可以给大家一些启发和帮助。具体的源码有兴趣的朋友可以留下邮箱。
    
    展开全文
  • c#定时服务数据同步源代码

    热门讨论 2013-01-22 12:04:08
    c#定时数据同步,用户可以设置每天,每时分秒,按照指定规则 同步数据
  • Java实现Mysql数据同步

    千次阅读 2018-11-09 17:01:27
    离线应用程序数据同步到服务器端 服务器端数据同步到离线应用程序 同步记录表设计: 名 类型 不是null 主键 备注 id int √ √ 主键id start_id int     被同步表数据...

    应用场景:

    • 离线应用程序数据同步到服务器端
    • 服务器端数据同步到离线应用程序

    同步记录表设计:

    类型不是null主键备注
    idint主键id
    start_idint  被同步表数据,开始id
    end_idint  被同步表数据,结束id
    end_upate_timetimestamp  同步结束时的时间(被同步表最后一条同步数据创建时间)
    sync_typevarchar  同步类型
    create_timetimestamp 创建时间

     

    创建同步记录表sql文件:

    CREATE TABLE `sync_record` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
      `start_id` int(11) DEFAULT NULL COMMENT '被同步表数据,开始id',
      `end_id` int(11) DEFAULT NULL COMMENT '被同步表数据,结束id',
      `end_upate_time` timestamp(4) NULL DEFAULT NULL COMMENT '同步结束时的时间(被同步表最后一条同步数据创建时间)',
      `sync_type` varchar(3) DEFAULT NULL COMMENT '同步类型',
      `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8 COMMENT='同步记录表';

    本篇博客介绍的是Java程序实现Mysql数据同步,要对抽象类有深刻的理解,不然会对代码逻辑很懵懂,不懂得同学可以看我这篇博客回忆一下Java基础知识:

    Java抽象类

    编写同步数据逻辑抽象类代码:

    • 根据主键id同步

     AbstractSyncByIdService.java(抽象类)

    @Service
    public abstract class AbstractSyncByIdService {
    
        private static final Logger logger = LoggerFactory.getLogger(AbstractSyncByIdService.class);
    
        @Autowired
        private SyncDao syncDao;
    
    
        /**
         * 获取同步的上一个id
         *
         * @author HeLiu
         * @date 2018/7/18 11:20
         */
        public Integer queryPreviousId(String syncType) {
            return syncDao.queryPreviousId(syncType);
        }
    
        /**
         * 异常或者结束时,保存或者更新本次的同步记录
         *
         * @author HeLiu
         * @date 2018/7/18 11:39
         */
        protected void saveOrUpdateSyncRecord(Integer startId, Integer endId, String syncType) {
            boolean exsitFlag = syncDao.queryExsitBySyncType(syncType);
            //如果存在该同步类型的 同步记录,则只更新endId ; 不存在,则插入该类型的同步记录
            if (exsitFlag) {
                syncDao.updateEndIdBySyncType(syncType, endId);
            } else {
                syncDao.saveSyncRecord(syncType, startId, endId);
            }
        }
    
        /**
         * 执行同步,同步中的业务逻辑,数据之间的同步先后关系,都在这里编写
         *
         * @author HeLiu
         * @date 2018/7/18 11:36
         */
        public void excuteSync(String syncType, String pcId) {
    
            logger.info(".......start .excuteSync  ..syncType:{}..", EnumSyncType.enumOfByCode(syncType).desc);
    
            // 获取开始id
            Integer previousId = queryPreviousId(syncType);
    
            // 每次都会执行方法,判断是否有需要同步的数据
            Pair<Boolean, Object> resultPair = exsitNeedSync(previousId, pcId);
            while (resultPair.getLeft()) {
                // 设置最已同步的id 为前一个id
                Integer syncEndId = previousId;
                try {
                    // 同步数据,并返回结束时,本次同步数据的id,
                    // 没有异常,则表示同步成功
                    syncEndId = syncData(resultPair.getRight());
                    logger.info(".......同步数据id:{}.. .excuteSync  ..syncType:{}..", syncEndId, syncType);
                    // 同步成功 最新同步成功的id , 则变成上一个id
                    previousId = syncEndId;
    
                    resultPair = exsitNeedSync(previousId, pcId);
                } catch (Exception e) {
                    logger.info(".excuteSync..excetption..previousId:{}...syncType.:{}", previousId, syncType);
                    logger.error("excuteSync..excetption.", e);
    
                } finally {
                    // 保存同步记录,
                    // 每次同步成功一条数据,都需要更新最新已同步的id
                    logger.info("..saveOrUpdateSyncRecord...........");
                    saveOrUpdateSyncRecord(previousId, syncEndId, syncType);
                }
            }
            logger.info(".......end .excuteSync  ..syncType:{}..", EnumSyncType.enumOfByCode(syncType).desc);
    
    
        }
    
        /**
         * 根据同步开始id,同步数据, 返回结束时的id, 不同模块,实现不一样,这里抽象出来
         *
         * @author HeLiu
         * @date 2018/7/18 11:32
         */
        protected abstract Integer syncData(Object data) throws  Exception;
    
        /**
         * 根据同步id ,查询是否有需要同步的数据,true 表示有, false 表示没有
         *
         * @author HeLiu
         * @date 2018/7/18 16:21
         */
        public abstract Pair<Boolean, Object> exsitNeedSync(Integer previousId, String pcId);
    
    
    }
    •  根据创建时间同步

      AbstractSyncByTimeService.java(抽象类)

    @Service
    public abstract class AbstractSyncByTimeService {
    
        private static final Logger logger = LoggerFactory.getLogger(AbstractSyncByTimeService.class);
    
        @Autowired
        private SyncDao syncDao;
    
        /**
         * 获取最后一次的更新时间
         *
         * @author HeLiu
         * @date 2018/7/18 11:20
         */
        public String queryPreviousEndUpdateTime(String syncType) {
            return syncDao.queryPreviousEndUpdateTime(syncType);
        }
    
        /**
         * 异常或者结束时,保存或者更新本次的同步记录
         *
         * @author HeLiu
         * @date 2018/7/18 11:39
         */
        protected void saveOrUpdateSyncRecord(String endUpdateTime, String syncType) {
            boolean exsitFlag = syncDao.queryExsitBySyncType(syncType);
            // 如果存在该同步类型的 同步记录,则只更新同步数据的创建时间; 不存在,则插入该类型的同步记录
            if (exsitFlag) {
                syncDao.updateEndUpdateTimeBySyncType(syncType, endUpdateTime);
            } else {
                syncDao.saveEndUpdateTimeBySyncType(syncType, endUpdateTime);
            }
        }
    
        /**
         * 执行同步,同步中的业务逻辑,数据之间的同步先后关系,都在这里编写
         *
         * @author HeLiu
         * @date 2018/7/18 11:36
         */
        public void excuteSync(String syncType, String pcId) {
    
            logger.info(".......start .excuteSync  ..syncType:{}..", EnumSyncType.enumOfByCode(syncType).desc);
    
            // 获取开始同步时间
            String endUpdateTime = queryPreviousEndUpdateTime(syncType);
    
            // 每次都会执行方法,判断是否有需要同步的数据
            Pair<Boolean, Object> resultPair = exsitNeedSync(endUpdateTime, pcId);
            while (resultPair.getLeft()) {
                // 设置已同步的时间 为前一个时间
                String syncEndUpdateTime = endUpdateTime;
                try {
                    // 同步数据,并返回结束时,本次同步数据的创建时间,
                    // 没有异常,则表示同步成功
                    syncEndUpdateTime = syncData(resultPair.getRight());
                    logger.info(".......同步数据endUpdateTime:{}.. .excuteSync  ..syncType:{}..", syncEndUpdateTime, syncType);
                    // 同步成功 最新同步成功的创建时间 , 则变成上一个创建时间
                    endUpdateTime = syncEndUpdateTime;
    
                    resultPair = exsitNeedSync(endUpdateTime, pcId);
                } catch (Exception e) {
                    logger.info(".excuteSync..excetption..previousId:{}...syncType.:{}", endUpdateTime, EnumSyncType.enumOfByCode(syncType).desc);
                    logger.error("excuteSync..excetption.", e);
    
                } finally {
                    // 保存同步记录,
                    // 每次同步成功一条数据,都需要更新最新已同步的创建时间
                    saveOrUpdateSyncRecord(endUpdateTime, syncType);
                }
            }
            logger.info(".......end .excuteSync  ..syncType:{}..", EnumSyncType.enumOfByCode(syncType).desc);
    
    
        }
    
        /**
         * 根据同步开始时间,同步数据, 返回结束时的时间, 不同模块,实现不一样,这里抽象出来
         *
         * @author HeLiu
         * @date 2018/7/18 11:32
         */
        protected abstract String syncData(Object data) throws  Exception;
    
        /**
         * 根据同步开始时间 ,查询是否有需要同步的数据,true 表示有, false 表示没有
         *
         * @author HeLiu
         * @date 2018/7/18 16:21
         */
        public abstract Pair<Boolean, Object> exsitNeedSync(String endUpdateTime, String pcId);
    
    
    }

     注意:

    • 两者同步逻辑都是一样的一个根据主键id,前提是你的主键id是数字递增类型的不是UUID之类的,另一个根据数据的创建时间,利用时间有先后的原理。这二者同步的区别要区分好。
    • 根据你同步数据设置好区分的类别也就是syncType,例如:人员-'1';视频-'2'......,怎么开心怎么来。
    • 然后编写你自己的同步数据逻辑层一定要继承该类(AbstractSyncByIdService / AbstractSyncByTimeService,重写抽象类里面的方法,自定义你自己的业务代码,因为不同的同步数据,业务的代码不一样。
    • 这两个抽象类一定要仔细看,有详细的注解。
    • 两个抽象方法至关重要,一定要理解这两个方法的用处。

    代码补充: 

     SyncDao.java

    @Repository
    public class SyncDao {
    
        private static final String  name_space = "syncRecord" + SPOT;
    
        @Autowired
        private DaoClient daoClient;
    
    
        /**
         *  根据同步类型,查询出,原数据表中,开始同步的id
         * @date 2018/7/18 14:18
         */
        public Integer queryPreviousId(String syncType){
            String sqlId = name_space + "queryPreviousId";
            Map<String,Object> param = new HashMap<>();
            param.put("syncType", syncType);
            return daoClient.queryForObject(sqlId, param, Integer.class);
        }
    
        /**
         *  判断该种类型的同步信息是否存在
         * @author liuao
         * @date 2018/7/18 15:16
         */
        public boolean queryExsitBySyncType(String syncType){
            String sqlId = name_space + "queryExsitBySyncType";
            Map<String,Object> param = new HashMap<>();
            param.put("syncType", syncType);
            int count =  daoClient.queryForObject(sqlId, param, Integer.class);
            return count > 0 ? true : false ;
        }
    
        /**
         *  根据同步类型更新同步结束时的id
         * @author liuao
         * @date 2018/7/18 15:24
         */
        public int updateEndIdBySyncType(String syncType, Integer endId){
            String sqlId = name_space + "updateEndIdBySyncType";
            Map<String,Object> param = new HashMap<>();
            param.put("syncType", syncType);
            param.put("endId", endId);
            return daoClient.excute(sqlId, param);
        }
    
        /**
         *  根据同步类型更新同步结束时的id
         * @author liuao
         * @date 2018/7/18 15:24
         */
        public int updateEndUpdateTimeBySyncType(String syncType, String endUpdateTime){
            String sqlId = name_space + "updateEndUpdateTimeBySyncType";
            Map<String,Object> param = new HashMap<>();
            param.put("syncType", syncType);
            param.put("endUpdateTime", endUpdateTime);
            return daoClient.excute(sqlId, param);
        }
    
        /**
         *  根据同步类型保存同步结束时的更新时间
         * @author liuao
         * @date 2018/7/18 15:24
         */
        public int saveEndUpdateTimeBySyncType(String syncType, String endUpdateTime){
            String sqlId = name_space + "saveEndUpdateTimeBySyncType";
            Map<String,Object> param = new HashMap<>();
            param.put("syncType", syncType);
            param.put("endUpdateTime", endUpdateTime);
            return daoClient.insertAndGetId(sqlId, param);
        }
    
    
        /**
         *  保存同步记录
         * @date 2018/7/18 15:28
         */
        public int saveSyncRecord(String syncType, Integer startId ,Integer endId){
            String sqlId = name_space + "saveSyncRecord";
            Map<String,Object> param = new HashMap<>();
            param.put("syncType", syncType);
            param.put("startId", startId);
            param.put("endId", endId);
            return daoClient.excute(sqlId, param);
        }
    
        /**
         *  查询出最后一次的更新时间
         * @date 2018/8/2 19:48
         */
        public String queryPreviousEndUpdateTime(String syncType) {
            String sqlId = name_space + "queryPreviousEndUpdateTime";
            Map<String,Object> param = new HashMap<>();
            param.put("syncType", syncType);
            return daoClient.queryForObject(sqlId, param, String.class);
        }
    }

    sql语句:

    <sqltemplate id="queryPreviousId">
            <![CDATA[
                 SELECT
                    IFNULL (MAX(end_id),0) lastId
                FROM SYNC_RECORD
                WHERE SYNC_TYPE = :syncType
    		]]>
        </sqltemplate>
    
    
        <sqltemplate id="queryExsitBySyncType">
            <![CDATA[
                 SELECT
                    count(id)
                FROM SYNC_RECORD
                WHERE SYNC_TYPE = :syncType
    		]]>
        </sqltemplate>
    
        <sqltemplate id="updateEndIdBySyncType">
            <![CDATA[
                UPDATE SYNC_RECORD
                SET
                    END_ID = :endId
                WHERE SYNC_TYPE = :syncType
    		]]>
        </sqltemplate>
    
    
        <sqltemplate id="saveSyncRecord">
            <![CDATA[
                    INSERT INTO SYNC_RECORD
                    SET
                        START_ID = :startId ,
                        END_ID = :endId ,
                        SYNC_TYPE = :syncType
    
    		]]>
        </sqltemplate>
    
        <sqltemplate id="updateEndUpdateTimeBySyncType">
            <![CDATA[
                    update  SYNC_RECORD
                    SET
                        end_upate_time = :endUpdateTime
                        where SYNC_TYPE = :syncType
    
    		]]>
        </sqltemplate>
    
    
        <sqltemplate id="saveEndUpdateTimeBySyncType">
            <![CDATA[
                    INSERT INTO SYNC_RECORD
                    SET
                        END_UPATE_TIME = :endUpdateTime ,
                        SYNC_TYPE = :syncType
    		]]>
        </sqltemplate>
    
        <sqltemplate id="queryPreviousEndUpdateTime">
            <![CDATA[
                 SELECT
                    IFNULL (MAX(end_upate_time),'2018-01-01 00:00:00') lastId
                FROM SYNC_RECORD
                WHERE SYNC_TYPE = :syncType
    		]]>
        </sqltemplate>

    注意:代码是死的人是活的,灵活使用,不要被代码局限了,这个只是提供一下思路,具体怎么使用可以自己根据实际需求改和优化,深刻理解设计思路和对抽象类的一个灵活使用。

    这里的Dao层和sql的写法是jdbctemplate的封装,可以借鉴我的一篇博客——Java基于jdbctemplate数据持久层操作封装

    刚开始肯定有点难理解,耐下心仔细看,欢迎相互讨论——QQ:892715310,WX:Miss5202468。

    展开全文
  • 1.选择(工具--- 数据传输) 2.选择高级,高级里面有很多选择(例如:使用完整的插入语句) 3.结构同步 4.比对 5.勾选自己想要同步的表结构选项即可
  • 1、 早期关系型数据库之间的数据同步 1)、全量同步 比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法就是 分页查询源端的表,然后通过 jdbc的batch 方式插入到目标表,这个地方需要注意的是,分页...
  • 数据迁移&数据同步

    千次阅读 2018-12-17 11:22:22
    文章目录历史数据迁移实时数据同步 由于老系统满足不了业务需求,因此需要开发新系统,并且使用新的语言和架构,老系统的数据库数据肯定是不能弃掉,需要平滑迁移。而且还由于老系统的开发人员不在和严重不足,无法...
  • mysql之间的数据同步

    千次阅读 2018-06-21 17:31:50
    环境要求: Windows 操作系统 ...假设数据库A为主机,数据库B为从机(A向B提供同步服务,即B中的数据来自A) A机器:IP=10.10.151.166 B机器:IP=10.10.151.156 下面看单向同步的配置步骤:...
  • 网络游戏,最重要的部分应该就是数据同步了。一个游戏的数据同步质量,直接影响这个网游的操作体验。做出一个一步一卡的游戏,一定会成为其最大的槽点。 记得我当年最早玩的网络游戏《传奇》,那真是处处可卡bug满天...
  • ES多集群间数据同步

    万次阅读 2018-05-16 17:49:02
    ES多集群间数据同步1.引言 自己在google上搜了一下,自己总结了一下集群中某节点要访问远程集群节点中的数据,并保证数据的一致性和稳定性。举个例子,现有三个集群分别是:集群A、集群B和集群C,每个集群对应的有...
  • SpringBoot 定时任务实现数据同步方法

    千次阅读 热门讨论 2020-12-07 11:48:02
    方案一:通过轮询接口的方式执行pullData()方法实现数据同步 该方式的原理是先清空之前的所有数据,然后重新插入通过api调用获取的最新数据。该方法的优点,逻辑简单。缺点是,频繁删除、插入数据。再调用查询...
  • 本文主要介绍源表为单表时,数据增量抽取的情况。当源表为多表时,后面的文章会继续介绍。 一、抽取情况说明 将源数据库S中的A表(将此表称为源表),通过ETL工具抽取至目标数据库T的A表(将此表称为目标表)。假设...
  • FlinkX是一款基于Flink的分布式离线/实时数据同步插件,可实现多种异构数据源高效的数据同步,其由袋鼠云于2016年初步研发完成,目前有稳定的研发团队持续维护,已在Github上开源(开源地址详见文章末尾)。...
  • 需要跨网络:从阿里云服务器上的数据库,通过网闸使用ftp传文件的方式,将数据同步到业主的专网中;阿里云跟业主专网不能直连; 定时数据增量同步,具体同步哪些表,需要可配置; 节约工作量,最大限度上不改变...
  • 数据同步之全量数据同步(一)

    千次阅读 2019-12-20 10:52:11
    本文主要介绍源表为单表时,数据全量的情况。当源表为多表时,跟单表的情况差不多。 一、抽取情况说明 将源数据库S中的A表(将此表称为源表),通过ETL工具抽取至目标数据库T的A表(将此表称为目标表)。假设源表A的...
  • SQL Server数据同步

    千次阅读 2018-08-31 10:37:50
    SQL Server数据库数据同步的步骤 1、设置登录名密码、查看登录名状态、查看服务器名称 1)选择用户名“sa”右键选择“属性”,如图1 : 图1  2)设置密码,如图2 : 图2 3)设置状态、“确认”,如图3...
  • 数据同步两种方式

    千次阅读 2020-03-06 14:46:28
    如果数据要存储多份的时候,为了保证数据的准备性,我们需要保证数据更新的同步同步方式 1.全量同步:就是每天定时(避开业务高峰期)或者周期性全部把数据从一个地方拷贝到另一地方。(全部的数据) 2.增量...
  • 工作心得之接口数据同步

    千次阅读 2019-05-10 22:37:17
    在开发过程中避免不了调用其他第三... 本次同步数据主要采用resetful接口调用第三方接口把本系统的业务数据同步到第三接口,基本流程如下: 1、书写调用第三方接口的工具类,通过该工具类获取第三方接口的token ...
  • ES数据同步方案

    万次阅读 2019-03-06 08:41:11
    当业务量上升后,由于mysql对全文检索或模糊查询支持的能力不强,在系统中查询的地方,往往会出现慢sql等...接下来,就结合工作中实际用到的场景,对数据从mysql到es的同步进行一些分析。 在实践中我总结出了以下几...
  • 公众号推文规则变了,点击上方"数据社"关注,设为星标后台回复【加群】,申请加入数据学习交流群大数据集群内部都有节点级别和机架级别的容错机制(存储层对应的就是传统的三副...
  • 基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    千次阅读 多人点赞 2019-07-15 10:24:28
    一、背景 随着马蜂窝的逐渐发展,我们的...而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到...
  • 利用Kettle进行数据同步(上)

    万次阅读 2018-06-04 09:02:33
    写这篇文章,是源于公司内部的一个常见需求:将生产环境的数据同步到测试环境,以便更方便的进行测试和bug定位。 起初是用的Navicat Premium这款DB管理软件,功能非常强大了,足以满足开发人员的日常工作需求,也...
  • 有个需求需要查询远程数据库,然后将远程数据库中的数据抓取到本地,远程数据库中的数据是每天都增加的,所以就需要写个程序自动实现实时抓取。 这里我用到的框架是Springboot2.0+Mybatis+Mapper,涉及的数据库有...
  • C#——数据同步方法

    千次阅读 2016-08-30 16:09:26
    数据同步是并发编程不可避免的话题,今天我们就来谈谈C#中的数据同步方法。 进程内数据同步 数据主要通过同步原语来进行同步,先来看看C#中的两种最简单的数据同步方式 (1)lock关键字 lock关键字很简单,简单的...
  • redis数据同步时的过程

    千次阅读 2019-04-27 14:35:20
    今天我们来探讨一下Redis的主从复制的数据同步阶段的全量复制和增量复制.
  • zookeeper数据同步

    千次阅读 2019-01-31 20:36:37
    整个集群完成Leader选举后,Learner会向Leader进行注册,当Learner向Leader完成注册后,就进入数据同步环节,同步过程就是Leader将那些没有在Learner服务器上提交过的事务请求同步给Learner服务器,大体过程如下 ...
  • sql server作业实现数据同步

    千次阅读 2018-11-11 17:59:16
     客服QQ1793040 ----------------------------------------------------------     ...关于HKROnline SyncNavigator 注册机价格...HKROnline SyncNavigator 8.4.1 企业版数据同步软件 自2009年第一个版本开发...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,585,056
精华内容 634,022
关键字:

数据同步