Elasticsearch——Document
  vxNQtvtQlfbi 2023年11月02日 28 0

Document:文档

  • Elasticsearch是面向文档的,文档是所有可搜索数据的最小基础信息单元。
  • 一个Document就像数据库中的一行记录,文档会被序列化成JSON格式,保持在Elasticsearch中,多个Document存储于一个索引(Index)中。文档以JSON(Javascript Object Notation)格式来表示,而JSON是一个到处存在的互联网数据交互格式。
  • 每一个文档都有一个UniqueID

文档的元数据

  • 元数据:用于标注稳定的相关信息
  • _index:文档所属的索引名
  • _type:文档所属的类型名
  • _id:文档的主键,在写入的时候,可以指定该Doc的ID值,如果不指定,则系统自动生成一个唯一的UUID值。
  • _source:文档的原始Json数据
  • _version:文档的版本信息,Elasticsearch通过使用version来保证对文档的变更能以正确的顺序执行,避免乱序造成的数据丢失。
  • _score:相关性打分。
  • _seq_no:严格递增的顺序号,每个文档一个,Shard级别严格递增,保证后写入的Doc的_seq_no大于先写入的Doc的_seq_no。
  • primary_term:primary_term也和_seq_no一样是一个整数,每当Primary Shard发生重新分配时,比如重启,Primary选举等,_primary_term会递增1。
  • found:查询的ID正确那么ture, 如果 Id 不正确,就查不到数据,found字段就是false。

生成文档id

手动生成

场景:从数据库或其他系统导入时,本身有唯一主键。 用法:PUT /index/_doc/id

PUT /test_index/_doc/1

{
	"test_field": "test"
}

自动生成

用法:PUT /index/_doc

PUT /test_index/_doc

{
	"test_field": "test"
}

自动id特点:长度为20个字符,URL安全,base64编码,GUID,分布式生成不冲突。

_source字段

含义:插入数据时的所有字段和值,在get获取数据时,在_source字段中原样返回。

定制返回字段:GET /index/_doc/id?_source_includes=field1,field2

文档的替换与删除

全量替换

执行两次,返回结果中版本号(_version)在不断上升,此过程为全量替换。

PUT /test_index/_doc/1

{
	"test_field": "test"
}

实质:旧文档的内容不会立即删除,只是标记为delete,适当的时机,集群会将这些文档删除。

强制创建

为防止覆盖原有数据,我们在新增时,设置为强制创建,就不会覆盖原有文档。 语法:PUT /index/_doc/id/_create

PUT /test_index/_doc/1/_create

{
	"test_field": "test"
}

删除

语法:DELETE /index/_doc/id 实质:旧文档的内容不会立即删除,只是标记为deleted,适当的时机,集群会将这些文档删除,lazy delete。

局部替换 partial update

使用PUT /index/type/id为文档全量替换,需要将文档所有数据提交。 partial update局部替换则只修改变动字段。

用法:

PUT /index/type/id/_update

{
	"doc": {
		"field": "value"
	}
}

内部原理

内部与全量替换是一样的,旧文档标记为删除,新建一个文档。

步骤:

  • 1、es获取内部旧文档。
  • 2、将传来的文档field更新到旧数据(内存中)
  • 3、将旧文档标记为delete
  • 4、创建新文档。

优点:

  • 大大减少网络传输次数和流量,提升性能。
  • 减少并发冲突发生的概率。

使用脚本更新

es可以内置脚本执行复杂操作,例如painless脚本。 注意:groovy脚本在es6以后就不支持了,原因是耗内存,不安全远程注入漏洞。

内置脚本

插入数据

PUT /test_index/_doc/6

{
	"num": 0
}

执行脚本操作:

PUT /test_index/_doc/6/_update

{
	"script": "ctx._source.num+=1"
}

外部脚本

Painless是内置的,脚本内容可以通过多种途径传给es,包括rest接口,或者放到config/scripts目录等,默认开启。

注意:脚本性能底下,容易发生注入。

官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html

elasticsearch的并发问题

如同秒杀,多线程情况下,es同样会出现并发冲突问题。

实现基于_version的版本控制

es对于文档的增删改都是基于_version版本号的。

  • 1、多次新增,返回版本号递增
PUT /test_index/_doc/3
{
  "test_field": "test"
}

  • 2、删除此文档
DELETE /test_index/_doc/3

返回

{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "3",
  "_version" : 4,
  "result" : "deleted",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 3,
  "_primary_term" : 1
}

  • 3、删除后新增

可以看到版本号依然递增,验证延迟删除策略。

如果删除一条数据,es立马删除的话,所有分片和副本都要立马删除,对es集群来说压力太大。

