package com.amazon.opendistroforelasticsearch.ad.task;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.breaker.ADCircuitBreakerService;
import com.amazon.opendistroforelasticsearch.ad.common.exception.ADTaskCancelledException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.LimitExceededException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.ResourceNotFoundException;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.feature.FeatureManager;
import com.amazon.opendistroforelasticsearch.ad.feature.SinglePointFeatures;
import com.amazon.opendistroforelasticsearch.ad.indices.ADIndex;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.ml.ThresholdingModel;
import com.amazon.opendistroforelasticsearch.ad.model.ADTask;
import com.amazon.opendistroforelasticsearch.ad.model.ADTaskState;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.DetectionDateRange;
import com.amazon.opendistroforelasticsearch.ad.model.FeatureData;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.InternalStatNames;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodeResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultBulkIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.ExceptionUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils;
import com.amazon.randomcutforest.RandomCutForest;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.InternalMin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

/* loaded from: input_file:com/amazon/opendistroforelasticsearch/ad/task/ADBatchTaskRunner.class */
public class ADBatchTaskRunner {
    private final Logger logger = LogManager.getLogger(ADBatchTaskRunner.class);
    private final RateLimiter rateLimiter = RateLimiter.create(1.0d);
    private final ThreadPool threadPool;
    private final Client client;
    private final ADStats adStats;
    private final DiscoveryNodeFilterer nodeFilter;
    private final ClusterService clusterService;
    private final FeatureManager featureManager;
    private final ADCircuitBreakerService adCircuitBreakerService;
    private final ADTaskManager adTaskManager;
    private final AnomalyResultBulkIndexHandler anomalyResultBulkIndexHandler;
    private final IndexNameExpressionResolver indexNameExpressionResolver;
    private AnomalyDetectionIndices anomalyDetectionIndices;
    private final ADTaskCacheManager adTaskCacheManager;
    private final TransportRequestOptions option;
    private volatile Integer maxAdBatchTaskPerNode;
    private volatile Integer pieceSize;
    private volatile Integer pieceIntervalSeconds;

