package com.amazon.opendistroforelasticsearch.search.asynchronous.processor;

import com.amazon.opendistroforelasticsearch.search.asynchronous.context.AsynchronousSearchContextId;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveContext;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveStore;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchContextClosedException;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.persistence.AsynchronousSearchPersistenceModel;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchContextEvent;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchState;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchStateMachine;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.AsynchronousSearchStateMachineClosedException;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.event.BeginPersistEvent;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.event.SearchFailureEvent;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.event.SearchResponsePersistFailedEvent;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.event.SearchResponsePersistedEvent;
import com.amazon.opendistroforelasticsearch.search.asynchronous.context.state.event.SearchSuccessfulEvent;
import com.amazon.opendistroforelasticsearch.search.asynchronous.response.AsynchronousSearchResponse;
import com.amazon.opendistroforelasticsearch.search.asynchronous.service.AsynchronousSearchPersistenceService;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;

/* loaded from: input_file:com/amazon/opendistroforelasticsearch/search/asynchronous/processor/AsynchronousSearchPostProcessor.class */
public class AsynchronousSearchPostProcessor {
    private static final Logger logger = LogManager.getLogger(AsynchronousSearchPostProcessor.class);
    private final AsynchronousSearchPersistenceService asynchronousSearchPersistenceService;
    private final AsynchronousSearchActiveStore asynchronousSearchActiveStore;
    private final AsynchronousSearchStateMachine asynchronousSearchStateMachine;
    private final Consumer<AsynchronousSearchActiveContext> freeActiveContextConsumer;
    private final ThreadPool threadPool;

    public AsynchronousSearchPostProcessor(AsynchronousSearchPersistenceService asynchronousSearchPersistenceService, AsynchronousSearchActiveStore asynchronousSearchActiveStore, AsynchronousSearchStateMachine asynchronousSearchStateMachine, Consumer<AsynchronousSearchActiveContext> consumer, ThreadPool threadPool, ClusterService clusterService) {
        this.asynchronousSearchActiveStore = asynchronousSearchActiveStore;
        this.asynchronousSearchPersistenceService = asynchronousSearchPersistenceService;
        this.asynchronousSearchStateMachine = asynchronousSearchStateMachine;
        this.freeActiveContextConsumer = consumer;
        this.threadPool = threadPool;
    }

    public AsynchronousSearchResponse processSearchFailure(Exception exc, AsynchronousSearchContextId asynchronousSearchContextId) {
        Optional<AsynchronousSearchActiveContext> context = this.asynchronousSearchActiveStore.getContext(asynchronousSearchContextId);
        try {
            if (!context.isPresent()) {
                return new AsynchronousSearchResponse(AsynchronousSearchState.FAILED, -1L, -1L, null, ExceptionsHelper.convertToElastic(exc));
            }
            AsynchronousSearchActiveContext asynchronousSearchActiveContext = context.get();
            this.asynchronousSearchStateMachine.trigger((AsynchronousSearchContextEvent) new SearchFailureEvent(asynchronousSearchActiveContext, exc));
            handlePersist(asynchronousSearchActiveContext);
            return asynchronousSearchActiveContext.getAsynchronousSearchResponse();
        } catch (AsynchronousSearchStateMachineClosedException e) {
            return new AsynchronousSearchResponse(AsynchronousSearchState.FAILED, -1L, -1L, null, ExceptionsHelper.convertToElastic(exc));
        }
    }

    public AsynchronousSearchResponse processSearchResponse(SearchResponse searchResponse, AsynchronousSearchContextId asynchronousSearchContextId) {
        Optional<AsynchronousSearchActiveContext> context = this.asynchronousSearchActiveStore.getContext(asynchronousSearchContextId);
        try {
            if (!context.isPresent()) {
                return new AsynchronousSearchResponse(AsynchronousSearchState.SUCCEEDED, -1L, -1L, searchResponse, null);
            }
            AsynchronousSearchActiveContext asynchronousSearchActiveContext = context.get();
            this.asynchronousSearchStateMachine.trigger((AsynchronousSearchContextEvent) new SearchSuccessfulEvent(asynchronousSearchActiveContext, searchResponse));
            handlePersist(asynchronousSearchActiveContext);
            return asynchronousSearchActiveContext.getAsynchronousSearchResponse();
        } catch (AsynchronousSearchStateMachineClosedException e) {
            return new AsynchronousSearchResponse(AsynchronousSearchState.SUCCEEDED, -1L, -1L, searchResponse, null);
        }
    }

