精华内容
下载资源
问答
  • reindex()是pandas对象的一个重要方法,其作用是创建一个新索引的新对象。 一、对Series对象重新索引 se1=pd.Series([1,7,3,9],index=['d','c','a','f']) se1 代码结果: d 1 c 7 a 3 f 9 dtype: int64 ...
  • reindex更多的不是修改pandas对象的索引,而只是修改索引的顺序,如果修改的索引不存在就会使用默认的None代替此行。且不会修改原数组,要修改需要使用赋值语句。 series.reindex() import pandas as pd import ...
  • es-reindex-用于复制ElasticSearch索引的简单ruby脚本 简单的ruby脚本,用于复制和重新索引ElasticSearch索引,复制索引设置和映射。 在滚动过程中将显示进度和时间估计。 要求 需要Ruby 1.8.6或更高版本,为方便...
  • reindex

    2021-01-07 17:23:15
    ​ 使用reindex方案解决。代码层次连接索引库的别名。这样在代码层次不用变化,只需要将原索引的别名删除,将别名设置到新索引上就可以做到不停用es而新建索引或修改索引映射,并且不影响业务使用。 1、问题索引...

    问题:

    ​ aggregation_test索引由于个人意外操作,添加了两个无用字段query和sole,索引中也出现了对应脏数据,现需要去除该字段。

    解决方案:

    ​ 使用reindex方案解决。代码层次连接索引库的别名。这样在代码层次不用变化,只需要将原索引的别名删除,将别名设置到新索引上就可以做到不停用es而新建索引或修改索引映射,并且不影响业务使用。

    1、问题索引mapping(query、sole字段意外新增)+ 索引别名

    #索引映射
    GET aggregation_test/_mapping
    
    {
      "aggregation_test": {
        "mappings": {
          "doc": {
            "properties": {
              "color": {
                "type": "keyword"
              },
              "make": {
                "type": "keyword"
              },
              "price": {
                "type": "integer"
              },
              "query": {#错误无用字段
                "properties": {
                  "bool": {
                    "properties": {
                      "must": {
                        "properties": {
                          "term": {
                            "properties": {
                              "id": {
                                "properties": {
                                  "value": {
                                    "type": "text",
                                    "fields": {
                                      "keyword": {
                                        "type": "keyword",
                                        "ignore_above": 256
                                      }
                                    }
                                  }
                                }
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                }
              },
              "sold": {
                "type": "date"
              },
              "sole": {#错误无用字段
                "type": "date"
              }
            }
          }
        }
      }
    }
    
    
    #原索引别名
    GET aggregation_test/_alias
    
    {
      "aggregation_test": {
        "aliases": {
          "aggregation_test_index": {}
        }
      }
    }
    

    2、查询原索引中的数据

    GET aggregation_test/_search
    {
      "size": 10, 
      "query": {
        "match_all": {}
      }
    }
    

    有两条是脏数据:
    在这里插入图片描述

    3、新建索引mapping

    PUT aggregation_test-v2
    {
      "settings": {
        "number_of_shards": "3",
        "number_of_replicas": 1
      },
      "mappings": {
        "doc": {
          "properties": {
            "color": {
              "type": "keyword"
            },
            "make": {
              "type": "keyword"
            },
            "price": {
              "type": "integer"
            },
            "sold": {
              "type": "date"
            }
          }
        }
      }
    }
    

    4、reindex第一次

    POST _reindex
    {
      "conflicts": "proceed",#报错跳过
      "source": {
        "index": "aggregation_test",#数据来源索引
        "size": 5,#每次数据复制数据量
        "_source": [#源数据中需要的字段
          "price",
          "color",
          "make",
          "sold"
        ],
        "query": {#只取查询条件中的数据(可以自己参考添加条件)
          "match_all": {}
        }
      },
      "dest": {
        "index": "aggregation_test-v2",#数据目标索引
        "op_type": "create"#是否是新数据(默认是以资源id来分辨的,如下来源数据id在目标索引中已存在,则跳过)
      },
      "script": {#同步过程中可以对数据进行处理
        "source": "ctx._source.price += 99",#价格字段值加99
        "lang": "painless"
      }
    }
    
    结果:数据少了???+并且报错脚本错误在这里插入图片描述
    {
      "error": {
        "root_cause": [
          {
            "type": "script_exception",
            "reason": "runtime error",
            "script_stack": [
              "ctx._source.price += 99", 
              "                     ^---- HERE"
            ],
            "script": "ctx._source.price += 99",
            "lang": "painless"
          }
        ],
        "type": "script_exception",
        "reason": "runtime error",
        "script_stack": [
          "ctx._source.price += 99",
          "                     ^---- HERE"
        ],
        "script": "ctx._source.price += 99",
        "lang": "painless",
        "caused_by": {
          "type": "null_pointer_exception",#因为有数据该字段值为空
          "reason": null
        }
      },
      "status": 500
    }
    

    4.1 删除新索引中的数据重新reindex,不使用脚本,防止报错

    #删除目标索引所有数据
    POST aggregation_test-v2/_delete_by_query?refresh
    {
      "query": {
        "match_all": {}
      }
    }
    
    #重新同步,不用脚本
    POST _reindex
    {
      "conflicts": "proceed",
      "source": {
        "index": "aggregation_test",
        "size": 5,
        "_source": [
          "price",
          "color",
          "make",
          "sold"
        ],
        "query": {
          "match_all": {}
        }
      },
      "dest": {
        "index": "aggregation_test-v2",
        "op_type": "create"
      }
    }
    
    #结果成功,同步数据9{
      "took": 1047,
      "timed_out": false,
      "total": 9,
      "updated": 0,
      "created": 9,
      "deleted": 0,
      "batches": 2,
      "version_conflicts": 0,
      "noops": 0,
      "retries": {
        "bulk": 0,
        "search": 0
      },
      "throttled_millis": 0,
      "requests_per_second": -1,
      "throttled_until_millis": 0,
      "failures": []
    }
    
    
    成功数据:但是有脏数据?这个时候,我们可以在reindex的查询条件中过滤这些脏数据

    在这里插入图片描述

    4.2 重新reindex,不要脏数据

    POST _reindex
    {
      "conflicts": "proceed",
      "source": {
        "index": "aggregation_test",
        "size": 5,
        "_source": [
          "price",
          "color",
          "make",
          "sold"
        ],
        "query": {
          "bool": {
            "must_not": [
              {
                "exists": {
                  "field": "sole"
                }
              },
              {
                "exists": {
                  "field": "query.bool.must.term.id.value"
                }
              }
            ]
          }
        }
      },
      "dest": {
        "index": "aggregation_test-v2",
        "op_type": "create"
      }
    }
    
    #响应
    {
      "took": 880,
      "timed_out": false,
      "total": 7,
      "updated": 0,
      "created": 7,
      "deleted": 0,
      "batches": 2,
      "version_conflicts": 0,
      "noops": 0,
      "retries": {
        "bulk": 0,
        "search": 0
      },
      "throttled_millis": 0,
      "requests_per_second": -1,
      "throttled_until_millis": 0,
      "failures": []
    }
    
    

    结果 ,符合预期,去掉两条脏数据

    在这里插入图片描述

    5、将新索引添加别名:aggregation_test_index

    #删除原索引别名
    DELETE aggregation_test/_alias/aggregation_test_index
    
    #给新索引添加别名
    PUT aggregation_test-v2/_alias/aggregation_test_index
    

    结果:符合预期

    在这里插入图片描述

    6、使用别名查询

    #使用别名查询
    POST aggregation_test_index/_search
    {
      "query": {"match_all": {}}
    }
    
    #结果
    {
      "took": 2,
      "timed_out": false,
      "_shards": {
        "total": 3,
        "successful": 3,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 7,
        "max_score": 1,
        "hits": [
          {
            "_index": "aggregation_test-v2",
            "_type": "doc",
            "_id": "BX7ab3YBwI6V-XWfWiJZ",
            "_score": 1,
            "_source": {
              "sold": "2014-07-02",
              "color": "blue",
              "price": 15000,
              "make": "toyota"
            }
          },
          {
            "_index": "aggregation_test-v2",
            "_type": "doc",
            "_id": "An7ab3YBwI6V-XWfWiJZ",
            "_score": 1,
            "_source": {
              "sold": "2014-10-28",
              "color": "red",
              "price": 10000,
              "make": "honda"
            }
          },
          {
            "_index": "aggregation_test-v2",
            "_type": "doc",
            "_id": "BH7ab3YBwI6V-XWfWiJZ",
            "_score": 1,
            "_source": {
              "sold": "2014-05-18",
              "color": "green",
              "price": 30000,
              "make": "ford"
            }
          },
          {
            "_index": "aggregation_test-v2",
            "_type": "doc",
            "_id": "B37ab3YBwI6V-XWfWiJZ",
            "_score": 1,
            "_source": {
              "sold": "2014-11-05",
              "color": "red",
              "price": 20000,
              "make": "honda"
            }
          },
          {
            "_index": "aggregation_test-v2",
            "_type": "doc",
            "_id": "CH7ab3YBwI6V-XWfWiJZ",
            "_score": 1,
            "_source": {
              "sold": "2014-01-01",
              "color": "red",
              "price": 80000,
              "make": "bmw"
            }
          },
          {
            "_index": "aggregation_test-v2",
            "_type": "doc",
            "_id": "Bn7ab3YBwI6V-XWfWiJZ",
            "_score": 1,
            "_source": {
              "sold": "2014-08-19",
              "color": "green",
              "price": 12000,
              "make": "toyota"
            }
          },
          {
            "_index": "aggregation_test-v2",
            "_type": "doc",
            "_id": "CX7ab3YBwI6V-XWfWiJZ",
            "_score": 1,
            "_source": {
              "sold": "2014-02-12",
              "color": "blue",
              "price": 25000,
              "make": "ford"
            }
          }
        ]
      }
    }
    

    7、回到script脚本有问题这里,修改脚本script

    #重新reindex成功,脚本判断price字段是否有值即可
    POST _reindex
    {
      "conflicts": "proceed",
      "source": {
        "index": "aggregation_test",
        "size": 5,
        "_source": [
          "price",
          "color",
          "make",
          "sold"
        ],
        "query": {
          "bool": {
            "must_not": [
              {
                "exists": {
                  "field": "sole"
                }
              },
              {
                "exists": {
                  "field": "query.bool.must.term.id.value"
                }
              }
            ]
          }
        }
      },
      "dest": {
        "index": "aggregation_test-v2",
        "op_type": "create"
      },
      "script": {
        "source": "if(ctx._source.price != null){ctx._source.price += 99}",
        "lang": "painless"
      }
    }
    
    #响应
    {
      "took": 964,
      "timed_out": false,
      "total": 7,
      "updated": 0,
      "created": 7,
      "deleted": 0,
      "batches": 2,
      "version_conflicts": 0,
      "noops": 0,
      "retries": {
        "bulk": 0,
        "search": 0
      },
      "throttled_millis": 0,
      "requests_per_second": -1,
      "throttled_until_millis": 0,
      "failures": []
    }
    

    结果查看price是否+99

    [外链图片转存中...(img-XY9PnP5p-1610008448620)]

    展开全文
  • J-ES-Reindex Java Command Line tool for reindexing Elasticsearch Indices $ java -jar J-ES-Reindex.jar --------------------------------- Elasticsearch Java Reindex tool v0.5 ------------------------...
  • ES索引重建reindex详解

    2021-09-26 10:25:55
    详细介绍索引重建reindex的使用场景和各种用法。

    一、使用场景

    1.分片数变更:当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。
    2. mapping字段变更:当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;但是在ES中,一个字段的mapping在定义并且导入数据之后是不能再修改的,所以这种情况下也可以考虑尝试使用Reindex。
    3. 分词规则修改,比如使用了新的分词器或者对分词器自定义词库进行了扩展,而之前保存的数据都是按照旧的分词规则保存的,这时候必须进行索引重建。


    二、_reindex

    官方说明地址:reindex

    ES提供了_reindex这个API。相比于我们重新导入数据肯定会快不少,实测速度大概是bulk导入数据的5-10倍。
    reindex的核心做跨索引、跨集群的数据迁移。

    Reindex 不会尝试设置目标索引。 它不会复制源索引的设置。 您应该在运行 _reindex 操作之前设置目标索引,包括设置映射、分片计数、副本等。

    先根据复制源索引创建新的目标索引,然后执行reindex命令。
    基础使用命令:

    POST _reindex
    {
      "source": {
        "index": "old_index"
      },
      "dest": {
        "index": "new_index"
      }
    }
    

    三、实战

    1、覆盖更新

    说明:
    "version_type": "internal",internal表示内部的,省略version_type或version_type设置为 internal 将导致 Elasticsearch 盲目地将文档转储到目标中,覆盖任何具有相同类型和 ID 的文件。
    这也是最常见的重建方式。

    POST _reindex
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter",
        "version_type": "internal"
      }
    }
    

    2、创建丢失的文档并更新旧版本的文档

    说明:
    "version_type": "external",external表示外部的,将 version_type 设置为 external 将导致 Elasticsearch 保留源中的版本,创建任何丢失的文档,并更新目标索引中版本比源索引中版本旧的任何文档。
    id不存在的文档会直接更新;id存在的文档会先判断版本号,只会更新版本号旧的文档。

    POST _reindex
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter",
        "version_type": "external"
      }
    }
    

    3、仅创建丢失的文档

    要创建的 op_type 设置将导致 _reindex 仅在目标索引中创建丢失的文档,所有存在的文档都会引起版本冲突。
    只要两个索引中存在id相同的记录,就会引起版本冲突

    POST _reindex
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter",
        "op_type": "create"
      }
    }
    

    4、冲突处理

    默认情况下,版本冲突会中止 _reindex 进程。 “冲突”请求正文参数可用于指示 _reindex 继续处理有关版本冲突的下一个文档。 需要注意的是,其他错误类型的处理不受“冲突”参数的影响。
    "conflicts": "proceed"在请求正文中设置时,_reindex 进程将继续处理版本冲突并返回遇到的版本冲突计数。

    POST _reindex
    {
      "conflicts": "proceed",
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter",
        "op_type": "create"
      }
    }
    

    5、source中添加查询条件

    POST _reindex
    {
      "source": {
        "index": "twitter",
        "query": {
          "term": {
            "user": "kimchy"
          }
        }
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    

    6、source中包含多个源索引

    源中的索引可以是一个列表,允许您在一个请求中从多个源中复制。 这将从 twitter 和 blog 索引中复制文档:

    POST _reindex
    {
      "source": {
        "index": ["twitter", "blog"]
      },
      "dest": {
        "index": "all_together"
      }
    }
    

    也支持*号来匹配多个索引。

    POST _reindex
    {
      "source": {
        "index": "twitter*"
      },
      "dest": {
        "index": "all_together"
      }
    }
    

    7、限制处理的记录数

    通过设置size大小来限制处理文档的数量。

    POST _reindex
    {
      "size": 10000,
      "source": {
        "index": "twitter",
        "sort": { "date": "desc" }
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    

    8、从远程ES集群中重建索引

    POST _reindex
    {
      "source": {
        "remote": {
          "host": "http://otherhost:9200",
          "username": "user",
          "password": "pass",
          "socket_timeout": "1m",
          "connect_timeout": "10s"
        },
        "index": "source",
        "query": {
          "match": {
            "test": "data"
          }
        }
      },
      "dest": {
        "index": "dest"
      }
    }
    

    9、提取随机子集

    说明:从源索引中随机取10条数据到新索引中。

    POST _reindex
    {
      "size": 10,
      "source": {
        "index": "twitter",
        "query": {
          "function_score" : {
            "query" : { "match_all": {} },
            "random_score" : {}
          }
        },
        "sort": "_score"    
      },
      "dest": {
        "index": "random_twitter"
      }
    }
    

    10、修改字段名称

    原索引

    POST test/_doc/1?refresh
    {
      "text": "words words",
      "flag": "foo"
    }
    

    重建索引,将原索引中的flag字段重命名为tag字段。

    POST _reindex
    {
      "source": {
        "index": "test"
      },
      "dest": {
        "index": "test2"
      },
      "script": {
        "source": "ctx._source.tag = ctx._source.remove(\"flag\")"
      }
    }
    

    结果:

    GET test2/_doc/1
    {
      "found": true,
      "_id": "1",
      "_index": "test2",
      "_type": "_doc",
      "_version": 1,
      "_seq_no": 44,
      "_primary_term": 1,
      "_source": {
        "text": "words words",
        "tag": "foo"
      }
    }
    

    四、性能优化

    常规的如果我们只是进行少量的数据迁移利用普通的reindex就可以很好的达到要求,但是当我们发现我们需要迁移的数据量过大时,我们会发现reindex的速度会变得很慢。
    数据量几十个G的场景下,elasticsearch reindex速度太慢,从旧索引导数据到新索引,当前最佳方案是什么?
    原因分析:
    reindex的核心做跨索引、跨集群的数据迁移。
    慢的原因及优化思路无非包括:
    1)批量大小值可能太小。需要结合堆内存、线程池调整大小;
    2)reindex的底层是scroll实现,借助scroll并行优化方式,提升效率;
    3)跨索引、跨集群的核心是写入数据,考虑写入优化角度提升效率。
    可行方案:
    1)提升批量写入的大小值size
    2)通过设置sliced提高写入的并行度

    1、提升批量写入大小值

    默认情况下 _reindex 使用 1000 的滚动批次。可以使用源元素source中的 size 字段更改批次大小:

    POST _reindex
    {
      "source": {
        "index": "source",
        "size": 5000
      },
      "dest": {
        "index": "dest"
      }
    }
    

    2、提高scroll的并行度

    Reindex 支持 Sliced Scroll 来并行化重新索引过程。 这种并行化可以提高效率并提供一种将请求分解为更小的部分的便捷方式。
    每个Scroll请求,可以分成多个Slice请求,可以理解为切片,各Slice独立并行,利用Scroll重建或者遍历要快很多倍。
    slicing的设定分为两种方式:手动设置分片、自动设置分片。

    自动设置分片如下:

    POST _reindex?slices=5&refresh
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    

    slices大小设置注意事项:
    1)slices大小的设置可以手动指定,或者设置slices设置为auto,auto的含义是:针对单索引,slices大小=分片数;针对多索引,slices=分片的最小值。
    2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
    3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能。
    效果
    实践证明,比默认设置reindex速度能提升10倍+。

    五、超时问题

    es中的请求超时时间默认是1分钟,当重建索引的数据量太大时,经常会出现超时。这种情况可以增大超时时间,也可以添加wait_for_completion=false参数将请求转为异步任务。

    POST _reindex?slices=9&refresh&wait_for_completion=false
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    

    1、获取reindex任务列表

    GET _tasks?detailed=true&actions=*reindex
    

    2、根据任务id查看任务

    GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619
    

    3、取消任务

    POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel
    

    总结

    本文主要介绍了ES索引重建的常见使用场景以及典型的使用方法,并说明了相关性能优化的技巧和请求超时问题的处理方法。

    展开全文
  • Magento2 ReIndex数据(Wbs_Reindexer)v1.0.0 Magento 2扩展 该扩展程序有助于从admin重新编制数据索引,而无需执行命令行(CLI)。 特征: 便于使用 来自管理员的简单重新索引数据 快速索引 安装 运行以下命令: ...
  • Reindex API 详解

    2021-02-24 17:28:44
    在开始讲解具体的API的时候,有一点必须知道,Reindex不会尝试设置目标索引。它不会复制源索引的设置。您应该在运行_reindex操作之前设置目标索引,包括设置映射,分片计数,副本等。当然,最常见的做法不会是手动的...

    在开始讲解具体的API的时候,有一点必须知道,Reindex不会尝试设置目标索引。它不会复制源索引的设置。您应该在运行_reindex操作之前设置目标索引,包括设置映射,分片计数,副本等。当然,最常见的做法不会是手动的设置索引。而是使用索引模版。只要索引模版的匹配形式可以匹配上源索引和目标索引,则我们不需要去考虑索引配置的问题,模版会为我们解决对应的问题。

    另外,下文只针对5.x及以后版本。

    基本操作

    对于reindex API来说,最常见的,就是简单的把documents从一个index复制到另一个。下面的例子中,我们把twitter index的内容复制到new_twitter index中:

    POST _reindex
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    返回到结果如下:

    {
      "took" : 147,
      "timed_out": false,
      "created": 120,
      "updated": 0,
      "deleted": 0,
      "batches": 1,
      "version_conflicts": 0,
      "noops": 0,
      "retries": {
        "bulk": 0,
        "search": 0
      },
      "throttled_millis": 0,
      "requests_per_second": -1.0,
      "throttled_until_millis": 0,
      "total": 120,
      "failures" : [ ]
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    version_type

    就像_update_by_query,_reindex会生成源索引的快照(snapshot),但它的目标必须是一个不同的索引,以便避免版本冲突。dest对象可以像index API一样进行配置,以乐观锁控制并发。像上面那样,不设置version_type或设置它将设置为internal。Elasticsearch将会直接将文档转储到dest中,覆盖任何发生的具有相同类型和id的document:

    POST _reindex
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter",
        "version_type": "internal"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    如果把version_type设置为external.则elasticsearch会从source读取version字段,当遇到具有相同类型和id的documents,只更新newer verion。

    POST _reindex
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter",
        "version_type": "external"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    上面说起来似乎有点不好理解。简单说来,就是在redinex的时候,你的dest index可以不是一个新的index,而是包含有数据的。如果你的source indexh和dest index里面有相同类型和id的document.对于使用internal,是直接覆盖。使用external的话,只有当source的version更加新的时候,才更新。

    op_type

    op_type设置为create,_reindex API,只在dest index中添加不不存在的doucments。如果相同的documents已经存在,则会报version confilct的错误。

    POST _reindex
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter",
        "op_type": "create"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    conflicts配置

    默认情况下,当发生version conflict的时候,_reindex会被abort。除非把conflicts设置为“proceed”:

    POST _reindex
    {
      "conflicts": "proceed",
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter",
        "op_type": "create"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    query配置

    我们还可以通过query,把需要_reindex的document限定在一定的范围。下面的例子,就只copy了作者是’kimchy’的twitter到new_twitter:

    POST _reindex
    {
      "source": {
        "index": "twitter",
        "type": "tweet",
        "query": {
          "term": {
            "user": "kimchy"
          }
        }
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    将多个索引reindex到一个目标

    POST _reindex
    {
      "source": {
        "index": ["twitter", "blog"],
        "type": ["tweet", "post"]
      },
      "dest": {
        "index": "all_together"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这里要注意的是,如果twitter和blog中有document的id是一样的,则无法保证最终出现在all_together里面的document是哪个,因为迭代是随机的。(最后一个会覆盖前一个)

    size设置

    通过size可以控制复制多少内容,下例只复制了一个document:

    POST _reindex
    {
      "size": 1,
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    sort配置

    下例,把时间排序前10000个document,复制到目标:

    POST _reindex
    {
      "size": 10000,
      "source": {
        "index": "twitter",
        "sort": { "date": "desc" }
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    只复制特定的field

    POST _reindex
    {
      "source": {
        "index": "twitter",
        "_source": ["user", "tweet"]
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    使用script

    POST _reindex
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter",
        "version_type": "external"
      },
      "script": {
        "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
        "lang": "painless"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    使用Ingest Node

    这个功能应该说是最好用的了。当你的source是因为不合理的结构,需要重新结构化所有的数据时,通过ingest node, 可以很方便的在新的index中获得不一眼的mapping和value:

    POST _reindex
    {
      "source": {
        "index": "source"
      },
      "dest": {
        "index": "dest",
        "pipeline": "some_ingest_pipeline"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    reindex远程服务器的索引到本地

    使用remote属性:

    POST _reindex
    {
      "source": {
        "remote": {
          "host": "http://otherhost:9200",
          "username": "user",
          "password": "pass"
        },
        "index": "source",
        "query": {
          "match": {
            "test": "data"
          }
        }
      },
      "dest": {
        "index": "dest"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    对于在其他集群上的index,就不存在本地镜像复制的便利。需要从网络上下载数据再写到本地,默认的,buffer的size是100M。在scroll size是1000的情况下,如果单个document的平均大小超过100Kb,则有可能会报错。因此在在遇到非常大的documents,需要减小batch的size:

    POST _reindex
    {
      "source": {
        "remote": {
          "host": "http://otherhost:9200"
        },
        "index": "source",
        "size": 100,
        "query": {
          "match": {
            "test": "data"
          }
        }
      },
      "dest": {
        "index": "dest"
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    Response body

    一般来说,是这样的:

    {
      "took" : 639,
      "updated": 0,
      "created": 123,
      "batches": 1,
      "version_conflicts": 2,
      "retries": {
        "bulk": 0,
        "search": 0
      }
      "throttled_millis": 0,
      "failures" : [ ]
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    这里要注意的是failures,如果里面的值不为空,则代表本次_reindex是失败的,是被中途abort,一般都是因为发生了conflicts。前面已经说过,如何设置,在发生conflict的时候继续运行。

    使用Task API查询进度

    一般来说,如果你的source index很大,则可能需要比较长的时间来完成_reindex的工作。如果需要查看进度,可以通过_tasks API:

    GET _tasks?detailed=true&actions=*reindex
    • 1

    返回结果如下:

    {
      "nodes" : {
        "r1A2WoRbTwKZ516z6NEs5A" : {
          "name" : "r1A2WoR",
          "transport_address" : "127.0.0.1:9300",
          "host" : "127.0.0.1",
          "ip" : "127.0.0.1:9300",
          "attributes" : {
            "testattr" : "test",
            "portsfile" : "true"
          },
          "tasks" : {
            "r1A2WoRbTwKZ516z6NEs5A:36619" : {
              "node" : "r1A2WoRbTwKZ516z6NEs5A",
              "id" : 36619,
              "type" : "transport",
              "action" : "indices:data/write/reindex",
              "status" : { "total" : 6154, "updated" : 3500, "created" : 0, "deleted" : 0, "batches" : 4, "version_conflicts" : 0, "noops" : 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0 },
              "description" : "" }
          }
        }
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    上面表示,一共需要处理6154个document,现在已经update了3500个。

    取消reindex

    射出去的箭还是可以收回的,通过_tasks API:

    POST _tasks/task_id:1/_cancel
    • 1

    这里的task_id,可以通过上面的å_tasks API获得。

    展开全文
  • 1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。 2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入...

    一、应用背景

       ES在创建好索引后,mappingproperties属性类型是不能更改的只能添加。如果说需要修改字段就需要重新建立索引然后把旧数据导到新索引。

    • 1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex

    • 2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;但是在ES中,一个字段的mapping在定义并且导入数据之后是不能再修改的,所以这种情况下也可以考虑尝试使用Reindex

      5.X版本后新增_reindex APIReindex可以直接在Elasticsearch集群里面对数据进行重建。并且支持跨集群间的数据迁移。

    二、数据迁移步骤:

    2.0、源索引

    PUT /test_v1
    {
      "mappings": {
        "properties": {
          "id": {
            "type": "integer"
          },
          "name": {
            "type": "text",
            "fields": {
              "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
              }
            }
          },
          "age": {
            "type": "long"
          }
        }
      }, 
      "settings": {
        "number_of_replicas": 2, 
        "number_of_shards": 3
      },
      "aliases": {
        "test": {},
        "test_read": {},
        "test_write":{}
      }
    }
    

    2.1、创建目标索引(这步非常重要)

      为什么这么说? 因为一开始,在备份数据的场景下,我以为dest index不用创建,最后测试发现如果dest index不创建,对于非text类型数据,可能会变成text类型(不建议这样做,所以最好创建好dest index)。

    PUT /test_dest
    {
      "mappings": {
        "properties": {
          "id": {
            "type": "integer"
          },
          "name": {
            "type": "text",
            "fields": {
              "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
              }
            }
          },
          "age": {
            "type": "long"
          }
        }
      }, 
      "settings": {
        "number_of_replicas": 0,    
        "number_of_shards": 3,
        "refresh_interval": "-1"
      }
    }
    
    • 并修改了number_of_replicasrefresh_interval
    • 设置number_of_replicas0防止我们迁移文档的同时又发送到副本节点,影响性能
    • 设置refresh_interval-1是限制其刷新。默认是1秒
    • 当我们数据迁移完成再把上面两个值进行修改即可

    2.2、拷贝数据

    POST _reindex
    {
      "source": {
        "index": "old_index",
        "size":1000   //可选,每次批量提交1000个,可以提高效率,建议每次提交5-15M的数据
      },
      "dest": {
        "index": "new_index"
      }
    }
    

    这时候去看数据,是看不到数据的,因为还要刷新才行。

    2.3、恢复刷新和副本配置

    PUT /test_dest/_settings
    {
      "refresh_interval": "1s",
      "number_of_replicas": 2
    }
    

    更新副本数和刷新时间,自此数据迁移就完成了,因为之前的索引不用,但是接口都是指向之前的索引,我们就在新索引添加别名即可。

    2.4、如果迁移需要,切换别名

    校验完数据的完整性和正确性,再进行别名的切换。

     
    POST /_aliases 
    {
      "actions": [
        {
          "remove": {
            "index": "test_v1",
            "alias": "test"
          }
        },
        {
          "remove": {
            "index": "test_v1",
            "alias": "test_read"
          }
        },
        {
          "remove": {
            "index": "test_v1",
            "alias": "test_write"
          }
        },
        {
          "add": {
            "index": "test_dest",
            "alias": "test"
          }
        },
        {
          "add": {
            "index": "test_dest",
            "alias": "test_read"
          }
        },
        {
          "add": {
            "index": "test_dest",
            "alias": "test_write"
          }
        }
      ]
    }
    

    三、重要注意事项

    3.1、备份数据时,目标索引必须创建,否则改变了目标索引

    没有创建目标索引,导致id类型从integer变成了long, 分片和副本都变成1。
    在这里插入图片描述

    3.2、扩集群迁移

    • 和同集群数据迁移基本一样,就是多了一个设置白名单而已。
      • (如果是A集群 --> B集群,就需要在B中的elasticsearch.yml 设置A地址为白名单)
      • reindex.remote.whitelist: 172.16.1.236:9200
    • 设置好索引、number_of_replicas: 0、refresh_interval: -1
    • remote中设置远程集群的地址与账号密码(如果配置了的话)。
    • 也可以添加query属性,只查询符号条件的。
    
    
    POST /_reindex
    {
      "source": {
        "index": "test_v1",
        "remote": {
          "host": "http://destHost:9200",
          "username": "username",
          "password": "password"
        },
        "query": {
          "match_all": {}
        }
      },
      "dest": {
        "index": "test_remote"
      }
    }
    

    完成之后记得重新配置远程集群索引的number_of_replicas、refresh_interval

    3.3、数据迁移效率

    3.3.1、问题发现:

      常规的如果我们只是进行少量的数据迁移利用普通的reindex就可以很好的达到要求,但是当我们发现我们需要迁移的数据量过大时,我们会发现reindex的速度会变得很慢

      数据量几十个G的场景下,elasticsearch reindex速度太慢,从旧索引导数据到新索引,当前最佳方案是什么?

    3.3.2、原因分析:

    reindex的核心做跨索引跨集群的数据迁移。
    慢的原因及优化思路无非包括:

    • 1)批量大小值可能太小。需要结合堆内存、线程池调整大小;
    • 2)reindex的底层是scroll实现,借助scroll并行优化方式,提升效率;
    • 3)跨索引、跨集群的核心是写入数据,考虑写入优化角度提升效率。

    3.3.3、可行方案:

    3.3.3.1、提升批量写入大小值

    默认情况下,_reindex使用1000进行批量操作,您可以在source中调整batch_size

    POST _reindex
    {
      "source": {
        "index": "source",
        "size": 5000
      },
      "dest": {
        "index": "dest"
      }
    }
    
    

    批量大小设置的依据:

    1、使用批量索引请求以获得最佳性能。

    批量大小取决于数据、分析和集群配置,但一个好的起点是每批处理5-15 MB

    注意,这是物理大小。文档数量不是度量批量大小的好指标。例如,如果每批索引1000个文档:

    • 1)每个1kb的1000个文档是1mb。

    • 2)每个100kb的1000个文档是100 MB。

    这些是完全不同的体积大小。

    2、逐步递增文档容量大小的方式调优。

    • 1)从大约5-15 MB的大容量开始,慢慢增加,直到你看不到性能的提升。然后开始增加批量写入的并发性(多线程等等)。

    • 2)使用kibana、cerebro或iostat、top和ps等工具监视节点,以查看资源何时开始出现瓶颈。如果您开始接收EsRejectedExecutionException,您的集群就不能再跟上了:至少有一个资源达到了容量。

    要么减少并发性,或者提供更多有限的资源(例如从机械硬盘切换到ssd固态硬盘),要么添加更多节点。

    3.3.3.2、借助scroll的sliced提升写入效率

    Reindex支持Sliced Scroll以并行化重建索引过程。 这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。

    sliced原理(from medcl)

    • 1)用过Scroll接口吧,很慢?如果你数据量很大,用Scroll遍历数据那确实是接受不了,现在Scroll接口可以并发来进行数据遍历了。
    • 2)每个Scroll请求,可以分成多个Slice请求,可以理解为切片,各Slice独立并行,利用Scroll重建或者遍历要快很多倍。

    slicing使用举例

    slicing的设定分为两种方式:手动设置分片、自动设置分片
    手动设置分片参见官网。
    自动设置分片如下:

    POST _reindex?slices=5&refresh
    {
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter"
      }
    }
    

    slices大小设置注意事项:

    • 1)slices大小的设置可以手动指定,或者设置slices设置为autoauto的含义是:
      • 针对单索引,slices大小=分片数
      • 针对多索引,slices=分片的最小值
    • 2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
    • 3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能

    效果: 实践证明,比默认设置reindex速度能提升10倍+。

    关注我的公众号【宝哥大数据】,更多干货

    在这里插入图片描述

    展开全文
  • 11.reindex操作

    2020-10-20 11:09:49
    Reindex API简介2. 从远程集群重建索引3. URL参数4. 响应体5. 配合Task API使用1. 查找任务2. 配合取消任务API使用3. 重置节流阀4. 修改字段名6. 使用slice并行执行1. 手动切片2. 自动切片3. 挑选slice的数量8. ...
  • ES reindex介绍

    千次阅读 2020-07-28 16:44:19
    参考资料: https://gaoming.blog.csdn.net/article/details/82734667 ... ... https://www.elastic.co/guide/en/elasticsearch/reference/6.5/docs-reindex.html https://w
  • elasticsearch 基础 —— ReIndex

    万次阅读 2018-09-17 11:34:34
    Reindex会将一个索引的数据复制到另一个已存在的索引,但是并不会复制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。 一、reindex的常用操作 1、reindex基础实现  _reindex会将一个索引...
  • 按照业务规划设计进行创建步骤二、重建索引准备,禁用刷新和副本复制步骤三、重建索引,并通过参数设置加快索引重建效率步骤四、恢复新索引副本数和刷新间隔步骤五、检查索引重建情况elasticsearch _reindex api详细...
  • 在我们开发的过程中,我们有很多时候需要用到Reindex接口。它可以帮我们把数据从一个index到另外一个index进行重新reindex。这个对于特别适用于我们在修改我们数据的mapping后,需要重新把数据从现有的index转到新的...
  • Elasticsearch _reindex Alias使用

    千次阅读 2019-06-27 11:43:24
    ES数据库重建索引——Reindex(数据迁移) 应用背景: 1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。 2、当数据的mapping...
  • reindex的速率提升

    2021-03-24 21:12:45
    1、reindex的速率极慢,是否有办法改善? 以下问题来自社区:https://elasticsearch.cn/question/3782 reindex 问题1:reindex和snapshot的速率极慢,是否有办法改善? reindex和snapshot的速率比用filebeat或者...
  • es reindex使用

    2020-11-17 23:23:03
    es reindex使用 原因 因为之前在建索引的时候,仅仅只是考虑到了分片最大管理数量(2^31≈21亿)导致建立的索引都是写满了这些分片,也导致了每个分片的size出奇的大,有的在1tb左右的数据量了,严重的拖慢了速度,...
  • dataframe索引设定reindex

    2021-08-12 21:51:13
    reindex:索引对齐,重新排序 pandas中的reindex方法可以为series和dataframe添加或者删除索引,如果新添加的索引没有对应的值,则默认为nan。如果减少索引,就相当于一个切片操作。 df.reindex(index=[1101,1203,...
  • 参考链接:https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.reindex.html#pandas.DataFrame.reindexDataFrame.reindex(labels=None,index=None,columns=None,axis=None,method=None,c....
  • Reindex API3.1 两个注意点3.2. OP Type3.3. 跨集群 ReIndex3.4 查看 Task API4. 数据迁移5. 数据迁移效率 1. 应用场景: 当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时...
  • 在用pandas的时候,写了这一段语句 `nd = d.reindex(index = ni,columns = nc,method = 'bfill')`,报错index must be monotonic increasing or decreasing,没找到比较好的解释,就自己推了一下
  • pandas reindex重新索引

    2020-12-22 20:33:39
    reindex 是 pandas 对象的一个重要方法,其作用是创建一个适应新索引的新对象。例如:定义一个 Series 对象为 se1=pd.Series([1,4,-1,2],index=['a','e','d','c']),现在调用 Series 对象的重新索引 se1.reindex(['a...
  • pandas共有两种数据结构类型分别为series和dataframe,他们的区别和联系如下: 区别: series,只是一个一维数据结构,它由index和value组成。 dataframe,是一个二维结构,除了拥有index和value之外,还拥有column。...
  • # 使用 kibana 的案例数据来做演示。...POST _reindex?requests_per_second=100&slices=3 { "source": { "index": "kibana_sample_data_logs" }, "dest": { "index": "kibana_sample_data_logs_01" } }.
  • elasticsearch5.5 Reindex API 异步重建索引

    千次阅读 2019-02-19 17:23:14
    elasticsearch Reindex API 异步重建索引Reindex API注意基本用法设置version_type设置version_type为internal设置version_type为external设置op_type根据限制条件重建索引 Reindex API 这是针对Elasticsearch官方...
  • Python Reindex产生Nan

    2021-07-16 15:30:48
    Here is the code that I am working with:import pandas as pdtest3 = pd.Series([1,2,3], index = ['a','b','c'])test3 = test3.reindex(index = ['f','g','z'])So originally every thing is fine and test3 has ...
  • Reindex可以直接在Elasticsearch集群里面对数据进行重建。并且支持跨集群间的数据迁移。 三、实战 1、原索引 比如我现在有这么一个索引:topic,mapping信息如下: { "settings": { "number...
  • Python reindex使用详解

    千次阅读 2021-01-14 08:29:17
    首先介绍reindex,它的作用是创建一个新对象,新对象的数据符合新的索引。示例如下所示从上图可以看出,使用reindex后,索引进行了重排。如果某个索引值不存在,就会引入缺失值:如果不想使用缺失值,可以通过fill_...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 21,270
精华内容 8,508
关键字:

reindex

友情链接: erweibili.rar