精华内容
下载资源
问答
  • 最近在学es,在数据方面碰到第一个问题是怎么将postgres中的数据库同步es中,在网上找了下相关文档,只有logstash-input-jdbc这个插件还在维护,而且在es中logstash高版本已经集成了这一插件,所以就省去了安装...

    最近在学es,在数据方面碰到第一个问题是怎么将postgres中的数据库中同步到es中,在网上找了下相关文档,只有logstash-input-jdbc这个插件还在维护,而且在es中logstash高版本已经集成了这一插件,所以就省去了安装ruby和安装插件这一步了

    1 安装elasticsearch logstash kibana 三件套

    2 下载数据库驱动

    图方便的话可以直接拷贝maven仓库里面的即可

    3 添加 .conf文件

    input {
        jdbc {
          # mysql 数据库链接,shop为数据库名
          jdbc_connection_string => "jdbc:postgresql://ip:5432/chongqing_gis"
          # 用户名和密码
          jdbc_user => ""
          jdbc_password => ""
          # 驱动
          jdbc_driver_library => "E:/ES/logstash-7.8.0/postgres/postgresql-42.2.9.jar"
          # 驱动类名
          jdbc_driver_class => "org.postgresql.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          # 执行的sql 文件路径+名称
          statement_filepath => "E:/ES/logstash-7.8.0/postgres/jdbc.sql"
          # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
          # schedule => "* * * * *"
        }
    }
    
    filter {
        json {
            source => "message"
            remove_field => ["message"]
        }
    }
    
    output {
        elasticsearch {
            # ES的IP地址及端口
            hosts => ["localhost:9200"]
            # 索引名称
            index => "test_index"
            # 需要关联的数据库中有有一个id字段,对应类型中的id
            document_id => "%{gid}"
        }
        stdout {
            # JSON格式输出
            codec => json_lines
        }
    }

    修改输入参数:数据库ip、端口、账号、密码等

    修改输出参数:es的ip、端口、索引、document_id等

    输出参数中索引如果还没有创建,在启动logstash时会自动根据默认模板创建索引,其中有些教程中出现了index_type的设置,这个类型在es的高版本中已经取消,不需要再设置,如果设置了,启动logstash会报错

    statement_filepath 保存准备执行的sql文件

    jdbc.sql文件:

    select gid,name,address from qtpoi

    在qtpoi表中有个字段是保存的空间地理信息geom字段,当我加上这个时,启动logstash一直报错,可能对空间字段需要做进一步的处理

    Exception when executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::OrgLogstash::MissingConverterException: Missing Converter handling for full class name=org.postgresql.util.PGobject, simple name=PGobject

    4 启动logstash即可同步

    bin/logstash -f config/postgres.conf

    5 打开kibana即可查看到刚同步的数据

    GET test_index/_doc/_search

    6 如果设置了定时任务,logstash会定时去访问数据同步到es中,但是上面jdbc.sql文件中获取的是整张表的数据,也就是说每次同步都会对全表进行同步,但是我的需求是只需要第一次同步整张表后面只对更新的和修改的数据做同步,

    在网上找了下,思路大概是给表增加一个新的字段,保存当前创建的时间或者是当前更新的时间,然后根据:sql_last_value这个函数获取大于这个函数的时间,就过滤出新增的数据和更新的数据,实现对增量数据同步到es,:sql_last_value这个函数官网也没说的太清楚,我大致认为是最后一个更新的值或者最后一次更新的时间

    参考链接:

    https://blog.csdn.net/laoyang360/article/details/51747266

    https://www.cnblogs.com/xuwenjin/p/8987546.html

    https://blog.csdn.net/qq_31871785/article/details/89533058

    官方文档:

    https://www.elastic.co/guide/en/logstash/7.8/index.html

     

     

     

    展开全文
  • oracle数据库同步ES

    2019-11-06 17:23:52
    oracle同步ES 需求: 1、全量、增量都具备 ...2、当数据库中有修改删除操作时,ES也能做相应变动 ...logstash我已经试过了,虽然全量、增量可以实现,但是当数据库中有字段变动时,无法同步ES
  •   ES数据库同步PG数据库在本文主要用到的思想是:在PG数据库的数据录入以及更新时,如下图所示,会有其对应的字段modify_time记录最后的修改时间。程序会记录同步到ES数据库中最后一条数据的修改时间,利用线程...

    思想

      ES数据库同步PG数据库在本文主要用到的思想是:在PG数据库的数据录入以及更新时,如下图所示,会有其对应的字段modify_time记录最后的修改时间。程序会记录同步到ES数据库中最后一条数据的修改时间,利用线程间隔10s检查一次PG数据库是否有modify_time > 记录的最后修改时间。如果有,将最新的数据同步到ES数据库,并修改记录时间。如果没有,继续每隔10s检查一次。

    最终的实现效果:

    程序实现的主要代码

    def update_pg2es():
        global last_modify_time
        print(type(last_modify_time), last_modify_time)
        print("检测PG数据库是否有数据更新...")
        #连接数据库
        pg_conn = psycopg2.connect(config.CONNECT_PG_DB_URL)
        pg_cursor = pg_conn.cursor()
    
        select_sql = "select * from test where modify_time > '{}' order by modify_time".format(last_modify_time)
        pg_cursor.execute(select_sql)
        datas = pg_cursor.fetchall()
        # 关闭连接
        pg_conn.close
        pg_cursor.close
        es_data_list = []
        for row in datas:
            # 获取当前行数据的列与值得字典
            column_values = get_row_colum_info(pg_cursor, row)
            es_data = translate_entity_date_to_es(column_values)
            es_data_list.append(es_data)
            last_modify_time = column_values['modify_time']
    
        index_entity = config.entity_index
        type_entity = config.entity_type
        num = 10000  # 批量存入的个数
        while (len(es_data_list) / num >= 0):
            if (math.floor(len(es_data_list) / num) == 0):  # 最后了
                es_result = es_service.insert_data_list(index_entity, type_entity, es_data_list[:])  # 取剩下的
                print("存入{}个数据到ES".format(len(es_data_list)))
                break
            print("存入{}个数据到ES".format(num))
            print("前五个数据:")
            print(index_entity, type_entity, es_data_list[:5])
            es_result = es_service.insert_data_list(index_entity, type_entity, es_data_list[:num])  # 存储一定的数量
            es_data_list = es_data_list[num:]  # 取剩下的
    
            if "ok" != es_result:
                print("存储实体数据到es中失败")
                break  # 如果某一次存入有问题,直接退出
    
        # 线程定时执行
        t = threading.Timer(10, update_pg2es)
        t.start()
        
        
    if __name__ == '__main__':
    	update_pg2es()
    

      注意:

    • select now()::timestamp(6)without time zone SQL语句可以查询当前时间(不带时区,精度为秒后面保存6位)
    • select_sql = "select * from test where modify_time > '{}' order by modify_time".format(last_modify_time)将sql语句赋值给select_sql,sql语句的意思为选择出修改时间大于记录时间的数据,并将数据按照修改时间进行排序。此处注意记录时间两边需要加单引号才能保证sql语句的正常运行。
    展开全文
  • 数据库同步es报错(ArgumentError) invalid byte sequence in US-ASCII 今天在做mysql同步到es的时候出现了一个戏剧性的错误,配置文件,mysql版本万无一失,可还是同步失败,报错信息如下: invalid byte sequence ...

    数据库同步es报错(ArgumentError) invalid byte sequence in US-ASCII

    今天在做mysql同步到es的时候出现了一个戏剧性的错误,配置文件,mysql版本万无一失,可还是同步失败,报错信息如下:
    invalid byte sequence in US-ASCII (ArgumentError)

    后面经过一番折腾,发现是我 logstash 文件存放的地址含有中文,随即我把路径改成了全英文。至此,问题迎刃而解!
    在将logstash路径改为英文以后,再次执行	logstash -f ./mysql_info.conf,就成功同步了
    以上就是我对这个问题的个人解决方案,欢迎各位大佬补充和指点!

    展开全文
  • ## es 7.8.1 系列 (将mysql数据库同步es) 安装在win 10 上 准备工作: 安装 1.elasticsearch-7.8.1 2.elasticsearch-head-5.0.0 (可视化工具,方便查看es数据结构) 3.kibana-7.8.1-windows-x86_64(可是化工具...

    ## es 7.8.1 系列 (将mysql数据库同步到es) 安装在win 10 上
    准备工作:
    安装 
    1.elasticsearch-7.8.1 
    2.elasticsearch-head-5.0.0 (可视化工具,方便查看es数据结构)
    3.kibana-7.8.1-windows-x86_64(可是化工具,方便查看es数据结构,带图形)
    4.logstash-7.8.1(同步mysql数据到es插件)mysql-connector-java-5.1.49.jar
    启动
    1.es启动:bin下的elasticsearch.bat,浏览器输入localhost:9200 响应了就启动成功了
    2.es_head 启动:需要下载node.js 然后 npm i,npm run start 浏览器输入localhost:9100 响应了就启动成功了
    3.kibana启动:先在config下找到kibana.yml文件添加上

    server.port: 5601
    elasticsearch.hosts: ["http://localhost:9200"]
    kibana.index: ".kibana"

    然后在bin下的kibana.bat,显示:"Server running at http://localhost:5601",就启动成功了
    4.logstash启动:在bin下添加sync-conf文件夹,然后将mysql驱动放入,编写一个自己的业务sql文件,然后还要有一个sync.conf配置文件,在bin目录下执行 logstash.bat -f sync-conf/sync.conf 没有[ERROR]就是启动成功了
    配置文件请参考:"https://www.elastic.co/guide/en/logstash/7.x/configuration.html","https://zhuanlan.zhihu.com/p/162903896"

    mysql数据如下:

    DROP TABLE IF EXISTS `user`;
    CREATE TABLE `user`  (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
      `update_time` timestamp(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0),
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
    -- ----------------------------
    -- Records of user
    -- ----------------------------
    INSERT INTO `user` VALUES (1, 'zhangsan', '2020-09-04 11:21:43');
    INSERT INTO `user` VALUES (2, 'lisi', '2020-09-30 11:21:53');
    SET FOREIGN_KEY_CHECKS = 1;

    遇到的问题

    [ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"input\", \"filter\", \"output\" at line 1, column 1 (byte 1)", 

      我将自己的配置文件改成utf-8后还是不行,卧槽。。。。我也是服了。然后看了一篇帖子说要改成utf-8 无 BOM 格式编码
    然后竟然可以了。。。
    帖子地址:"http://doc.primeton.com/pages/viewpage.action?pageId=22257515"
    elasticsearch-head连接不上
      在elasticsearch的安装目录下找到config文件夹,找到elasticsearch.yml文件,打开编辑它,加上如下这两行配置

    http.cors.enabled: true
    http.cors.allow-origin: "*"

    启动成功后可以在 kibana里查看 dev toos 命令:GET /索引名称/_search

    也可以在elasticsearch-head里查看:数据浏览找到对应的索引

    1

    2

     

    这是我遇到的问题,软件地址不知道在哪里的可以私信我 

    展开全文
  • 前段时间搭建了Elasticsearch 和 Kibana,顺手把Logstash给装了,这篇文章是把我在用Logstash同步数据库数据到ES中遇到的问题记录下来 准备 一个至少装有 Elasticsearch 和 Logstash 的主机 同步准备 创建一个存放...
  • 使用logstash 同步数据库es

    千次阅读 2019-05-29 11:24:41
    # 需要关联的数据库中有有一个id字段,对应类型中的id document_id => "%{id}" document_type => "user" } stdout { # JSON格式输出 codec => json_lines } } 最后 上传/Users/liyaochu/important/...
  • Logstash 实现ES数据库同步: 使用定时器(使用sql 定时的去查询数据进行同步)、实现方式比较简单。 MQ 实现 ES数据库同步: 实时性,消息放到MQ中,消费者会自动的消费,复杂性更高。 MQ方式保持数据一致...
  • logstash 相当于一个管道,链接关系型数据库ES ,现在咱们来安装并来实际操作一番。 步骤 下载logstash ,可以去我上篇给的华为镜像里下载。 然后进入目录下 我们需要 安装插件 bin/logstash-plugin install ...
  • 在前面的博客,已经实现了利用Logstash自动同步数据库内容到ES,除了Logstash同步的方式,还可以使用MQ的形式来同步,他们有什么区别呢?本文来讲解下。 阅读前,有兴趣的小伙伴可以看之前写的两篇博客: 《淘东...
  • bin/logstash 就是用来运行任务的命令 1、创建配置文件.conf es:timestamp mysql:datatime ...es中的@timestamp字段是logstash记录的入库时间 ...2、准备好数据库的驱动文件(就是写程序是用到的数据库的jar包) ...
  • es数据库同步方案

    万次阅读 2018-04-19 14:16:42
    我们采取MySQL作为主要的数据存储,利用MySQL的事务特性维护数据一致性,使用ElasticSearch进行数据汇集和查询,此时es数据库同步方案就尤为重要。 保证es数据库同步方案: 1、首先添加商品入数据库,添加...
  • logstash要与es版本一致。 2、将logstash 解压到/usr/local 将mysql驱动复制到/usr/local 命令:tar -zxvf logstash-6.4.3.tar.gz -C /usr/local/ 命令:cp mysql-connector-java-5.1.41.jar /usr/local/ 3、...
  • 自动将数据从源SQL数据库同步到目标ElasticSearch存储库。 sqldb2es是一个Java应用程序,它从JDBC数据源(表格数据)中获取数据,以供以结构化(JSon)数据格式进行索引。 sqldb2es允许通过将ElasticSearch JSon...
  • 题记一次同步多张表是开发中的一般需求。...1、同步原理原有ES专栏中有详解,不再赘述。详细请参考我的专栏: 深入详解Elasticsearch 以下是通过ES5.4.0, logstash5.4.1 验证成功。 可以确认的是2.X版本...
  • 选择与你的ES相同版本的logstash的tar.gz包,上传解压,进入解压目录下测试下。 使用bin/logstash -e 'input { stdin { } } output { stdout {} }',启动后输入任意内容后,如果有返回则表示安装成功 使用...
  • logstash同步数据库数据到ES

    千次阅读 2019-05-22 17:37:40
    logstash同步数据到ESlogstash配置说明环境准备配置文件启动方式踩过的坑 logstash配置说明 环境准备 在logstash目录中新建文件夹driver; 把数据库驱动包复制到driver文件夹下(ojdbc6.jar); 配置文件 新建配置...
  • 但很多情况下,我们的需求是:现在的数据存储在mysql、oracle等关系型传统数据库中,如何尽量不改变原有数据库表结构,将这些数据的insert,update,delete操作结果实时同步到elasticsearch(简称ES)呢? 本文基于以上...
  • # 多张表的同步只需要设置多个jdbc的模块就行了 jdbc { # mysql 数据库链接,shop为数据库名 jdbc_connection_string => "jdbc:mysql://localhost:3306/库名称?useUnicode=true&characterEncoding=utf8&...
  • 本文主要讲解如何通过logstash-output-mongodb插件实现Mysql与Mongodb数据的同步。源数据存储在Mysql,目标数据库为非关系型数据库Mongodb。0、前提1)已经安装好源数据库:Mysql; 2)已经安装好目的数据库:Mongodb...
  • 安装和ES版本相同的logstash,然后在bin的同级目录创建一个文件夹,名字自起: 此文件夹中主要存放需要同步的文件: 以下是jdbc1101.conf文件的数据: input { stdin { } jdbc { # mysql 数据库链接 ...
  •  对于历史数据,mongo-connector工具不能同步ES中,根因是本身工具不支持(初步界定),还是没有这种场景,待查(进一步研究后再更新)。1. mongo-connector 地址:https://github.com/mongodb-labs/mong...
  • 前言:logstash-input-jdbc实现mysql 与elasticsearch的解读... 目标:实现了oracle与ES同步增、删、改、查。 1、配置文件[root@5b9dbaaa148a logstash_jdbc_test]# cat jdbc_oracle.confinput { stdin { } jd...
  • 启动logstash时,进入logstash的bin,使用指定的配置运行即可同步,数据量大的时候可能需要等待一下。 启动命令:logstash.bat -f D:\ELK\logstash-6.2.4\myconfig\config\logstash_add.conf 请注意:像文件中的...
  • 本文主要实现将Elasticsearch中的索引数据Index同步到Mongodb中的集合collection中。0、前提1)已经安装好源数据库:elasticsearch V2.X; 2)已经安装好目的数据库:Mongodb; 3)已经安装好logstash及相关插件...
  • 测试表明:该插件优点:能实现同步增、删、改、查操作。不足之处(待完善的地方): 1、仍处理开发、相对不稳定阶段; 2、没有日志,不便于排查问题及查看同步结果。 本文深入详解了插件的安装、使用、增删改查...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 441
精华内容 176
关键字:

数据库同步es