    public void persistResponse(AsynchronousSearchActiveContext asynchronousSearchActiveContext, AsynchronousSearchPersistenceModel asynchronousSearchPersistenceModel) {
        asynchronousSearchActiveContext.acquireAllContextPermits(ActionListener.wrap(releasable -> {
            if (!asynchronousSearchActiveContext.shouldPersist()) {
                logger.debug("Async search context [{}] has been closed while waiting to acquire permits for post processing", asynchronousSearchActiveContext.getAsynchronousSearchId());
                releasable.close();
                return;
            }
            logger.debug("Persisting response for asynchronous search id [{}]", asynchronousSearchActiveContext.getAsynchronousSearchId());
            ThreadContext.StoredContext stashContext = this.threadPool.getThreadContext().stashContext();
            try {
                AsynchronousSearchPersistenceService asynchronousSearchPersistenceService = this.asynchronousSearchPersistenceService;
                String asynchronousSearchId = asynchronousSearchActiveContext.getAsynchronousSearchId();
                ActionListener wrap = ActionListener.wrap(indexResponse -> {
                    logger.debug("Successfully persisted response for asynchronous search id [{}]", asynchronousSearchActiveContext.getAsynchronousSearchId());
                    try {
                        try {
                            this.asynchronousSearchStateMachine.trigger((AsynchronousSearchContextEvent) new SearchResponsePersistedEvent(asynchronousSearchActiveContext));
                            this.freeActiveContextConsumer.accept(asynchronousSearchActiveContext);
                        } catch (AsynchronousSearchStateMachineClosedException e) {
                            logger.warn("Unexpected state, possibly caused by external task cancellation, context with id [{}] closed while triggering event [{}]", asynchronousSearchActiveContext.getAsynchronousSearchId(), SearchResponsePersistedEvent.class.getName());
                            this.freeActiveContextConsumer.accept(asynchronousSearchActiveContext);
                        }
                    } catch (Throwable th) {
                        this.freeActiveContextConsumer.accept(asynchronousSearchActiveContext);
                        throw th;
                    }
                }, exc -> {
                    try {
                        try {
                            this.asynchronousSearchStateMachine.trigger((AsynchronousSearchContextEvent) new SearchResponsePersistFailedEvent(asynchronousSearchActiveContext));
                            this.freeActiveContextConsumer.accept(asynchronousSearchActiveContext);
                        } catch (AsynchronousSearchStateMachineClosedException e) {
                            logger.warn("Unexpected state, possibly caused by external task cancellation, context with id [{}] closed while triggering event [{}]", asynchronousSearchActiveContext.getAsynchronousSearchId(), SearchResponsePersistFailedEvent.class.getName());
                            this.freeActiveContextConsumer.accept(asynchronousSearchActiveContext);
                        }
                        logger.error(() -> {
                            return new ParameterizedMessage("Failed to persist final response for [{}] due to [{}]", asynchronousSearchActiveContext.getAsynchronousSearchId(), exc);
                        });
                    } catch (Throwable th) {
                        this.freeActiveContextConsumer.accept(asynchronousSearchActiveContext);
                        throw th;
                    }
                });
                Objects.requireNonNull(releasable);
                asynchronousSearchPersistenceService.storeResponse(asynchronousSearchId, asynchronousSearchPersistenceModel, ActionListener.runAfter(wrap, releasable::close));
                if (stashContext != null) {
                    stashContext.close();
                }
            } catch (Throwable th) {
                if (stashContext != null) {
                    try {
                        stashContext.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }, exc -> {
            Throwable unwrapCause = ExceptionsHelper.unwrapCause(exc);
            logger.log(((unwrapCause instanceof AsynchronousSearchContextClosedException) || (unwrapCause instanceof TimeoutException)) ? Level.DEBUG : Level.WARN, () -> {
                return new ParameterizedMessage("Exception  occured while acquiring the permit for asynchronousSearchContext [{}]", asynchronousSearchActiveContext.getAsynchronousSearchId());
            }, exc);
            this.freeActiveContextConsumer.accept(asynchronousSearchActiveContext);
        }), TimeValue.timeValueSeconds(120L), "persisting response");
    }

    private void handlePersist(AsynchronousSearchActiveContext asynchronousSearchActiveContext) {
        if (!asynchronousSearchActiveContext.shouldPersist()) {
            this.freeActiveContextConsumer.accept(asynchronousSearchActiveContext);
            return;
        }
        try {
            this.asynchronousSearchStateMachine.trigger((AsynchronousSearchContextEvent) new BeginPersistEvent(asynchronousSearchActiveContext));
        } catch (AsynchronousSearchStateMachineClosedException e) {
            this.freeActiveContextConsumer.accept(asynchronousSearchActiveContext);
        }
    }
}
