package com.amazon.opendistroforelasticsearch.search.asynchronous.management;

import com.amazon.opendistroforelasticsearch.search.asynchronous.plugin.AsynchronousSearchPlugin;
import com.amazon.opendistroforelasticsearch.search.asynchronous.response.AcknowledgedResponse;
import com.amazon.opendistroforelasticsearch.search.asynchronous.service.AsynchronousSearchPersistenceService;
import com.amazon.opendistroforelasticsearch.search.asynchronous.service.AsynchronousSearchService;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

/* loaded from: input_file:com/amazon/opendistroforelasticsearch/search/asynchronous/management/AsynchronousSearchManagementService.class */
public class AsynchronousSearchManagementService extends AbstractLifecycleComponent implements ClusterStateListener {
    private final ClusterService clusterService;
    private final AsynchronousSearchPersistenceService asynchronousSearchPersistenceService;
    private final ThreadPool threadPool;
    private volatile Scheduler.Cancellable activeContextReaperScheduledFuture;
    private static final String RESPONSE_CLEANUP_SCHEDULING_EXECUTOR = "management";
    private AtomicReference<PersistedResponseCleanUpAndRescheduleRunnable> persistedResponseCleanUpRunnable = new AtomicReference<>();
    private AsynchronousSearchService asynchronousSearchService;
    private TransportService transportService;
    private TimeValue activeContextReaperInterval;
    private TimeValue persistedResponseCleanUpInterval;
    public static final String PERSISTED_RESPONSE_CLEANUP_ACTION_NAME = "indices:data/read/opendistro/asynchronous_search/response_cleanup";
    private static final Logger logger = LogManager.getLogger(AsynchronousSearchManagementService.class);
    public static final Setting<TimeValue> ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING = Setting.timeSetting("opendistro.asynchronous_search.active.context.reaper_interval", TimeValue.timeValueMinutes(5), TimeValue.timeValueSeconds(5), new Setting.Property[]{Setting.Property.NodeScope});
    public static final Setting<TimeValue> PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING = Setting.timeSetting("opendistro.asynchronous_search.expired.persisted_response.cleanup_interval", TimeValue.timeValueMinutes(5), TimeValue.timeValueSeconds(5), new Setting.Property[]{Setting.Property.NodeScope});

    /* loaded from: input_file:com/amazon/opendistroforelasticsearch/search/asynchronous/management/AsynchronousSearchManagementService$ActiveContextReaper.class */
    class ActiveContextReaper implements Runnable {
        ActiveContextReaper() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                AsynchronousSearchManagementService.this.asynchronousSearchService.getContextsToReap().forEach(asynchronousSearchContext -> {
                    AsynchronousSearchManagementService.this.asynchronousSearchService.freeContext(asynchronousSearchContext.getAsynchronousSearchId(), asynchronousSearchContext.getContextId(), null, ActionListener.wrap(bool -> {
                        AsynchronousSearchManagementService.logger.debug("Successfully freed up context [{}] running duration [{}]", asynchronousSearchContext.getAsynchronousSearchId(), Long.valueOf(asynchronousSearchContext.getExpirationTimeMillis() - asynchronousSearchContext.getStartTimeMillis()));
                    }, exc -> {
                        AsynchronousSearchManagementService.logger.debug(() -> {
                            return new ParameterizedMessage("Failed to cleanup asynchronous search context [{}] running duration [{}] due to ", asynchronousSearchContext.getAsynchronousSearchId(), Long.valueOf(asynchronousSearchContext.getExpirationTimeMillis() - asynchronousSearchContext.getStartTimeMillis()));
                        }, exc);
                    }));
                });
            } catch (Exception e) {
                AsynchronousSearchManagementService.logger.error("Failed to free up overrunning asynchronous searches due to ", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/amazon/opendistroforelasticsearch/search/asynchronous/management/AsynchronousSearchManagementService$AsynchronousSearchCleanUpRequest.class */
    public static class AsynchronousSearchCleanUpRequest extends ActionRequest {
        private final long absoluteTimeInMillis;

        AsynchronousSearchCleanUpRequest(long j) {
            this.absoluteTimeInMillis = j;
        }

        AsynchronousSearchCleanUpRequest(StreamInput streamInput) throws IOException {
            super(streamInput);
            this.absoluteTimeInMillis = streamInput.readLong();
        }

        public void writeTo(StreamOutput streamOutput) throws IOException {
            super.writeTo(streamOutput);
            streamOutput.writeLong(this.absoluteTimeInMillis);
        }

        public ActionRequestValidationException validate() {
            return null;
        }

        public long getAbsoluteTimeInMillis() {
            return this.absoluteTimeInMillis;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.absoluteTimeInMillis));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.absoluteTimeInMillis == ((AsynchronousSearchCleanUpRequest) obj).absoluteTimeInMillis;
        }

        public String toString() {
            return "[expirationTimeMillis] : " + this.absoluteTimeInMillis;
        }
    }