    public ADBatchTaskRunner(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, DiscoveryNodeFilterer discoveryNodeFilterer, IndexNameExpressionResolver indexNameExpressionResolver, ADCircuitBreakerService aDCircuitBreakerService, FeatureManager featureManager, ADTaskManager aDTaskManager, AnomalyDetectionIndices anomalyDetectionIndices, ADStats aDStats, AnomalyResultBulkIndexHandler anomalyResultBulkIndexHandler, ADTaskCacheManager aDTaskCacheManager) {
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.client = client;
        this.anomalyResultBulkIndexHandler = anomalyResultBulkIndexHandler;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
        this.nodeFilter = discoveryNodeFilterer;
        this.adStats = aDStats;
        this.adCircuitBreakerService = aDCircuitBreakerService;
        this.adTaskManager = aDTaskManager;
        this.featureManager = featureManager;
        this.anomalyDetectionIndices = anomalyDetectionIndices;
        this.option = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG).withTimeout((TimeValue) AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings)).build();
        this.adTaskCacheManager = aDTaskCacheManager;
        this.maxAdBatchTaskPerNode = (Integer) AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE, num -> {
            this.maxAdBatchTaskPerNode = num;
        });
        this.pieceSize = (Integer) AnomalyDetectorSettings.BATCH_TASK_PIECE_SIZE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.BATCH_TASK_PIECE_SIZE, num2 -> {
            this.pieceSize = num2;
        });
        this.pieceIntervalSeconds = (Integer) AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS, num3 -> {
            this.pieceIntervalSeconds = num3;
        });
    }

    public void run(ADTask aDTask, TransportService transportService, ActionListener<ADBatchAnomalyResultResponse> actionListener) {
        HashMap hashMap = new HashMap();
        hashMap.put("state", ADTaskState.INIT.name());
        hashMap.put("init_progress", Float.valueOf(0.0f));
        ActionListener wrap = ActionListener.wrap(aDBatchAnomalyResultResponse -> {
            actionListener.onResponse(aDBatchAnomalyResultResponse);
        }, exc -> {
            actionListener.onFailure(exc);
            handleException(aDTask, exc);
        });
        this.adTaskManager.updateADTask(aDTask.getTaskId(), hashMap, ActionListener.wrap(updateResponse -> {
            dispatchTask(aDTask, ActionListener.wrap(discoveryNode -> {
                if (this.clusterService.localNode().getId().equals(discoveryNode.getId())) {
                    this.logger.info("execute AD task {} locally on node {} for detector {}", aDTask.getTaskId(), discoveryNode.getId(), aDTask.getDetectorId());
                    startADBatchTask(aDTask, false, transportService, wrap);
                } else {
                    this.logger.info("execute AD task {} remotely on node {} for detector {}", aDTask.getTaskId(), discoveryNode.getId(), aDTask.getDetectorId());
                    transportService.sendRequest(discoveryNode, ADBatchTaskRemoteExecutionAction.NAME, new ADBatchAnomalyResultRequest(aDTask), this.option, new ActionListenerResponseHandler(wrap, ADBatchAnomalyResultResponse::new));
                }
            }, exc2 -> {
                wrap.onFailure(exc2);
            }));
        }, exc2 -> {
            wrap.onFailure(exc2);
        }));
    }

    private void dispatchTask(ADTask aDTask, ActionListener<DiscoveryNode> actionListener) {
        ADStatsRequest aDStatsRequest = new ADStatsRequest(this.nodeFilter.getEligibleDataNodes());
        aDStatsRequest.addAll(ImmutableSet.of(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName(), InternalStatNames.JVM_HEAP_USAGE.getName()));
        this.client.execute(ADStatsNodesAction.INSTANCE, aDStatsRequest, ActionListener.wrap(aDStatsNodesResponse -> {
            List list = (List) aDStatsNodesResponse.getNodes().stream().filter(aDStatsNodeResponse -> {
                return ((Long) aDStatsNodeResponse.getStatsMap().get(InternalStatNames.JVM_HEAP_USAGE.getName())).longValue() < 85;
            }).collect(Collectors.toList());
            if (list.size() == 0) {
                String str = "All nodes' memory usage exceeds limitation85. No eligible node to run detector " + aDTask.getDetectorId();
                this.logger.warn(str);
                actionListener.onFailure(new LimitExceededException(aDTask.getDetectorId(), str));
                return;
            }
            List list2 = (List) list.stream().filter(aDStatsNodeResponse2 -> {
                return ((Long) aDStatsNodeResponse2.getStatsMap().get(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName())).longValue() < ((long) this.maxAdBatchTaskPerNode.intValue());
            }).collect(Collectors.toList());
            if (list2.size() != 0) {
                actionListener.onResponse(((ADStatsNodeResponse) list2.stream().sorted((aDStatsNodeResponse3, aDStatsNodeResponse4) -> {
                    int compareTo = ((Long) aDStatsNodeResponse3.getStatsMap().get(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName())).compareTo((Long) aDStatsNodeResponse4.getStatsMap().get(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName()));
                    return compareTo == 0 ? ((Long) aDStatsNodeResponse3.getStatsMap().get(InternalStatNames.JVM_HEAP_USAGE.getName())).compareTo((Long) aDStatsNodeResponse4.getStatsMap().get(InternalStatNames.JVM_HEAP_USAGE.getName())) : compareTo;
                }).findFirst().get()).getNode());
                return;
            }
            String str2 = "All nodes' executing historical detector count exceeds limitation. No eligible node to run detector " + aDTask.getDetectorId();
            this.logger.warn(str2);
            actionListener.onFailure(new LimitExceededException(aDTask.getDetectorId(), str2));
        }, exc -> {
            this.logger.error("Failed to get node's task stats", exc);
            actionListener.onFailure(exc);
        }));
    }

    public void startADBatchTask(ADTask aDTask, boolean z, TransportService transportService, ActionListener<ADBatchAnomalyResultResponse> actionListener) {
        try {
            checkClusterState(aDTask);
            this.threadPool.executor(AnomalyDetectorPlugin.AD_BATCH_TASK_THREAD_POOL_NAME).execute(() -> {
                ActionListener<String> internalBatchTaskListener = internalBatchTaskListener(aDTask, transportService);
                try {
                    executeADBatchTask(aDTask, internalBatchTaskListener);
                } catch (Exception e) {
                    internalBatchTaskListener.onFailure(e);
                }
            });
            actionListener.onResponse(new ADBatchAnomalyResultResponse(this.clusterService.localNode().getId(), z));
        } catch (Exception e) {
            this.logger.error("Fail to start AD batch task " + aDTask.getTaskId(), e);
            actionListener.onFailure(e);
        }
    }

    private ActionListener<String> internalBatchTaskListener(ADTask aDTask, TransportService transportService) {
        String taskId = aDTask.getTaskId();
        return ActionListener.wrap(str -> {
            this.adTaskCacheManager.remove(taskId);
            this.adStats.getStat(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName()).decrement();
            this.adTaskManager.cleanDetectorCache(aDTask, transportService, () -> {
                this.adTaskManager.updateADTask(taskId, ImmutableMap.of("state", ADTaskState.FINISHED.name()));
            });
        }, exc -> {
            this.adTaskCacheManager.remove(taskId);
            this.adStats.getStat(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName()).decrement();
            this.adTaskManager.cleanDetectorCache(aDTask, transportService, () -> {
                handleException(aDTask, exc);
            });
        });
    }

    private void handleException(ADTask aDTask, Exception exc) {
        if (exc instanceof ADTaskCancelledException) {
            this.adStats.getStat(StatNames.AD_CANCELED_BATCH_TASK_COUNT.getName()).increment();
        } else if (ExceptionUtil.countInStats(exc)) {
            this.adStats.getStat(StatNames.AD_BATCH_TASK_FAILURE_COUNT.getName()).increment();
        }
        this.adTaskManager.handleADTaskException(aDTask, exc);
    }

    private void executeADBatchTask(ADTask aDTask, ActionListener<String> actionListener) {
        this.adStats.getStat(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName()).increment();
        this.adStats.getStat(StatNames.AD_TOTAL_BATCH_TASK_EXECUTION_COUNT.getName()).increment();
        this.adTaskCacheManager.add(aDTask);
        runFirstPiece(aDTask, Instant.now(), actionListener);
    }

    private void checkClusterState(ADTask aDTask) {
        checkADPluginEnabled(aDTask.getDetectorId());
        checkCircuitBreaker(aDTask);
    }

    private void checkADPluginEnabled(String str) {
        if (!EnabledSetting.isADPluginEnabled()) {
            throw new EndRunException(str, CommonErrorMessages.DISABLED_ERR_MSG, true).countedInStats(false);
        }
    }

    private void checkCircuitBreaker(ADTask aDTask) {
        String taskId = aDTask.getTaskId();
        if (this.adCircuitBreakerService.isOpen().booleanValue()) {
            this.logger.error("AD task: {}, {}", taskId, "Circuit breaker is open");
            throw new LimitExceededException(aDTask.getDetectorId(), "Circuit breaker is open", true);
        }
    }

    private void runFirstPiece(ADTask aDTask, Instant instant, ActionListener<String> actionListener) {
        try {
            ADTaskManager aDTaskManager = this.adTaskManager;
            String taskId = aDTask.getTaskId();
            ImmutableMap of = ImmutableMap.of("state", ADTaskState.INIT.name(), ADTask.CURRENT_PIECE_FIELD, Long.valueOf(aDTask.getDetector().getDetectionDateRange().getStartTime().toEpochMilli()), ADTask.TASK_PROGRESS_FIELD, Float.valueOf(0.0f), "init_progress", Float.valueOf(0.0f), ADTask.WORKER_NODE_FIELD, this.clusterService.localNode().getId());
            CheckedConsumer checkedConsumer = updateResponse -> {
                try {
                    checkIfADTaskCancelled(aDTask.getTaskId());
                    getDateRangeOfSourceData(aDTask, (l, l2) -> {
                        long millis = ((IntervalTimeConfiguration) aDTask.getDetector().getDetectionInterval()).toDuration().toMillis();
                        DetectionDateRange detectionDateRange = aDTask.getDetector().getDetectionDateRange();
                        long epochMilli = detectionDateRange.getStartTime().toEpochMilli();
                        long epochMilli2 = detectionDateRange.getEndTime().toEpochMilli();
                        if (l.longValue() >= epochMilli2 || l2.longValue() <= epochMilli) {
                            actionListener.onFailure(new ResourceNotFoundException(aDTask.getDetectorId(), "There is no data in the detection date range"));
                            return;
                        }
                        if (l.longValue() > epochMilli) {
                            epochMilli = l.longValue();
                        }
                        if (l2.longValue() < epochMilli2) {
                            epochMilli2 = l2.longValue();
                        }
                        long j = epochMilli - (epochMilli % millis);
                        long j2 = epochMilli2 - (epochMilli2 % millis);
                        if (j2 - j < AnomalyDetectorSettings.THRESHOLD_MODEL_TRAINING_SIZE * millis) {
                            actionListener.onFailure(new AnomalyDetectionException("There is no enough data to train model").countedInStats(false));
                            return;
                        }
                        long min = Math.min(j + (this.pieceSize.intValue() * millis), j2);
                        this.logger.debug("start first piece from {} to {}, interval {}, dataStartTime {}, dataEndTime {}, detectorId {}, taskId {}", Long.valueOf(j), Long.valueOf(min), Long.valueOf(millis), Long.valueOf(j), Long.valueOf(j2), aDTask.getDetectorId(), aDTask.getTaskId());
                        getFeatureData(aDTask, j, min, j, j2, millis, instant, actionListener);
                    }, actionListener);
                } catch (Exception e) {
                    actionListener.onFailure(e);
                }
            };
            Objects.requireNonNull(actionListener);
            aDTaskManager.updateADTask(taskId, of, ActionListener.wrap(checkedConsumer, actionListener::onFailure));
        } catch (Exception e) {
            actionListener.onFailure(e);
        }
    }

    private void getDateRangeOfSourceData(ADTask aDTask, BiConsumer<Long, Long> biConsumer, ActionListener actionListener) {
        this.client.search(new SearchRequest().indices((String[]) aDTask.getDetector().getIndices().toArray(new String[0])).source(new SearchSourceBuilder().aggregation(AggregationBuilders.min(CommonName.AGG_NAME_MIN_TIME).field(aDTask.getDetector().getTimeField())).aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(aDTask.getDetector().getTimeField())).size(0)), ActionListener.wrap(searchResponse -> {
            InternalMin internalMin = searchResponse.getAggregations().get(CommonName.AGG_NAME_MIN_TIME);
            InternalMax internalMax = searchResponse.getAggregations().get(CommonName.AGG_NAME_MAX_TIME);
            double value = internalMin.getValue();
            double value2 = internalMax.getValue();
            if (value == Double.POSITIVE_INFINITY) {
                actionListener.onFailure(new ResourceNotFoundException(aDTask.getDetectorId(), "There is no data in the time field"));
            } else {
                biConsumer.accept(Long.valueOf((long) value), Long.valueOf((long) value2));
            }
        }, exc -> {
            actionListener.onFailure(exc);
        }));
    }

    private void getFeatureData(ADTask aDTask, long j, long j2, long j3, long j4, long j5, Instant instant, ActionListener<String> actionListener) {
        this.featureManager.getFeatureDataPointsByBatch(aDTask.getDetector(), j, j2, new ThreadedActionListener<>(this.logger, this.threadPool, AnomalyDetectorPlugin.AD_BATCH_TASK_THREAD_POOL_NAME, ActionListener.wrap(map -> {
            try {
                if (map.size() == 0) {
                    this.logger.debug("No data in current piece with end time: " + j2);
                    runNextPiece(aDTask, j2, j3, j4, j5, actionListener);
                } else {
                    detectAnomaly(aDTask, map, j, j2, j3, j4, j5, instant, actionListener);
                }
            } catch (Exception e) {
                actionListener.onFailure(e);
            }
        }, exc -> {
            this.logger.debug("Fail to get feature data by batch for this piece with end time: " + j2);
            actionListener.onFailure(exc);
        }), false));
    }

    private void detectAnomaly(ADTask aDTask, Map<Long, Optional<double[]>> map, long j, long j2, long j3, long j4, long j5, Instant instant, ActionListener<String> actionListener) {
        String taskId = aDTask.getTaskId();
        RandomCutForest rcfModel = this.adTaskCacheManager.getRcfModel(taskId);
        ThresholdingModel thresholdModel = this.adTaskCacheManager.getThresholdModel(taskId);
        Deque<Map.Entry<Long, Optional<double[]>>> shingle = this.adTaskCacheManager.getShingle(taskId);
        ArrayList arrayList = new ArrayList();
        long j6 = j;
        for (int i = 0; i < this.pieceSize.intValue() && j6 < j4; i++) {
            Optional<double[]> empty = map.containsKey(Long.valueOf(j6)) ? map.get(Long.valueOf(j6)) : Optional.empty();
            j6 += j5;
            SinglePointFeatures shingledFeatureForHistoricalDetector = this.featureManager.getShingledFeatureForHistoricalDetector(aDTask.getDetector(), shingle, empty, j6);
            List<FeatureData> featureData = shingledFeatureForHistoricalDetector.getUnprocessedFeatures().isPresent() ? ParseUtils.getFeatureData(shingledFeatureForHistoricalDetector.getUnprocessedFeatures().get(), aDTask.getDetector()) : null;
            if (shingledFeatureForHistoricalDetector.getProcessedFeatures().isPresent()) {
                double[] dArr = shingledFeatureForHistoricalDetector.getProcessedFeatures().get();
                double anomalyScore = rcfModel.getAnomalyScore(dArr);
                rcfModel.update(dArr);
                double d = 0.0d;
                double d2 = 0.0d;
                if (this.adTaskCacheManager.isThresholdModelTrained(taskId)) {
                    d = thresholdModel.grade(anomalyScore);
                    d2 = thresholdModel.confidence();
                    if (anomalyScore > 0.0d) {
                        thresholdModel.update(anomalyScore);
                    }
                } else if (this.adTaskCacheManager.getThresholdModelTrainingDataSize(taskId) >= AnomalyDetectorSettings.THRESHOLD_MODEL_TRAINING_SIZE) {
                    this.logger.debug("training threshold model");
                    thresholdModel.train(this.adTaskCacheManager.getThresholdModelTrainingData(taskId));
                    this.adTaskCacheManager.setThresholdModelTrained(taskId, true);
                } else if (anomalyScore > 0.0d) {
                    this.adTaskCacheManager.addThresholdModelTrainingData(taskId, anomalyScore);
                }
                arrayList.add(new AnomalyResult(aDTask.getDetectorId(), taskId, Double.valueOf(anomalyScore), Double.valueOf(d), Double.valueOf(d2), featureData, Instant.ofEpochMilli(j6 - j5), Instant.ofEpochMilli(j6), instant, Instant.now(), null, null, aDTask.getDetector().getUser(), Integer.valueOf(this.anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT))));
            } else {
                arrayList.add(new AnomalyResult(aDTask.getDetectorId(), taskId, Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), featureData, Instant.ofEpochMilli(j6 - j5), Instant.ofEpochMilli(j6), instant, Instant.now(), shingledFeatureForHistoricalDetector.getUnprocessedFeatures().isPresent() ? "No full shingle in current detection window" : "No data in current detection window", null, aDTask.getDetector().getUser(), Integer.valueOf(this.anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT))));
            }
        }
        this.anomalyResultBulkIndexHandler.bulkIndexAnomalyResult(arrayList, new ThreadedActionListener(this.logger, this.threadPool, AnomalyDetectorPlugin.AD_BATCH_TASK_THREAD_POOL_NAME, ActionListener.wrap(bulkResponse -> {
            try {
                runNextPiece(aDTask, j2, j3, j4, j5, actionListener);
            } catch (Exception e) {
                actionListener.onFailure(e);
            }
        }, exc -> {
            this.logger.error("Fail to bulk index anomaly result", exc);
            actionListener.onFailure(exc);
        }), false));
    }

    private void runNextPiece(ADTask aDTask, long j, long j2, long j3, long j4, ActionListener<String> actionListener) {
        String taskId = aDTask.getTaskId();
        float calculateInitProgress = calculateInitProgress(taskId);
        String name = calculateInitProgress >= 1.0f ? ADTaskState.RUNNING.name() : ADTaskState.INIT.name();
        if (j >= j3) {
            this.logger.info("AD task finished for detector {}, task id: {}", aDTask.getDetectorId(), taskId);
            this.adTaskCacheManager.remove(taskId);
            this.adTaskManager.updateADTask(taskId, ImmutableMap.of(ADTask.CURRENT_PIECE_FIELD, Long.valueOf(j3), ADTask.TASK_PROGRESS_FIELD, Float.valueOf(1.0f), "execution_end_time", Long.valueOf(Instant.now().toEpochMilli()), "init_progress", Float.valueOf(calculateInitProgress)), ActionListener.wrap(updateResponse -> {
                actionListener.onResponse("task execution done");
            }, exc -> {
                actionListener.onFailure(exc);
            }));
            return;
        }
        checkClusterState(aDTask);
        long intValue = j + (this.pieceSize.intValue() * j4);
        long j5 = intValue > j3 ? j3 : intValue;
        for (int i = 0; i < this.pieceIntervalSeconds.intValue(); i++) {
            checkIfADTaskCancelled(taskId);
            this.rateLimiter.acquire(1);
        }
        this.logger.debug("start next piece start from {} to {}, interval {}", Long.valueOf(j), Long.valueOf(j5), Long.valueOf(j4));
        this.adTaskManager.updateADTask(taskId, ImmutableMap.of("state", name, ADTask.CURRENT_PIECE_FIELD, Long.valueOf(j), ADTask.TASK_PROGRESS_FIELD, Float.valueOf(((float) (j - j2)) / ((float) (j3 - j2))), "init_progress", Float.valueOf(calculateInitProgress)), ActionListener.wrap(updateResponse2 -> {
            getFeatureData(aDTask, j, j5, j2, j3, j4, Instant.now(), actionListener);
        }, exc2 -> {
            actionListener.onFailure(exc2);
        }));
    }

    private float calculateInitProgress(String str) {
        RandomCutForest rcfModel = this.adTaskCacheManager.getRcfModel(str);
        if (rcfModel == null) {
            return 0.0f;
        }
        float totalUpdates = ((float) rcfModel.getTotalUpdates()) / 128.0f;
        if (totalUpdates > 1.0f) {
            return 1.0f;
        }
        return totalUpdates;
    }

    private void checkIfADTaskCancelled(String str) {
        if (this.adTaskCacheManager.contains(str) && this.adTaskCacheManager.isCancelled(str)) {
            throw new ADTaskCancelledException(this.adTaskCacheManager.getCancelReason(str), this.adTaskCacheManager.getCancelledBy(str));
        }
    }
}
