/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.cluster;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterInfo;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.DiskUsage;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.store.StoreStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ReceiveTimeoutTransportException;

public class InternalClusterInfoService
implements ClusterInfoService,
ClusterStateListener {
    private static final Logger logger = LogManager.getLogger(InternalClusterInfoService.class);
    private static final String REFRESH_EXECUTOR = "management";
    public static final Setting<TimeValue> INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING = Setting.timeSetting("cluster.info.update.interval", TimeValue.timeValueSeconds((long)30L), TimeValue.timeValueSeconds((long)10L), Setting.Property.Dynamic, Setting.Property.NodeScope);
    public static final Setting<TimeValue> INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING = Setting.positiveTimeSetting("cluster.info.update.timeout", TimeValue.timeValueSeconds((long)15L), Setting.Property.Dynamic, Setting.Property.NodeScope);
    private volatile TimeValue updateFrequency;
    private volatile ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsages;
    private volatile ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsages;
    private volatile IndicesStatsSummary indicesStatsSummary;
    private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference();
    private volatile boolean enabled;
    private volatile TimeValue fetchTimeout;
    private final ThreadPool threadPool;
    private final Client client;
    private final List<Consumer<ClusterInfo>> listeners = new CopyOnWriteArrayList<Consumer<ClusterInfo>>();

    public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
        this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
        this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
        this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
        this.threadPool = threadPool;
        this.client = client;
        this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
        this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
        this.enabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
        ClusterSettings clusterSettings = clusterService.getClusterSettings();
        clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout);
        clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency);
        clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
    }

    private void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }

    private void setFetchTimeout(TimeValue fetchTimeout) {
        this.fetchTimeout = fetchTimeout;
    }

    void setUpdateFrequency(TimeValue updateFrequency) {
        this.updateFrequency = updateFrequency;
    }

    @Override
    public void clusterChanged(ClusterChangedEvent event) {
        if (event.localNodeMaster() && this.refreshAndRescheduleRunnable.get() == null) {
            logger.trace("elected as master, scheduling cluster info update tasks");
            this.executeRefresh(event.state(), "became master");
            RefreshAndRescheduleRunnable newRunnable = new RefreshAndRescheduleRunnable();
            this.refreshAndRescheduleRunnable.set(newRunnable);
            this.threadPool.scheduleUnlessShuttingDown(this.updateFrequency, REFRESH_EXECUTOR, newRunnable);
        } else if (!event.localNodeMaster()) {
            this.refreshAndRescheduleRunnable.set(null);
            return;
        }
        if (!this.enabled) {
            return;
        }
        for (DiscoveryNode addedNode : event.nodesDelta().addedNodes()) {
            if (!addedNode.isDataNode()) continue;
            this.executeRefresh(event.state(), "data node added");
            break;
        }
        for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
            if (!removedNode.isDataNode()) continue;
            logger.trace("Removing node from cluster info: {}", (Object)removedNode.getId());
            if (this.leastAvailableSpaceUsages.containsKey(removedNode.getId())) {
                ImmutableOpenMap.Builder<String, DiskUsage> newMaxUsages = ImmutableOpenMap.builder(this.leastAvailableSpaceUsages);
                newMaxUsages.remove(removedNode.getId());
                this.leastAvailableSpaceUsages = newMaxUsages.build();
            }
            if (!this.mostAvailableSpaceUsages.containsKey(removedNode.getId())) continue;
            ImmutableOpenMap.Builder<String, DiskUsage> newMinUsages = ImmutableOpenMap.builder(this.mostAvailableSpaceUsages);
            newMinUsages.remove(removedNode.getId());
            this.mostAvailableSpaceUsages = newMinUsages.build();
        }
    }

    private void executeRefresh(ClusterState clusterState, String reason) {
        if (clusterState.nodes().getDataNodes().size() > 1) {
            logger.trace("refreshing cluster info in background [{}]", (Object)reason);
            this.threadPool.executor(REFRESH_EXECUTOR).execute(new RefreshRunnable(reason));
        }
    }

    @Override
    public ClusterInfo getClusterInfo() {
        IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary;
        return new ClusterInfo(this.leastAvailableSpaceUsages, this.mostAvailableSpaceUsages, indicesStatsSummary.shardSizes, indicesStatsSummary.shardRoutingToDataPath, indicesStatsSummary.reservedSpace);
    }

    protected CountDownLatch updateNodeStats(ActionListener<NodesStatsResponse> listener) {
        CountDownLatch latch = new CountDownLatch(1);
        NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
        nodesStatsRequest.clear();
        nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FS.metricName());
        nodesStatsRequest.timeout(this.fetchTimeout);
        this.client.admin().cluster().nodesStats(nodesStatsRequest, new LatchedActionListener<NodesStatsResponse>(listener, latch));
        return latch;
    }

    protected CountDownLatch updateIndicesStats(ActionListener<IndicesStatsResponse> listener) {
        CountDownLatch latch = new CountDownLatch(1);
        IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
        indicesStatsRequest.clear();
        indicesStatsRequest.store(true);
        indicesStatsRequest.indicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_CLOSED_HIDDEN);
        this.client.admin().indices().stats(indicesStatsRequest, new LatchedActionListener<IndicesStatsResponse>(listener, latch));
        return latch;
    }

    List<NodeStats> adjustNodesStats(List<NodeStats> nodeStats) {
        return nodeStats;
    }

    public final ClusterInfo refresh() {
        logger.trace("refreshing cluster info");
        CountDownLatch nodeLatch = this.updateNodeStats(new ActionListener<NodesStatsResponse>(){

            @Override
            public void onResponse(NodesStatsResponse nodesStatsResponse) {
                ImmutableOpenMap.Builder<String, DiskUsage> leastAvailableUsagesBuilder = ImmutableOpenMap.builder();
                ImmutableOpenMap.Builder<String, DiskUsage> mostAvailableUsagesBuilder = ImmutableOpenMap.builder();
                InternalClusterInfoService.fillDiskUsagePerNode(logger, InternalClusterInfoService.this.adjustNodesStats(nodesStatsResponse.getNodes()), leastAvailableUsagesBuilder, mostAvailableUsagesBuilder);
                InternalClusterInfoService.this.leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build();
                InternalClusterInfoService.this.mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build();
            }

            @Override
            public void onFailure(Exception e) {
                if (e instanceof ReceiveTimeoutTransportException) {
                    logger.error("NodeStatsAction timed out for ClusterInfoUpdateJob", (Throwable)e);
                } else {
                    if (e instanceof ClusterBlockException) {
                        if (logger.isTraceEnabled()) {
                            logger.trace("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", (Throwable)e);
                        }
                    } else {
                        logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", (Throwable)e);
                    }
                    InternalClusterInfoService.this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
                    InternalClusterInfoService.this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
                }
            }
        });
        CountDownLatch indicesLatch = this.updateIndicesStats(new ActionListener<IndicesStatsResponse>(){

            @Override
            public void onResponse(IndicesStatsResponse indicesStatsResponse) {
                ShardStats[] stats = indicesStatsResponse.getShards();
                ImmutableOpenMap.Builder<String, Long> shardSizeByIdentifierBuilder = ImmutableOpenMap.builder();
                ImmutableOpenMap.Builder<ShardRouting, String> dataPathByShardRoutingBuilder = ImmutableOpenMap.builder();
                HashMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceBuilders = new HashMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder>();
                InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizeByIdentifierBuilder, dataPathByShardRoutingBuilder, reservedSpaceBuilders);
                ImmutableOpenMap.Builder rsrvdSpace = ImmutableOpenMap.builder();
                reservedSpaceBuilders.forEach((nodeAndPath, builder) -> rsrvdSpace.put(nodeAndPath, builder.build()));
                InternalClusterInfoService.this.indicesStatsSummary = new IndicesStatsSummary(shardSizeByIdentifierBuilder.build(), dataPathByShardRoutingBuilder.build(), rsrvdSpace.build());
            }

            @Override
            public void onFailure(Exception e) {
                if (e instanceof ReceiveTimeoutTransportException) {
                    logger.error("IndicesStatsAction timed out for ClusterInfoUpdateJob", (Throwable)e);
                } else {
                    if (e instanceof ClusterBlockException) {
                        if (logger.isTraceEnabled()) {
                            logger.trace("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", (Throwable)e);
                        }
                    } else {
                        logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", (Throwable)e);
                    }
                    InternalClusterInfoService.this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
                }
            }
        });
        try {
            if (!nodeLatch.await(this.fetchTimeout.getMillis(), TimeUnit.MILLISECONDS)) {
                logger.warn("Failed to update node information for ClusterInfoUpdateJob within {} timeout", (Object)this.fetchTimeout);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        try {
            if (!indicesLatch.await(this.fetchTimeout.getMillis(), TimeUnit.MILLISECONDS)) {
                logger.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", (Object)this.fetchTimeout);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        ClusterInfo clusterInfo = this.getClusterInfo();
        boolean anyListeners = false;
        for (Consumer<ClusterInfo> listener : this.listeners) {
            anyListeners = true;
            try {
                logger.trace("notifying [{}] of new cluster info", listener);
                listener.accept(clusterInfo);
            }
            catch (Exception e) {
                logger.info((Message)new ParameterizedMessage("failed to notify [{}] of new cluster info", listener), (Throwable)e);
            }
        }
        assert (anyListeners) : "expected to notify at least one listener";
        return clusterInfo;
    }

    @Override
    public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
        this.listeners.add(clusterInfoConsumer);
    }

    static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> shardSizes, ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceByShard) {
        for (ShardStats s : stats) {
            ShardRouting shardRouting = s.getShardRouting();
            newShardRoutingToDataPath.put(shardRouting, s.getDataPath());
            StoreStats storeStats = s.getStats().getStore();
            if (storeStats == null) continue;
            long size = storeStats.sizeInBytes();
            long reserved = storeStats.getReservedSize().getBytes();
            String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
            logger.trace("shard: {} size: {} reserved: {}", (Object)shardIdentifier, (Object)size, (Object)reserved);
            shardSizes.put(shardIdentifier, size);
            if (reserved == -1L) continue;
            ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent(new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()), t -> new ClusterInfo.ReservedSpace.Builder());
            reservedSpaceBuilder.add(shardRouting.shardId(), reserved);
        }
    }

    static void fillDiskUsagePerNode(Logger logger, List<NodeStats> nodeStatsArray, ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvailableUsages, ImmutableOpenMap.Builder<String, DiskUsage> newMostAvailableUsages) {
        for (NodeStats nodeStats : nodeStatsArray) {
            if (nodeStats.getFs() == null) {
                logger.warn("Unable to retrieve node FS stats for {}", (Object)nodeStats.getNode().getName());
                continue;
            }
            FsInfo.Path leastAvailablePath = null;
            FsInfo.Path mostAvailablePath = null;
            for (FsInfo.Path info : nodeStats.getFs()) {
                if (leastAvailablePath == null) {
                    assert (mostAvailablePath == null);
                    mostAvailablePath = leastAvailablePath = info;
                    continue;
                }
                if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) {
                    leastAvailablePath = info;
                    continue;
                }
                if (mostAvailablePath.getAvailable().getBytes() >= info.getAvailable().getBytes()) continue;
                mostAvailablePath = info;
            }
            String nodeId = nodeStats.getNode().getId();
            String nodeName = nodeStats.getNode().getName();
            if (logger.isTraceEnabled()) {
                logger.trace("node: [{}], most available: total disk: {}, available disk: {} / least available: total disk: {}, available disk: {}", (Object)nodeId, (Object)mostAvailablePath.getTotal(), (Object)mostAvailablePath.getAvailable(), (Object)leastAvailablePath.getTotal(), (Object)leastAvailablePath.getAvailable());
            }
            if (leastAvailablePath.getTotal().getBytes() < 0L) {
                if (logger.isTraceEnabled()) {
                    logger.trace("node: [{}] least available path has less than 0 total bytes of disk [{}], skipping", (Object)nodeId, (Object)leastAvailablePath.getTotal().getBytes());
                }
            } else {
                newLeastAvailableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, leastAvailablePath.getPath(), leastAvailablePath.getTotal().getBytes(), leastAvailablePath.getAvailable().getBytes()));
            }
            if (mostAvailablePath.getTotal().getBytes() < 0L) {
                if (!logger.isTraceEnabled()) continue;
                logger.trace("node: [{}] most available path has less than 0 total bytes of disk [{}], skipping", (Object)nodeId, (Object)mostAvailablePath.getTotal().getBytes());
                continue;
            }
            newMostAvailableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, mostAvailablePath.getPath(), mostAvailablePath.getTotal().getBytes(), mostAvailablePath.getAvailable().getBytes()));
        }
    }

    private static class IndicesStatsSummary {
        static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
        final ImmutableOpenMap<String, Long> shardSizes;
        final ImmutableOpenMap<ShardRouting, String> shardRoutingToDataPath;
        final ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace;

        IndicesStatsSummary(ImmutableOpenMap<String, Long> shardSizes, ImmutableOpenMap<ShardRouting, String> shardRoutingToDataPath, ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace) {
            this.shardSizes = shardSizes;
            this.shardRoutingToDataPath = shardRoutingToDataPath;
            this.reservedSpace = reservedSpace;
        }
    }

    private class RefreshAndRescheduleRunnable
    extends RefreshRunnable {
        RefreshAndRescheduleRunnable() {
            super("scheduled");
        }

        @Override
        protected void doRun() {
            if (this == InternalClusterInfoService.this.refreshAndRescheduleRunnable.get()) {
                super.doRun();
            } else {
                logger.trace("master changed, scheduled refresh job is stale");
            }
        }

        @Override
        public void onAfter() {
            if (this == InternalClusterInfoService.this.refreshAndRescheduleRunnable.get()) {
                logger.trace("scheduling next cluster info refresh in [{}]", (Object)InternalClusterInfoService.this.updateFrequency);
                InternalClusterInfoService.this.threadPool.scheduleUnlessShuttingDown(InternalClusterInfoService.this.updateFrequency, InternalClusterInfoService.REFRESH_EXECUTOR, this);
            }
        }
    }

    private class RefreshRunnable
    extends AbstractRunnable {
        private final String reason;

        RefreshRunnable(String reason) {
            this.reason = reason;
        }

        @Override
        protected void doRun() {
            if (InternalClusterInfoService.this.enabled) {
                logger.trace("refreshing cluster info [{}]", (Object)this.reason);
                InternalClusterInfoService.this.refresh();
            } else {
                logger.trace("skipping cluster info refresh [{}] since it is disabled", (Object)this.reason);
            }
        }

        @Override
        public void onFailure(Exception e) {
            logger.warn((Message)new ParameterizedMessage("refreshing cluster info failed [{}]", (Object)this.reason), (Throwable)e);
        }

        @Override
        public void onRejection(Exception e) {
            boolean shutDown = e instanceof OpenSearchRejectedExecutionException && ((OpenSearchRejectedExecutionException)e).isExecutorShutdown();
            logger.log(shutDown ? Level.DEBUG : Level.WARN, "refreshing cluster info rejected [{}]", (Object)this.reason, (Object)e);
        }
    }
}

