精华内容
下载资源
问答
  • java数据实时同步系统

    2015-01-30 14:39:09
    java数据实时同步系统,把远程数据库数据实时同步到本地数据库
  • 多源异构数据库实时同步解决方案

    千次阅读 2021-03-07 15:57:37
    1 需求概述 将企业个业务系统(Oracle、SQL Server、MySQL...采用灵蜂数据集成软件BeeDI在异构库间进行实时数据同步,通过ETL全量同步历史数据,通过日志解析方式实时同步增量数据,BeeDI提供Oracle、SQL Server、M.

    1 需求概述

    将企业多个业务系统(Oracle、SQL Server、MySQL、DB2、PostgreSQL等)产生的交易数据流实时同步到数据仓库或大数据平台,通过对交易数据的联机实时分析,快速制定或调整商业计划,提升企业的核心竞争力。

    需求关键点:同步过程不可影响或中断业务系统正常运行、同步延时3级以内。

     

    2 技术原理

    采用灵蜂数据集成软件BeeDI在异构库间进行实时数据同步,通过ETL全量同步历史数据,通过日志解析方式实时同步增量数据,BeeDI提供Oracle、SQL Server、MySQL、DB2、PostgreSQL数据库日志解析功能。

    根据生产环境日志空间的大小,可分批增量或一次性同步历史数据。

    在日志空间受限的情况下,如果将所有历史数据一次性同步,全量过程产生的日志超出预留空间会被删除,造成实时日志解析任务数据缺失,需要分批增量同步历史数据,基本操作如下:

    a) 全量同步部分表。

    b) 在实时日志解析任务中添加已全量同步完成的表并启动任务,当源库和目标库两端对应表数据一致时,停止日志解析任务。

    c) 重复上面a)和b)步骤,直到所有表都加入实时日志解析任务。

    如果日志空间充足,则可一次性同步所有历史数据,然后启用实时日志解析任务。

     

    3 数据库环境配置

    3.1 启用日志捕获

    在源库创建同步用户,该用户可以读取所有对象、系统字典及数据库日志权限。

    在源库开启数据库日志功能,如Orcale开启归档日志、SQL Server开启完整日志、MySQL开启BinLog。

    3.2 存储空间

    根据源库历史数据量及日增数据量,评估目标库需要的空间大小,在目标库预留足够的表空间。

     

    4 BeeDI同步操作

    4.1 配置全量同步任务

    为优化数据抽取性能,建议在抽取组件的【选项】对话框中设置【异步】抽取模式。

    为优化数据加载性能,建议在装载组件的【选项】对话框中选择【批量】装载模式。

    全量同步任务配置完成如下:

    按照以上方式,创建多个ETL任务,其中每个ETL任务对应一张同步表。

    4.2 配置实时日志解析任务

    实时解析任务同一数据源对应一个任务,在其中指定所有要同步的表。

    在各个抽取组件的【选项】对话框中,设置增量抽取,指定日志模式。

    完成后的ETL任务如下:

    4.3 执行全量同步任务

    可以同时启动多个全量同步任务,只要数据库服务器资源及BeeDI所在机器资源充足。

    4.4 执行实时日志解析任务

    当所有全量同步任务运行结束后,编辑实时日志解析任务,在其中添加所有全量同步完成的表;将最先运行的全量同步任务的启动时间指定为日志解析点,日志解析点只需在任务初次执行时设置一次,以后任务运行将自动管理解析点。

    指定实时任务按秒定时运行,运行周期1秒。

    实时任务运行后,在日志窗口输出数据库日志解析信息,包含每分钟读取的日志记录,最近解析日志时间点。

     

    4.5 添加更多同步表

    当实时任务对应的作业状态频繁出现定时图标时,表明任务进入实时状态,此时源表和目标表数据一致,停止实时任务,配置运行其它表的全量同步任务(参考4.1和4.3)。当全量任务结束后,编辑实时任务,增加已全量同步完成的其他表,启动实时任务(参考4.2和4.4)。

     

    5 校验同步数据一致性

    依次在源库和目标库执行 select count(*) from [表] 比较表记录数是否相等。

    依次在源库和目标库执行 select sum([数值列]) from [表] 比较指定字段算术和是否相等。

    展开全文
  • 摘要: 这段时间负责一个老项目开发的数据库管理工作,这个项目中开发库与测试数据库分离,其中有些系统表数据与基础资料数据经常...Oracle 数据实时同步到 MySQL,目前支持的工具还比较少,一款免费的数据同步...

      摘要:

              这段时间负责一个老项目开发的数据库管理工作,这个项目中开发库与测试数据库分离,其中有些系统表数据与基础资料数据经常需要进行同步,相信很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据、表多、数据量大等情况就难以同步。我自己亲测了一种方式,可以非常方便地完成 Oracle 数据实时同步到 MySQL,跟大家分享一下,希望对大家有帮助。

    Oracle 数据实时同步到 MySQL,目前支持的工具还比较少,一款免费的数据同步工具 Tapdata Cloud 是支持的。跟前面分享到的其他数据库同步的操作方式类似。

     其他数据库的同步操作

    Oracle 数据怎么实时同步到 Kafka | 亲测干货分享建议收藏

    Oracle 数据怎么实时同步到 SQL Server | 亲测干货分享建议收藏

    Oracle 数据怎么实时同步到 DM DB 达梦数据库 | 亲测干货建议收藏

    Oracle 数据怎么实时同步到 Elasticsearch   |  亲测干货建议收藏

    Oracle 数据怎么实时同步到 MongoDB | 亲测干货分享建议收藏

    Oracle 数据怎么实时同步到 PgSQL | 亲测干货分享建议收藏

    第一步:配置 Oracle 连接

    1. 点击 Tapdata Cloud 操作后台左侧菜单栏的【连接管理】,然后点击右侧区域【连接列表】右上角的【创建连接】按钮,打开连接类型选择页面,然后选择MySQL

    2. 在打开的连接信息配置页面依次输入需要的配置信息

    【连 接 名 称】:设置连接的名称,多个连接的名称不能重复

    【数据库地址】:数据库 IP / Host

    【端          口】:数据库端口

    【数据库名称】:tapdata 数据库连接是以一个 db 为一个数据源。这里的 db 是指一个数据库实例中的 database,而不是一个 schema。

    【账          号】:可以访问数据库的账号

    【密          码】:数据库账号对应的密码

    【时 间 时 区】:默认使用该数据库的时区;若指定时区,则使用指定后的时区设置

    第二步:配置 Kafka 连接

    3. 同第一步操作,点击左侧菜单栏的【连接管理】,然后点击右侧区域【连接列表】右上角的【创建连接】按钮,打开连接类型选择页面,然后选择 Kafka

     4.在打开的连接信息配置页面依次输入需要的配置信息,配置完成后测试连接保存即可。

    第三步:选择同步模式-全量/增量/全+增

    进入Tapdata Cloud 操作后台任务管理页面,点击添加任务按钮进入任务设置流程

    根据刚才建好的连接,选定源端与目标端。  

     根据数据需求,选择需要同步的库、表,如果你对表名有修改需要,可以通过页面中的表名批量修改功能对目标端的表名进行批量设置.

    在以上选项设置完毕后,下一步选择同步类型,平台提供全量同步、增量同步、全量+增量同步,设定写入模式和读取数量。

    如果选择的是全量+增量同步,在全量任务执行完毕后,Tapdata Agent 会自动进入增量同步状态。在该状态中,Tapdata Agent 会持续监听源端的数据变化(包括:写入、更新、删除),并实时的将这些数据变化写入目标端。

    点击任务监控可以打开任务执行详情页面,可以查看任务执行的具体信息。

    第四部:进行数据校验

    一般同步完成后,我都习惯性进行一下数据校验,防止踩坑。

    Tapdata Cloud 有三种校验模式,我常用最快的快速count校验 ,只需要选择到要校验的表,不用设置其他复杂的参数和条件,简单方便。

     如果觉得不够用,也可以选择表全字段值校验 ,这个除了要选择待校验表外,还需要针对每一个表设置索引字段。

     在进行表全字段值校验时,还支持进行高级校验。通过高级校验可以添加JS校验逻辑,可以对源和目标的数据进行校验。

     还有一个校验方式关联字段值校验 ,创建关联字段值校验时,除了要选择待校验表外,还需要针对每一个表设置索引字段。

    以上就是 Oracle 数据实时同步到 MySQL的操作分享,希望上面的操作分享对你有帮助!码字不易,转载请注明出处~

     其他数据库的同步操作

    Oracle 数据怎么实时同步到 Kafka | 亲测干货分享建议收藏

    Oracle 数据怎么实时同步到 SQL Server | 亲测干货分享建议收藏

    Oracle 数据怎么实时同步到 DM DB 达梦数据库 | 亲测干货建议收藏

    Oracle 数据怎么实时同步到 Elasticsearch   |  亲测干货建议收藏

    Oracle 数据怎么实时同步到 MongoDB | 亲测干货分享建议收藏

    Oracle 数据怎么实时同步到 PgSQL | 亲测干货分享建议收藏

    java项目精品实战案例分享系列 《100套》推荐阅读

    MySQL 数据实时同步到 Elasticsearch 、MongoDB、SQL Server、PostgreSQL、DM DB 达梦数据库、Kafka 的方式也都是先配置源和目标的连接,然后新建任务选择同步模式:全量/增量/全量+增量,因为步骤相同,其他就不再贴图说明了。创建连接的时候,有没有发现:DB2、Sybase、Gbase 几个数据库现在是灰色锁定状态,应该是在开发中了,可能后续也会支持这些数据库的同步功能。

     打卡 文章 更新  35  /  100天

    大家可以点赞、收藏、关注、评论我啦 、有数据库相关的问题随时联系我或交流哟~!

    在这里插入图片描述

    展开全文
  • Mysql数据实时同步实践

    千次阅读 2019-11-14 17:01:03
    关于小米内部使用的数据库你知道多少?(文末有福利)背景MySQL由于自身简单、高效、可靠的特点,成为小米内部使用最广泛的数据库,但是当数据量达到千万/亿级别的时候,M...


      关于小米内部使用的数据库你知道多少?(文末有福利)

    往期文章回顾:Flink流式计算在节省资源方面的简单分析

    背景

    MySQL由于自身简单、高效、可靠的特点,成为小米内部使用最广泛的数据库,但是当数据量达到千万/亿级别的时候,MySQL的相关操作会变的非常迟缓;如果这时还有实时BI展示的需求,对于mysql来说是一种灾难。
    为了解决sql查询慢,查不了的业务痛点,我们探索出一套完整的实时同步,即席查询的解决方案,本文主要从实时同步的角度介绍相关工作。

    早期业务借助Sqoop将Mysql中的数据同步到Hive来进行数据分析,使用过程中也带来了一些问题:

    • 虽然Sqoop支持增量同步但还属于粗粒度的离线同步,无法满足实时性的需求

    • 每次同步Sqoop以sql的方式向Mysql发出数据请求也在一定程度上对Mysql带来一定的压力

    • 同时Hive对数据更新的支持也相对较弱

    为了更有效地连接前端业务数据系统(MySQL)和后端统计分析系统(查询分析引擎),我们需要一套实时同步MySQL数据的解决方案。

    小米内部实践

    如何能够做到数据的实时同步呢?我们想到了MySQL主从复制时使用的binlog日志,它记录了所有的 DDL 和 DML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗时间

    下面来看一下MySQL主从复制的原理,主要有以下几个步骤:

    1. master(主库)在每次准备提交事务完成数据更新前,将改变记录到二进制日志(binary log)中

    2. slave(从库)发起连接,连接到master,请求获取指定位置的binlog文件

    3. master创建dump线程,推送binlog的slave

    4. slave启动一个I/O线程来读取主库上binary log中的事件,并记录到slave自己的中继日志(relay log)中

    5. slave还会起动一个SQL线程,该线程从relay log中读取事件并在备库执行,完成数据同步

    6. slave记录自己的binlog   

    binlog记录了Mysql数据的实时变化,是数据同步的基础,服务需要做的就是遵守Mysql的协议,将自己伪装成Mysql的slave来监听业务从库,完成数据实时同步。

    结合小米内部系统特点,构建了Mysql数据同步服务–-LCSBinlog,作为一种独立的数据接入方式整合在Talos Platform中,Talos Platform作为大数据集成的基础解决方案,以自研消息队列Talos为数据总线,连接各种系统为主要目标,提供丰富的数据Source输入和数据Sink输出,并且Talos天然支持流式计算,因此业务可以充分利用Talos Platform互联互通的特性,并结合自身的业务需求实现更加高阶的业务场景。

    上图是Talos Platform中的整体流程架构,其中标红部分是目前LCSBinlog在小米内部使用最广泛的一条链路:Mysql --->  Talos  --->   Kudu  --->   BI,数据同步到kudu后借助Sparksql查询引擎为上层BI系统提供即席查询服务,Kudu和Sparksql的整合细节可以参见往期内容:告别”纷纷扰扰”—小米OLAP服务架构演进

    LCSBinlog服务的主体架构

    服务一共有两种角色

       Master :主要负责作业的调度,

       Worker: 主要完成具体的数据同步任务

    在Worker上运行两种作业:

    1. BinlogSyncJob:每一个mysql库都会对应这样一个Job,将binlog日志完整地写入到服务创建的Talos topic中

    2. MysqlSyncJob:同步历史数据,消费binlog数据,过滤特定库表数据实时同步至用户配置的topic中

    服务整体依赖于Zookeeper来同步服务状态,记录作业调度信息和标记作业运行状态;在kudu表中记录作业同步进度

    控制流程如下:

    1. Worker节点通过在Zookeeper上注册告知自己可以被调度

    2. 通过在Zookeeper上抢占EPHEMERAL临时节点实现Master的HA

    3. 用户在融合云(Web)上注册BinlogSource同步任务

    4. Master周期性从配置服务读取Binlog同步作业配置

    5. Master更新Zookeeper中的调度信息

    6. Worker节点 根据Zookeeper上的调度信息启动新分配任务,停止配置失效任务;作业启动后完成数据实时同步并周期性将同步进度记录在kudu中

    7. 服务上报监控信息到Falcon平台,作业异常退出发送报警邮件

     如何保障数据正确性

    >>>>

    顺序性

    用户配置的每一个BinlogSource 都会绑定一个Talos的topic,在进行消费的时候需要保证同一条mysql记录操作的顺序性,消息队列Talos是无法保证全局消息有序的,只能保证partition内部有序。

    对于配置分库分表或者多库同步任务的BinlogSource,服务会根据库表信息进行hash,将数据写入相应的partiton,保证同一张表的数据在一个partition中,使得下游消费数据的顺序性;

    对于单表同步的作业目前使用一个partition保证其数据有序。

    >>>>

    一致性

    如何保证在作业异常退出后,作业重新启动能够完整地将mysql中的数据同步到下游系统,主要依赖于以下三点

    1. 服务会记录作业同步的offset,重启后从上次commit的offset继续消费   

    2. Binlog数据的顺序性保证了即便数据被重复消费(未commit的数据),也能对同一条记录的操作以相同的顺序执行

    3. 下游存储系统kudu,Es ,Redis基于主键的操作能够保证binlog重复回放后数据的最终一致性

    应用场景  

    有了这份数据我们可以做些什么事情呢,本节例举了几种常见的应用场景     

     

    >>>>

    实时更新缓存


    业务查询类服务往往会在mysql之上架设一个缓存,减少对底层数据库的访问;当mysql库数据变化时,如果缓存还没有过期那么就会拿到过期的数据,业务期望能够实时更新缓存;

    利用binlog服务,根据策略实时将数据同步到redis中,这样就能够保证了缓存中数据有效性,减少了对数据库的调用,从而提高整体性能。

    >>>>

    异步处理,系统解耦

    随着业务的发展,同一份数据可能有不同的分析用途,数据成功写入到mysql的同时也需要被同步到其他系统;如果用同步的方式处理,一方面拉长了一次事务整个流程,另一方面系统间也会相互影响

    数据在mysql中操作成功后才会记录在binlog中,保证下游处理到时的一致性;使用binlog服务完成数据的下发,有助于系统的解耦

    关于异步处理,系统解耦在消息队列价值思考一文中有更深入的解读 

    >>>>

    即席查询的BI系统

    就如文章开篇提到的,mysql在一定场景下的性能瓶颈,mysql数据同步到kudu后可以借助sparksql完成性能的提升

    因为同样是sql接口,对使用者的切换成本也是较低的,数据同步到更适合的存储中进行查询,也能够避免因大查询而对原mysql库其他查询的影响

    目前小米内部稳定运行3000+的同步作业,使用binlog服务同步数据到kudu中;小米内部BI明星产品XDATA借助整套同步流程很好地支持了运营、sql分析同学日常统计分析的需求

    如何使用Binlog数据

    用户接入数据的时候要求mysql库开启binlog日志格式必须为Row模式:记录的是每一行记录的每个字段变化前后的值,虽然会造成binlog数据量的增多,但是能够确保每一条记录准确性,避免数据同步不一致情况的出现

    最终通过监听binlog日志,LCSBinlog服务将数据转换成如下的数据结构,写入用户注册的Topic中, 目前Sink服务使用SparkStreaming实时转储数据到kudu中,后续也将逐步迁移到Flink上以提升资源利用、降低延迟

    业务用户也可以根据我们提供的数据格式,实时消费Talos数据以实现更复杂的业务逻辑,下表为每一种数据操作,是否保存修改前后的列表    

     疑难杂症

    下面分享2个上线后遇到的有趣问题

    >>>>

    数据不一致问题,业务使用唯一索引

    业务接入一段时间后, 发现部分表会偶尔存在kudu表的数据条目数多于同步的mysql表的数据条目数,我们将多出来的数据与mysql产生的binlog日志经过一一对比,发现用户在mysql表中设置了唯一索引,通过唯一索引修改了主键,而kudu中的数据是通过主键标识或更新一条记录的,于是update操作变成了insert操作,这就造成了原来的1条记录变成了2条。

    解决办法:对于这种类型的表,LCSBinlog服务会把一次Update操作转换成一条Delete数据和一条Insert数据

    >>>>

    Full Dump同步历史数据时,客户端超时

    服务刚上线的时候,通过jdbc 执行sql的方式完成全量历史数据的同步,在同步的过程中会发现dump任务会卡顿很长时间才会返回结果,当数据量很大会出现超时同步失败的情况,会造成数据的延迟。调研后发现使用mysql官方jdbc在客户端查询数据的时候,默认为从服务器一次取出所有数据放在客户端内存中,fetch size参数不起作用,当一条SQL返回数据量较大时可能会出现OOM

    解决办法:当statement设置以下属性时,采用的是流数据接收方式,每次只从服务器接收部份数据,直到所有数据处理完毕。优化后历史数据同步稳定运行,对mysql端的压力也很小        

                        总结                 

    MySQL以Binlog日志的方式记录数据变化,基于流式数据的Change Data Caputre (CDC)机制实现了LCSBinlog服务,

    本文主要对LCSBinlog的服务架构、应用场景以及在小米内部的实践经验进行了介绍,也和大家分享了我们实际中遇到的问题和解决方案,希望能够帮助到大家理解服务的原理,带来启发,也欢迎大家和我们一起交流。

    文末福利

    雷军曾说:

    “一本书、一个人改变了我一辈子,这使得我上大学一年级的时候,就想建一家世界一流的公司。那是80年代的一本书,印得很粗糙,翻译也跟今天不太一样,但看得我激动的不行。”

    雷军讲话中提到的那个人是乔布斯,

    那本书就是《硅谷之火》。

    那么请问对你影响最大的书是哪一本呢?

    请在留言区写下书名并说说这本书是如何影响你的

    留言点赞数量最多的人,将获得这本《硅谷之火》

    小编剧透:

    我们将从大家留下的书名中选一本作为下期福利

    梦想还是要有的,万一实现了呢~!快来参与吧~!


    我就知道你“在看”

    展开全文
  • 项目开发阶段遇到一个需求,描述大致就是同一个用户在A系统数据库保存的数据信息与在B系统数据库保存的数据信息要保持同步。当A系统用户修改了个人信息,A系统后台在将用户修改后的信息入库的同时也会向B系统发送...

            项目开发阶段遇到一个需求,描述大致就是同一个用户在A系统数据库保存的数据信息与在B系统数据库保存的数据信息要保持同步。当A系统用户修改了个人信息,A系统后台在将用户修改后的信息入库的同时也会向B系统发送消息,让B系统后台进行自动数据信息同步。

            这个可以根据各企业各自的系统间通讯方式来灵活处理。这里我介绍我运用的处理方式,作为经验总结记录和分享。

            深谙spring的实现原理:使用dom4j技术对xml文件进行解析读取扫描注解通过反射机制来进行对象的创建,于是解决上述需求的方案由此得到启发。对于我们实际系统来说,这就是一个小框架,扩展性非常好,后来者只需要专注业务逻辑的实现即可完成数据同步的需求。

            下面先贴目录结构

             


    这里运用先缓存业务逻辑处理方法的策略,即在服务器启动的时候就将写好的业务逻辑处理方法缓存到内存中,通过监听器监听到其他系统有发送同步消息时自动调用相应的处理方法来进行数据同步。

    要缓存服务,需要用到注解和反射

    下面贴上两个自定义注解:分别是类注解和方法注解

    package com.zy.isc.common;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * <p>Title: IscClassMapping</p>
     * <p>Description: 消息业务处理类注解
     * 用于标识类为消息处理类,和IscMethodMapping方法注解配合使用
     * spring容器加载完成后会将具休的业务方法缓存起来,用于处理消息。
     * </p>
     * <p>Company: 
     * @author kjkfb_zy  2017-7-31 
     * <p>Just do it!!!</p>
     * @version v1.0
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    public @interface IscClassMapping {
    
    }
    package com.zy.isc.common;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * <p>Title: IscMethodMapping</p>
     * <p>Description: 消息业务处理方法注解</p>
     * <p>Company: 
     * @author kjkfb_zy  2017-7-31 
     * <p>Just do it!!!</p>
     * @version v1.0
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface IscMethodMapping {
    	public String name();
    	public String desc() default "";
    }
    

    然后定义一个通用的业务处理类,通过这个类来保存注解类对象,然后运用反射机制来调用具体的业务逻辑处理方法

    package com.zy.isc.common;
    
    import java.io.Serializable;
    import java.lang.reflect.Method;
    
    /**
     * <p>Title: ServiceBean</p>
     * <p>Description: 保存到map中的业务bean</p>
     * <p>Company: 
     * @author kjkfb_zy  2017-7-31 
     * <p>Just do it!!!</p>
     * @version v1.0
     */
    public class ServiceBean implements Serializable{
    	
    	private static final long serialVersionUID = 7453372917648514518L;
    	
    	private Method method;
    	private Object object;
    	private String desc;
    	
    	public Method getMethod() {
    		return method;
    	}
    	public void setMethod(Method method) {
    		this.method = method;
    	}
    	public Object getObject() {
    		return object;
    	}
    	public void setObject(Object object) {
    		this.object = object;
    	}
    	public String getDesc() {
    		return desc;
    	}
    	public void setDesc(String desc) {
    		this.desc = desc;
    	}
    
    }
    

    spring容器初始化时还需要做的另一件事——将带有注解的类和方法缓存在map中,key值就是方法上面的注解value值,key对应的value就是带注解的对应的业务处理类对象实例

    package com.zy.isc.core_receive;
    
    import java.lang.reflect.Method;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.ApplicationContext;
    
    import com.zy.isc.common.IscClassMapping;
    import com.zy.isc.common.IscMethodMapping;
    import com.zy.isc.common.ServiceBean;
    
    /**
     * <p>Title: InitServiceMethodMapping</p>
     * <p>Description: spring容器启动时调用这里的初始化方法,将带有自定义注解的类和方法缓存在map中</p>
     * <p>Company: 
     * @author kjkfb_zy  2017-7-31 
     * <p>Just do it!!!</p>
     * @version v1.0
     */
    public class InitServiceMethodMapping {
    
    	private static Logger logger = LoggerFactory.getLogger(InitServiceMethodMapping.class);
    	private static Map<String,ServiceBean> map = null;
    	
    	private InitServiceMethodMapping(){}
    	
    	public static Map<String,ServiceBean> getMethodMap(){
    		return map;
    	}
    	
    	public static synchronized void init() throws Exception{
    		if(map == null){
    			logger.info("initialize biz interface object and save into map start");
    			map = new HashMap<String, ServiceBean>();
    			ApplicationContext context = SpringContextReceiveUtil.getApplicationContext();
    			for(String s : context.getBeanDefinitionNames()){
    				Class<?> c = context.getBean(s).getClass();
    				if(c.getAnnotation(IscClassMapping.class)!=null){
    					Method[]method = c.getDeclaredMethods();
    					ServiceBean serviceBean = null;
    					for(Method m : method){
    						IscMethodMapping mksIscMethodMapping = m.getAnnotation(IscMethodMapping.class);
    						if(mksIscMethodMapping!=null){
    							if(!map.containsKey(mksIscMethodMapping.name())){
    								serviceBean = new ServiceBean();
    								serviceBean.setObject(context.getBean(s));
    								serviceBean.setMethod(m);
    								serviceBean.setDesc(mksIscMethodMapping.desc());
    								map.put(mksIscMethodMapping.name(),serviceBean);
    								logger.info("@biz interface name:["+mksIscMethodMapping.name()+"],already saved in cached map@");
    							}else{
    								throw new Exception("initialize biz interface failed, name:["+mksIscMethodMapping.name()+"]repeated,please modify then try,classpath:"+c.getName());
    							}
    						}
    					}
    				}
    			}
    			logger.info("initialize biz interface object and save into map start,total biz interface count:"+map.size());
    		}
    	}
    }
    

    然后在spring容器启动的时候调用上述类中的初始化方法和启动消息监听器

    package com.zy.isc.core_receive;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    
    import com.pingan.isc.ISCMessageBroker;
    import com.zy.isc.handler.ServerHandler;
    
    public class SpringContextReceiveUtil implements ApplicationContextAware{
    
    	private Logger logger = LoggerFactory.getLogger(SpringContextReceiveUtil.class);
    	private static ApplicationContext applicationContext;
    	
    	@Override
    	public void setApplicationContext(ApplicationContext applicationContext)throws BeansException {
    		logger.info("initialize spring context start:"+applicationContext);
    		SpringContextReceiveUtil.applicationContext = applicationContext;
    		logger.info("initialize spring context end, bean count:"+applicationContext.getBeanDefinitionCount());
    		try {
    			//这里缓存业务接口时容器还没有完全启动完成,所以使用纯线程来启动消息中心监听程序,以免影响启动超时的情况
    			InitServiceMethodMapping.init();
    			//启动消息监听
    			initMessageHandler();
    		} catch (Exception e) {
    			logger.error("initialize spring context failed",e);
    		}
    	}
    
    	public static ApplicationContext getApplicationContext(){
    		return applicationContext;
    	}
    
    	public void initMessageHandler() {
    		try {
    			logger.info("initialize MSG listener start");
    			//启动消息监听
    			int corePoolSize = 10;
    			int maximumPoolSize = 20;
    			int keepAliveTime = 300;
    			int queueSize = 100;
    			ServerHandler handler = new ServerHandler();
    			ISCMessageBroker.MessageExecutor(corePoolSize, maximumPoolSize, keepAliveTime, queueSize, handler);
    			logger.info("initialize MSG listener end");
    		} catch (Exception e) {
    			logger.error("initialize MSG listener exception",e);
    		}
    	}
    }
    

    消息监听器启动时需要指定消息处理器,这个处理器实现了MessageHandler接口,一旦有消息从其他系统发过来,监听器监听到消息到来就会调用messageReceived(Object arg0)这个方法,参数即为接收到的消息

    package com.zy.isc.handler;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.pingan.isc.core.MessageHandler;
    import com.zy.isc.core_receive.ServiceReceiveExecutor;
    
    public class ServerHandler implements MessageHandler {
    	private static final Logger logger = LoggerFactory.getLogger(ServerHandler.class);
    	
    	@Override
    	public void messageReceived(Object arg0) throws Exception{
    		try {
    			logger.info("=======invoke biz method start=======");
    			long start = System.currentTimeMillis();
    			ServiceReceiveExecutor.execute(arg0);
    			long end = System.currentTimeMillis();
    			logger.info("=======invoke biz method end=======");
    			logger.info("time cost:"+(end-start)/1000);
    		}catch (Exception e) {
    			logger.error("Message Received Exception"+arg0,e);
    		}
    	}
    }
    

    然后在这个类的messageReceived(Object arg0)方法中再调用接收消息执行器将接收到的消息进行处理,解析消息内容得到里面用来标记具体业务逻辑处理方法的值,然后将该值与缓存在map中的key值比对,找到对应的方法用反射来调用。

    package com.zy.isc.core_receive;
    
    import java.util.Enumeration;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.ResourceBundle;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.dc.eai.data.CompositeData;
    import com.dc.eai.data.Field;
    import com.dcfs.esb.client.converter.PackUtil;
    import com.zy.isc.common.ServiceBean;
    
    /**
     * <p>Title: ServiceReceiveExcutor</p>
     * <p>Description:接收ISC消息执行具体的业务方法 </p>
     * <p>Company: 
     * @author kjkfb_zy  2017-7-31 
     * <p>Just do it!!!</p>
     * @version v1.0
     */
    public class ServiceReceiveExecutor {
    
    	private static final Logger logger = LoggerFactory.getLogger(ServiceReceiveExecutor.class);
    	
    	//读取配置文件
    	private static ResourceBundle resource;
    	static{
    		resource = ResourceBundle.getBundle("IscConfigPlus");
    	}
    	
    	public static String getValue(String key){
    		return resource.getString(key);
    	}
    	
    	/**
    	 * <p>Description:根据消息子主题和业务码执行具体的业务方法 </p>
    	 * @param message
    	 * @return
    	 */
    	public static Object execute(Object message) throws Exception{
    		logger.info("===== unpack message start =====");
    		Map<String, Object> map = unpackMSG(message);
    		String uniqueId = null;
    		CompositeData compositeData = null;
    		CompositeData body = null;
    		if (map != null && map.size() > 0) {
    			uniqueId = (String) map.get("uniqueId");
    			compositeData = (CompositeData) map.get("compositeData");
    			body = compositeData.getStruct("BODY");
    		}else {
    			logger.info("message is null");
    			return null;
    		}
    		logger.info("===== unpack message end =====");
    		try {
    			if(StringUtils.isBlank(uniqueId)){
    				logger.error("uniqueId is null,no method matches , message infomation:\r\n"+message);
    				throw new Exception("uniqueId is null,no method matches , message infomation:\r\n"+message);
    			}
    			
    			boolean isContainsKey = InitServiceMethodMapping.getMethodMap().containsKey(uniqueId);
    			if (isContainsKey) {
    				ServiceBean serviceBean = InitServiceMethodMapping.getMethodMap().get(uniqueId);
    				logger.info("request biz interface's ID:["+uniqueId+"],biz interface description["+serviceBean.getDesc()+"]");
    				return serviceBean.getMethod().invoke(serviceBean.getObject(),body);
    			}else {
    				logger.info("no method maches the request,message information\r\n" + compositeData );
    			}
    		} catch (Exception e) {
    			logger.error("biz method exception,args:\r\n",e);
    			throw e;
    		}
    		return null;
    	}
    	
    	/**
    	 * <p>Description: 标准报文体解包,将报文中的消息子主题和交易码拼接后作为业务逻辑方法的唯一标识</p>
    	 * @param message
    	 * @return
    	 */
    	public static Map<String,Object> unpackMSG(Object message){
    		Map<String, Object> retMap = new HashMap<String, Object>();
    		if (message != null) {
    			//解析出报文体,存到map中
    			CompositeData compositeData  = PackUtil.unpackXmlStr((String)message);
    			//打印此日志方便查看报文,生产环境去掉
    			logger.debug("message content:\r\n" + compositeData);
    			retMap.put("compositeData", compositeData);
    			CompositeData body = compositeData.getStruct("BODY");
    			retMap.put("body", body);
    			
    			Map<String, Object>dataMap = new HashMap<String, Object>();
    			Enumeration<String> keys = resource.getKeys();
    			while (keys.hasMoreElements()) {
    				String key = (String) keys.nextElement();
    				String value = getValue(key);
    				CompositeData struct = compositeData.getStruct(value);
    				if (struct != null && struct.size()>0) {
    					logger.debug("key:value ——> " + key+":"+value);
    					dataMap.put(key, struct);
    				}
    			}
    			logger.debug("dataMap:\r\n"+dataMap);
    			if (dataMap != null && dataMap.size()>0) {
    				CompositeData iscSysHeadCompositeData = (CompositeData) dataMap.get("iscSysHead");
    				CompositeData iscPubHeadCompositeData = (CompositeData) dataMap.get("iscPubHead");
    				
    				Field subTopicField = iscSysHeadCompositeData.getField("SUB_TOPIC");
    				Field serviceCodeField = iscPubHeadCompositeData.getField("SERVICE_CODE");
    				if (subTopicField != null) {
    					String subTopic = subTopicField.getValue().toString();
    					logger.debug("message subtopic: " + subTopic);
    					retMap.put("subTopic", subTopic);
    				}else {
    					retMap.put("subTopic", "");
    				}
    				if (serviceCodeField != null) {
    					String serviceCode = serviceCodeField.getValue().toString();
    					logger.debug("message serviceCode: " + serviceCode);
    					retMap.put("serviceCode", serviceCode);
    				}else {
    					retMap.put("serviceCode", "");
    				}
    				
    				String subTopic = retMap.get("subTopic").toString();
    				String serviceCode = retMap.get("serviceCode").toString();
    				String uniqueId = subTopic + serviceCode;
    				retMap.put("uniqueId", uniqueId);
    			}else {
    				logger.info("dataMap is null,uniqueId is null");
    			}
    		}
    		return retMap;
    	}
    }
    

    这个ServiceReceiveExecutor类会将消息中解析出来的报文body通过反射参数传到具体的业务逻辑处理类中,最后就是具体的业务逻辑处理类了,这个类或者方法可以按相同的方式进行任意扩展

           

    package com.zy.isc.service;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;
    
    import com.dc.eai.data.CompositeData;
    import com.dc.eai.data.Field;
    import com.zy.isc.common.IscClassMapping;
    import com.zy.isc.common.IscMethodMapping;
    
    /**
     * <p>Title: UserInfoService</p>
     * <p>Description: 接收BECIF广播用户信息Service</p>
     * <p>Company: 
     * @author kjkfb_zy  2017-7-31 
     * <p>Just do it!!!</p>
     * @version v1.0
     */
    @Service
    @IscClassMapping
    public class UserInfoService {
    	
    	private static final Logger logger = LoggerFactory.getLogger(UserInfoService.class);
    	
    	//服务类
    	//@Resource(name = "userService")
    	//private UserService userService;
    
    	//name 唯一标识 = 子主题(20005)+ 交易码(000012)
    	@IscMethodMapping(name="20005000012",desc="xxx业务需求描述")
    	public void userInfoCombine(CompositeData compositeData) throws Exception{
    		Field clientNoField = compositeData.getField("CLIENT_NO");
    		Field clientNo1Field = compositeData.getField("CLIENT_NO1");
    		String clientNo = null;
    		String clientNo1 = null;
    		if (clientNoField != null) {
    			clientNo = clientNoField.getValue().toString();
    			logger.info("combine becif:" + clientNo);
    		}
    		if (clientNo1Field != null) {
    			clientNo1 = clientNo1Field.getValue().toString();
    			logger.info("combined becif: " + clientNo1);
    		}  
    		if (StringUtils.isNotBlank(clientNo1) && StringUtils.isNotBlank(clientNo)) {
    			Map<String, Object> paramMap = new HashMap<String, Object>();
    			paramMap.put("clientNo", clientNo);
    			paramMap.put("aClentNo", clientNo1);
    			//boolean flag = scfpUserService.checkBecifExist(clientNo1);
    			boolean flag = false;
    			logger.info("becif exist:" + flag);
    			if (flag) {
    				try {
    					//scfpUserService.combineUserBecif(paramMap);
    				} catch (Exception e) {
    					logger.error("userInfoCombine() method exception:" + e.getMessage());
    				}
    			}
    		}
    	}
    }
    

    最后还有一个容器销毁时释放缓存的监听器

    package com.zy.isc.listener;
    
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.pingan.isc.ISCMessageBroker;
    
    /**
     * <p>Title: StartupListener</p>
     * <p>Description:</p>
     * <p>Company: 
     * @author kjkfb_zy  2017-7-31 
     * <p>Just do it!!!</p>
     * @version v1.0
     */
    public class StartupListener implements ServletContextListener {
    	private static final Logger LOGGER = LoggerFactory.getLogger(StartupListener.class);
    	
    	@Override
    	public void contextInitialized(ServletContextEvent arg0) {
    		LOGGER.info("===== MSG listener preparation =====");
    	}
    	
    	@Override
    	public void contextDestroyed(ServletContextEvent arg0) {
    		ISCMessageBroker.destroyed();
    		LOGGER.info("===== ISCMessageBroker destroyed,resource release =====");
    	}
    
    }
    
    这样整个小框架就完毕了,使用时只需要在spring的配置文件中将SpringContextReceiveUtil这个类的bean配置好,在web.xml中配置StartupListener这个监听器就可以使用了。接收消息按UserInfoService类方法上面注解唯一标识来区分。后续还有其他消息要接收,直接按照这种注解方式在UserInfoService类中扩展或者另外新增类似UserInfoService类都可以。
    展开全文
  • 上海立邦TU报销系统,是上海立邦集团针对内部的报销的业务,编写的一套系统,此系统主要特点是和Web、SAP和Notes等系统实现无缝对接,从而完成整体业务的流转,目前立邦已经存在SAP报销系统、Notes系统,情况如下: ...
  • 摘要: 很 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对...我亲测的数据同步工具是 Tapdata Cloud,对注册用户是永久免费的,同步效率极高。 第一步:配置 Oracle 连接 ...
  • Linux下Rsync+sersync实现数据实时同步

    万次阅读 2019-09-02 11:33:09
    2、sersync可以记录下被监听目录中发生变化的(包括增加、删除、修改)具体某一个文件或某一个目录的名字,然后使用rsync同步的时候,只同步发生变化的这个文件或者这个目录。 二、Rsync+Inotify-tools与Rsync+...
  • 数据实时同步之MongoDB

    千次阅读 2020-07-07 08:31:00
    转载本文需注明出处:微信公众号EAWorld,违者必究。前言:随着传统企业的发展,企业数据呈现多样化,海量化,难以实现数据快速分析。MongoDB是当前很企业使用的,当日积月累数据很大...
  • 两个系统之间数据同步

    千次阅读 2020-05-23 21:50:40
    本文所讨论的数据同步是指分别部署的系统之间的数据同步数据同步可分为三类:实时同步、定时同步、手动同步。 同步方案可分为:数据库同步、读写文件同步、直接调用接口同步。 2.数据同步 同步讲究数据...
  • 系统数据同步方案

    千次阅读 2019-01-24 23:21:58
    一.RabbitMQ分布式集群架构 ...RabbitMQ可以通过三种方法来部署分布式集群系统,分别是:cluster,federation,shovel cluster: 不支持跨网段,用于同一个网段内的局域网 可以随意的动态增加或者减少 节点之间需要...
  • 1、 早期关系型数据库之间的数据同步 1)、全量同步 比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法就是 分页查询源端的表,然后通过 jdbc的batch 方式插入到目标表,这个地方需要注意的是,分页...
  • 台SQLServer数据实时同步工具

    万次阅读 2018-11-11 17:56:17
    因为这个版本的syncnavigator注册机是程序员自己开发的,因而成本比以前官方成本要小,并且没有做过多市场开发营销,所以价格相对以前来说优惠很,这对于有数据同步需求的公司和团队来说,...
  • 数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为ODS(Operational Data Store)数据。在互联网企业中,常见的ODS数据有业务日志数据(Log)和业务DB数据(DB)两类。对于业务DB数据来说,从MySQL等...
  • 使用WebSocket解决页面数据实时同步

    万次阅读 2018-11-20 11:06:21
    一个页面需要在不同的PC端访问,在某一PC端对网页内容发生改变时,其他PC端页面数据实时更新显示. 实现: 采用webSocket+AOP通知的方式实现 思路: 当页面数据修改时,会通过后端保存方法存进数据库,这样我们就要一个...
  • shell mysql数据实时同步脚本

    千次阅读 2018-03-04 15:54:24
    1. 背景 公司大佬需要报表实时更新,从业务报表同步数据仓库,可根据需要设置同步时间几个crontab 或者while true,用shell脚本获取增量数据实时更新,业务系统存在物理删除的场景,这就需要去监控业务库删除...
  •  JD_databus是为满足多数据中心项目的mysql在数据中心间复制的需求所产生的。最开始JD_databus是在LinkedIn的databus的基础上开发的,本次设计考虑到可维护性、代码的简洁、需求的快速迭代,决定重新开发。设计和...
  • 采用DataX实现表增量数据同步

    千次阅读 2020-12-21 14:39:17
    这两天验证了一下阿里的DataX数据同步工具,觉得DataX可以用来做管理数据的多级数据同步。DataX用来做批量数据迁移很适合,能够保证数据的一致性,性能也很好,结合时间戳字段,用来实现数据定时增量同步也是可以的...
  • SQL server数据实时同步到mysql

    万次阅读 2018-12-07 17:04:47
    本文在...在同步的前提下,环境一定要搭好,测试的时候应为mysql安装的一些bug导致失败了很次,又重装过 ---安装安装mysqlconnector http://www.mysql.com/products/connecto...
  • 2、sersync可以记录下被监听目录中发生变化的(包括增加、删除、修改)具体某一个文件或者某一个目录的名字,然后使用rsync同步的时候,只同步发生变化的文件或者目录 二、rsync+inotify-tools与rsync+sersync架构...
  • ES数据同步方案

    万次阅读 2019-03-06 08:41:11
    当业务量上升后,由于mysql对全文检索或模糊查询支持的能力不强,在系统中查询的地方,往往会出现慢sql等...接下来,就结合工作中实际用到的场景,对数据从mysql到es的同步进行一些分析。 在实践中我总结出了以下几...
  • 情况大数据集群需要获取业务数据,用于...第二种则是通过数据同步的方式,将关系型数据同步到大数据集群,可以是存储在 hdfs 上,使用 hive 进行分析,或者是直接存储到 hbase 中。 其中数据同步又可以大致分...
  • Linux系统实时数据同步和数据转发

    千次阅读 2013-12-23 22:24:52
    linux以其高性能、任务、高并发等特点而著称,在服务器市场上,linux系统占据着越来越的市场份额,特别是在云计算风起云涌的年代,linux系统更是在众多系统中脱颖而出。 今天要讲的是云存储,云存储简单点说就是...
  • SqlServer实时数据同步到MySql

    千次阅读 2018-11-11 17:58:23
     客服QQ1793040 ----------------------------------------------------------     ...关于HKROnline SyncNavigator 注册机价格...HKROnline SyncNavigator 8.4.1 企业版数据同步软件 自2009年第一个版本开发...
  • 大数据开发平台-数据同步服务

    万次阅读 2017-09-21 13:38:35
    同步一切
  • 产品编号:MR-MRS产品类别:超宽带高速记录回放系统 Product introduction 产品简介 主要用于对光纤信号进行长时间高速连续实时采集记录和回放产生,适用于雷达、无线通信、软件无线电、电子对抗、电子侦察、...
  • 如何实现两个系统之间的数据同步

    万次阅读 2017-02-25 15:54:58
    1、实现原理图: 2、涉及技术 a.Dubbo接口的注册与调用 b.使用jms异步消息传递实现...学习过程中遇到什么问题或者想获取学习资源的话,欢迎加入Java学习交流群,群号码:543120397 我们一起学Java!......
  • 企业系统之间数据同步处理

    千次阅读 2015-12-11 10:48:04
    数据同步一般是指一个数据源的数据发生改变时,其他相关的数据源的数据也发生相应变化。数据同步可以有五种实现方案,根据具体需求不同,可以采取不同方案。 1. 触发器:在源数据库建立增、删、改触发器,每当源...
  • 最近写一个出版社的投标文件,其中涉及到线下ERP系统需要与淘宝店铺库存进行实时同步,所以将当时的解决方案和大家分享下,也方便以后查阅使用。   项目需求   目前此项目存在库存不统一的问题,随着信息时代...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,086,616
精华内容 434,646
关键字:

多系统数据实时同步