package com.amazon.opendistroforelasticsearch.ad.task;

import com.amazon.opendistroforelasticsearch.ad.cluster.HashRing;
import com.amazon.opendistroforelasticsearch.ad.common.exception.ADTaskCancelledException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.DuplicateTaskException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.InternalFailure;
import com.amazon.opendistroforelasticsearch.ad.common.exception.LimitExceededException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.ResourceNotFoundException;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.ADTask;
import com.amazon.opendistroforelasticsearch.ad.model.ADTaskAction;
import com.amazon.opendistroforelasticsearch.ad.model.ADTaskProfile;
import com.amazon.opendistroforelasticsearch.ad.model.ADTaskState;
import com.amazon.opendistroforelasticsearch.ad.model.ADTaskType;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfile;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorFunction;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.ADCancelTaskAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADCancelTaskRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.ADTaskProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADTaskProfileRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.ForwardADTaskAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ForwardADTaskRequest;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.ExceptionUtil;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.amazon.opendistroforelasticsearch.commons.authuser.User;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

/* loaded from: input_file:com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.class */
public class ADTaskManager {
    private final Logger logger = LogManager.getLogger(getClass());
    private final Client client;
    private final ClusterService clusterService;
    private final NamedXContentRegistry xContentRegistry;
    private final AnomalyDetectionIndices detectionIndices;
    private final DiscoveryNodeFilterer nodeFilter;
    private final ADTaskCacheManager adTaskCacheManager;
    private final HashRing hashRing;
    private volatile Integer maxAdTaskDocsPerDetector;
    private volatile Integer pieceIntervalSeconds;
    private volatile TimeValue requestTimeout;