es内部主从同步并发控制

es内部主从同步时,是多线程异步,乐观锁机制。

java api实现文档管理

es技术特点

  • 1、es技术比较特殊,不像其他分布式,es代码层面很好写,难的是概念的理解。
  • 2、es最重要的是它的rest api,跨语言的,在真实生产中,查询数据、分析数据,使用rest api更方便。

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.3/java-rest-overview.html

java客户端简单获取数据

引入maven依赖

<dependencies>
	<dependency>
		<groupId>org.elasticsearch.client</groupId>
		<artifactId>elasticsearch-rest-high-level-client</artifactId>
		<version>7.3.0</version>
		<exclusions>
			<exclusion>
				<groupId>org.elasticsearch</groupId>
				<artifactId>elasticsearch</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.elasticsearch</groupId>
		<artifactId>elasticsearch</artifactId>
		<version>7.3.0</version>
	</dependency>

	<dependency>
		<groupId>org.elasticsearch.client</groupId>
		<artifactId>elasticsearch-rest-client</artifactId>
		<version>7.3.0</version>
	</dependency>
</dependencies>

步骤

  • 1、获取连接的客户端
  • 2、构建请求
  • 3、执行
  • 4、获取结果
public class TestDemo {

    public static void main(String[] args) throws IOException {
        //1、获取连接的客户端
        RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
                new HttpHost("localhost",9200,"http")
        ));

        //2、构建请求
        GetRequest request = new GetRequest("book","1");

        //3、执行
        GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);

        //4、获取结果
        System.out.println(getResponse.getId());
        System.out.println(getResponse.getVersion());
        System.out.println(getResponse.getSourceAsString());
    }
}

结合spring-boot-starter-test测试文档查询

引入maven依赖

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.2.10.RELEASE</version>
	<relativePath/>
</parent>

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>

	<dependency>
		<groupId>org.elasticsearch.client</groupId>
		<artifactId>elasticsearch-rest-high-level-client</artifactId>
		<version>7.3.0</version>
		<exclusions>
			<exclusion>
				<groupId>org.elasticsearch</groupId>
				<artifactId>elasticsearch</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.elasticsearch</groupId>
		<artifactId>elasticsearch</artifactId>
		<version>7.3.0</version>
	</dependency>

	<dependency>
		<groupId>org.elasticsearch.client</groupId>
		<artifactId>elasticsearch-rest-client</artifactId>
		<version>7.3.0</version>
	</dependency>

	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.2.74</version>
	</dependency>

	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.12</version>
		<scope>provided</scope>
	</dependency>

	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
</dependencies>

配置application.properties

spring.application.name=service-search

#多个节点用逗号隔开
elasticsearch.hostlist=127.0.0.1:9200

创建配置类ElasticsearchConfig

@Configuration
public class ElasticsearchConfig {

    @Value("${elasticsearch.hostlist}")
    private String hostlist;

    @Bean(destroyMethod = "close")
    public RestHighLevelClient restHighLevelClient(){
        String[] split = hostlist.split(",");
        HttpHost[] httpHost = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String[] item = split[i].split(":");
            httpHost[i] = new HttpHost(item[0],Integer.parseInt(item[1]),"http");
        }
        return new RestHighLevelClient(RestClient.builder(httpHost));
    }
}

编写测试代码

简单同步查询

@SpringBootTest(classes = SearchApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class TestDocument {

    @Autowired
    private RestHighLevelClient client;

    @Test
    public void testGet() throws IOException {
        //1、构建请求
        GetRequest request = new GetRequest("book","1");

        //===========可选参数,可以设置很多================
        String[] includes = new String[]{"user","message"};//想要查询的字段
        String[] excludes = Strings.EMPTY_ARRAY;//不想要的字段,暂时设置空数组
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true,includes,excludes);
        request.fetchSourceContext(fetchSourceContext);

        //2、执行
        //同步查询
        GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);

        //3、获取结果
        if(getResponse.isExists()){
            System.out.println(getResponse.getId());
            System.out.println(getResponse.getVersion());
            System.out.println(getResponse.getSourceAsString());//以String获取数据
            System.out.println(getResponse.getSourceAsBytes());//以bytes获取数据
            System.out.println(getResponse.getSourceAsMap());//以Map获取数据
        }else {
            log.info("没有获取到结果");
        }
    }
}

简单异步查询

