本文共 13838 字,大约阅读时间需要 46 分钟。
if (isPeerRecovery(shardRouting)) { ......}else { //走的这个分支 indexService.shard(shardId).recoverFromStore(shardRouting, new StoreRecoveryService.RecoveryListener() {}
public void recoverFromStore(ShardRouting shard, StoreRecoveryService.RecoveryListener recoveryListener) { ...... final boolean shouldExist = shard.allocatedPostIndexCreate(); storeRecoveryService.recover(this, shouldExist, recoveryListener); }
if (indexShard.routingEntry().restoreSource() != null) { indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource()); } else { indexShard.recovering("from store", RecoveryState.Type.STORE, clusterService.localNode()); }
threadPool.generic().execute(new Runnable() { @Override public void run() { try { final RecoveryState recoveryState = indexShard.recoveryState(); if (indexShard.routingEntry().restoreSource() != null) { restore(indexShard, recoveryState); } else { recoverFromStore(indexShard, indexShouldExists, recoveryState); }
typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);indexShard.finalizeRecovery();继续进入 indexShard.performTranslogRecovery 方法: public MapperformTranslogRecovery(boolean indexExists) { if (indexExists == false) { final RecoveryState.Translog translogStats = recoveryState().getTranslog(); translogStats.totalOperations(0); translogStats.totalOperationsOnStart(0); } final Map recoveredTypes = internalPerformTranslogRecovery(false, indexExists); return recoveredTypes; }
engineConfig.setEnableGcDeletes(false);engineConfig.setCreate(indexExists == false);
org.elasticsearch.index.shard.TranslogRecoveryPerformer.getRecoveredTypes
if (skipTranslogRecovery == false) { markLastWrite(); } createNewEngine(skipTranslogRecovery, engineConfig); return engineConfig.getTranslogRecoveryPerformer(). getRecoveredTypes();
typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);indexShard.finalizeRecovery();
try { if (skipInitialTranslogRecovery) { commitIndexWriter(writer, translog, lastCommittedSegmentInfos. getUserData(). get(SYNC_COMMIT_ID)); } else { recoverFromTranslog(engineConfig, translogGeneration); } } catch (IOException | EngineException ex) { ....... }
final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer(); try (Translog.Snapshot snapshot = translog.newSnapshot()) { opsRecovered = handler.recoveryFromSnapshot(this, snapshot); } catch (Throwable e) { throw new EngineException(shardId, "failed to recover from translog", e); }
public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) { try { switch (operation.opType()) { case CREATE: Translog.Create create = (Translog.Create) operation; Engine.Create engineCreate = IndexShard.prepareCreate(docMapper(create.type()), source(create.source()).index(shardId.getIndex()).type(create.type()).id(create.id()) .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()), create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false); maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates); if (logger.isTraceEnabled()) { logger.trace("[translog] recover [create] op of [{}][{}]", create.type(), create.id()); } engine.create(engineCreate); break;
if (opsRecovered > 0) { opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog .currentFileGeneration()); flush(true, true); } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); }
indexShard.finalizeRecovery(); String indexName = indexShard.shardId().index().name(); for (Map.Entryentry : typesToUpdate.entrySet()) { validateMappingUpdate(indexName, entry.getKey(), entry.getValue()); } indexShard.postRecovery("post recovery from shard_store");
if (isPeerRecovery(shardRouting)) { //走的这个分支.....RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA; recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));...... }else { ......}
org.elasticsearch.indices.recovery.RecoveryTarget
threadPool.generic().execute(new RecoveryRunner(recoveryId));
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(), false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId());recoveryStatus.indexShard().prepareForIndexRecovery(); recoveryStatus.CancellableThreads().execute(new CancellableThreads.Interruptable() { @Override public void run() throws InterruptedException { responseHolder.set(transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler() { @Override public RecoveryResponse newInstance() { return new RecoveryResponse(); } }).txGet()); } });
class StartRecoveryTransportRequestHandler extends TransportRequestHandler{ @Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception { RecoveryResponse response = recover(request); channel.sendResponse(response); } }
private RecoveryResponse recover(final StartRecoveryRequest request) { ..... if (IndexMetaData.isOnSharedFilesystem(shard.indexSettings())) { handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, logger); } else { handler = new RecoverySourceHandler(shard, request, recoverySettings, transportService, logger); } ongoingRecoveries.add(shard, handler); try { return handler.recoverToTarget(); } finally { ongoingRecoveries.remove(shard, handler); } }
public RecoveryResponse recoverToTarget() { final Engine engine = shard.engine(); assert engine.getTranslog() != null : "translog must not be null"; try (Translog.View translogView = engine.getTranslog().newView()) { final SnapshotIndexCommit phase1Snapshot; phase1Snapshot = shard.snapshotIndex(false); phase1(phase1Snapshot, translogView); try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) { phase2(phase2Snapshot); } catch (Throwable e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } finalizeRecovery(); } return response; }
try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) { phase2(phase2Snapshot); }
int totalOperations = sendSnapshot(snapshot);
cancellableThreads.execute(new Interruptable() { @Override public void run() throws InterruptedException { final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations()); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } });
@Override public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws Exception { try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) { final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger); final RecoveryStatus recoveryStatus = statusRef.status(); final RecoveryState.Translog translog = recoveryStatus.state().getTranslog(); translog.totalOperations(request.totalTranslogOps()); assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state(); try { recoveryStatus.indexShard().performBatchRecovery(request.operations());
//org.elasticsearch.index.translog.TranslogServiceINDEX_TRANSLOG_FLUSH_INTERVAL = "index.translog.interval";INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD = "index.translog.flush_threshold_period";INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";
//每隔interval的时间,就去检查下面三个条件决定是不是要进行flush,//默认5s。时间过长,会超出下面阈值比较大。index.translog.interval //超过多少条日志后需要flush,默认Int的最大值index.translog.flush_threshold_ops //定时flush,默认30m 可动态设置index.translog.flush_threshold_period//translog 大小超过多少后flush,默认512m index.translog.flush_threshold_size
转载地址:http://wurql.baihongyu.com/