精华内容
下载资源
问答
  • ES导入MySql数据

    2020-01-03 21:13:05
    1. logstash导入MySQL数据 要使用logstash导入数据的时候,首先需要将mysql的驱动包(注意版本)加入到logstash的home目录下logstash-core\lib\jars下。 在config目录下创建logstash-mysql.conf文件,配置如下: ...

    1. logstash导入MySQL数据

    1. 要使用logstash导入数据的时候,首先需要将mysql的驱动包(注意版本)加入到logstash的home目录下logstash-core\lib\jars下。
    2. 在config目录下创建logstash-mysql.conf文件,配置如下:
    input {
      jdbc {
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/es?useSSL=false&serverTimezone=UTC"
        jdbc_user => es
        jdbc_password => "123456"
        #启用追踪,如果为true,则需要指定tracking_column
        use_column_value => false
        #指定追踪的字段,
        tracking_column => "id"
        #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
        tracking_column_type => "numeric"
        #记录最后一次运行的结果
        record_last_run => true
        #上面运行结果的保存位置
        last_run_metadata_path => "mysql-position.txt"
        statement => "SELECT * FROM news where tags is not null"
        #表示每天的 17:57分执行
        schedule => " 0 57 17 * * *"
      }
    }
    
    filter {
      mutate {
        split => { "tags" => ","}
      }
    }
    output {
      elasticsearch {
        document_id => "%{id}"
        document_type => "_doc"
        index => "news"
        hosts => ["http://localhost:9200"]
      }
      stdout{
        codec => rubydebug
      }
    }
    
    1. 运行如下命令导入数据集
    D:\logstash-datas\bin>logstash.bat -f ../config/logstash-mysql.conf
    

    注意: 在导入数据时可能会遇到内存溢出问题

    1. cmd运行jvisualvm命令,然后安装Visual GC插件,可以查看本地程序使用的内存
      Eden新生代
      Metaspace元空间

    2. 数据导入时elasticsearch出现内存溢出=> 调整JVM堆内存大小

    config/jvm.options 
        -Xms3g(初始化最小内存大小) 
        -Xmx3g(初始化最大内存大小),最小和最大值要一致
    或者尝试删除logstash下的data文件夹中的内容,我这里修改为3G
    
    -XX:NewRatio=4(新生代与老年代内存大小比例为1:4),默认为2
    
    -XX:MaxDirectMemorySize=128m
     控制 堆外内存区 的大小 虽然不隶属于JVM,但是还是由JVM来回收的
    

    Settiong the heap size
    为什么给的内存大小不能超过50%?因为Elasticsearch的文件系统也需要内存

    http://developer.ibm.com/answers/questions/398383/summary-of-understanding-xxmaxdirectmemorysize-set/

    Netty 堆外内存,也叫直接内存,涉及到用户态、内核态、NIO
    堆外内存
    Java本身对文件是没有读取能力的(不能直接操作硬件层面),Java之所以能以输入流和输出流读取、写入文件,是借助于操作系统层面的东西。
    Java调用JVM,JVM使用C++语言写的,而C++语言是可以直接操作操作系统的。Java中调用OutputStream输出流和InputStream输入流,是间接调用了操作系统层面的东西,涉及到了状态的变化。

    Java调用InputStream输入流,首先会将硬盘中的文件读取到真实物理内存中,接着把真实物理内存中的数据读取到JVM的内存中,这个过程有个状态的变化,从内核态切换到用户态。
    Java调用OutputStream输出流,是用户态切换到内核态。
    以上,经历了5个步骤,浪费了系统的性能。
    所以,ElasticSearch、Hadoop、Dubbo等都使用了Netty。Netty是一个高性能的IO通信框架,高性能体现在IO的处理上。Netty可以让数据不切换到用户态,直接再内核态完成数据的输入和输出。
    Netty对IO的处理实际上是NIO来做的,NIO中有一个非常重要的概念:零拷贝,即不需要拷贝到JVM的内存中。

    -XX:MaxDirectMemorySize配置的是堆外内存的大小,即Netty对文件处理的内存空间大小。如果堆外内存过小会报出direct buffer memory错误,提示直接内存不足
    传统JVM文件的读写.png

    展开全文
  • logstash数据从mysql导入es所需mysql的jar包
  • ES数据大批量导入MySQL

    2021-06-01 16:21:35
    1、大批量导入的问题 ES大批量导入,一般默认为10条,设置size的话,一般最大也是1w条,如果几十万以上,那么效率就很低了,所以需要scroll方法进行导入。 2、导入的代码示例 import os import pymysql from elastic...

    1、大批量导入的问题

    ES大批量导入,一般默认为10条,设置size的话,一般最大也是1w条,如果几十万以上,那么效率就很低了,所以需要scroll方法进行导入。

    2、导入的代码示例

    
    import os
    
    import pymysql
    from elasticsearch import Elasticsearch
    import datetime
    import requests
    import json
    
    
    class DBHelper:
        def __init__(self, dbName, env):
            if env == "dev":
                self.host = "192.168.1.19"
                self.port = 3306
                self.user = "root"
                self.password = "password"
                self.name = dbName
            else:
                self.host = "test.ali"
                self.port = 3306
                self.user = "clod"
                self.password = "passowrd"
                self.name = dbName
    
        def db_config(self):
            config = {
                'host': self.host,
                'port': self.port,
                'user': self.user,
                'password': self.password,
                'db': self.name,
                'charset': 'utf8mb4',
                'cursorclass': pymysql.cursors.DictCursor,
            }
            return config
    
        def connectionbase(self, config):
            connection = pymysql.connect(**config)
    
            return connection
    
    
    def to_mysql(db, question, answer, env):
        db = DBHelper(db, env)
        connection = db.connectionbase(db.db_config())
        sql = "INSERT INTO library(question,answer,create_time) VALUES('" + str(question) + "','" + str(
            answer) + "','" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + "')"
        with connection.cursor() as cursor:
            cursor.execute(sql)
            connection.commit()
            cursor.close()
        connection.close()
    
    
    def del_mysql(db, env):
        db = DBHelper(db, env)
        connection = db.connectionbase(db.db_config())
        sql = "truncate table library"
        with connection.cursor() as cursor:
            cursor.execute(sql)
            connection.commit()
            cursor.close()
        connection.close()
    
    
    def get_all_es_data(es):
        result = []
        hits_all = []
        data = es.search(index="index", doc_type="medical", scroll='10m', timeout='10s', size=10000)
        hits = data["hits"]["hits"]
        hits_all.extend(hits)
        if not hits:
            print("result is empty!")
        scroll_id = data["_scroll_id"]
        total = data["hits"]["total"]
        print("ES中共计读取数据量:{}".format(total))
        page = total if int(total / 10000) == 0 else int(total / 10000)
        for i in range(page):
            res = es.scroll(scroll_id=scroll_id, scroll='10m')
            hitscroll = res["hits"]["hits"]
            if  len(hitscroll) > 0:
                hits_all.extend(hitscroll)
        for hit in hits_all:
            source = hit["_source"]
            result.append(source)
        print("已获取数据量:{}".format(len(result)))
        return result
    
    
    def read_es_to_mysql(host, port, user, password, env):
        if user is None or password is None:
            es = Elasticsearch(hosts="192.168.1.10", port=9203)
            result = get_all_es_data(es)
            start_idx = 0
            cnt = len(result)
            del_mysql("pla", env)
            for source in result:
                question = source["question"]
                answer = source["answer"]
                if question is None or answer is None or len(question) == 0 or len(answer) == 0:
                    continue
                question = question.replace(",", ",")
                answer = answer.replace(",", ",")
                to_mysql("plat", question, answer, env)
                start_idx += 1
                if start_idx % 1000 == 0:
                    print("数据库写入进度:{:.2f}%".format(start_idx * 100.0 / cnt))
        else:
            es = Elasticsearch(hosts=host, port=port, http_auth=(user, password))
            result = get_all_es_data(es)
            start_idx = 0
            cnt = len(result)
            del_mysql("plat", env)
            for source in result:
                question = source["question"]
                answer = source["answer"]
                if question is None or answer is None or len(question) == 0 or len(answer) == 0:
                    continue
                question = question.replace(",", ",").replace("'","")
                answer = answer.replace(",", ",").replace("'","")
                to_mysql("pla", question, answer, env)
                start_idx += 1
                if start_idx % 1000 == 0:
                    print("数据库写入进度:{:.2f}%".format(start_idx * 100.0 / cnt))
    
    
    if __name__ == '__main__':
        #read_es_to_mysql(None, None, None, None, "dev")
        read_es_to_mysql("es-cn-com", "9200", "clod", "password", "prod")
    展开全文
  • 要使用logstash服务. 它的作用就是将mysqles的建立一个通道,实现一个全量索引构建。编写导入mysql导入 es的规则和sql
  • mysql 导入es

    2021-04-02 12:42:06
    https://blog.csdn.net/liyongbing1122/article/details/85096187

    https://blog.csdn.net/liyongbing1122/article/details/85096187

    展开全文
  • 今天项目经理给了我一份js文件,让我导入到mongodb里,然后转成mysql,最后放到es里 我直接用navicat for mongodb 运行脚本文件,直接导入了 然后用 mongodb 自带的 mongodbExport 工具导出为了csv文件 先去自己...

    记一下今天踩得坑

    今天项目经理给了我一份js文件,让我导入到mongodb里,然后转成mysql,最后放到es里
    我直接用navicat for mongodb 运行脚本文件,直接导入了
    然后用 mongodb 自带的 mongodbExport 工具导出为了csv文件
    先去自己mongodb 安装目录下找到bin,点进去以后有一个mongoexport
    然后执行这条命令即可导出

    mongoexport -d food_nutrient -c food_nutrient --csv -f food_id,name,heart,unit,type,picture,nutr -o food_nutrient.csv

    -d:指明使用的库,本例中为food_nutrient

    -c:指明要导出的集合,本例中为food_nutrient

    -o:指明要导出的文件名,本例中为food_nutrient.csv

    -csv:指明要导出为csv格式

    -f:指明需要导出 food_id,name,heart,unit,type,picture,nutr 这几列的数据

    我这里导出时多加了一个 _id 字段,这个字段我记得是mongodb自己带的,我这里多导出了这个字段,导致我后面导入到es时报错

    导出 csv 文件后,要将文件导入到mysql中,这里我也是直接用navicat了,先建好表,然后右键表导入向导,里面有个csv文件类型,直接下一步就可以了

    导入到mysql后,要往es里导入数据,这里我直接用之前用过的一个导数据的项目导的
    这里有个坑,数据一直导不进去,后来我仔细看了日志发现请求报错了
    报了这个错
    Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters

    说 _id 这个字段有问题,其实就是我表里有 _id 这个字段和es的冲突了,然后我只好改了查询的sql,这里其实在导出的时候就不该导出 _id 这个字段了

    展开全文
  • DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步...
  • 一、向 es 中安装ik分词器插件 第一步:去https://github.com/medcl/elasticsearch-analysis-ik/releases?after=v5.5.3该网址下载对应版本的ik分词器(下zip版,不要下成source版) 第二步:在es的plugins下新建ik...
  • id => "%{id}" } #stdout输出到标准输出,一般用于调试 stdout { codec => json_lines } } 注意点: mysql-connector-java-8.0.13.jar驱动库需要自己下载上传,地址要正确,我是放在了pipeline文件夹下的。...
  • Logstash mysql导入es注意

    2020-11-25 00:53:07
    "D:/lmhworksapce/DBCPP/WebRoot/WEB-INF/lib/mysql-connector-java-8.0.11.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" # 8.0以上版本:一定要把serverTimezone=UTC天加上 jdbc_connection_string =...
  • canal_mysql_elasticsearch_sync 支持请star:sparkles: 基于 canal 的 Mysql 与 Elasticsearch 实时同步的 javaweb 服务。 canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件。 工作原理 全量 暴露Http接口...
  • logstash将mysql数据导入es

    千次阅读 2019-03-07 11:40:27
    Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到指定的存储位置(可以使es、json、mysql等)。本文主要介绍mysql数据导入ElasticSearch 2、下载并安装logstash ...
  • ElasticSearch-Logstash6.0.0安装以及mysql数据导入ESLogstash安装(6.0.0)mysql数据导入ES验证mysql导入的数据 Logstash安装(6.0.0) 下载logstash到linux 我直接给你们百度云 链接:...
  • MySQL数据库数据导入ES

    千次阅读 2020-06-11 14:10:04
    需要安装 mysql数据导入ES中,用到的有logstash,ESmysql的连接包。链接:https://pan.baidu.com/s/1xopMMUtPir12zrQfYcwBZg 提取码:me8g。这里的logstash和ES我用的版本是6.4.3的,还有mysql连接包,ik分词器...
  • 单文件执行  bin/logstash -f ../... #设定ES索引类型  } }   output {  elasticsearch {  hosts => "10.100.11.231:9200"  index => "cxx_info_index"  document_id => "%{id}"  } }  
  • 使用 DataX将数据从MySQL导入ES 介绍 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS ...
  • 使用py脚本,连接MySQLES,读取MySQL中表数据导入ES中作为文档存储。 注意该方案效率不高,不可作为实际业务同步数据的解决方案。 """ 读取MySQL中表数据导入ES中作为文档存储 """ import pymysql from ...
  • logstash conf的output内容 output { if [type]=="bbs" { elasticsearch { hosts => ["http://es.service:9200"] user => "" password =>...
  • 本文说明使用logstash来实现全量和增量将mysql数据导入es中。每一分钟执行一次。有时效性高的要求可以提高频率。 2.logstash同步mysql数据到elasticsearch logstash-plugin install logstash-output-elastic...
  • 下载logstash-6.8.6,我的es是6.8.6 1.解压logstash unzip logstash-6.8.6 2.安装 logstash-input-jdbc 插件 logstash-input-jdbc插件是logstash 的一个个插件 使用ruby语言开发。下载插件过程中最大的坑是下载插件...
  • 配置logstash增量导入mysql数据到es #logstash挂载目录结构 [root@iZbp18drdmy0c96u5xnc1wZ logstash]# ls -R .: config mysql pipeline template ./config: jvm.options log4j2.properties logstash-sample.conf ...

空空如也

空空如也

1 2 3 4 5 ... 14
收藏数 264
精华内容 105
关键字:

es导入mysql

mysql 订阅