精华内容
下载资源
问答
  • canal是阿里巴巴mysql数据库binlog的增量订阅消费组件。 工作原理 全量 暴露的Http接口(接口定义见 ),待调用后开启后台线程,通过主键分批同步指定数据库中数据到Elasticsearch 读取数据库会加读锁主键必须为...
  • 【ElasticSearch】进阶电商检索实战必备:进阶电商检索实战必备:分词-同义词-联想词-增量同步-全量同步-持续更新】ElasticSearch电商检索实战必备---持续更新1、ES中的几个重要概念:集群、节点、文档、索引、分片...

    ElasticSearch电商检索实战必备—持续更新


    下面是大体思路,本人会持续更新,有问题请留言或者私信。


    1、ES中的几个重要概念:集群、节点、文档、索引、分片及插件ik分词器

    2、商品检索模糊匹配、过滤、排序、分页、高亮及聚合分析

    3、es中商品数据的增删改查,canal增量同步

    4、nested 内嵌属性的保存,使用script进行增删改查操作

    5、搜索联想词suggestor的使用,汉语拼音插件的安装

    6、同义词分析检索,同义词插件的安装及词典的更新与维护

    7、商品数据全量同步到es服务器利器:BulkProcessor

    展开全文
  • 增量同步和全量同步是数据库同步的两种方式。 全量同步是一次性同步全部数据,增量同步则只同步两个数据库不同的部分。 多表同步大家肯定都会想用最省事的方法,比如就建立一个公共的Json模板,将读库(reader)和写...

    简介

    本文主要讲解DataX的全量和增量同步实现方式,有具体代码可参考。增量同步时,将日志按天写入日志文件中

    增量同步和全量同步是数据库同步的两种方式。

    全量同步是一次性同步全部数据,增量同步则只同步两个数据库不同的部分。

    多表同步大家肯定都会想用最省事的方法,比如就建立一个公共的Json模板,将读库(reader)和写库(writer)的连接地址、端口、账号、密码、表名都动态传入,然后字段用*号代替。那博主就告诉你,后续出错和维护的坑你得走一遍了,并不是说不行,具体还是看业务场景来。

    避免大家踩坑,这里的多表同步会采用脚本进行动态参数传入,建议每个表对应一个Json文件,每个字段都单独写,并且字段用`符号包起来(如果某个字段是MySQL关键字会报错),不要嫌麻烦,不然后面出问题了问题更大,你还得重来一遍,如果涉及几十上百张表,你会懂那个痛苦的,频繁的Ctrl+C和Ctrl+V让你怀疑人生,不多说了,都是泪 。

    在这里插入图片描述

    话不多说,直接开撸


    一、全量同步

    1.示例Shell脚本文件:allSyncTask.sh
    #!/bin/bash
    . /etc/profile
    # 读库的IP
    r_ip="127.0.0.1"
    # 读库的端口
    r_port="3306"
    # 读库的数据库名称
    r_dbname="datax"
    # 读库的账号
    r_username="root"
    # 读库的密码
    r_password="123456"
    # 写库的IP
    w_ip="127.0.0.1"
    # 写库的端口
    w_port="3306"
    # 写库的数据库名称
    w_dbname="datax2"
    # 写库的账号
    w_username="root"
    # 写库的密码
    w_password="123456"
    # DataX全量同步(多个文件直接写多个执行命令)
    python /opt/datax/bin/datax.py /opt/datax/job/table1.json -p "-Dr_ip=$r_ip -Dr_port=$r_port -Dr_dbname=$r_dbname -Dr_username=$r_username -Dr_password=$r_password -Dw_ip=$w_ip -Dw_port=$w_port -Dw_dbname=$w_dbname -Dw_username=$w_username -Dw_password=$w_password"
    python /opt/datax/bin/datax.py /opt/datax/job/table2.json -p "-Dr_ip=$r_ip -Dr_port=$r_port -Dr_dbname=$r_dbname -Dr_username=$r_username -Dr_password=$r_password -Dw_ip=$w_ip -Dw_port=$w_port -Dw_dbname=$w_dbname -Dw_username=$w_username -Dw_password=$w_password"
    

    2.示例Json文件(这里随便写一个,大家当做模板参考就行):table1.json

    因为是多表同步,会执行多个文件,我这里的channel参数设置为10。

    {
        "job": {
    		"setting": {
                "speed": {
                    "channel": 10
                },
    			"errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader", 
                        "parameter": {
                            "column": [
    							"`id`",
    							"`name`"
    						], 
                            "connection": [
                                {
                                    "jdbcUrl": ["jdbc:mysql://${r_ip}:${r_port}/${r_dbname}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull"],
                                    "table": ["tabel1"]
                                }
                            ], 
    						"username": "${r_username}",
                            "password": "${r_password}"
                        }
                    }, 
                    "writer": {
                        "name": "mysqlwriter", 
                        "parameter": {
    						"writeMode": "update",
                            "column": [
                                "`id`",
    							"`name`"
                            ],
                            "session": [
                            	"set session sql_mode='ANSI'"
                            ],
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:mysql://${w_ip}:${w_port}/${w_dbname}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull", 
                                    "table": ["tabel1"]
                                }
                            ],
    						"username": "${w_username}",
                            "password": "${w_password}"
                        }
                    }
                }
            ]
        }
    }
    

    3.运行脚本即可

    运行脚本之前先将放入的脚本进行授权,否则不能执行,对文件进行最高级别授权命令:

    chmod 777 allSyncTask.sh
    

    allSyncTask.sh放入指定文件夹,运行命令:

    ./allSyncTask.sh
    

    至此全量同步完成


    二、定时增量同步

    相比全量同步,增量同步增加了用某个字段来判断改条数据是否被更新,具体体现在 Shell 脚本和 Json 文件中,下面会提到。


    1.准备工作:

    给需要同步的表增加用于增量同步判断的字段,可用ID也可以用时间,个人建议用时间,更方便。

    给读库的table1表加上实时更新的时间记录字段(可根据需要是否给写库也加上,我这里是加上的/font>):

    alter table table1 add column `curr_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP  COMMENT '最后更新时间(DataX数据采集使用)';
    

    具体可参考我的其他博客的第二点:MySQL | 设置时间字段为自动更新(CURRENT_TIMESTAMP).


    2.示例Shell脚本文件(相比于全量同步,这里使用了当前时间做条件传入):incrSyncTask.sh
    #!/bin/bash
    . /etc/profile
    # 当前时间(用于增量同步判断条件)
    curr_time=$(date +%Y-%m-%d)
    # 读库的IP
    r_ip="127.0.0.1"
    # 读库的端口
    r_port="3306"
    # 读库的数据库名称
    r_dbname="datax"
    # 读库的账号
    r_username="root"
    # 读库的密码
    r_password="123456"
    # 写库的IP
    w_ip="127.0.0.1"
    # 写库的端口
    w_port="3306"
    # 写库的数据库名称
    w_dbname="datax2"
    # 写库的账号
    w_username="root"
    # 写库的密码
    w_password="123456"
    # DataX全量同步(多个文件直接写多个执行命令)
    python /opt/datax/bin/datax.py /opt/datax/job/incr_table1.json -p "-Dcurr_time=$curr_time -Dr_ip=$r_ip -Dr_port=$r_port -Dr_dbname=$r_dbname -Dr_username=$r_username -Dr_password=$r_password -Dw_ip=$w_ip -Dw_port=$w_port -Dw_dbname=$w_dbname -Dw_username=$w_username -Dw_password=$w_password"
    python /opt/datax/bin/datax.py /opt/datax/job/incr_table2.json -p "-Dcurr_time=$curr_time -Dr_ip=$r_ip -Dr_port=$r_port -Dr_dbname=$r_dbname -Dr_username=$r_username -Dr_password=$r_password -Dw_ip=$w_ip -Dw_port=$w_port -Dw_dbname=$w_dbname -Dw_username=$w_username -Dw_password=$w_password"
    

    至于为什么加. /etc/profile,参考此文章的第二点:DataX踩坑2: | 定时任务crontab不执行或报错:/bin/sh: java: command not found.


    3.示例Json文件(相比于全量同步,这里使用了在读库(reader)加了一个where条件,条件就是传入的当前日期):incr_table1.json
    {
        "job": {
    		"setting": {
                "speed": {
                    "channel": 10
                },
    			"errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader", 
                        "parameter": {
                            "column": [
    							"`id`",
    							"`name`",
    							"`curr_time`"
    						], 
    						"where": "curr_time between DATE_SUB('${curr_time} 05:00:00', INTERVAL 1 DAY) and '${curr_time} 04:59:59'",
                            "connection": [
                                {
                                    "jdbcUrl": ["jdbc:mysql://${r_ip}:${r_port}/${r_dbname}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull"],
                                    "table": ["tabel1"]
                                }
                            ], 
    						"username": "${r_username}",
                            "password": "${r_password}"
                        }
                    }, 
                    "writer": {
                        "name": "mysqlwriter", 
                        "parameter": {
    						"writeMode": "update",
                            "column": [
                                "`id`",
    							"`name`",
    							"`curr_time`"
                            ],
                            "session": [
                            	"set session sql_mode='ANSI'"
                            ],
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:mysql://${w_ip}:${w_port}/${w_dbname}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull", 
                                    "table": ["tabel1"]
                                }
                            ],
    						"username": "${w_username}",
                            "password": "${w_password}"
                        }
                    }
                }
            ]
        }
    }
    

    简单说一下上面的增量逻辑:在 Shell 脚本中传入当前日期给where条件时间,我会让定时器在凌晨五点执行,所以查询的是昨天凌晨5点到今天04:59:59有修改的数据,查询条件就是第1点说到的实时修改时间


    4.添加定时器,这里使用的是crontab

    运行脚本之前先将放入的脚本进行授权,否则不能执行,对文件进行最高级别授权命令:

    chmod 777 incrSyncTask.sh
    

    4.1.编辑定时任务crontab,使用以下命令进入crontab任务文件:

    crontab -e
    

    4.2.进行vi编辑即可,添加crontab定时任务(每天凌晨5点执行,跟cron表达式类似):

    0 5 * * * /opt/datax/bin/incrSyncTask.sh >/dev/null 2>&1
    

    建议加上>/dev/null 2>&1,因为crontab会把日志都发给mail,这里是直接丢弃一样
    ,也可以指定输出到某个文件中,参考如下命令,每天执行后生成日志文件:

    0 5 * * * /opt/datax/bin/incrSyncTask.sh > /opt/datax/log/incrSyncTask_$(date +\%Y\%m\%d).log 2>&1
    

    这里需要用斜杠"\"对百分号进行转义

    4.3.查看crontab日志:

    tail -f /var/log/cron
    或
    tail -f /var/spool/mail/root
    

    如果找不到/var/spool/mail/root文件,参考:DataX踩坑2: | 定时任务crontab不执行或报错:/bin/sh: java: command not found.

    定时任务跑了之后,会将日志文件输出到上面的位置,根据登录宿主机的用户不同,文件名不同。这里的发送到邮箱有些坑,日志量过大问题、找不到邮箱问题,参考:DataX踩坑2 | 定时任务crontab不执行或报错:/bin/sh: java: command not found.


    三、注意点

    1.关于脚本运行后连接数据库报连接失败,重新连接等错误(\r换行符的问题)

    其实脚本中的所有数据库参数都是正确的,但就是报错,可以看日志,发现每个参数后面都携带了\r换行符
    在这里插入图片描述
    解决方法:Shell 脚本参数传递时有 \r 换行符问题.


    2.批量执行多文件命令报错

    建议将speed中的channel参数设置大一点,如10。
    由于DataX读取数据和写入数据需要占用服务器性能,所以需要考虑服务器性能,channel的值并不是越大越好。
    具体可参考结尾的性能优化文章。


    3.执行命令报:FileNotFoundException

    本来想将全量同步 Json 文件和增量同步 Json 文件放入不同的文件夹A和B,便于区分,直接放在 DataX 安装路径的 job 目录中,结果执行命令时找不到 job 目录下的A、B文件夹中的Json文件


    4.定时任务crontab不执行 或 报错:/bin/sh: java: command not found

    参考文章:DataX踩坑2: | 定时任务crontab不执行或报错:/bin/sh: java: command not found.


    相关文章


    参考博客


    展开全文
  • 数据同步-全量和增量

    千次阅读 2018-12-26 11:36:27
    因为公司的需求人员说的含糊其词的,原先我以为是关于本身业务的,但是 昨天赶上当当的满100-50我在挑书看到了... 数据同步一般分为两种方式:全量和增量。 1.1 全量 全量,这个很好理解。就是每天定时(避开...

    因为公司的需求人员说的含糊其词的,原先我以为是关于本身业务的,但是 昨天赶上当当的满100-50我在挑书看到了技术书里面有一章是介绍的是数据同步这个概念,于是上网搜关于这种的资料。

    参考https://www.cnblogs.com/big1987/p/8522884.html

    1.同步方式

     数据同步一般分为两种方式:全量和增量。

    1.1  全量

    全量,这个很好理解。就是每天定时(避开业务高峰期)或者周期性全量把数据从一个地方拷贝到另外一个地方;

    全量的话,可以采用直接全部覆盖(使用“新”数据覆盖“旧”数据);或者走更新逻辑(覆盖前判断下,如果新旧不一致,就更新);

     

    1.2 增量

    增量的基础是全量,就是你要使用某种方式先把全量数据拷贝过来,然后再采用增量方式同步更新。

     

    展开全文
  • Solr的增量更新和全量更新对比

    千次阅读 2020-08-31 17:41:39
    而当数据库的数据发生改变我们需要来数据库的数据进行同步的时候就用到了全量更新和增量更新 先来说一下全量更新 全量更新顾名思义就是全部都更新,当我们的索引库需要更新数据库数据的时候,就把数据全部拿过来...

    Solr的增量更新和全量更新

    solr的简介

    solr是一个采用java语言,基于Lucene的全文搜索引擎。
    

    solr的一些原理

    solr通过数据库的数据导入到索引库中,并且给数据创建相应的索引,用来提高搜索的速度。而当数据库的数据发生改变我们需要来和数据库的数据进行同步的时候就用到了全量更新增量更新

    先来说一下全量更新

    全量更新顾名思义就是全部都更新,当我们的索引库需要更新数据库数据的时候,就把数据全部拿过来进行更新同步。

    使用全量更新

    (注意这里—你的文件名可能会因为solr版本问题而和我的文件名不一样)
    1.配置solrconfig.xml文件
    在这里插入图片描述
    2.配置solr-data-config.xml文件(数据源配置文件)
    在这里插入图片描述
    3.配置managed-schema文件
    在这里插入图片描述
    然后启动项目
    在这里插入图片描述

    以上就是全量更新

    再来说一下增量更新
    增量更新简单来说就是,比如数据库的数据发生修改我们就只导入更新修改的数据,比全量更新操作量少

    操作步骤也很简单
    1.配置solr-data-config.xml文件(数据源配置文件)
    在这里插入图片描述
    2.重启项目
    在这里插入图片描述
    在这里我数据库的数据没有改变,所以没有显示更新了几条数据

    以上就是增量更新和全量更新

    展开全文
  • Redis主从同步:全量同步 增量同步

    千次阅读 2018-06-06 21:28:16
    日常工作中使用的Redis集群是一主多从的架构模型(如下图,主1+主2组成一套完整的数据),当主1发生宕机时,会切写到从(为了方便制作预案主1主2会同时切,分别切向从1从2作为新的...Redis主从同步分为两种机制:全...
  • mysql增量备份和全量备份

    千次阅读 2017-03-08 11:34:36
    项目需要对mysql数据库每天进行一次增量备份,一周进行一次全量备份,研究了大半天,整理出来的脚本,其实停简单的,理解了binlog其实就是so easy. 1.前提 必须要开启bin-log才可以。 开启的方式在my.ini或my.cnf...
  • 增量和全量

    2021-01-25 14:45:37
    增量同步的前提是全量,然后再根据规则增量同步; 增量的基础是全量,就是你要使用某种方式先把全量数据拷贝过来,然后再采用增量方式同步更新。 增量的话,就是指抓取某个时刻(更新时间)或者检查点(checkpoint)...
  • Redis的增量复制和全量复制

    千次阅读 2018-06-16 10:30:26
    1. redis什么时候会发生全量复制?a) redis slave首启动或者重启后,连接到master时b) redis slave进程没重启,但是掉线了,重连后不满足部分复制条件2. redis什么时候会发生部分复制?先来看部分复制需要的条件a) ...
  • (1)全量同步 什么是全量同步:将一个mysql的整个表的所有数据都同步到es中 常用插件是logstash-input-jdbc,logstash通过sql语句分区间对数据进行查询,然后输出到es进行实现。 logstash-input-jdbc同步插件原理:...
  • 数据的增量采集与全量采集
  • 目前的ETL种类繁多,可选择的工具也有很多,比如使用Sqoop, DataX做离线的T+1数据同步, Spark或者Flink做T+0的实时数据同步等。 目前有很多公司业务是T+1的,每天需要同步昨天的业务库(MySQL、mongodb等)的数据到...
  • DataLink是一个满足各种异构数据源之间的实时增量同步、离线全量同步,分布式、可扩展的数据交换平台。 项目背景 着眼于未来,我们的目标是打造一个平台,满足各种异构数据源之间的实时增量同步和离线全量同步,支撑...
  • 数据库做同步全量+增量
  • 全量同步增量同步的区别 全量同步:就是每天定时(避开高峰期)或者采用一个周期实现将数据拷贝到一个地方也就是Rdb存储。 增量同步:比如采用对行为的操作实现对数据的同步,也就是AOF。 全量与增量的比较:增量...
  • 全量复制 master 执行 bgsave ,在本地生成一份 rdb 快照文件。 master node 将 rdb 快照文件发送给 slave node,如果 rdb 复制时间超过 60秒(repl-timeout),那么 slave node 就会认为复制失败,可以适当调大这个...
  • logstash-input-jdbc增量全量数据同步

    千次阅读 2018-11-27 17:37:05
    笔者在mysql数据同步到ES中,发现第一次同步时需要全量的数据,之后则需要定时去同步增量数据,所以笔者提供增量和全量同步的conf供读者参考 二、解决方案 1、全量数据同步 具体如何执行可参考...
  • ETL过程中的全量和增量同步 一. 数据的同步方式主要分为两种方式 全量同步 ​ 对源数据进行全部抽取到目的源上,相当于将文件完全复制一份到目的地,该方式可以完全保证数据的一致性问题。 增量同步 ​ 对源...
  • Redis的全量同步和增量同步

    千次阅读 2017-10-16 09:52:07
    mysqlhotcopy使用lock tables、flush tablescp或scp来快速备份数据库.它是备份数据库或单个表最快的途径,完全属于物理备份, 但只能用于备份MyISAM存储引擎运行在数据库目录所在的机器上.与mysqldump备份不同,...
  • SyncNavigator是一款功能强大的数据库同步软件,适用于SQLSERVER, MySQL,具有自动/定时同步数据、无人值守、故障自动恢复、同构/异构数据库同步、断点续传和增量同步等功能,支持Windows xp以上所有操作系统,适用...
  • logstash-input-mysql 全量和增量同步到ES

    千次阅读 2019-01-31 17:49:56
    全量同步增量同步 全量同步是指全部将数据同步到es,通常是刚建立es,第一次同步时使用。增量同步是指将后续的更新、插入记录同步到es。 2、常用的一些ES同步方法 1)、 elasticsearch-jdbc : 严格意义上它...
  • logstash全量和增量同步数据到mysql

    千次阅读 2019-04-23 22:55:24
    一、场景 在mysql数据同步到ES中,发现第一次同步时需要全量的...2、支持每次全量同步或按照特定字段(如递增ID、修改时间)增量同步; 3、同步频率可控,最快同步频率每分钟一次(如果对实效性要求较高,慎用);...
  • ES与MySQL数据同步全量增量

    万次阅读 2020-02-11 23:41:36
    考虑ES如何与数据库实现同步? ES如何查询多个字段? 如何构建商品服务(包含搜索功能)? ES肯定是集群的,如何集群? 一个项目当它做大做当后都可能会需要将数据从传统的数据库同步到另一种数据集合中,一般...
  • solr数据同步:把数据库数据同步到solr,这里使用的数据库是postgresql,如果其他的数据库,配置步骤也一样,只是驱动包不一样而已 1. 上传驱动jar包 把postgresql-42.2.5.jar包放到 /home/solr/solr-8.5.1/server...
  • 1、数据导入功能,存在全量更新/增量更新...增量更新和全量更新 该语法用于: 判断B表A表是否满足ON中条件,如果满足则用B表去更新A表(或其他操作), 如果不满足,则将B表数据插入A表但是有很多可选项(或其他操

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 20,768
精华内容 8,307
关键字:

增量同步和全量同步