使用scroll方式迁移数据,类似于 github上nodejs写的elasticsearch-dump 。
依赖包
# 我这里演示的ES是7.x的,如果下面的脚本运行报错,请考虑调整这里的python的elasticsearch包版本
pip install elasticsearch==7.13.1
配置文件
vim configs.py
# -*- coding: utf-8 -*-
# es数据源的信息
es_source_host = ['127.0.0.1:9200'] # 支持多个节点间用逗号分隔
es_source_index = "index-test1"
# es目标库的信息
es_dest_host = ['127.0.0.1:9200']
es_dest_index = "index-test2"
# 每次取的条数
batch_size = 2000
# 每轮休眠的时间(单位秒)
sleep_time = 0
主程序
vim run.py
# -*- coding: utf-8 -*-
import json
import time
import configs
from elasticsearch import Elasticsearch
src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)
start_ts = time.time()
scroll_time = '5m' # 指定 Scroll 上下文的存活时间
src_index_name = configs.es_source_index
dest_index_name = configs.es_dest_index
def create_dest_index():
try:
dest_es.indices.create(
index=configs.es_dest_index,
body={"settings": {"index": {"number_of_shards": 4}}},
)
except Exception as e:
print(str(e))
def update_dest_index_setting(time_dur,replicas):
try:
res = dest_es.indices.put_settings(
index=configs.es_dest_index,
body={"index.refresh_interval": time_dur, "number_of_replicas": replicas},
)
print(res)
except Exception as e:
print(str(e))
def update_dest_index_mapping():
dest_mapping = src_es.indices.get_mapping(index=configs.es_source_index)[configs.es_source_index]["mappings"]
try:
res = dest_es.indices.put_mapping(body=dest_mapping, index=configs.es_dest_index)
print(res)
except Exception as e:
print(str(e))
def migrate():
query = {
"query": {
"match_all": {} # 查询所有文档
}
}
# 计数下,用于最后确认scroll的次数
count = 0
# 初始化 Scroll 上下文
response = src_es.search(index=src_index_name, scroll=scroll_time, body=query,size=configs.batch_size)
scroll_id = response['_scroll_id']
hits = response['hits']['hits']
# 处理第一批结果,拼装bulk需要的数据结构
data_list1=[]
for hit in hits:
data1={}
_id, _source = hit["_id"], hit["_source"]
data1["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
data_list1.append(data1["index"])
data_list1.append(_source)
# 把第一次找出的数据,拼装好的结果写入目标ES
dest_res = dest_es.bulk(index=dest_index_name, body=data_list1)
if dest_res["errors"]:
for item in response["items"]:
if "error" in item["index"]:
print(f"Failed operation: {item['index']}")
else:
print("Bulk operations completed successfully!")
count += 1
# 滚动获取剩余结果
while True:
if len(hits) < 0:
break
response = src_es.scroll(scroll_id=scroll_id, scroll=scroll_time)
scroll_id = response['_scroll_id']
# print("scroll_id ---> ", scroll_id )
hits = response['hits']['hits']
# 拼装bulk需要的数据结构
data_list2=[]
for hit in hits:
data2={}
_id, _source = hit["_id"], hit["_source"]
data2["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
data_list2.append(data2["index"])
data_list2.append(_source)
if len(data_list2) <=0:
break
dest_res = dest_es.bulk(index=dest_index_name, body=data_list2)
if dest_res["errors"]:
for item in response["items"]:
if "error" in item["index"]:
print(f"Failed operation: {item['index']}")
else:
print("Bulk operations completed successfully!")
time.sleep(configs.sleep_time)
count += 1
stop_ts = time.time()
print('scroll 遍历的次数: ', count, '耗时(秒):', int(stop_ts - start_ts))
if __name__ == '__main__':
create_dest_index() # 创建目标索引
update_dest_index_setting("60s",0) # 临时降低持久性,提升写入性能
update_dest_index_mapping() # 复制mapping
migrate() # 数据同步
update_dest_index_setting("1s",1) # 提升持久性,确保数据安全性
执行
python run.py
效率
测试下来,速度还是很给力的。
测试数据集:
docs: 639566
primary size: 179.78MB
耗时:
elasticsearch-dump迁移耗时7分钟。
python脚本迁移耗时 4分钟(可能是因为我脚本里面的迁移前先调大refresh的功劳?)。
20230805 更新
受打击了。
最近在研究极限网关的日志记录的时候,顺手又刷到了medcl大神当年的工具esm,试了下esm的迁移速度大约比我的py脚本快4倍。
因此,生产如果要迁移数据的话,推荐使用medcl的esm工具。
地址:https://github.com/medcl/esm
wget https://github.com/medcl/esm/releases/download/v0.6.1/migrator-linux-amd64
mv migrator-linux-amd64 /bin/esm
我这里只列一个命令,其余的可以参考官方文档
$ /bin/esm -s http://localhost:9200 -x "index-t3" -y "index-t3-new" -d http://localhost:9200 -c 10000 --shards=5 --copy_settings --copy_mappings --force --refresh
index-t3
[08-05 10:18:42] [INF] [main.go:349,main] start settings/mappings migration..
[08-05 10:18:42] [ERR] [http.go:302,Request] {"acknowledged":true,"shards_acknowledged":true,"index":"index-t3-new"}
[08-05 10:18:42] [ERR] [http.go:302,Request] {"acknowledged":true}
[08-05 10:18:42] [INF] [main.go:469,main] settings/mappings migration finished.
[08-05 10:18:42] [INF] [main.go:480,main] start data migration..
Scroll 180030 / 180030 [==============================================================================================================================================================================] 100.00% 39s
Bulk 180030 / 180030 [================================================================================================================================================================================] 100.00% 39s
[08-05 10:19:22] [INF] [main.go:509,main] data migration finished.
[08-05 10:19:22] [ERR] [http.go:302,Request] {"acknowledged":true}