/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointPlan;
import org.apache.flink.runtime.checkpoint.FinishedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider;
import org.apache.flink.runtime.checkpoint.FullyFinishedOperatorState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.util.Preconditions;

public class DefaultCheckpointPlan
implements CheckpointPlan {
    private final List<Execution> tasksToTrigger;
    private final List<Execution> tasksToWaitFor;
    private final List<ExecutionVertex> tasksToCommitTo;
    private final List<Execution> finishedTasks;
    private final boolean mayHaveFinishedTasks;
    private final Map<JobVertexID, ExecutionJobVertex> fullyFinishedOrFinishedOnRestoreVertices;
    private final IdentityHashMap<ExecutionJobVertex, Integer> vertexOperatorsFinishedTasksCount;

    DefaultCheckpointPlan(List<Execution> tasksToTrigger, List<Execution> tasksToWaitFor, List<ExecutionVertex> tasksToCommitTo, List<Execution> finishedTasks, List<ExecutionJobVertex> fullyFinishedJobVertex, boolean mayHaveFinishedTasks) {
        this.tasksToTrigger = Preconditions.checkNotNull(tasksToTrigger);
        this.tasksToWaitFor = Preconditions.checkNotNull(tasksToWaitFor);
        this.tasksToCommitTo = Preconditions.checkNotNull(tasksToCommitTo);
        this.finishedTasks = Preconditions.checkNotNull(finishedTasks);
        this.mayHaveFinishedTasks = mayHaveFinishedTasks;
        this.fullyFinishedOrFinishedOnRestoreVertices = new HashMap<JobVertexID, ExecutionJobVertex>();
        fullyFinishedJobVertex.forEach(jobVertex -> this.fullyFinishedOrFinishedOnRestoreVertices.put(jobVertex.getJobVertexId(), (ExecutionJobVertex)jobVertex));
        this.vertexOperatorsFinishedTasksCount = new IdentityHashMap();
    }

    @Override
    public List<Execution> getTasksToTrigger() {
        return this.tasksToTrigger;
    }

    @Override
    public List<Execution> getTasksToWaitFor() {
        return this.tasksToWaitFor;
    }

    @Override
    public List<ExecutionVertex> getTasksToCommitTo() {
        return this.tasksToCommitTo;
    }

    @Override
    public List<Execution> getFinishedTasks() {
        return this.finishedTasks;
    }

    @Override
    public Collection<ExecutionJobVertex> getFullyFinishedJobVertex() {
        return this.fullyFinishedOrFinishedOnRestoreVertices.values();
    }

    @Override
    public boolean mayHaveFinishedTasks() {
        return this.mayHaveFinishedTasks;
    }

    @Override
    public void reportTaskFinishedOnRestore(ExecutionVertex task) {
        this.fullyFinishedOrFinishedOnRestoreVertices.putIfAbsent(task.getJobvertexId(), task.getJobVertex());
    }

    @Override
    public void reportTaskHasFinishedOperators(ExecutionVertex task) {
        this.vertexOperatorsFinishedTasksCount.compute(task.getJobVertex(), (k, v) -> v == null ? 1 : v + 1);
    }

    @Override
    public void fulfillFinishedTaskStatus(Map<OperatorID, OperatorState> operatorStates) {
        if (!this.mayHaveFinishedTasks) {
            return;
        }
        HashMap<JobVertexID, ExecutionJobVertex> partlyFinishedVertex = new HashMap<JobVertexID, ExecutionJobVertex>();
        for (Execution task : this.finishedTasks) {
            JobVertexID jobVertexId = task.getVertex().getJobvertexId();
            if (this.fullyFinishedOrFinishedOnRestoreVertices.containsKey(jobVertexId)) continue;
            partlyFinishedVertex.put(jobVertexId, task.getVertex().getJobVertex());
        }
        this.checkNoPartlyFinishedVertexUsedUnionListState(partlyFinishedVertex, operatorStates);
        this.checkNoPartlyOperatorsFinishedVertexUsedUnionListState(partlyFinishedVertex, operatorStates);
        this.fulfillFullyFinishedOrFinishedOnRestoreOperatorStates(operatorStates);
        this.fulfillSubtaskStateForPartiallyFinishedOperators(operatorStates);
    }

    private void checkNoPartlyFinishedVertexUsedUnionListState(Map<JobVertexID, ExecutionJobVertex> partlyFinishedVertex, Map<OperatorID, OperatorState> operatorStates) {
        for (ExecutionJobVertex vertex : partlyFinishedVertex.values()) {
            if (!this.hasUsedUnionListState(vertex, operatorStates)) continue;
            throw new FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException(String.format("The vertex %s (id = %s) has used UnionListState, but part of its tasks are FINISHED.", vertex.getName(), vertex.getJobVertexId()));
        }
    }

    private void checkNoPartlyOperatorsFinishedVertexUsedUnionListState(Map<JobVertexID, ExecutionJobVertex> partlyFinishedVertex, Map<OperatorID, OperatorState> operatorStates) {
        for (Map.Entry<ExecutionJobVertex, Integer> entry : this.vertexOperatorsFinishedTasksCount.entrySet()) {
            ExecutionJobVertex vertex = entry.getKey();
            if (partlyFinishedVertex.containsKey(vertex.getJobVertexId()) || entry.getValue().intValue() == vertex.getParallelism() || !this.hasUsedUnionListState(vertex, operatorStates)) continue;
            throw new FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException(String.format("The vertex %s (id = %s) has used UnionListState, but part of its tasks has called operators' finish method.", vertex.getName(), vertex.getJobVertexId()));
        }
    }

    private boolean hasUsedUnionListState(ExecutionJobVertex vertex, Map<OperatorID, OperatorState> operatorStates) {
        for (OperatorIDPair operatorIDPair : vertex.getOperatorIDs()) {
            OperatorState operatorState = operatorStates.get(operatorIDPair.getGeneratedOperatorID());
            if (operatorState == null) continue;
            for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
                boolean hasUnionListState = Stream.concat(operatorSubtaskState.getManagedOperatorState().stream(), operatorSubtaskState.getRawOperatorState().stream()).filter(Objects::nonNull).flatMap(operatorStateHandle -> operatorStateHandle.getStateNameToPartitionOffsets().values().stream()).anyMatch(stateMetaInfo -> stateMetaInfo.getDistributionMode() == OperatorStateHandle.Mode.UNION);
                if (!hasUnionListState) continue;
                return true;
            }
        }
        return false;
    }

    private void fulfillFullyFinishedOrFinishedOnRestoreOperatorStates(Map<OperatorID, OperatorState> operatorStates) {
        for (ExecutionJobVertex jobVertex : this.fullyFinishedOrFinishedOnRestoreVertices.values()) {
            for (OperatorIDPair operatorID : jobVertex.getOperatorIDs()) {
                OperatorState operatorState = operatorStates.get(operatorID.getGeneratedOperatorID());
                Preconditions.checkState(operatorState == null || !operatorState.hasSubtaskStates(), "There should be no states or only coordinator state reported for fully finished operators");
                operatorState = new FullyFinishedOperatorState(operatorID.getGeneratedOperatorID(), jobVertex.getParallelism(), jobVertex.getMaxParallelism());
                operatorStates.put(operatorID.getGeneratedOperatorID(), operatorState);
            }
        }
    }

    private void fulfillSubtaskStateForPartiallyFinishedOperators(Map<OperatorID, OperatorState> operatorStates) {
        for (Execution finishedTask : this.finishedTasks) {
            ExecutionJobVertex jobVertex = finishedTask.getVertex().getJobVertex();
            for (OperatorIDPair operatorIDPair : jobVertex.getOperatorIDs()) {
                OperatorState operatorState = operatorStates.get(operatorIDPair.getGeneratedOperatorID());
                if (operatorState != null && operatorState.isFullyFinished()) continue;
                if (operatorState == null) {
                    operatorState = new OperatorState(operatorIDPair.getGeneratedOperatorID(), jobVertex.getParallelism(), jobVertex.getMaxParallelism());
                    operatorStates.put(operatorIDPair.getGeneratedOperatorID(), operatorState);
                }
                operatorState.putState(finishedTask.getParallelSubtaskIndex(), FinishedOperatorSubtaskState.INSTANCE);
            }
        }
    }
}