@SpringBootTest(classes = SearchApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class TestDocument {

    @Autowired
    private RestHighLevelClient client;

    @Test
    public void testGet() throws IOException {
        //1、构建请求
        GetRequest request = new GetRequest("book","1");

        //===========可选参数,可以设置很多================
        String[] includes = new String[]{"user","message"};//想要查询的字段
        String[] excludes = Strings.EMPTY_ARRAY;//不想要的字段,暂时设置空数组
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true,includes,excludes);
        request.fetchSourceContext(fetchSourceContext);

        //2、执行
        //异步查询
        ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
            //成功时的操作
            @Override
            public void onResponse(GetResponse getResponse) {
                System.out.println(getResponse.getId());
                System.out.println(getResponse.getVersion());
                System.out.println(getResponse.getSourceAsString());//以String获取数据
            }
            //失败时的操作
            @Override
            public void onFailure(Exception e) {
                log.error("error",e);
            }
        };
        client.getAsync(request, RequestOptions.DEFAULT,listener);

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

新增

4种构建文档数据的方法

  • 1、json字符串方式
IndexRequest request = new IndexRequest("book");
request.id("2");

String json = "{\"user\":\"tomas\",\"postDate\":\"2020-10-25\",\"message\":\"trying out es\"}";
request.source(json, XContentType.JSON);
  • 2、文档源作为Map提供,可自动转换为JSON格式。
IndexRequest request = new IndexRequest("book");
request.id("2");

Map<String,Object> map = new HashMap<>();
map.put("user","tom");
map.put("postDate","2020-10-25");
map.put("message","trying out es");
request.source(map);
  • 3、文档源作为XContentBuilder对象提供,Elasticsearch内置辅助生成JSON内容。
IndexRequest request = new IndexRequest("book");
request.id("2");

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
	builder.field("user","tomas");
	builder.timeField("postDate","2020-10-25");
	builder.field("message","trying out es");
}
builder.endObject();
request.source(builder);
  • 4、文档源作为Object键值对提供,转换为JSON格式。
IndexRequest request = new IndexRequest("book");
request.id("2");

request.source("user","tomas",
		"postDate","2020-10-25",
		"message","trying out es");

同步新增

@Test
public void testAdd() throws IOException {
	//1、构建请求
	IndexRequest request = new IndexRequest("book");
	request.id("3");
	//===============构建文档数据4种方法=======================
	//方法1
	String json = "{\"user\":\"tomas\",\"postDate\":\"2020-10-25\",\"message\":\"trying out es\"}";
	request.source(json, XContentType.JSON);

	//方法2
	/*Map<String,Object> map = new HashMap<>();
	map.put("user","tom");
	map.put("postDate","2020-10-25");
	map.put("message","trying out es");
	request.source(map);

	//方法3
	XContentBuilder builder = XContentFactory.jsonBuilder();
	builder.startObject();
	{
		builder.field("user","tomas");
		builder.timeField("postDate","2020-10-25");
		builder.field("message","trying out es");
	}
	builder.endObject();
	request.source(builder);

	//方法4
	request.source("user","tomas",
			"postDate","2020-10-25",
			"message","trying out es");*/

	//=====================可选参数========================
	//设置超时时间
	request.timeout("1s");
	request.timeout(TimeValue.timeValueSeconds(1));

	//手动维护版本号
	request.version(2);
	request.versionType(VersionType.EXTERNAL);

	//2、执行
	//同步操作
	IndexResponse response = client.index(request, RequestOptions.DEFAULT);

	//3、获取结果
	log.info("index: {}",response.getIndex());
	log.info("id: {}",response.getId());
	if(response.getResult() == DocWriteResponse.Result.CREATED){
		log.info("新增成功,result: {}",response.getResult());
	}else if(response.getResult() == DocWriteResponse.Result.UPDATED){
		log.info("更新成功,result: {}",response.getResult());
	}else {
		log.info("操作失败,result: {}",response.getResult());
	}

	ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();
	if(shardInfo.getTotal() != shardInfo.getSuccessful()){
		log.info("处理成功的分片数少于总分片");
	}

	if(shardInfo.getFailed() > 0){
		for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
			String reason = failure.reason();//每一个错误的原因
			log.info(reason);
		}
	}
}

异步新增