    /* loaded from: input_file:com/amazon/opendistroforelasticsearch/search/asynchronous/management/AsynchronousSearchManagementService$PersistedResponseCleanUpAndRescheduleRunnable.class */
    private class PersistedResponseCleanUpAndRescheduleRunnable extends ResponseCleanUpRunnable {
        PersistedResponseCleanUpAndRescheduleRunnable() {
            super("scheduled");
        }

        @Override // com.amazon.opendistroforelasticsearch.search.asynchronous.management.AsynchronousSearchManagementService.ResponseCleanUpRunnable
        protected void doRun() {
            if (this == AsynchronousSearchManagementService.this.persistedResponseCleanUpRunnable.get()) {
                super.doRun();
            } else {
                AsynchronousSearchManagementService.logger.trace("master changed, scheduled cleanup job is stale");
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void onAfter() {
            if (this == AsynchronousSearchManagementService.this.persistedResponseCleanUpRunnable.get()) {
                AsynchronousSearchManagementService.logger.trace("scheduling next clean up job in [{}]", AsynchronousSearchManagementService.this.persistedResponseCleanUpInterval);
                AsynchronousSearchManagementService.this.threadPool.scheduleUnlessShuttingDown(AsynchronousSearchManagementService.this.persistedResponseCleanUpInterval, AsynchronousSearchManagementService.RESPONSE_CLEANUP_SCHEDULING_EXECUTOR, this);
            }
        }
    }

    /* loaded from: input_file:com/amazon/opendistroforelasticsearch/search/asynchronous/management/AsynchronousSearchManagementService$PersistedResponseCleanUpTransportHandler.class */
    class PersistedResponseCleanUpTransportHandler implements TransportRequestHandler<AsynchronousSearchCleanUpRequest> {
        PersistedResponseCleanUpTransportHandler() {
        }

        public void messageReceived(AsynchronousSearchCleanUpRequest asynchronousSearchCleanUpRequest, TransportChannel transportChannel, Task task) {
            AsynchronousSearchManagementService asynchronousSearchManagementService = AsynchronousSearchManagementService.this;
            Objects.requireNonNull(transportChannel);
            asynchronousSearchManagementService.asyncCleanUpOperation(asynchronousSearchCleanUpRequest, task, ActionListener.wrap((v1) -> {
                r3.sendResponse(v1);
            }, exc -> {
                try {
                    transportChannel.sendResponse(exc);
                } catch (IOException e) {
                    AsynchronousSearchManagementService.logger.warn(() -> {
                        return new ParameterizedMessage("Failed to send cleanup error response for request [{}]", asynchronousSearchCleanUpRequest);
                    }, e);
                }
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/amazon/opendistroforelasticsearch/search/asynchronous/management/AsynchronousSearchManagementService$ResponseCleanUpRunnable.class */
    public class ResponseCleanUpRunnable extends AbstractRunnable {
        private final String reason;

        ResponseCleanUpRunnable(String str) {
            this.reason = str;
        }

        protected void doRun() {
            AsynchronousSearchManagementService.this.performCleanUp();
        }

        public void onFailure(Exception exc) {
            AsynchronousSearchManagementService.logger.warn(new ParameterizedMessage("sync search clean up job failed [{}]", this.reason), exc);
        }

        public void onRejection(Exception exc) {
            AsynchronousSearchManagementService.logger.log((exc instanceof EsRejectedExecutionException) && ((EsRejectedExecutionException) exc).isExecutorShutdown() ? Level.DEBUG : Level.WARN, "asynchronous search clean up job rejected [{}]", this.reason, exc);
        }
    }

    @Inject
    public AsynchronousSearchManagementService(Settings settings, ClusterService clusterService, ThreadPool threadPool, AsynchronousSearchService asynchronousSearchService, TransportService transportService, AsynchronousSearchPersistenceService asynchronousSearchPersistenceService) {
        this.clusterService = clusterService;
        this.threadPool = threadPool;
        this.clusterService.addListener(this);
        this.asynchronousSearchService = asynchronousSearchService;
        this.transportService = transportService;
        this.asynchronousSearchPersistenceService = asynchronousSearchPersistenceService;
        this.activeContextReaperInterval = (TimeValue) ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING.get(settings);
        this.persistedResponseCleanUpInterval = (TimeValue) PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING.get(settings);
        transportService.registerRequestHandler(PERSISTED_RESPONSE_CLEANUP_ACTION_NAME, "same", false, false, AsynchronousSearchCleanUpRequest::new, new PersistedResponseCleanUpTransportHandler());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void asyncCleanUpOperation(AsynchronousSearchCleanUpRequest asynchronousSearchCleanUpRequest, Task task, ActionListener<AcknowledgedResponse> actionListener) {
        this.transportService.getThreadPool().executor(AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME).execute(() -> {
            performPersistedResponseCleanUpAction(asynchronousSearchCleanUpRequest, actionListener);
        });
    }

    private void performPersistedResponseCleanUpAction(AsynchronousSearchCleanUpRequest asynchronousSearchCleanUpRequest, ActionListener<AcknowledgedResponse> actionListener) {
        this.asynchronousSearchPersistenceService.deleteExpiredResponses(actionListener, asynchronousSearchCleanUpRequest.absoluteTimeInMillis);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void clusterChanged(ClusterChangedEvent clusterChangedEvent) {
        if (!clusterChangedEvent.localNodeMaster() || this.persistedResponseCleanUpRunnable.get() != null) {
            if (clusterChangedEvent.localNodeMaster()) {
                return;
            }
            this.persistedResponseCleanUpRunnable.set(null);
        } else {
            logger.trace("elected as master, triggering response cleanup tasks");
            triggerCleanUp(clusterChangedEvent.state(), "became master");
            AbstractRunnable persistedResponseCleanUpAndRescheduleRunnable = new PersistedResponseCleanUpAndRescheduleRunnable();
            this.persistedResponseCleanUpRunnable.set(persistedResponseCleanUpAndRescheduleRunnable);
            this.threadPool.scheduleUnlessShuttingDown(this.persistedResponseCleanUpInterval, RESPONSE_CLEANUP_SCHEDULING_EXECUTOR, persistedResponseCleanUpAndRescheduleRunnable);
        }
    }

    private void triggerCleanUp(ClusterState clusterState, String str) {
        if (clusterState.nodes().getDataNodes().size() > 0) {
            logger.debug("triggering response cleanup in background [{}]", str);
            this.threadPool.executor(RESPONSE_CLEANUP_SCHEDULING_EXECUTOR).execute(new ResponseCleanUpRunnable(str));
        }
    }

    protected void doStart() {
        this.activeContextReaperScheduledFuture = this.threadPool.scheduleWithFixedDelay(new ActiveContextReaper(), this.activeContextReaperInterval, AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME);
    }

    protected void doStop() {
        this.persistedResponseCleanUpRunnable.set(null);
        this.activeContextReaperScheduledFuture.cancel();
    }

    protected void doClose() {
        this.persistedResponseCleanUpRunnable.set(null);
        this.activeContextReaperScheduledFuture.cancel();
    }

    public final void performCleanUp() {
        ThreadContext threadContext = this.threadPool.getThreadContext();
        try {
            ThreadContext.StoredContext stashContext = threadContext.stashContext();
            try {
                threadContext.markAsSystemContext();
                ImmutableOpenMap dataNodes = this.clusterService.state().nodes().getDataNodes();
                List list = (List) Stream.of(dataNodes.values().toArray(DiscoveryNode.class)).collect(Collectors.toList());
                if (list == null || list.isEmpty()) {
                    logger.debug("Found empty data nodes with asynchronous search enabled attribute [{}] for response clean up", dataNodes);
                    if (stashContext != null) {
                        stashContext.close();
                        return;
                    }
                    return;
                }
                final DiscoveryNode discoveryNode = (DiscoveryNode) list.get(Randomness.get().nextInt(list.size()));
                this.transportService.sendRequest(discoveryNode, PERSISTED_RESPONSE_CLEANUP_ACTION_NAME, new AsynchronousSearchCleanUpRequest(this.threadPool.absoluteTimeInMillis()), new TransportResponseHandler<AcknowledgedResponse>() { // from class: com.amazon.opendistroforelasticsearch.search.asynchronous.management.AsynchronousSearchManagementService.1
                    /* renamed from: read, reason: merged with bridge method [inline-methods] */
                    public AcknowledgedResponse m14read(StreamInput streamInput) throws IOException {
                        return new AcknowledgedResponse(streamInput);
                    }

                    public void handleResponse(AcknowledgedResponse acknowledgedResponse) {
                        AsynchronousSearchManagementService.logger.debug("Successfully executed clean up action on node [{}] with response [{}]", discoveryNode, Boolean.valueOf(acknowledgedResponse.isAcknowledged()));
                    }

                    public void handleException(TransportException transportException) {
                        AsynchronousSearchManagementService.logger.error(() -> {
                            return new ParameterizedMessage("Exception executing action [{}]", AsynchronousSearchManagementService.PERSISTED_RESPONSE_CLEANUP_ACTION_NAME);
                        }, transportException);
                    }

                    public String executor() {
                        return AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME;
                    }
                });
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            logger.error("Failed to schedule asynchronous search cleanup", e);
        }
    }
}
