精华内容
下载资源
问答
  • 一、问题描述在mysql主从配置搭建好以后,偶尔会出现从库无法同步主库数据的情况,经过测试,把产生主从数据同步的集中情况进行了归纳以及总结,问题如下:1、主库抛出异常, 例如主键冲突等情况,这是主从配置...

    一、问题描述

    在mysql主从配置搭建好以后,偶尔会出现从库无法同步主库数据的情况,经过测试,把产生主从数据不同步的集中情况进行了归纳以及总结,问题如下:

    1、主库抛出异常
    例如主键冲突等情况,这是主从配置就会失效,丛库就无法同步主库的数据了。如下:
    这里写图片描述

    2、重启主库的服务,
    这种情况在测试环境是常见的,比如关机重启,这样mysql的服务就会重启,也会导致同步点丢失,无法在进行主从同步,

    3、对从数据库进行的写操作,
    也就是在从数据库上进行了增、删、改操作后,也会导致主从同步失效

    以上是目前测试到的会导致主从同步失效的情况,如果有其他情况,欢迎留言分享
    解决方案有2种:
    a、先备份主库的所有数据到丛库,然后从新配置主从环境,参见前面的一片博客即可
    mysql主从配置教程

    此种解决方案,如果入在生产环境需要先讲主数据库备份到丛库

    b、在从数据库忽略错误

    进入从数据库命令行

    1、停止slave,命令:stop slave;

    2、跳过错误的次数,后面的数字可变
    set global sql_slave_skip_counter =1;

    3、重启slave
    start slave;

    设置后,数据库会自动更新下来,当然这种方案有风险就是不知道错误几次是几次,
    所以比较稳妥的做法是备份,重做主从

    展开全文
  • 1、 早期关系型数据库之间的数据同步 1)、全量同步 比如从oracle数据库中同步一张表的...2)、基于数据文件导出和导入的全量同步,这种同步方式一般只适用于同种数据库之间的同步,如果是不同的数据库,这种...

    1、 早期关系型数据库之间的数据同步

    1)、全量同步

    比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法就是 分页查询源端的表,然后通过 jdbc的batch 方式插入到目标表,这个地方需要注意的是,分页查询时,一定要按照主键id来排序分页,避免重复插入。

    2)、基于数据文件导出和导入的全量同步,这种同步方式一般只适用于同种数据库之间的同步,如果是不同的数据库,这种方式可能会存在问题。

    3)、基于触发器的增量同步

    增量同步一般是做实时的同步,早期很多数据同步都是基于关系型数据库的触发器trigger来做的。

    使用触发器实时同步数据的步骤:

    A、 基于原表创触发器,触发器包含insert,modify,delete 三种类型的操作,数据库的触发器分Before和After两种情况,一种是在insert,modify,delete 三种类型的操作发生之前触发(比如记录日志操作,一般是Before),一种是在insert,modify,delete 三种类型的操作之后触发。

    B、 创建增量表,增量表中的字段和原表中的字段完全一样,但是需要多一个操作类型字段(分表代表insert,modify,delete 三种类型的操作),并且需要一个唯一自增ID,代表数据原表中数据操作的顺序,这个自增id非常重要,不然数据同步就会错乱。

    C、 原表中出现insert,modify,delete 三种类型的操作时,通过触发器自动产生增量数据,插入增量表中。

    D、处理增量表中的数据,处理时,一定是按照自增id的顺序来处理,这种效率会非常低,没办法做批量操作,不然数据会错乱。  有人可能会说,是不是可以把insert操作合并在一起,modify合并在一起,delete操作合并在一起,然后批量处理,我给的答案是不行,因为数据的增删改是有顺序的,合并后,就没有顺序了,同一条数据的增删改顺序一旦错了,那数据同步就肯定错了。

    市面上很多数据etl数据交换产品都是基于这种思想来做的。

    E、 这种思想使用kettle 很容易就可以实现,笔者曾经在自己的博客中写过 kettle的文章,https://www.cnblogs.com/laoqing/p/7360673.html

    4)、基于时间戳的增量同步

    A、首先我们需要一张临时temp表,用来存取每次读取的待同步的数据,也就是把每次从原表中根据时间戳读取到数据先插入到临时表中,每次在插入前,先清空临时表的数据

    B、我们还需要创建一个时间戳配置表,用于存放每次读取的处理完的数据的最后的时间戳。

    C、每次从原表中读取数据时,先查询时间戳配置表,然后就知道了查询原表时的开始时间戳。

    D、根据时间戳读取到原表的数据,插入到临时表中,然后再将临时表中的数据插入到目标表中。

    E、从缓存表中读取出数据的最大时间戳,并且更新到时间戳配置表中。缓存表的作用就是使用sql获取每次读取到的数据的最大的时间戳,当然这些都是完全基于sql语句在kettle中来配置,才需要这样的一张临时表。

    2、    大数据时代下的数据同步

    1)、基于数据库日志(比如mysql的binlog)的同步

    我们都知道很多数据库都支持了主从自动同步,尤其是mysql,可以支持多主多从的模式。那么我们是不是可以利用这种思想呢,答案当然是肯定的,mysql的主从同步的过程是这样的。

    A、master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);

    B、slave将master的binary log events拷贝到它的中继日志(relay log);

    C、slave重做中继日志中的事件,将改变反映它自己的数据。

    阿里巴巴开源的canal就完美的使用这种方式,canal 伪装了一个Slave 去喝Master进行同步。

    A、 canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

    B、 mysql master收到dump请求,开始推送binary log给slave(也就是canal)

    C、 canal解析binary log对象(原始为byte流)

    另外canal 在设计时,特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

    canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample

    canal c# 客户端: https://github.com/dotnetcore/CanalSharp

    canal go客户端: https://github.com/CanalClient/canal-go

    canal php客户端: https://github.com/xingwenge/canal-php、

    github的地址:https://github.com/alibaba/canal/

    另外canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ   https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

    D、在使用canal时,mysql需要开启binlog,并且binlog-format必须为row,可以在mysql的my.cnf文件中增加如下配置

    log-bin=E:/mysql5.5/bin_log/mysql-bin.log
    binlog-format=ROW
    server-id=123、

    E、 部署canal的服务端,配置canal.properties文件,然后 启动 bin/startup.sh 或bin/startup.bat

    #设置要监听的mysql服务器的地址和端口
    canal.instance.master.address = 127.0.0.1:3306
    #设置一个可访问mysql的用户名和密码并具有相应的权限,本示例用户名、密码都为canal
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    #连接的数据库
    canal.instance.defaultDatabaseName =test
    #订阅实例中所有的数据库和表
    canal.instance.filter.regex = .*\\..*
    #连接canal的端口
    canal.port= 11111
    #监听到的数据变更发送的队列
    canal.destinations= example

    F、 客户端开发,在maven中引入canal的依赖

    <dependency>
     <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.0.21</version>
    </dependency>

    代码示例:

    package com.example;
     
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.common.utils.AddressUtils;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.protobuf.InvalidProtocolBufferException;
     
    import java.net.InetSocketAddress;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
     
      
    public class CanalClientExample {
     
        public static void main(String[] args) {
            while (true) {
                //连接canal
                CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");
                connector.connect();
                //订阅 监控的 数据库.表
                connector.subscribe("demo_db.user_tab");
                //一次取10条
                Message msg = connector.getWithoutAck(10);
     
                long batchId = msg.getId();
                int size = msg.getEntries().size();
                if (batchId < 0 || size == 0) {
                    System.out.println("没有消息,休眠5秒");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //
                    CanalEntry.RowChange row = null;
                    for (CanalEntry.Entry entry : msg.getEntries()) {
                        try {
                            row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                            List<CanalEntry.RowData> rowDatasList = row.getRowDatasList();
                            for (CanalEntry.RowData rowdata : rowDatasList) {
                                List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList();
                                Map<String, Object> dataMap = transforListToMap(afterColumnsList);
                                if (row.getEventType() == CanalEntry.EventType.INSERT) {
                                    //具体业务操作
                                    System.out.println(dataMap);
                                } else if (row.getEventType() == CanalEntry.EventType.UPDATE) {
                                    //具体业务操作
                                    System.out.println(dataMap);
                                } else if (row.getEventType() == CanalEntry.EventType.DELETE) {
                                    List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList();
                                    for (CanalEntry.Column column : beforeColumnsList) {
                                        if ("id".equals(column.getName())) {
                                            //具体业务操作
                                            System.out.println("删除的id:" + column.getValue());
                                        }
                                    }
                                } else {
                                    System.out.println("其他操作类型不做处理");
                                }
     
                            }
     
                        } catch (InvalidProtocolBufferException e) {
                            e.printStackTrace();
                        }
                    }
                    //确认消息
                    connector.ack(batchId);
                }
     
     
            }
        }
     
        public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) {
            Map map = new HashMap();
            if (afterColumnsList != null && afterColumnsList.size() > 0) {
                for (CanalEntry.Column column : afterColumnsList) {
                    map.put(column.getName(), column.getValue());
                }
            }
            return map;
        }
     
    }

    2)、基于BulkLoad的数据同步,比如从hive同步数据到hbase

    我们有两种方式可以实现,

    A、 使用spark任务,通过HQl读取数据,然后再通过hbase的Api插入到hbase中

    但是这种做法,效率很低,而且大批量的数据同时插入Hbase,对Hbase的性能影响很大。

    在大数据量的情况下,使用BulkLoad可以快速导入,BulkLoad主要是借用了hbase的存储设计思想,因为hbase本质是存储在hdfs上的一个文件夹,然后底层是以一个个的Hfile存在的。HFile的形式存在。Hfile的路径格式一般是这样的:

    /hbase/data/default(默认是这个,如果hbase的表没有指定命名空间的话,如果指定了,这个就是命名空间的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>

    B、 BulkLoad实现的原理就是按照HFile格式存储数据到HDFS上,生成Hfile可以使用hadoop的MapReduce来实现。如果不是hive中的数据,比如外部的数据,那么我们可以将外部的数据生成文件,然后上传到hdfs中,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。

    当然我们也可以不事先生成hfile,可以使用spark任务直接从hive中读取数据转换成RDD,然后使用HbaseContext的自动生成Hfile文件,部分关键代码如下:

    //将DataFrame转换bulkload需要的RDD格式
        val rddnew = datahiveDF.rdd.map(row => {
          val rowKey = row.getAs[String](rowKeyField)
      
          fields.map(field => {
            val fieldValue = row.getAs[String](field)
            (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
          })
        }).flatMap(array => {
          (array)
        })
    …
    //使用HBaseContext的bulkload生成HFile文件
        hbaseContext.bulkLoad[Put](rddnew.map(record => {
          val put = new Put(record._1)
          record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
          put
        }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
      
        val conn = ConnectionFactory.createConnection(hBaseConf)
        val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
        val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
        val realTable = conn.getTable(hbTableName)
        HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
      
        // bulk load start
        val loader = new LoadIncrementalHFiles(hBaseConf)
        val admin = conn.getAdmin()
        loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
      
        sc.stop()
      }
    …
      def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
        val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
        import scala.collection.JavaConversions._
        for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
          val family = cells.getKey
          for (value <- cells.getValue) {
            val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
            ret.+=((kfq, CellUtil.cloneValue(value)))
          }
        }
        ret.iterator
      }
    }

    C、pg_bulkload的使用

    这是一个支持pg库(PostgreSQL)批量导入的插件工具,它的思想也是通过外部文件加载的方式,这个工具笔者没有亲自去用过,详细的介绍可以参考:https://my.oschina.net/u/3317105/blog/852785   pg_bulkload项目的地址:http://pgfoundry.org/projects/pgbulkload/

    3)、基于sqoop的全量导入

    Sqoop 是hadoop生态中的一个工具,专门用于外部数据导入进入到hdfs中,外部数据导出时,支持很多常见的关系型数据库,也是在大数据中常用的一个数据导出导入的交换工具。

    Sqoop从外部导入数据的流程图如下:

    Sqoop将hdfs中的数据导出的流程如下:

    本质都是用了大数据的数据分布式处理来快速的导入和导出数据。

    4)、HBase中建表,然后Hive中建一个外部表,这样当Hive中写入数据后,HBase中也会同时更新,但是需要注意

    A、hbase中的空cell在hive中会补null

    B、hive和hbase中不匹配的字段会补null

    我们可以在hbase的shell 交互模式下,创建一张hbse表

    create 'bokeyuan','zhangyongqing'

    使用这个命令,我们可以创建一张叫bokeyuan的表,并且里面有一个列族zhangyongqing,hbase创建表时,可以不用指定字段,但是需要指定表名以及列族

    我们可以使用的hbase的put命令插入一些数据

    put 'bokeyuan','001','zhangyongqing:name','robot'
    put 'bokeyuan','001','zhangyongqing:age','20'
    put 'bokeyuan','002','zhangyongqing:name','spring'
    put 'bokeyuan','002','zhangyongqing:age','18'

    可以通过hbase的scan 全表扫描的方式查看我们插入的数据

    scan ' bokeyuan'

    我们继续创建一张hive外部表

    create external table bokeyuan (id int, name string, age int) 
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,user:name,user:age") 
    TBLPROPERTIES("hbase.table.name" = " bokeyuan");

    外部表创建好了后,我们可以使用HQL语句来查询hive中的数据了

    select * from classes;
    OK
    1 robot 20
    2 spring 18

    5)、Debezium+bireme:Debezium for PostgreSQL to Kafka  Debezium也是一个通过监控数据库的日志变化,通过对行级日志的处理来达到数据同步,而且Debezium 可以通过把数据放入到kafka,这样就可以通过消费kafka的数据来达到数据同步的目的。而且还可以给多个地方进行消费使用。

    Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。

    该项目的GitHub地址为:https://github.com/debezium/debezium   这是一个开源的项目。

    本来监控数据库,并且在数据变动的时候获得通知其实一直是一件很复杂的事情。关系型数据库的触发器可以做到,但是只对特定的数据库有效,而且通常只能更新数据库内的状态(无法和外部的进程通信)。一些数据库提供了监控数据变动的API或者框架,但是没有一个标准,每种数据库的实现方式都是不同的,并且需要大量特定的知识和理解特定的代码才能运用。确保以相同的顺序查看和处理所有更改,同时最小化影响数据库仍然非常具有挑战性。

     Debezium正好提供了模块为你做这些复杂的工作。一些模块是通用的,并且能够适用多种数据库管理系统,但在功能和性能方面仍有一些限制。另一些模块是为特定的数据库管理系统定制的,所以他们通常可以更多地利用数据库系统本身的特性来提供更多功能,Debezium提供了对MongoDB,mysql,pg,sqlserver的支持。

    Debezium是一个捕获数据更改(CDC)平台,并且利用Kafka和Kafka Connect实现了自己的持久性、可靠性和容错性。每一个部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一个上游数据库服务器,捕获所有的数据库更改,然后记录到一个或者多个Kafka topic(通常一个数据库表对应一个kafka topic)。Kafka确保所有这些数据更改事件都能够多副本并且总体上有序(Kafka只能保证一个topic的单个分区内有序),这样,更多的客户端可以独立消费同样的数据更改事件而对上游数据库系统造成的影响降到很小(如果N个应用都直接去监控数据库更改,对数据库的压力为N,而用debezium汇报数据库更改事件到kafka,所有的应用都去消费kafka中的消息,可以把对数据库的压力降到1)。另外,客户端可以随时停止消费,然后重启,从上次停止消费的地方接着消费。每个客户端可以自行决定他们是否需要exactly-once或者at-least-once消息交付语义保证,并且所有的数据库或者表的更改事件是按照上游数据库发生的顺序被交付的。

    对于不需要或者不想要这种容错级别、性能、可扩展性、可靠性的应用,他们可以使用内嵌的Debezium connector引擎来直接在应用内部运行connector。这种应用仍需要消费数据库更改事件,但更希望connector直接传递给它,而不是持久化到Kafka里。

    更详细的介绍可以参考:https://www.jianshu.com/p/f86219b1ab98

    bireme 的github 地址  https://github.com/HashDataInc/bireme

    bireme 的介绍:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md

    另外Maxwell也是可以实现MySQL到Kafka的消息中间件,消息格式采用Json:

    Download:
    https://github.com/zendesk/maxwell/releases/download/v1.22.5/maxwell-1.22.5.tar.gz 
    Source:
    https://github.com/zendesk/maxwell 

    6)、datax

    datax 是阿里开源的etl 工具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能,采用java+python进行开发,核心是java语言实现。

    github地址:https://github.com/alibaba/DataX

    A、设计架构:

    数据交换通过DataX进行中转,任何数据源只要和DataX连接上即可以和已实现的任意数据源同步

    B、框架

    核心模块介绍:

    • DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
    • DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
    • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
    • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
    • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

    DataX调度流程:

    举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

    • DataXJob根据分库分表切分成了100个Task。
    • 根据20个并发,DataX计算共需要分配4个TaskGroup。
    • 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

    优势:

    • 每种插件都有自己的数据转换策略,放置数据失真;
    • 提供作业全链路的流量以及数据量运行时监控,包括作业本身状态、数据流量、数据速度、执行进度等。
    • 由于各种原因导致传输报错的脏数据,DataX可以实现精确的过滤、识别、采集、展示,为用户提过多种脏数据处理模式;
    • 精确的速度控制
    • 健壮的容错机制,包括线程内部重试、线程级别重试;

    从插件视角看框架

    • Job:是DataX用来描述从一个源头到目的的同步作业,是DataX数据同步的最小业务单元;
    • Task:为最大化而把Job拆分得到最小的执行单元,进行并发执行;
    • TaskGroup:一组Task集合,在同一个TaskGroupContainer执行下的Task集合称为TaskGroup;
    • JobContainer:Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker;
    • TaskGroupContainer:TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TAskTacker。

        总之,Job拆分为Task,分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。

        物理执行有三种运行模式:

    • Standalone:单进程运行,没有外部依赖;
    • Local:单进程运行,统计信息,错误信息汇报到集中存储;
    • Distrubuted:分布式多线程运行,依赖DataX Service服务;

        总体来说,当JobContainer和TaskGroupContainer运行在同一个进程内的时候就是单机模式,在不同进程执行就是分布式模式。

    如果需要开发插件,可以看zhege这个插件开发指南:   https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md 

    数据源支持情况:

    类型 数据源 Reader(读) Writer(写) 文档
    RDBMS 关系型数据库 MySQL  、
                Oracle         √         √      、
      SQLServer  、
      PostgreSQL  、
      DRDS  、
      通用RDBMS(支持所有关系型数据库)  、
    阿里云数仓数据存储 ODPS  、
      ADS  
      OSS  、
      OCS  、
    NoSQL数据存储 OTS  、
      Hbase0.94  、
      Hbase1.1  、
      Phoenix4.x  、
      Phoenix5.x  、
      MongoDB  、
      Hive  、
    无结构化数据存储 TxtFile  、
      FTP  、
      HDFS  、
      Elasticsearch  
    时间序列数据库 OpenTSDB  
      TSDB  

    7)、OGG

    OGG 一般主要用于Oracle数据库。即Oracle GoldenGate是Oracle的同步工具 ,可以实现两个Oracle数据库之间的数据的同步,也可以实现Oracle数据同步到Kafka,相关的配置操作可以参考如下:

    https://blog.csdn.net/dkl12/article/details/80447154

    https://www.jianshu.com/p/446ed2f267fa

    http://blog.itpub.net/15412087/viewspace-2154644/

    8)、databus

    Databus是一个实时的、可靠的、支持事务的、保持一致性的数据变更抓取系统。 2011年在LinkedIn正式进入生产系统,2013年开源。

    Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更。

    Databus的传输层端到端延迟是微秒级的,每台服务器每秒可以处理数千次数据吞吐变更事件,同时还支持无限回溯能力和丰富的变更订阅功能。

    github:https://github.com/linkedin/databus

    databus架构设计:

    • 来源独立:Databus支持多种数据来源的变更抓取,包括Oracle和MySQL。
    • 可扩展、高度可用:Databus能扩展到支持数千消费者和事务数据来源,同时保持高度可用性。
    • 事务按序提交:Databus能保持来源数据库中的事务完整性,并按照事务分组和来源的提交顺寻交付变更事件。
    • 低延迟、支持多种订阅机制:数据源变更完成后,Databus能在微秒级内将事务提交给消费者。同时,消费者使用Databus中的服务器端过滤功能,可以只获取自己需要的特定数据。
    • 无限回溯:这是Databus最具创新性的组件之一,对消费者支持无限回溯能力。当消费者需要产生数据的完整拷贝时(比如新的搜索索引),它不会对数据库产生任何额外负担,就可以达成目的。当消费者的数据大大落后于来源数据库时,也可以使用该功能。

    Databus Relay中继的功能主要包括:

    • 从Databus来源读取变更行,并在内存缓存内将其序列化为Databus变更事件
    • 监听来自Databus客户端(包括Bootstrap Producer)的请求,并传输新的Databus数据变更事件

    Databus客户端的功能主要包括:

    • 检查Relay上新的数据变更事件,并执行特定业务逻辑的回调
    • 如果落后Relay太多,向Bootstrap Server发起查询
    • 新Databus客户端会向Bootstrap Server发起bootstrap启动查询,然后切换到向中继发起查询,以完成最新的数据变更事件
    • 单一客户端可以处理整个Databus数据流,或者可以成为消费者集群的一部分,其中每个消费者只处理一部分流数据

    Databus Bootstrap Producer的功能有:

    • 检查中继上的新数据变更事件
    • 将变更存储在MySQL数据库中
    • MySQL数据库供Bootstrap和客户端使用

    Databus Bootstrap Server的主要功能,监听来自Databus客户端的请求,并返回长期回溯数据变更事件。

    更多可以参考 databus社区wiki主页:https://github.com/linkedin/Databus/wiki

    Databus和canal的功能对比:

    对比项

     

    Databus

    canal

    结论

    支持的数据库

     

    mysql, oracle

    mysql(据说内部版本支持oracle)

    Databus目前支持的数据源更多

    业务开发

     

    业务只需要实现事件处理接口

    事件处理外,需要处理ack/rollback,

    反序列化异常等

    Databus开发接口用户友好度更高

    服务模型

     relay

    relay可以同时服务多个client

    一个server instance只能服务一个client

    (受限于server端保存拉取位点)

    Databus服务模式更灵活

     

    client

    client可以拉取多个relay的变更,

    访问的relay可以指定拉取某些表某些分片的变更

    client只能从一个server拉取变更,

    而且只能是拉取全量的变更

    可扩展性

     

    client可以线性扩展,处理能力也能线性扩展

    (Databus可识别pk,自动做数据分片)

    client无法扩展

    Databus扩展性更好

    可用性

    client ha

    client支持cluster模式,每个client处理一部分数据,

    某个client挂掉,其他client自动接管对应分片数据

    主备client模式,主client消费,

    如果主client挂掉,备client可自动接管

    Databus实时热备方案更成熟

     

    relay/server ha

    多个relay可连接到同一个数据库,

    client可以配置多个relay,relay故障启动切换

    主备relay模式,relay通过zk进行failover

    canal主备模式对数据库影响更小

     

    故障对上游

    数据库的影响

    client故障,bootstrap会继续拉取变更,

    client恢复后直接从bootstrap拉取历史变更

    client故障会阻塞server拉取变更,

    client恢复会导致server瞬时从数据库拉取大量变更

    Databus本身的故障对数据库影响几乎为0

    系统状态监控

     

    程序通过http接口将运行状态暴露给外部

    暂无

    Databus程序可监控性更好

    开发语言

     

    java,核心代码16w,测试代码6w

    java,4.2w核心代码,6k测试代码

    Databus项目更成熟,当然学习成本也更大

    9)、gobblin

    Gobblin是用来整合各种数据源的通用型ETL框架,在某种意义上,各种数据都可以在这里“一站式”的解决ETL整个过程,专为大数据采集而生,易于操作和监控,提供流式抽取支持。主要用于Kafka的数据同步到HDFS。

    该框架来源于kafka的东家LinkedIn。大体的架构如下:

    Gobblin的功能真的是非常的全。底层支持三种部署方式,分别是standalone,mapreduce,mapreduce on yarn。可以方便快捷的与Hadoop进行集成,上层有运行时任务调度和状态管理层,可以与Oozie,Azkaban进行整合,同时也支持使用Quartz来调度(standalone模式默认使用Quartz进行调度)。对于失败的任务还拥有多种级别的重试机制,可以充分满足我们的需求。再上层呢就是由6大组件组成的执行单元了。这6大组件的设计也正是Gobblin高度可扩展的原因。

    Gobblin组件

    Gobblin提供了6个不同的组件接口,因此易于扩展并进行定制化开发。分别是:

    • source
    • extractor
    • convertor
    • quality checker
    • writer
    • publisher

    Source主要负责将源数据整合到一系列workunits中,并指出对应的extractor是什么。这有点类似于Hadoop的InputFormat。

    Extractor则通过workunit指定数据源的信息,例如kafka,指出topic中每个partition的起始offset,用于本次抽取使用。Gobblin使用了watermark的概念,记录每次抽取的数据的起始位置信息。

    Converter顾名思义是转换器的意思,即对抽取的数据进行一些过滤、转换操作,例如将byte arrays 或者JSON格式的数据转换为需要输出的格式。转换操作也可以将一条数据映射成0条或多条数据(类似于flatmap操作)。

    Quality Checker即质量检测器,有2中类型的checker:record-level和task-level的策略。通过手动策略或可选的策略,将被check的数据输出到外部文件或者给出warning。

    Writer就是把导出的数据写出,但是这里并不是直接写出到output file,而是写到一个缓冲路径( staging directory)中。当所有的数据被写完后,才写到输出路径以便被publisher发布。Sink的路径可以包括HDFS或者kafka或者S3中,而格式可以是Avro,Parquet,或者CSV格式。同时Writer也可是根据时间戳,将输出的文件输出到按照“小时”或者“天”命名的目录中。

    Publisher就是根据writer写出的路径,将数据输出到最终的路径。同时其提供2种提交机制:完全提交和部分提交;如果是完全提交,则需要等到task成功后才pub,如果是部分提交模式,则当task失败时,有部分在staging directory的数据已经被pub到输出路径了。

    Gobblin执行流程

    Job被创建后,Runtime就根据Job的部署方式进行执行。Runtime负责job/task的定时执行,状态管理,错误处理以及失败重试,监控和报告等工作。Gobblin存在分支的概念,从数据源获取的数据由不同的分支进行处理。每个分支都可以有自己的Converter,Quality Checker,Writer和Publisher。因此各个分支可以按不同的结构发布到不同的目标地址。单个分支任务失败不会影响其他分支。 同时每一次Job的执行都会将结果持久化到文件( SequenceFiles)中,以便下一次执行时可以读到上次执行的位置信息(例如offset),本次执行可以从上次offset开始执行本次Job。状态的存储会被定期清理,以免出现存储无限增长的情况。

     Gobblin详情参考:http://www.imooc.com/article/78811

    github源码:https://github.com/apache/incubator-gobblin

    10)、MongoShake

    MongoShake是阿里巴巴Nosql团队开源出来的一个项目,主要用于mongdb的数据同步到kafka或者其他的mongdb数据库中,MongoShake是一个以golang语言进行编写的通用的平台型服务,通过读取MongoDB集群的Oplog操作日志,对MongoDB的数据进行复制,后续通过操作日志实现特定需求。日志可以提供很多场景化的应用,为此,我们在设计时就考虑了把MongoShake做成通用的平台型服务。通过操作日志,我们提供日志数据订阅消费PUB/SUB功能,可通过SDK、Kafka、MetaQ等方式灵活对接以适应不同场景(如日志订阅、数据中心同步、Cache异步淘汰等)。集群数据同步是其中核心应用场景,通过抓取oplog后进行回放达到同步目的,实现灾备和多活的业务场景。

    整体的架构图如下:

    应用场景举例

        1.  MongoDB集群间数据的异步复制,免去业务双写开销。

        2.  MongoDB集群间数据的镜像备份(当前1.0开源版本支持受限)

        3.  日志离线分析

        4.  日志订阅

        5.  数据路由。根据业务需求,结合日志订阅和过滤机制,可以获取关注的数据,达到数据路由的功能。

        6.  Cache同步。日志分析的结果,知道哪些Cache可以被淘汰,哪些Cache可以进行预加载,反向推动Cache的更新。

        7.  基于日志的集群监控

    功能介绍

    MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。现有通道类型有:

        1.  Direct:直接写入目的MongoDB

        2.  RPC:通过net/rpc方式连接

        3.  TCP:通过tcp方式连接

        4.  File:通过文件方式对接

        5.  Kafka:通过Kafka方式对接

        6.  Mock:用于测试,不写入tunnel,抛弃所有数据

    数据同步的架构如下图所示

    更多详细介绍可以参考官方提供的中文介绍文档:https://yq.aliyun.com/articles/603329

    总结:

    1、databus活跃度不高,datax和canal 相对比较活跃。

    2、datax 一般比较适合于全量数据同步,对全量数据同步效率很高(任务可以拆分,并发同步,所以效率高),对于增量数据同步支持的不太好(可以依靠时间戳+定时调度来实现,但是不能做到实时,延迟较大)。

    3、canal 、databus 等由于是通过日志抓取的方式进行同步,所以对增量同步支持的比较好。

    4、以上这些工具都缺少一个监控和任务配置调度管理的平台来进行支撑。

    作者的原创文章,转载须注明出处。原创文章归作者所有,欢迎转载,但是保留版权。对于转载了博主的原创文章,不标注出处的,作者将依法追究版权,请尊重作者的成果。

     

    展开全文
  • 工作总结7 大数据同步解决方案

    千次阅读 2014-04-09 19:28:53
    账套间大数据同步解决方案

    账套间大数据量同步的解决方案

         在忙于其他项目的开发过程中,突然有一家客户反馈一个大数据量问题,当物品基础数据等大数据同步到另一个账套时,系统就卡死,崩溃啦。

    我拿到用户的数据环境,用Visual GC监测工具,监测了一看皱眉


        整个的伊甸园内存区和旧时代内存区都爆满,jvm直接卡死,tomcat直接崩溃

        在同步的时候,瞬间内存快速爆满,当时把我吓了一跳。

        开始的时候,想当然的认为加大内存就可以解决此问题,于是jvm的最大内存由原来的1024M,扩到1500M,内存还是直接爆满。

        最好用Visual VM监测到,在同步的时候,产生了大量的实例对象,GC来不及回收,直接耗尽整个内存。

        于是确定,可能是程序出了问题。

        于是,开始从程序一步一步的查找,开始的第一个方法,就是查询物品的数据量比较大4.7W多条,直接耗尽了可怜的本身就不大的内存哭。通过在群里,开发人员的讨论和建议,对于大数据量的查询,我做了分页形式的批量查询,然后批量处理数据,批量同步数据。

        但是,再同步的过程中,会产生逻辑问题,为了不改变现有的业务逻辑和数据问题,我把处理后的数据,一次同步过去,监控了下同步的过程中,系统还是可以承受的。

        可是,同步的过程中,时间问题产生了,同步一次,大约要至少3个小时,这是多么可怕的事情,我又遇到了同步的时间问题,我在思考究竟哪里消耗这么长的时间,于是又重新回归程序的定位,查找,看了一遍代码后,看到代码中很多不足之处,进行了一一优化,最终发现了有个xml文件读取,解析放到了循环里面,导致耗费大部分时间。设置了一个全局变量,值解析一次xml文件,然后循环过程中,都从这个变量里去取相应的配置信息。

        通过测试以后,时间终于降下来了,现在时间大约10分钟左右,这令我相当的振奋不已。

        不过,在最后同步完数据的后处理时代,内存很快增高不少,于是定位到更新处理代码段的查询,也做了分页查询处理。眨眼

        另外,进行同步的过程中,发现JVM内存还是不是很稳定,总是升高,不往下降,为了保持内存跟趋于稳定,我每2W条的数据处理,就做一下 system.gc()垃圾回收,虽然会占用一点时间,但是对整体时间效率上还是可以接受的,这样做保证了系统趋于稳定。

        在往数据库同步的过程中,也做了基本的垃圾回收处理。

        优化后的处理结果如下图:大笑

        由于同步的业务逻辑校验,处理等都要耗费时间,在时间效率上还是稍微差点,但是整体上应该还是满足客户使用的。

        为了不使当前的页面卡死,把原来的同步提交,改成了异步提交处理。

        为了使同步的日志,加载到页面不至于卡死,改成了页面下载的模式,下载下来直接在txt文件里查看。

        以上,就是我做得一些基本优化。吐舌头


    展开全文
  • 最近两个项目之间的需要做数据同步,要求是A项目的数据以及图片文件同步到B项目,首先两个项目都是独立的采集录入信息的项目,数据库数据同步不说了,还有一些图片等文件也得需要同步, 首先想到的是B项目调用A...

    最近两个项目之间的需要做数据同步,要求是A项目的数据以及图片文件要同步到B项目,首先两个项目都是独立的采集录入信息的项目,数据库数据同步不说了,还有一些图片等文件也得需要同步, 首先想到的是B项目调用A项目的图片路径,因为两个项目的图片文件路径有很多种生成情况,两项目都在运行中,这种图片路径方法就排除了。 后面就想到了两种解决方法:

    1 socket 方法 就是在B项目添加侦听 B项目启动时候start, 在A项目中用Client 。

    2 在A项目中通过HttpURLConnection模拟post表单提交到B项目对应方法。(个人感觉这种比较好)

    HttpURLConnection:

    package test.httpUp;
    
    import java.io.ByteArrayOutputStream;
    import java.io.DataOutputStream;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.net.HttpURLConnection;
    import java.net.SocketTimeoutException;
    import java.net.URL;
    import java.net.URLEncoder;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;
    
    import javax.imageio.ImageIO;
    import javax.imageio.ImageReader;
    import javax.imageio.stream.ImageInputStream;
    
    public class HttpPostUtil {
    	
    
    	URL url;
    	HttpURLConnection conn;
    	String boundary = "--------http";
    	Map<String, String> textParams = new HashMap<String, String>();
    	Map<String, File> fileparams = new HashMap<String, File>();
    	DataOutputStream ds;
    
    	public HttpPostUtil(String url) throws Exception {
    		this.url = new URL(url);
    	}
        //重新设置要请求的服务器地址,即上传文件的地址。
    	public void setUrl(String url) throws Exception {
    		this.url = new URL(url);
    	}
        //增加一个普通字符串数据到form表单数据中
    	public void addTextParameter(String name, String value) {
    		textParams.put(name, value);
    	}
        //增加一个文件到form表单数据中
    	public void addFileParameter(String name, File value) {
    		fileparams.put(name, value);
    	}
        // 清空所有已添加的form表单数据
    	public void clearAllParameters() {
    		textParams.clear();
    		fileparams.clear();
    	}
        // 发送数据到服务器,返回一个字节包含服务器的返回结果的数组
    	public byte[] send() throws Exception {
    		initConnection();
    		try {
    			conn.connect();
    		} catch (SocketTimeoutException e) {
    			// something
    			throw new RuntimeException();
    		}
    		ds = new DataOutputStream(conn.getOutputStream());
    		writeFileParams();
    		writeStringParams();
    		paramsEnd();
    		InputStream in = conn.getInputStream();
    		ByteArrayOutputStream out = new ByteArrayOutputStream();
    		int b;
    		while ((b = in.read()) != -1) {
    			out.write(b);
    		}
    		conn.disconnect();
    		return out.toByteArray();
    	}
        //文件上传的connection的一些必须设置
    	private void initConnection() throws Exception {
    		conn = (HttpURLConnection) this.url.openConnection();
    		
    		conn.setDoOutput(true);
    		conn.setUseCaches(false);
    		conn.setConnectTimeout(10000); //连接超时为10秒
    		conn.setRequestMethod("POST");
    		conn.setRequestProperty("Content-Type",
    				"multipart/form-data; boundary=" + boundary);
    	}
        //普通字符串数据
    	private void writeStringParams() throws Exception {
    		Set<String> keySet = textParams.keySet();
    		for (Iterator<String> it = keySet.iterator(); it.hasNext();) {
    			String name = it.next();
    			String value = textParams.get(name);
    			ds.writeBytes("--" + boundary + "\r\n");
    			ds.writeBytes("Content-Disposition: form-data; name=\"" + name
    					+ "\"\r\n");
    			ds.writeBytes("\r\n");
    			ds.writeBytes(encode(value) + "\r\n");
    		}
    	}
        //文件数据
    	private void writeFileParams() throws Exception {
    		Set<String> keySet = fileparams.keySet();
    		for (Iterator<String> it = keySet.iterator(); it.hasNext();) {
    			String name = it.next();
    			File value = fileparams.get(name);
    			ds.writeBytes("--" + boundary + "\r\n");
    			ds.writeBytes("Content-Disposition: form-data; name=\"" + name
    					+ "\"; filename=\"" + encode(value.getName()) + "\"\r\n");
    			ds.writeBytes("Content-Type: " + getContentType(value) + "\r\n");
    			ds.writeBytes("\r\n");
    			ds.write(getBytes(value));
    			ds.writeBytes("\r\n");
    		}
    	}
        //获取文件的上传类型,图片格式为image/png,image/jpg等。非图片为application/octet-stream
    	private String getContentType(File f) throws Exception {
    		
    //		return "application/octet-stream";  // 此行不再细分是否为图片,全部作为application/octet-stream 类型
    		ImageInputStream imagein = ImageIO.createImageInputStream(f);
    		if (imagein == null) {
    			return "application/octet-stream";
    		}
    		Iterator<ImageReader> it = ImageIO.getImageReaders(imagein);
    		if (!it.hasNext()) {
    			imagein.close();
    			return "application/octet-stream";
    		}
    		imagein.close();
    		return "image/" + it.next().getFormatName().toLowerCase();//将FormatName返回的值转换成小写,默认为大写
    
    	}
        //把文件转换成字节数组
    	private byte[] getBytes(File f) throws Exception {
    		FileInputStream in = new FileInputStream(f);
    		ByteArrayOutputStream out = new ByteArrayOutputStream();
    		byte[] b = new byte[1024];
    		int n;
    		while ((n = in.read(b)) != -1) {
    			out.write(b, 0, n);
    		}
    		in.close();
    		return out.toByteArray();
    	}
    	//添加结尾数据
    	private void paramsEnd() throws Exception {
    		ds.writeBytes("--" + boundary + "--" + "\r\n");
    		ds.writeBytes("\r\n");
    	}
    	// 对包含中文的字符串进行转码,此为UTF-8。服务器那边要进行一次解码
        private String encode(String value) throws Exception{
        	return URLEncoder.encode(value, "UTF-8");
        }
    	public static void main(String[] args) throws Exception {
    		HttpPostUtil u = new HttpPostUtil("http://xxxxxx.action");
    		u.addTextParameter("path", "qs/");
    		u.addTextParameter("fileName", "111.jpg");
    		u.addFileParameter("picFile", new File(
    				"D://272.jpg"));
    		u.addTextParameter("text", "utf-8");
    		byte[] b = u.send();
    		String result = new String(b);
    		System.out.println(result);
    
    	}
    
    }
    
    =====================
    socket   client

    package test.socket;
    
    import java.io.DataOutputStream;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.net.URL;
    
    /**
     * 文件发送客户端主程序
     * @author admin_Hzw
     *
     */
    public class BxClient {
    	
    	/**
    	 * 程序main方法
    	 * @param args
    	 * @throws IOException
    	 */
    	public static void main(String[] args) throws IOException {
    		
    		int length = 0;
    		double sumL = 0 ;
    		byte[] sendBytes = null;
    		Socket socket = null;
    		DataOutputStream dos = null;
    		FileInputStream fis = null;
    		boolean bool = false;
    		try {
    			File file = new File("d:/272.jpg"); //要传输的文件路径
    			long l = file.length(); 
    			socket = new Socket();  
    			socket.connect(new InetSocketAddress("192.168.1.110", 8877));
    			dos = new DataOutputStream(socket.getOutputStream());
    			fis = new FileInputStream(file);      
    			sendBytes = new byte[1024];  
    			while ((length = fis.read(sendBytes, 0, sendBytes.length)) > 0) {
    				sumL += length;  
    				System.out.println("已传输:"+((sumL/l)*100)+"%");
    				dos.write(sendBytes, 0, length);
    				dos.flush();
    			} 
    			//虽然数据类型不同,但JAVA会自动转换成相同数据类型后在做比较
    			if(sumL==l){
    				bool = true;
    			}
    		}catch (Exception e) {
    			System.out.println("客户端文件传输异常");
    			bool = false;
    			e.printStackTrace();  
    		} finally{  
    			if (dos != null)
    				dos.close();
    			if (fis != null)
    				fis.close();   
    			if (socket != null)
    				socket.close();    
    		}
    		System.out.println(bool?"成功":"失败");
    	}
    }
    socket server

    web.xml

     <listener>  
      <listener-class>XXX.web.listener.SocketServiceLoader</listener-class>  
    </listener>  

    
    
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    
    public class SocketServiceLoader implements ServletContextListener { //socket server 线程  
        private SocketThread socketThread;  
          
        @Override  
        public void contextDestroyed(ServletContextEvent arg0) {  
            if(null!=socketThread && !socketThread.isInterrupted())  
            {  
             socketThread.closeSocketServer();  
             socketThread.interrupt();  
            }  
     }  
      
        @Override  
        public void contextInitialized(ServletContextEvent arg0) {  
            // TODO Auto-generated method stub  
            if(null==socketThread)  
            {  
             //新建线程类  
             socketThread=new SocketThread(null);  
             //启动线程  
             socketThread.start();  
            }  
        }  }
    

    
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class SocketThread extends Thread{  
        private ServerSocket serverSocket = null;  
          
        public SocketThread(ServerSocket serverScoket){  
            try {  
                if(null == serverSocket){  
                    this.serverSocket = new ServerSocket(8877);  
                    System.out.println("socket start");  
                }  
            } catch (Exception e) {  
                System.out.println("SocketThread创建socket服务出错");  
                e.printStackTrace();  
            }  
      
        }  
          
        public void run(){  
            while(!this.isInterrupted()){  
                try {  
                    Socket socket = serverSocket.accept();  
                      
                    if(null != socket && !socket.isClosed()){     
                        //处理接受的数据  
                        new SocketOperate(socket).start();  
                    }  
                    socket.setSoTimeout(30000);  
                      
                }catch (Exception e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
          
          
        public void closeSocketServer(){  
           try {  
                if(null!=serverSocket && !serverSocket.isClosed())  
                {  
                 serverSocket.close();  
                }  
           } catch (IOException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
           }  
         }  
          
          
    }
    

    
    import java.io.DataInputStream;
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.net.Socket;
    import java.util.Random;
    public class SocketOperate extends Thread{  
        private Socket socket;  
          
        public SocketOperate(Socket socket) {  
           this.socket=socket;  
        }  
        @SuppressWarnings("unused")  
       public void run()  
        {  
            try{   
    			System.out.println("开始监听...");
    			/*
    			 * 如果没有访问它会自动等待
    			 */
    			System.out.println("有链接");
    			receiveFile(socket);
    		} catch (Exception e) {
    			System.out.println("服务器异常");
    			e.printStackTrace();
    		}   
        } 
        
        
        /*public void run() {
    	}*/
    
    	/**
    	 * 接收文件方法
    	 * @param socket
    	 * @throws IOException
    	 */
    	public static void receiveFile(Socket socket) throws IOException {
    		byte[] inputByte = null;
    		int length = 0;
    		DataInputStream dis = null;
    		FileOutputStream fos = null;
    		String filePath = "D:/temp/"+new Random().nextInt(10000)+".jpg";
    		try {
    			try {
    				dis = new DataInputStream(socket.getInputStream());
    				File f = new File("D:/temp");
    				if(!f.exists()){
    					f.mkdir();  
    				}
    				/*  
    				 * 文件存储位置  
    				 */
    				fos = new FileOutputStream(new File(filePath));    
    				inputByte = new byte[1024];   
    				System.out.println("开始接收数据...");  
    				while ((length = dis.read(inputByte, 0, inputByte.length)) > 0) {
    					fos.write(inputByte, 0, length);
    					fos.flush();    
    				}
    				System.out.println("完成接收:"+filePath);
    			} finally {
    				if (fos != null)
    					fos.close();
    				if (dis != null)
    					dis.close();
    				if (socket != null)
    					socket.close(); 
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }  
    



    展开全文
  • 项目开发阶段遇到一个需求,描述大致就是同一个用户在A系统数据库保存的数据信息与在B系统数据库保存的数据信息要保持同步。当A系统用户修改了个人信息,A系统后台在将用户修改后的信息入库的同时也会向B系统发送...
  • 基于文件的离线数据同步方案

    千次阅读 2015-02-04 00:51:22
    一种基于文件的离线APP数据同步方案
  • 说明:由于本人的实际情况是不能修改线上对数据引擎的支持,并且只是为了同步部分表,因此没必要将两个库做主从,因此采用以下的方式进行解决对于跨服务器同步增量导数据的问题,本可以使用:select * into outfile ...
  • 数据量下高并发同步解决方案

    万次阅读 2016-07-04 11:52:32
    数据量下高并发同步的讲解(不看,保证你后悔)    对于我们开发的网站,如果网站的访问量非常大的话,那么我们就需要考虑相关的并发访问问题了。而并发问题是绝大部分的程序员头疼的问题, 但话又说回来了...
  • 解决方案:使用jquery请求后台 步骤1:安装jquery $ npm install jquery --save 步骤2:修改配置文件vue.config.js 步骤3:配置完成后,在需要组件使用import $ from ‘jquery’; 就可以使用! jquery使用案列: $....
  • vue axios同步请求解决方案

    万次阅读 2017-09-28 21:53:01
    在vue项目里面,需要循环发送ajax请求,出现的问题就是循环结束,第一次服务器还没返回,导致数据处理错误,需要使用同步请求解决方案目前没有发现axios可以同步请求,所以只能使用jQuery,配置同步请求;...
  • MySQL 实时同步 解决方案

    千次阅读 2021-02-16 00:11:45
    目录 1 需求概述 2 技术原理 3 MySQL环境配置 ...将MySQL5.6生产库多张表的数据实时同步到Oracle11g数据仓库,MySQL历史数据700G,平均每天产生50G左右日志文件,MySQL日志空间50G,超过后滚动删除日志文件
  • 目录 一、 如何将被搜索的数据在ES上创建反向索引 二、 Java代码如何与ES交互 ElasticSearch作为搜索引擎,我们需要解决2大问题: 一、 如何将被搜索的数据在ES上... 1.2、如何同步增量数据 第二个大问题也...
  • kettle实现数据增量同步方案

    千次阅读 2019-12-12 14:37:29
    我司目前数据库之间的数据同步都是oracle goldengate(ogg)方案,该方案的特点: 优点: 基于数据库的变更日志同步(oracle redo\mysql binlog),速度很快,对数据库性能影响很小,适合大量数据同步的场景 缺点: ...
  • 音视频同步解决方案

    千次阅读 2014-03-02 21:33:19
    一、音视频同步问题概述: 音视频同步问题是可视对讲中的重点需要解决的问题之一,也是一直以来被模拟门禁产品厂商攻击的一个弱点,因为模拟可视对讲产品都采用专线传输,不存在这个 问题。解决同步问题的方法有很多...
  • 最近写一个出版社的投标文件,其中涉及到线下ERP系统需要与淘宝店铺库存进行实时同步,所以将当时的解决方案和大家分享下,也方便以后查阅使用。   项目需求   目前此项目存在库存不统一的问题,随着信息时代...
  • 需要跨网络:从阿里云服务器上的数据库,通过网闸使用ftp传文件的方式,将数据同步到业主的专网中;阿里云跟业主专网不能直连; 定时数据增量同步,具体同步哪些表,需要可配置; 节约工作量,最大限度上不改变...
  • 一、网站应用背景开发一个网站的...当问题的规模在经济条件下通过堆硬件的方式解决不了的时候,我们应该通过其他的思路去解决问题,互联网发展至今,已经提供了很多成熟的解决方案,但并不是都具有适用性,你把淘宝的技
  • 在XMLHttpRequest2里支持二进制数据的下载了,现分别以同步和异步两种方式分别介绍。 异步的方式下载: xmlRequest.open("GET", "0.jpg", true); xmlRequest.responseType = "blob";//这里是关键,它指明...
  • 业务场景描述: 在我们的项目中有些配置信息持久化在数据库中,这些配置信息又是在系统启动后自动加载并缓存在local或者...分布式集群的项目之间同步数据,我们来讲一下有哪些方案解决方案一:项目程序中对DB进行操
  • 运维基本功(三):Linux文件管理 运维基本功(四):Linux文件管理-Vim编辑器概述 运维基本功(五):Linux文件管理-用户管理 运维基本功(六):Linux用户管理-远程管理 运维基本功(七):Linux的权限管理操作 ...
  • 数据同步处理方案

    万次阅读 2014-05-28 10:27:28
     和我所维护的系统的batch处理方式一模一样。我们系统现在是csv35万条数据,也是遇到了处理速度的瓶颈,我是ERP。...②建临时表是很好的解决方案,不知你们为什么不允许,反正我们允许,效果也很
  • JAVA多线程之间实现同步+多线程并发同步解决方案

    万次阅读 多人点赞 2018-03-04 14:09:15
     当多个线程同时共享同一个全局变量或静态变量,做写的操作(修改变量值)时,可能会发生数据冲突问题,也就是线程安全问题。但是做读操作时不会发生数据冲突问题。案例:需求现在有100张火车票,有两个窗口同时抢...
  • 1)、基于数据库日志(比如mysql的binlog)的同步 (canal) 我们都知道很多数据库都支持了主从自动同步,尤其是mysql,可以支持多主多从的模式。那么我们是不是可以利用这种思想呢,答案当然是肯定的,mysql的主从...
  • 基于rsync的文件增量同步方案

    千次阅读 2017-04-27 18:21:00
    应该有上传、下载、创建、删除等动作,但在本文的叙述中,主要关注文件内容的传输,即上传、下载),如何快速高效地进行文件同步,就成了云盘亟需解决的技术难题。本文阐述的方案就是在这种场景下提出来的,我们希望...
  • 不同业务场景下数据同步方案设计

    千次阅读 2018-09-06 23:41:09
    在搜索数据前,我们需要先将数据从关系型数据库中同步至搜索引擎中。因此,整个业务搜索过程包含两个阶段,第一阶段,将数据从关系型数据库同步至搜索引擎;第二阶段,从搜索引擎搜索数据,并返回至...
  • 数据自动备份解决方案

    千次阅读 2011-10-21 00:51:21
    1:网盘自动备份(隔离备份)隔离备份介绍:直接在网盘内建立项目、文件进行稿写操作很可能会与网盘数据同步导致数据丢失完整性,对文件造成损坏,所以这种方式是不可取的。因此采用隔离备份,所谓隔离备份就是在A...
  • 谷歌浏览器无法同步问题解决方案

    万次阅读 2018-10-09 20:58:39
    问题 本人在VMware中使用google chrome,登陆谷歌账号时报错,无法同步(已出...解决方案 打开 chrome://flags/#account-consistency,把高亮的设置改为 disable,重启 Chrome 成功导入后可在将其设置为default ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 233,603
精华内容 93,441
关键字:

数据文件同步解决方案