@Test
public void testAdd() throws IOException {
	//1、构建请求
	IndexRequest request = new IndexRequest("book");
	request.id("3");
	//===============构建文档数据4种方法=======================
	//方法1
	String json = "{\"user\":\"tomas\",\"postDate\":\"2020-10-25\",\"message\":\"trying out es\"}";
	request.source(json, XContentType.JSON);

	//方法2
	/*Map<String,Object> map = new HashMap<>();
	map.put("user","tom");
	map.put("postDate","2020-10-25");
	map.put("message","trying out es");
	request.source(map);

	//方法3
	XContentBuilder builder = XContentFactory.jsonBuilder();
	builder.startObject();
	{
		builder.field("user","tomas");
		builder.timeField("postDate","2020-10-25");
		builder.field("message","trying out es");
	}
	builder.endObject();
	request.source(builder);

	//方法4
	request.source("user","tomas",
			"postDate","2020-10-25",
			"message","trying out es");*/

	//=====================可选参数========================
	//设置超时时间
	request.timeout("1s");
	request.timeout(TimeValue.timeValueSeconds(1));

	//手动维护版本号
	request.version(2);
	request.versionType(VersionType.EXTERNAL);

	//2、执行

	//异步操作
	ActionListener<IndexResponse> actionListener = new ActionListener<IndexResponse>() {
		//成功时进行的操作
		@Override
		public void onResponse(IndexResponse indexResponse) {
			log.info("index: {}",response.getIndex());
			log.info("id: {}",response.getId());
			if(response.getResult() == DocWriteResponse.Result.CREATED){
				log.info("新增成功,result: {}",response.getResult());
			}else if(response.getResult() == DocWriteResponse.Result.UPDATED){
				log.info("更新成功,result: {}",response.getResult());
			}else {
				log.info("操作失败,result: {}",response.getResult());
			}
		}

		//失败时进行的操作
		@Override
		public void onFailure(Exception e) {
			log.error("error",e);
		}
	};
	client.indexAsync(request,RequestOptions.DEFAULT,actionListener);

	try {
		Thread.sleep(5000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

文档修改

全量替换:文档同一个Id多新增几次,就叫全量替换。

同步局部替换

@Test
public void testUpdate() throws IOException {
	//1、创建请求
	UpdateRequest request = new UpdateRequest("book","3");
	Map<String,Object> map = new HashMap<>();
	map.put("user","tomas Lee");
	request.doc(map);

	//=========可选参数==========
	request.timeout("1s");
	request.retryOnConflict(3);//重试次数

	//2、执行
	//同步操作
	UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);

	//3、获取结果
	log.info("index: {}",updateResponse.getIndex());
	log.info("id: {}",updateResponse.getId());
	if(updateResponse.getResult() == DocWriteResponse.Result.CREATED){
		log.info("新增成功,result: {}",updateResponse.getResult());
	}else if(updateResponse.getResult() == DocWriteResponse.Result.UPDATED){
		log.info("更新成功,result: {}",updateResponse.getResult());
	}else if(updateResponse.getResult() == DocWriteResponse.Result.DELETED){
		log.info("删除成功,result: {}",updateResponse.getResult());
	}else if(updateResponse.getResult() == DocWriteResponse.Result.NOOP){
		log.info("NOOP 不进行操作,result: {}",updateResponse.getResult());
	}
}

异步局部替换

@Test
public void testUpdate() throws IOException {
	//1、创建请求
	UpdateRequest request = new UpdateRequest("book","3");
	Map<String,Object> map = new HashMap<>();
	map.put("user","tomas Lee");
	request.doc(map);

	//=========可选参数==========
	request.timeout("1s");
	request.retryOnConflict(3);//重试次数

	//2、执行
	//异步操作
	ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
		@Override
		public void onResponse(UpdateResponse updateResponse) {
			log.info("index: {}",updateResponse.getIndex());
			log.info("id: {}",updateResponse.getId());
			if(updateResponse.getResult() == DocWriteResponse.Result.CREATED){
				log.info("新增成功,result: {}",updateResponse.getResult());
			}else if(updateResponse.getResult() == DocWriteResponse.Result.UPDATED){
				log.info("更新成功,result: {}",updateResponse.getResult());
			}else if(updateResponse.getResult() == DocWriteResponse.Result.DELETED){
				log.info("删除成功,result: {}",updateResponse.getResult());
			}else if(updateResponse.getResult() == DocWriteResponse.Result.NOOP){
				log.info("NOOP 不进行操作,result: {}",updateResponse.getResult());
			}
		}

		@Override
		public void onFailure(Exception e) {
			log.info("error",e);
		}
	};
	client.updateAsync(request, RequestOptions.DEFAULT,listener);

	try {
		Thread.sleep(5000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

同步文档删除

@Test
public void testDelete() throws IOException {
	//1、创建请求
	DeleteRequest request = new DeleteRequest("book","3");

	//2、执行
	DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
	
	//3、获取结果
	log.info("index: {}",deleteResponse.getIndex());
	log.info("id: {}",deleteResponse.getId());
	log.info("result: {}",deleteResponse.getResult());
}

异步文档删除

@Test
public void testDelete() throws IOException {
	//1、创建请求
	DeleteRequest request = new DeleteRequest("book","3");

	//2、执行
	ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
		@Override
		public void onResponse(DeleteResponse deleteResponse) {
			log.info("index: {}",deleteResponse.getIndex());
			log.info("id: {}",deleteResponse.getId());
			log.info("result: {}",deleteResponse.getResult());
		}

		@Override
		public void onFailure(Exception e) {
			log.error("error",e);
		}
	};
	client.deleteAsync(request, RequestOptions.DEFAULT,listener);

	try {
		Thread.sleep(5000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

批量增删改bulk

@Test
public void testBulk() throws IOException {
	//1、创建请求
	BulkRequest bulkRequest = new BulkRequest();
	bulkRequest.add(new IndexRequest("book").id("2").source(XContentType.JSON,"field","1"));
	bulkRequest.add(new IndexRequest("book").id("3").source(XContentType.JSON,"field","2"));
	bulkRequest.add(new UpdateRequest("book","3").doc(XContentType.JSON,"field","3"));
	bulkRequest.add(new DeleteRequest("book").id("2"));

	//2、执行
	BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
	//3、获取结果
	for (BulkItemResponse bulkItemResponse : bulkResponse) {
		DocWriteResponse response = bulkItemResponse.getResponse();
		log.info("index: {}",response.getIndex());
		log.info("id: {}",response.getId());
		log.info("result: {}",response.getResult());

		if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.INDEX)){
			log.info("index: {}",response.getIndex());
			log.info("id: {}",response.getId());
			log.info("INDEX result: {}",response.getResult());
		}else if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.CREATE)){
			log.info("index: {}",response.getIndex());
			log.info("id: {}",response.getId());
			log.info("CREATE result: {}",response.getResult());
		}else if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.UPDATE)){
			log.info("index: {}",response.getIndex());
			log.info("id: {}",response.getId());
			log.info("UPDATE result: {}",response.getResult());
		}else if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.DELETE)){
			log.info("index: {}",response.getIndex());
			log.info("id: {}",response.getId());
			log.info("DELETE result: {}",response.getResult());
		}
	}
}

