es 新增文档浅析
  OnDcysL4aAex 2023年11月02日 67 0

本文分析一下 es 写入文档,并同步到副本的一个过程,先阅读官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-replication.html

Every indexing operation in Elasticsearch is first resolved to a replication group using routing, typically based on the document ID. 
Once the replication group has been determined, the operation is forwarded internally to the current primary shard of the group. 
This stage of indexing is referred to as the coordinating stage.

The next stage of indexing is the primary stage, performed on the primary shard. 
The primary shard is responsible for validating the operation and forwarding it to the other replicas. 
Since replicas can be offline, the primary is not required to replicate to all replicas. 
Instead, Elasticsearch maintains a list of shard copies that should receive the operation. 
This list is called the in-sync copies and is maintained by the master node. 
As the name implies, these are the set of "good" shard copies that are guaranteed to have processed all of the index and delete operations that have been acknowledged to the user. 
The primary is responsible for maintaining this invariant and thus has to replicate all operations to each copy in this set.

流程图如下:

es 新增文档浅析_sed

 primary 写入文档的主要方法如下:

1 // org.elasticsearch.index.engine.InternalEngine#index
  2 public IndexResult index(Index index) throws IOException {
  3     assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
  4     final boolean doThrottle = index.origin().isRecovery() == false;
  5     try (ReleasableLock releasableLock = readLock.acquire()) {
  6         ensureOpen();
  7         assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
  8         int reservedDocs = 0;
  9         try (
 10             Releasable ignored = versionMap.acquireLock(index.uid().bytes());
 11             Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}
 12         ) {
 13             lastWriteNanos = index.startTime();
 14             final IndexingStrategy plan = indexingStrategyForOperation(index);
 15             reservedDocs = plan.reservedDocs;
 16 
 17             final IndexResult indexResult;
 18             if (plan.earlyResultOnPreFlightError.isPresent()) {
 19                 assert index.origin() == Operation.Origin.PRIMARY : index.origin();
 20                 indexResult = plan.earlyResultOnPreFlightError.get();
 21                 assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
 22             } else {
 23                 // generate or register sequence number
 24                 if (index.origin() == Operation.Origin.PRIMARY) {
 25                     index = new Index(
 26                         index.uid(),
 27                         index.parsedDoc(),
 28                         generateSeqNoForOperationOnPrimary(index),
 29                         index.primaryTerm(),
 30                         index.version(),
 31                         index.versionType(),
 32                         index.origin(),
 33                         index.startTime(),
 34                         index.getAutoGeneratedIdTimestamp(),
 35                         index.isRetry(),
 36                         index.getIfSeqNo(),
 37                         index.getIfPrimaryTerm()
 38                     );
 39 
 40                     final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
 41                     if (toAppend == false) {
 42                         advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());
 43                     }
 44                 } else {
 45                     markSeqNoAsSeen(index.seqNo());
 46                 }
 47 
 48                 assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
 49 
 50                 if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
 51                     indexResult = indexIntoLucene(index, plan);
 52                 } else {
 53                     indexResult = new IndexResult(
 54                         plan.versionForIndexing,
 55                         index.primaryTerm(),
 56                         index.seqNo(),
 57                         plan.currentNotFoundOrDeleted,
 58                         index.id()
 59                     );
 60                 }
 61             }
 62             if (index.origin().isFromTranslog() == false) {
 63                 final Translog.Location location;
 64                 if (indexResult.getResultType() == Result.Type.SUCCESS) {
 65                     location = translog.add(new Translog.Index(index, indexResult));
 66                 } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
 67                     // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
 68                     final NoOp noOp = new NoOp(
 69                         indexResult.getSeqNo(),
 70                         index.primaryTerm(),
 71                         index.origin(),
 72                         index.startTime(),
 73                         indexResult.getFailure().toString()
 74                     );
 75                     location = innerNoOp(noOp).getTranslogLocation();
 76                 } else {
 77                     location = null;
 78                 }
 79                 indexResult.setTranslogLocation(location);
 80             }
 81             if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
 82                 final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
 83                 versionMap.maybePutIndexUnderLock(
 84                     index.uid().bytes(),
 85                     new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
 86                 );
 87             }
 88             localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
 89             if (indexResult.getTranslogLocation() == null) {
 90                 // the op is coming from the translog (and is hence persisted already) or it does not have a sequence number
 91                 assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
 92                 localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
 93             }
 94             indexResult.setTook(System.nanoTime() - index.startTime());
 95             indexResult.freeze();
 96             return indexResult;
 97         } finally {
 98             releaseInFlightDocs(reservedDocs);
 99         }
100     } catch (RuntimeException | IOException e) {
101         try {
102             if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
103                 failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
104             } else {
105                 maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
106             }
107         } catch (Exception inner) {
108             e.addSuppressed(inner);
109         }
110         throw e;
111     }
112 }

第 51 行写入 lucene,第 65 行记录 translog。

主分片写入本地后,把数据同步到副本,代码如下:

1 // org.elasticsearch.action.support.replication.ReplicationOperation#performOnReplicas
 2 private void performOnReplicas(
 3     final ReplicaRequest replicaRequest,
 4     final long globalCheckpoint,
 5     final long maxSeqNoOfUpdatesOrDeletes,
 6     final ReplicationGroup replicationGroup,
 7     final PendingReplicationActions pendingReplicationActions
 8 ) {
 9     // for total stats, add number of unassigned shards and
10     // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
11     totalShards.addAndGet(replicationGroup.getSkippedShards().size());
12 
13     final ShardRouting primaryRouting = primary.routingEntry();
14 
15     for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
16         if (shard.isSameAllocation(primaryRouting) == false) {
17             performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions);
18         }
19     }
20 }

 

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  tqf4faUYHHCA   2023年12月23日   89   0   0 sedpythonPythonsed
  P4Buhht98JbZ   2023年12月22日   75   0   0 sedideciciidesed
OnDcysL4aAex