使用Python脚本实现ElasticSearch的在线数据迁移【文末彩蛋】
  jKnO7FXA2HI8 2023年11月02日 56 0

使用scroll方式迁移数据,类似于 github上nodejs写的elasticsearch-dump 。


依赖包

# 我这里演示的ES是7.x的,如果下面的脚本运行报错,请考虑调整这里的python的elasticsearch包版本
pip install elasticsearch==7.13.1


配置文件

vim configs.py

# -*- coding: utf-8 -*-

# es数据源的信息
es_source_host = ['127.0.0.1:9200']  # 支持多个节点间用逗号分隔
es_source_index = "index-test1"

# es目标库的信息
es_dest_host = ['127.0.0.1:9200']
es_dest_index = "index-test2"

# 每次取的条数
batch_size = 2000

# 每轮休眠的时间(单位秒)
sleep_time = 0


主程序

vim run.py

# -*- coding: utf-8 -*-
import json
import time
import configs

from elasticsearch import Elasticsearch

src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)

start_ts = time.time()

scroll_time = '5m'  # 指定 Scroll 上下文的存活时间

src_index_name = configs.es_source_index
dest_index_name = configs.es_dest_index

def create_dest_index():
    try:
        dest_es.indices.create(
            index=configs.es_dest_index,
            body={"settings": {"index": {"number_of_shards": 4}}},
        )

    except Exception as e:
        print(str(e))

def update_dest_index_setting(time_dur,replicas):
    try:
        res = dest_es.indices.put_settings(
            index=configs.es_dest_index,
            body={"index.refresh_interval": time_dur, "number_of_replicas": replicas},
        )
        print(res)
    except Exception as e:
        print(str(e))


def update_dest_index_mapping():
    dest_mapping = src_es.indices.get_mapping(index=configs.es_source_index)[configs.es_source_index]["mappings"]
    try:
        res = dest_es.indices.put_mapping(body=dest_mapping, index=configs.es_dest_index)
        print(res)
    except Exception as e:
        print(str(e))



def migrate():
    query = {
        "query": {
            "match_all": {}  # 查询所有文档
        }
    }

    # 计数下,用于最后确认scroll的次数
    count = 0

    # 初始化 Scroll 上下文
    response = src_es.search(index=src_index_name, scroll=scroll_time, body=query,size=configs.batch_size)
    scroll_id = response['_scroll_id']
    hits = response['hits']['hits']

    # 处理第一批结果,拼装bulk需要的数据结构
    data_list1=[]
    for hit in hits:
        data1={}
        _id, _source = hit["_id"], hit["_source"]
        data1["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
        data_list1.append(data1["index"])
        data_list1.append(_source)

    # 把第一次找出的数据,拼装好的结果写入目标ES
    dest_res = dest_es.bulk(index=dest_index_name, body=data_list1)
    if dest_res["errors"]:
        for item in response["items"]:
            if "error" in item["index"]:
                print(f"Failed operation: {item['index']}")
    else:
        print("Bulk operations completed successfully!")

    count += 1

    # 滚动获取剩余结果
    while True:
        if len(hits) < 0:
            break

        response = src_es.scroll(scroll_id=scroll_id, scroll=scroll_time)
        scroll_id = response['_scroll_id']
        # print("scroll_id ---> ", scroll_id )
        hits = response['hits']['hits']

        # 拼装bulk需要的数据结构
        data_list2=[]
        for hit in hits:
            data2={}
            _id, _source = hit["_id"], hit["_source"]
            data2["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
            data_list2.append(data2["index"])
            data_list2.append(_source)

        if len(data_list2) <=0:
            break
        dest_res = dest_es.bulk(index=dest_index_name, body=data_list2)
        if dest_res["errors"]:
            for item in response["items"]:
                if "error" in item["index"]:
                    print(f"Failed operation: {item['index']}")
        else:
            print("Bulk operations completed successfully!")

        time.sleep(configs.sleep_time)

        count += 1


    stop_ts = time.time()
    print('scroll 遍历的次数: ', count, '耗时(秒):', int(stop_ts - start_ts))



if __name__ == '__main__':
    create_dest_index()  # 创建目标索引
    update_dest_index_setting("60s",0)  # 临时降低持久性,提升写入性能
    update_dest_index_mapping()  # 复制mapping
    migrate() # 数据同步
    update_dest_index_setting("1s",1)  # 提升持久性,确保数据安全性


执行

python run.py


效率

测试下来,速度还是很给力的。
测试数据集:
	docs: 639566
	primary size: 179.78MB

耗时:
elasticsearch-dump迁移耗时7分钟。
python脚本迁移耗时 4分钟(可能是因为我脚本里面的迁移前先调大refresh的功劳?)。



20230805 更新

受打击了。

最近在研究极限网关的日志记录的时候,顺手又刷到了medcl大神当年的工具esm,试了下esm的迁移速度大约比我的py脚本快4倍。

因此,生产如果要迁移数据的话,推荐使用medcl的esm工具。

地址:https://github.com/medcl/esm

wget https://github.com/medcl/esm/releases/download/v0.6.1/migrator-linux-amd64
mv migrator-linux-amd64 /bin/esm

我这里只列一个命令,其余的可以参考官方文档

$ /bin/esm -s http://localhost:9200 -x "index-t3" -y "index-t3-new"  -d http://localhost:9200  -c 10000 --shards=5  --copy_settings --copy_mappings --force  --refresh
index-t3
[08-05 10:18:42] [INF] [main.go:349,main] start settings/mappings migration..
[08-05 10:18:42] [ERR] [http.go:302,Request] {"acknowledged":true,"shards_acknowledged":true,"index":"index-t3-new"}
[08-05 10:18:42] [ERR] [http.go:302,Request] {"acknowledged":true}
[08-05 10:18:42] [INF] [main.go:469,main] settings/mappings migration finished.
[08-05 10:18:42] [INF] [main.go:480,main] start data migration..
Scroll 180030 / 180030 [==============================================================================================================================================================================] 100.00% 39s
Bulk 180030 / 180030 [================================================================================================================================================================================] 100.00% 39s
[08-05 10:19:22] [INF] [main.go:509,main] data migration finished.
[08-05 10:19:22] [ERR] [http.go:302,Request] {"acknowledged":true}




【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  eHipUjOuzYYH   2023年11月02日   36   0   0 sedelasticsearchIP