bulk api奇特的json格式

bulk api的语法

{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n

[{
  "action": {
 
  },
  "data": {

  }
}]
  • 1、bulk中的每个操作都可能要转发到不同的node的shard去执行
  • 2、如果采用比较良好的json数组格式

允许任意的换行,整个可读性非常棒,读起来很爽,es拿到那种标准格式的json串以后,要按照下述流程去进行处理:

  • 1)将json数组解析为JSONArray对象,这个时候,整个数据,就会在内存中出现一份一模一样的拷贝,一份数据是json文本,一份数据是JSONArray对象

  • 2)解析json数组里的每个json,对每个请求中的document进行路由

  • 3)为路由到同一个shard上的多个请求,创建一个请求数组

  • 4)将这个请求数组序列化

  • 5)将序列化后的请求数组发送到对应的节点上去

  • 3、耗费更多内存,更多的jvm gc开销 我们之前提到过bulk size最佳大小的那个问题,一般建议说在几千条那样,然后大小在10MB左右,所以说,可怕的事情来了。假设说现在100个bulk请求发送到了一个节点上去,然后每个请求是10MB,100个请求,就是1000MB = 1GB,然后每个请求的json都copy一份为jsonarray对象,此时内存中的占用就会翻倍,就会占用2GB的内存,甚至还不止。因为弄成jsonarray之后,还可能会多搞一些其他的数据结构,2GB+的内存占用。

 

占用更多的内存可能就会积压其他请求的内存使用量,比如说最重要的搜索请求,分析请求,等等,此时就可能会导致其他请求的性能急速下降。

 

另外的话,占用内存更多,就会导致java虚拟机的垃圾回收次数更多,更频繁,每次要回收的垃圾对象更多,耗费的时间更多,导致es的java虚拟机停止工作线程的时间更多

现在的奇特格式

{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n
  • 1)不用将其转换为json对象,不会出现内存中的相同数据的拷贝,直接按照换行符切割json。

  • 2)对每两个一组的json,读取meta,进行document路由。

  • 3)直接将对应的json发送到node上去。

  • 5、最大的优势在于,不需要将json数组解析为一个JSONArray对象,形成一份大数据的拷贝,浪费内存空间,尽可能地保证性能。

 

参考: https://www.cnblogs.com/qdhxhz/p/11448451.html

https://www.cnblogs.com/Onlywjy/p/12194626.html

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  ehrZuhofWJiC   2024年04月26日   42   0   0 日志Java
vxNQtvtQlfbi