    public ADTaskManager(Settings settings, ClusterService clusterService, Client client, NamedXContentRegistry namedXContentRegistry, AnomalyDetectionIndices anomalyDetectionIndices, DiscoveryNodeFilterer discoveryNodeFilterer, HashRing hashRing, ADTaskCacheManager aDTaskCacheManager) {
        this.client = client;
        this.xContentRegistry = namedXContentRegistry;
        this.detectionIndices = anomalyDetectionIndices;
        this.nodeFilter = discoveryNodeFilterer;
        this.clusterService = clusterService;
        this.adTaskCacheManager = aDTaskCacheManager;
        this.hashRing = hashRing;
        this.maxAdTaskDocsPerDetector = (Integer) AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR, num -> {
            this.maxAdTaskDocsPerDetector = num;
        });
        this.pieceIntervalSeconds = (Integer) AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS, num2 -> {
            this.pieceIntervalSeconds = num2;
        });
        this.requestTimeout = (TimeValue) AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.REQUEST_TIMEOUT, timeValue -> {
            this.requestTimeout = timeValue;
        });
    }

    public void startDetector(String str, IndexAnomalyDetectorJobActionHandler indexAnomalyDetectorJobActionHandler, User user, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        getDetector(str, anomalyDetector -> {
            if (validateDetector(anomalyDetector, actionListener)) {
                indexAnomalyDetectorJobActionHandler.startAnomalyDetectorJob(anomalyDetector);
            }
        }, anomalyDetector2 -> {
            if (validateDetector(anomalyDetector2, actionListener)) {
                Optional<DiscoveryNode> owningNode = this.hashRing.getOwningNode(anomalyDetector2.getDetectorId());
                if (owningNode.isPresent()) {
                    forwardToCoordinatingNode(anomalyDetector2, user, ADTaskAction.START, transportService, owningNode.get(), actionListener);
                } else {
                    this.logger.debug("Can't find eligible node to run as AD task's coordinating node");
                    actionListener.onFailure(new ElasticsearchStatusException("No eligible node to run detector", RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                }
            }
        }, actionListener);
    }

    protected void forwardToCoordinatingNode(AnomalyDetector anomalyDetector, User user, ADTaskAction aDTaskAction, TransportService transportService, DiscoveryNode discoveryNode, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        transportService.sendRequest(discoveryNode, ForwardADTaskAction.NAME, new ForwardADTaskRequest(anomalyDetector, user, aDTaskAction), TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG).withTimeout(this.requestTimeout).build(), new ActionListenerResponseHandler(actionListener, AnomalyDetectorJobResponse::new));
    }

    public void stopDetector(String str, IndexAnomalyDetectorJobActionHandler indexAnomalyDetectorJobActionHandler, User user, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        getDetector(str, anomalyDetector -> {
            indexAnomalyDetectorJobActionHandler.stopAnomalyDetectorJob(str);
        }, anomalyDetector2 -> {
            getLatestADTask(str, optional -> {
                stopHistoricalDetector(str, optional, user, actionListener);
            }, transportService, actionListener);
        }, actionListener);
    }

    public <T> void getDetector(String str, Consumer<AnomalyDetector> consumer, Consumer<AnomalyDetector> consumer2, ActionListener<T> actionListener) {
        this.client.get(new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX).id(str), ActionListener.wrap(getResponse -> {
            if (!getResponse.isExists()) {
                actionListener.onFailure(new ElasticsearchStatusException("AnomalyDetector is not found", RestStatus.NOT_FOUND, new Object[0]));
                return;
            }
            try {
                XContentParser createXContentParserFromRegistry = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, getResponse.getSourceAsBytesRef());
                try {
                    XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, createXContentParserFromRegistry.nextToken(), createXContentParserFromRegistry);
                    AnomalyDetector parse = AnomalyDetector.parse(createXContentParserFromRegistry, getResponse.getId(), Long.valueOf(getResponse.getVersion()));
                    if (parse.isRealTimeDetector()) {
                        consumer.accept(parse);
                    } else {
                        consumer2.accept(parse);
                    }
                    if (createXContentParserFromRegistry != null) {
                        createXContentParserFromRegistry.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                String str2 = "Failed to start anomaly detector " + str;
                this.logger.error(str2, e);
                actionListener.onFailure(new ElasticsearchStatusException(str2, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
            }
        }, exc -> {
            actionListener.onFailure(exc);
        }));
    }

    public <T> void getLatestADTask(String str, Consumer<Optional<ADTask>> consumer, TransportService transportService, ActionListener<T> actionListener) {
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
        boolQueryBuilder.filter(new TermQueryBuilder("detector_id", str));
        boolQueryBuilder.filter(new TermQueryBuilder(ADTask.IS_LATEST_FIELD, true));
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolQueryBuilder);
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.source(searchSourceBuilder);
        searchRequest.indices(new String[]{CommonName.DETECTION_STATE_INDEX});
        this.client.search(searchRequest, ActionListener.wrap(searchResponse -> {
            if (searchResponse == null || searchResponse.getHits().getTotalHits() == null || searchResponse.getHits().getTotalHits().value == 0) {
                consumer.accept(Optional.empty());
                return;
            }
            SearchHit at = searchResponse.getHits().getAt(0);
            try {
                XContentParser createXContentParserFromRegistry = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, at.getSourceRef());
                try {
                    XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, createXContentParserFromRegistry.nextToken(), createXContentParserFromRegistry);
                    ADTask parse = ADTask.parse(createXContentParserFromRegistry, at.getId());
                    if (isADTaskEnded(parse) || !lastUpdateTimeExpired(parse)) {
                        consumer.accept(Optional.of(parse));
                    } else {
                        getADTaskProfile(parse, ActionListener.wrap(aDTaskProfile -> {
                            if (aDTaskProfile.getNodeId() == null) {
                                resetTaskStateAsStopped(parse, transportService);
                                parse.setState(ADTaskState.STOPPED.name());
                            }
                            consumer.accept(Optional.of(parse));
                        }, exc -> {
                            this.logger.error("Failed to get AD task profile for task " + parse.getTaskId(), exc);
                            actionListener.onFailure(exc);
                        }));
                    }
                    if (createXContentParserFromRegistry != null) {
                        createXContentParserFromRegistry.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                String str2 = "Failed to parse AD task for detector " + str;
                this.logger.error(str2, e);
                actionListener.onFailure(new ElasticsearchStatusException(str2, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
            }
        }, exc -> {
            if (exc instanceof IndexNotFoundException) {
                consumer.accept(Optional.empty());
            } else {
                this.logger.error("Failed to search AD task for detector " + str, exc);
                actionListener.onFailure(exc);
            }
        }));
    }

    private void stopHistoricalDetector(String str, Optional<ADTask> optional, User user, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        if (!optional.isPresent()) {
            actionListener.onFailure(new ResourceNotFoundException(str, "Detector not started"));
        } else {
            if (isADTaskEnded(optional.get())) {
                actionListener.onFailure(new ResourceNotFoundException(str, "No running task found"));
                return;
            }
            String taskId = optional.get().getTaskId();
            this.client.execute(ADCancelTaskAction.INSTANCE, new ADCancelTaskRequest(str, user == null ? null : user.getName(), this.nodeFilter.getEligibleDataNodes()), ActionListener.wrap(aDCancelTaskResponse -> {
                actionListener.onResponse(new AnomalyDetectorJobResponse(taskId, 0L, 0L, 0L, RestStatus.OK));
            }, exc -> {
                this.logger.error("Failed to cancel AD task " + taskId + ", detector id: " + str, exc);
                actionListener.onFailure(exc);
            }));
        }
    }

    private boolean lastUpdateTimeExpired(ADTask aDTask) {
        return aDTask.getLastUpdateTime().plus(2 * this.pieceIntervalSeconds.intValue(), (TemporalUnit) ChronoUnit.SECONDS).isBefore(Instant.now());
    }

    public boolean isADTaskEnded(ADTask aDTask) {
        return ADTaskState.STOPPED.name().equals(aDTask.getState()) || ADTaskState.FINISHED.name().equals(aDTask.getState()) || ADTaskState.FAILED.name().equals(aDTask.getState());
    }

    private void resetTaskStateAsStopped(ADTask aDTask, TransportService transportService) {
        if (isADTaskEnded(aDTask)) {
            return;
        }
        cleanDetectorCache(aDTask, transportService, () -> {
            HashMap hashMap = new HashMap();
            hashMap.put("state", ADTaskState.STOPPED.name());
            updateADTask(aDTask.getTaskId(), hashMap);
            this.logger.debug("reset task as stopped, task id " + aDTask.getTaskId());
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanDetectorCache(ADTask aDTask, TransportService transportService, AnomalyDetectorFunction anomalyDetectorFunction) {
        String coordinatingNode = aDTask.getCoordinatingNode();
        DiscoveryNode[] eligibleDataNodes = this.nodeFilter.getEligibleDataNodes();
        this.logger.debug("coordinatingNode is: " + coordinatingNode + " for task " + aDTask.getTaskId());
        DiscoveryNode discoveryNode = null;
        int length = eligibleDataNodes.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            DiscoveryNode discoveryNode2 = eligibleDataNodes[i];
            if (discoveryNode2.getId().equals(coordinatingNode)) {
                discoveryNode = discoveryNode2;
                break;
            }
            i++;
        }
        if (discoveryNode != null) {
            this.logger.debug("coordinatingNode found, will clean detector cache on it, detectorId: " + aDTask.getDetectorId());
            forwardToCoordinatingNode(aDTask.getDetector(), null, ADTaskAction.STOP, transportService, discoveryNode, ActionListener.wrap(anomalyDetectorJobResponse -> {
                anomalyDetectorFunction.execute();
            }, exc -> {
                this.logger.error("Failed to clear detector cache on coordinating node " + coordinatingNode, exc);
            }));
        } else {
            this.logger.warn("coordinating node" + coordinatingNode + " left cluster for detector " + aDTask.getDetectorId() + ", task id " + aDTask.getTaskId());
            anomalyDetectorFunction.execute();
        }
    }

    public void getLatestADTaskProfile(String str, TransportService transportService, ActionListener<DetectorProfile> actionListener) {
        getLatestADTask(str, optional -> {
            if (optional.isPresent()) {
                getADTaskProfile((ADTask) optional.get(), ActionListener.wrap(aDTaskProfile -> {
                    DetectorProfile.Builder builder = new DetectorProfile.Builder();
                    builder.adTaskProfile(aDTaskProfile);
                    actionListener.onResponse(builder.build());
                }, exc -> {
                    this.logger.error("Failed to get AD task profile for task " + ((ADTask) optional.get()).getTaskId(), exc);
                    actionListener.onFailure(exc);
                }));
            } else {
                actionListener.onFailure(new ResourceNotFoundException(str, "Can't find latest task for detector"));
            }
        }, transportService, actionListener);
    }

    private void getADTaskProfile(ADTask aDTask, ActionListener<ADTaskProfile> actionListener) {
        this.client.execute(ADTaskProfileAction.INSTANCE, new ADTaskProfileRequest(aDTask.getDetectorId(), this.nodeFilter.getEligibleDataNodes()), ActionListener.wrap(aDTaskProfileResponse -> {
            if (aDTaskProfileResponse.hasFailures()) {
                actionListener.onFailure((Exception) aDTaskProfileResponse.failures().get(0));
                return;
            }
            List list = (List) aDTaskProfileResponse.getNodes().stream().filter(aDTaskProfileNodeResponse -> {
                return aDTaskProfileNodeResponse.getAdTaskProfile() != null;
            }).map((v0) -> {
                return v0.getAdTaskProfile();
            }).collect(Collectors.toList());
            if (list.size() > 1) {
                String str = list.size() + " tasks running for detector " + aDTask.getDetectorId() + ". Please stop detector to kill all running tasks.";
                this.logger.error(str);
                actionListener.onFailure(new InternalFailure(aDTask.getDetectorId(), str));
            } else if (list.size() == 0) {
                actionListener.onResponse(new ADTaskProfile(aDTask, null, null, null, null, null, null));
            } else {
                ADTaskProfile aDTaskProfile = (ADTaskProfile) list.get(0);
                actionListener.onResponse(new ADTaskProfile(aDTask, aDTaskProfile.getShingleSize(), aDTaskProfile.getRcfTotalUpdates(), aDTaskProfile.getThresholdModelTrained(), aDTaskProfile.getThresholdModelTrainingDataSize(), aDTaskProfile.getModelSizeInBytes(), aDTaskProfile.getNodeId()));
            }
        }, exc -> {
            this.logger.error("Failed to get task profile for task " + aDTask.getTaskId(), exc);
            actionListener.onFailure(exc);
        }));
    }

    public ADTaskProfile getLocalADTaskProfileByDetectorId(String str) {
        ADTaskProfile aDTaskProfile = null;
        List<String> tasksOfDetector = this.adTaskCacheManager.getTasksOfDetector(str);
        if (tasksOfDetector.size() > 1) {
            String str2 = "Multiple tasks are running for detector: " + str + ". You can stop detector to kill all running tasks.";
            this.logger.warn(str2);
            throw new LimitExceededException(str2);
        }
        if (tasksOfDetector.size() == 1) {
            String str3 = tasksOfDetector.get(0);
            aDTaskProfile = new ADTaskProfile(Integer.valueOf(this.adTaskCacheManager.getShingle(str3).size()), Long.valueOf(this.adTaskCacheManager.getRcfModel(str3).getTotalUpdates()), Boolean.valueOf(this.adTaskCacheManager.isThresholdModelTrained(str3)), Integer.valueOf(this.adTaskCacheManager.getThresholdModelTrainingDataSize(str3)), Long.valueOf(this.adTaskCacheManager.getModelSize(str3)), this.clusterService.localNode().getId());
        }
        return aDTaskProfile;
    }

    private boolean validateDetector(AnomalyDetector anomalyDetector, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        String str = null;
        if (anomalyDetector.getFeatureAttributes().size() == 0) {
            str = "Can't start detector job as no features configured";
        } else if (anomalyDetector.getEnabledFeatureIds().size() == 0) {
            str = "Can't start detector job as no enabled features configured";
        }
        if (str == null) {
            return true;
        }
        actionListener.onFailure(new ElasticsearchStatusException(str, RestStatus.BAD_REQUEST, new Object[0]));
        return false;
    }

    public void startHistoricalDetector(AnomalyDetector anomalyDetector, User user, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        try {
            if (this.detectionIndices.doesDetectorStateIndexExist()) {
                getLatestADTask(anomalyDetector.getDetectorId(), optional -> {
                    if (!optional.isPresent() || isADTaskEnded((ADTask) optional.get())) {
                        executeHistoricalDetector(anomalyDetector, user, actionListener);
                    } else {
                        actionListener.onFailure(new ElasticsearchStatusException(CommonErrorMessages.DETECTOR_IS_RUNNING, RestStatus.BAD_REQUEST, new Object[0]));
                    }
                }, transportService, actionListener);
            } else {
                this.detectionIndices.initDetectionStateIndex(ActionListener.wrap(createIndexResponse -> {
                    if (createIndexResponse.isAcknowledged()) {
                        this.logger.info("Created {} with mappings.", CommonName.DETECTION_STATE_INDEX);
                        executeHistoricalDetector(anomalyDetector, user, actionListener);
                    } else {
                        this.logger.warn("Create index .opendistro-anomaly-detection-state with mappings not acknowledged");
                        actionListener.onFailure(new ElasticsearchStatusException("Create index .opendistro-anomaly-detection-state with mappings not acknowledged", RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                    }
                }, exc -> {
                    if (ExceptionsHelper.unwrapCause(exc) instanceof ResourceAlreadyExistsException) {
                        executeHistoricalDetector(anomalyDetector, user, actionListener);
                    } else {
                        this.logger.error("Failed to init anomaly detection state index", exc);
                        actionListener.onFailure(exc);
                    }
                }));
            }
        } catch (Exception e) {
            this.logger.error("Failed to start historical detector " + anomalyDetector.getDetectorId(), e);
            actionListener.onFailure(e);
        }
    }

    private void executeHistoricalDetector(AnomalyDetector anomalyDetector, User user, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
        updateByQueryRequest.indices(new String[]{CommonName.DETECTION_STATE_INDEX});
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
        boolQueryBuilder.filter(new TermQueryBuilder("detector_id", anomalyDetector.getDetectorId()));
        boolQueryBuilder.filter(new TermQueryBuilder(ADTask.IS_LATEST_FIELD, true));
        updateByQueryRequest.setQuery(boolQueryBuilder);
        updateByQueryRequest.setRefresh(true);
        updateByQueryRequest.setScript(new Script("ctx._source.is_latest = false;"));
        this.client.execute(UpdateByQueryAction.INSTANCE, updateByQueryRequest, ActionListener.wrap(bulkByScrollResponse -> {
            List bulkFailures = bulkByScrollResponse.getBulkFailures();
            if (bulkFailures.isEmpty()) {
                createNewADTask(anomalyDetector, user, actionListener);
            } else {
                this.logger.error("Failed to update old task's state for detector: {}, response: {} ", anomalyDetector.getDetectorId(), bulkByScrollResponse.toString());
                actionListener.onFailure(((BulkItemResponse.Failure) bulkFailures.get(0)).getCause());
            }
        }, exc -> {
            this.logger.error("Failed to reset old tasks as not latest for detector " + anomalyDetector.getDetectorId(), exc);
            actionListener.onFailure(exc);
        }));
    }

    private void createNewADTask(AnomalyDetector anomalyDetector, User user, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        String name = user == null ? null : user.getName();
        Instant now = Instant.now();
        ADTask build = new ADTask.Builder().detectorId(anomalyDetector.getDetectorId()).detector(anomalyDetector).isLatest(true).taskType(ADTaskType.HISTORICAL.name()).executionStartTime(now).taskProgress(Float.valueOf(0.0f)).initProgress(Float.valueOf(0.0f)).state(ADTaskState.CREATED.name()).lastUpdateTime(now).startedBy(name).coordinatingNode(this.clusterService.localNode().getId()).user(user).build();
        IndexRequest indexRequest = new IndexRequest(CommonName.DETECTION_STATE_INDEX);
        try {
            XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
            try {
                indexRequest.source(build.toXContent(jsonBuilder, RestHandlerUtils.XCONTENT_WITH_TYPE)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                this.client.index(indexRequest, ActionListener.wrap(indexResponse -> {
                    onIndexADTaskResponse(indexResponse, build, (indexResponse, actionListener2) -> {
                        cleanOldAdTaskDocs(indexResponse, build, actionListener2);
                    }, actionListener);
                }, exc -> {
                    this.logger.error("Failed to create AD task for detector " + anomalyDetector.getDetectorId(), exc);
                    actionListener.onFailure(exc);
                }));
                if (jsonBuilder != null) {
                    jsonBuilder.close();
                }
            } finally {
            }
        } catch (Exception e) {
            this.logger.error("Failed to create AD task for detector " + anomalyDetector.getDetectorId(), e);
            actionListener.onFailure(e);
        }
    }

    private void onIndexADTaskResponse(IndexResponse indexResponse, ADTask aDTask, BiConsumer<IndexResponse, ActionListener<AnomalyDetectorJobResponse>> biConsumer, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        if (indexResponse == null || indexResponse.getResult() != DocWriteResponse.Result.CREATED) {
            actionListener.onFailure(new ElasticsearchStatusException(ExceptionUtil.getShardsFailure(indexResponse), indexResponse.status(), new Object[0]));
            return;
        }
        aDTask.setTaskId(indexResponse.getId());
        ActionListener<AnomalyDetectorJobResponse> wrap = ActionListener.wrap(anomalyDetectorJobResponse -> {
            actionListener.onResponse(anomalyDetectorJobResponse);
        }, exc -> {
            handleADTaskException(aDTask, exc);
            if (exc instanceof DuplicateTaskException) {
                actionListener.onFailure(new ElasticsearchStatusException(CommonErrorMessages.DETECTOR_IS_RUNNING, RestStatus.BAD_REQUEST, new Object[0]));
            } else {
                actionListener.onFailure(exc);
                this.adTaskCacheManager.removeDetector(aDTask.getDetectorId());
            }
        });
        try {
            this.adTaskCacheManager.add(aDTask.getDetectorId());
            if (biConsumer != null) {
                biConsumer.accept(indexResponse, wrap);
            }
        } catch (Exception e) {
            wrap.onFailure(e);
        }
    }

    private void cleanOldAdTaskDocs(IndexResponse indexResponse, ADTask aDTask, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
        boolQueryBuilder.filter(new TermQueryBuilder("detector_id", aDTask.getDetectorId()));
        boolQueryBuilder.filter(new TermQueryBuilder(ADTask.IS_LATEST_FIELD, false));
        SearchRequest searchRequest = new SearchRequest();
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolQueryBuilder).sort(ADTask.EXECUTION_START_TIME_FIELD, SortOrder.DESC).from(this.maxAdTaskDocsPerDetector.intValue() - 1).trackTotalHits(true).size(1);
        searchRequest.source(searchSourceBuilder).indices(new String[]{CommonName.DETECTION_STATE_INDEX});
        String detectorId = aDTask.getDetectorId();
        this.client.search(searchRequest, ActionListener.wrap(searchResponse -> {
            if (!searchResponse.getHits().iterator().hasNext()) {
                runBatchResultAction(indexResponse, aDTask, actionListener);
                return;
            }
            this.logger.debug("AD tasks count for detector {} is {}, exceeds limit of {}", detectorId, Long.valueOf(searchResponse.getHits().getTotalHits().value), this.maxAdTaskDocsPerDetector);
            SearchHit at = searchResponse.getHits().getAt(0);
            try {
                XContentParser createXContentParserFromRegistry = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, at.getSourceRef());
                try {
                    XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, createXContentParserFromRegistry.nextToken(), createXContentParserFromRegistry);
                    ADTask parse = ADTask.parse(createXContentParserFromRegistry, at.getId());
                    DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(new String[]{CommonName.DETECTION_STATE_INDEX});
                    RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder(ADTask.EXECUTION_START_TIME_FIELD);
                    rangeQueryBuilder.lt(Long.valueOf(parse.getExecutionStartTime().toEpochMilli())).format(CommonName.EPOCH_MILLIS_FORMAT);
                    deleteByQueryRequest.setQuery(rangeQueryBuilder);
                    this.client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(bulkByScrollResponse -> {
                        this.logger.debug("Deleted {} old AD tasks started equals or before {} for detector {}", Long.valueOf(bulkByScrollResponse.getDeleted()), Long.valueOf(aDTask.getExecutionStartTime().toEpochMilli()), detectorId);
                        runBatchResultAction(indexResponse, aDTask, actionListener);
                    }, exc -> {
                        this.logger.warn("Failed to clean AD tasks for detector " + detectorId, exc);
                        actionListener.onFailure(exc);
                    }));
                    if (createXContentParserFromRegistry != null) {
                        createXContentParserFromRegistry.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                this.logger.warn("Failed to parse AD tasks for detector " + detectorId, e);
                actionListener.onFailure(e);
            }
        }, exc -> {
            this.logger.warn("Failed to search AD tasks for detector " + detectorId, exc);
            actionListener.onFailure(exc);
        }));
    }

    private void runBatchResultAction(IndexResponse indexResponse, ADTask aDTask, ActionListener<AnomalyDetectorJobResponse> actionListener) {
        this.client.execute(ADBatchAnomalyResultAction.INSTANCE, new ADBatchAnomalyResultRequest(aDTask), ActionListener.wrap(aDBatchAnomalyResultResponse -> {
            this.logger.info("AD task {} of detector {} dispatched to {} node {}", aDTask.getTaskId(), aDTask.getDetectorId(), aDBatchAnomalyResultResponse.isRunTaskRemotely() ? "remote" : "local", aDBatchAnomalyResultResponse.getNodeId());
            actionListener.onResponse(new AnomalyDetectorJobResponse(indexResponse.getId(), indexResponse.getVersion(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), RestStatus.OK));
        }, exc -> {
            actionListener.onFailure(exc);
        }));
    }

    public void handleADTaskException(ADTask aDTask, Exception exc) {
        String name = ADTaskState.FAILED.name();
        HashMap hashMap = new HashMap();
        if (exc instanceof DuplicateTaskException) {
            this.logger.warn("There is already one running task for detector, detectorId:" + aDTask.getDetectorId() + ". Will delete task " + aDTask.getTaskId());
            deleteADTask(aDTask.getTaskId());
            return;
        }
        if (exc instanceof ADTaskCancelledException) {
            this.logger.info("AD task cancelled, taskId: {}, detectorId: {}", aDTask.getTaskId(), aDTask.getDetectorId());
            name = ADTaskState.STOPPED.name();
            String cancelledBy = ((ADTaskCancelledException) exc).getCancelledBy();
            if (cancelledBy != null) {
                hashMap.put(ADTask.STOPPED_BY_FIELD, cancelledBy);
            }
        } else {
            this.logger.error("Failed to execute AD batch task, task id: " + aDTask.getTaskId() + ", detector id: " + aDTask.getDetectorId(), exc);
        }
        hashMap.put("error", ExceptionUtil.getErrorMessage(exc));
        hashMap.put("state", name);
        hashMap.put("execution_end_time", Long.valueOf(Instant.now().toEpochMilli()));
        updateADTask(aDTask.getTaskId(), hashMap);
    }

    public void updateADTask(String str, Map<String, Object> map) {
        updateADTask(str, map, ActionListener.wrap(updateResponse -> {
            if (updateResponse.status() == RestStatus.OK) {
                this.logger.debug("Updated AD task successfully: {}", updateResponse.status());
            } else {
                this.logger.error("Failed to update AD task {}, status: {}", str, updateResponse.status());
            }
        }, exc -> {
            this.logger.error("Failed to update task: " + str, exc);
        }));
    }

    public void updateADTask(String str, Map<String, Object> map, ActionListener<UpdateResponse> actionListener) {
        UpdateRequest updateRequest = new UpdateRequest(CommonName.DETECTION_STATE_INDEX, str);
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        hashMap.put("last_update_time", Long.valueOf(Instant.now().toEpochMilli()));
        updateRequest.doc(hashMap);
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        this.client.update(updateRequest, actionListener);
    }

    public void deleteADTask(String str) {
        deleteADTask(str, ActionListener.wrap(deleteResponse -> {
            this.logger.info("Deleted AD task {} with status: {}", str, deleteResponse.status());
        }, exc -> {
            this.logger.error("Failed to delete AD task " + str, exc);
        }));
    }

    public void deleteADTask(String str, ActionListener<DeleteResponse> actionListener) {
        this.client.delete(new DeleteRequest(CommonName.DETECTION_STATE_INDEX, str), actionListener);
    }

    public ADTaskCancellationState cancelLocalTaskByDetectorId(String str, String str2, String str3) {
        ADTaskCancellationState cancelByDetectorId = this.adTaskCacheManager.cancelByDetectorId(str, str2, str3);
        this.logger.debug("Cancelled AD task for detector: {}, state: {}, cancelled by: {}, reason: {}", str, cancelByDetectorId, str3, str2);
        return cancelByDetectorId;
    }

    public void deleteADTasks(String str, AnomalyDetectorFunction anomalyDetectorFunction, ActionListener<DeleteResponse> actionListener) {
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(new String[]{CommonName.DETECTION_STATE_INDEX});
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
        boolQueryBuilder.filter(new TermQueryBuilder("detector_id", str));
        deleteByQueryRequest.setQuery(boolQueryBuilder);
        this.client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(bulkByScrollResponse -> {
            this.logger.info("AD tasks deleted for detector {}", str);
            anomalyDetectorFunction.execute();
        }, exc -> {
            if (exc instanceof IndexNotFoundException) {
                anomalyDetectorFunction.execute();
            } else {
                actionListener.onFailure(exc);
            }
        }));
    }

    public void removeDetectorFromCache(String str) {
        this.adTaskCacheManager.removeDetector(str);
    }
}
