/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.pbcast;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Properties;
import java.util.Vector;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.protocols.pbcast.Digest;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Promise;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;

public class STABLE
extends Protocol {
    Address local_addr = null;
    Vector mbrs = new Vector();
    Digest digest = new Digest();
    Promise digest_promise = new Promise();
    Vector heard_from = new Vector();
    long digest_timeout = 60000L;
    long desired_avg_gossip = 20000L;
    long stability_delay = 6000L;
    StabilitySendTask stability_task = null;
    Object stability_mutex = new Object();
    StableTask stable_task = null;
    Object stable_task_mutex = new Object();
    TimeScheduler timer = null;
    int max_gossip_runs;
    int num_gossip_runs = this.max_gossip_runs = 3;
    static final String name = "STABLE";
    long max_bytes = 0L;
    long num_bytes_received = 0L;
    boolean suspended = false;
    long max_suspend_time = 600000L;
    SuspendTask suspend_task = null;

    public String getName() {
        return name;
    }

    public Vector requiredDownServices() {
        Vector<Integer> retval = new Vector<Integer>();
        retval.addElement(new Integer(57));
        return retval;
    }

    public boolean setProperties(Properties props) {
        super.setProperties(props);
        String str = props.getProperty("digest_timeout");
        if (str != null) {
            this.digest_timeout = Long.parseLong(str);
            props.remove("digest_timeout");
        }
        if ((str = props.getProperty("desired_avg_gossip")) != null) {
            this.desired_avg_gossip = Long.parseLong(str);
            props.remove("desired_avg_gossip");
        }
        if ((str = props.getProperty("stability_delay")) != null) {
            this.stability_delay = Long.parseLong(str);
            props.remove("stability_delay");
        }
        if ((str = props.getProperty("max_gossip_runs")) != null) {
            this.num_gossip_runs = this.max_gossip_runs = Integer.parseInt(str);
            props.remove("max_gossip_runs");
        }
        if ((str = props.getProperty("max_bytes")) != null) {
            this.max_bytes = Long.parseLong(str);
            props.remove("max_bytes");
        }
        if ((str = props.getProperty("max_suspend_time")) != null) {
            this.max_suspend_time = Long.parseLong(str);
            props.remove("max_suspend_time");
        }
        if (props.size() > 0) {
            System.err.println("STABLE.setProperties(): these properties are not recognized:");
            props.list(System.out);
            return false;
        }
        return true;
    }

    void suspend() {
        if (!this.suspended) {
            this.suspended = true;
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)"suspending message garbage collection");
            }
        }
        if (this.suspend_task == null || this.suspend_task.cancelled()) {
            this.suspend_task = new SuspendTask();
            this.timer.add(this.suspend_task);
        }
    }

    void resume() {
        if (this.suspended) {
            this.suspended = false;
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)"resuming message garbage collection");
            }
        }
        if (this.suspend_task != null) {
            this.suspend_task.stop();
            this.suspend_task = null;
        }
    }

    public void start() throws Exception {
        if (this.stack == null || this.stack.timer == null) {
            throw new Exception("STABLE.up(): timer cannot be retrieved from protocol stack");
        }
        this.timer = this.stack.timer;
    }

    public void stop() {
        this.stopStableTask();
    }

    public void up(Event evt) {
        int type = evt.getType();
        switch (evt.getType()) {
            case 1: {
                Header obj;
                Message msg = (Message)evt.getArg();
                if (this.max_bytes > 0L) {
                    long size = Math.max(msg.getLength(), 24);
                    this.num_bytes_received += size;
                    if (this.log.isTraceEnabled()) {
                        this.log.trace((Object)("received message of " + size + " bytes, total bytes received=" + this.num_bytes_received));
                    }
                    if (this.num_bytes_received >= this.max_bytes) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug((Object)("max_bytes has been exceeded (max_bytes=" + this.max_bytes + ", number of bytes received=" + this.num_bytes_received + "): sending STABLE message"));
                        }
                        new Thread(){

                            public void run() {
                                STABLE.this.initialize();
                                STABLE.this.sendStableMessage();
                            }
                        }.start();
                        this.num_bytes_received = 0L;
                    }
                }
                if ((obj = msg.getHeader(this.getName())) == null || !(obj instanceof StableHeader)) break;
                StableHeader hdr = (StableHeader)msg.removeHeader(this.getName());
                switch (hdr.type) {
                    case 1: {
                        this.handleStableGossip(msg.getSrc(), hdr.digest);
                        break;
                    }
                    case 2: {
                        this.handleStabilityMessage(hdr.digest);
                        break;
                    }
                    default: {
                        if (!this.log.isErrorEnabled()) break;
                        this.log.error((Object)("StableHeader type " + hdr.type + " not known"));
                    }
                }
                return;
            }
            case 8: {
                this.local_addr = (Address)evt.getArg();
            }
        }
        this.passUp(evt);
        if (this.desired_avg_gossip > 0L && (type == 6 || type == 1)) {
            this.startStableTask();
        }
    }

    protected void receiveUpEvent(Event evt) {
        if (evt.getType() == 58) {
            this.digest_promise.setResult(evt.getArg());
            return;
        }
        super.receiveUpEvent(evt);
    }

    public void down(Event evt) {
        int type = evt.getType();
        switch (evt.getType()) {
            case 6: {
                View v = (View)evt.getArg();
                Vector tmp = v.getMembers();
                this.mbrs.removeAllElements();
                this.mbrs.addAll(tmp);
                this.heard_from.retainAll(tmp);
                this.stopStableTask();
                break;
            }
            case 65: {
                this.stopStableTask();
                this.suspend();
                break;
            }
            case 66: {
                this.resume();
            }
        }
        if (this.desired_avg_gossip > 0L && (type == 6 || type == 1)) {
            this.startStableTask();
        }
        this.passDown(evt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void initialize() {
        Digest digest = this.digest;
        synchronized (digest) {
            this.digest.reset(this.mbrs.size());
            for (int i = 0; i < this.mbrs.size(); ++i) {
                this.digest.add((Address)this.mbrs.elementAt(i), -1L, -1L);
            }
            this.heard_from.removeAllElements();
            this.heard_from.addAll(this.mbrs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void startStableTask() {
        this.num_gossip_runs = this.max_gossip_runs;
        Object object = this.stable_task_mutex;
        synchronized (object) {
            if (this.stable_task != null && !this.stable_task.cancelled()) {
                return;
            }
            this.stable_task = new StableTask();
            this.timer.add(this.stable_task, true);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug((Object)("stable task started; num_gossip_runs=" + this.num_gossip_runs + ", max_gossip_runs=" + this.max_gossip_runs));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stopStableTask() {
        Object object = this.stable_task_mutex;
        synchronized (object) {
            if (this.stable_task != null) {
                this.stable_task.stop();
                this.stable_task = null;
            }
        }
    }

    void handleStableGossip(Address sender, Digest d) {
        if (d == null || sender == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error((Object)"digest or sender is null");
            }
            return;
        }
        if (this.suspended) {
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("STABLE message will not be handled as suspened=" + this.suspended));
            }
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug((Object)("received digest " + this.printStabilityDigest(d) + " from " + sender));
        }
        if (!this.heard_from.contains(sender)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("already received gossip from " + sender));
            }
            return;
        }
        if (!this.digest.sameSenders(d)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("received digest from " + sender + " (digest=" + d + ") which does not match my own digest (" + this.digest + "): ignoring digest and re-initializing own digest"));
            }
            this.initialize();
            return;
        }
        for (int i = 0; i < d.size(); ++i) {
            long my_highest_seen_seqno;
            Address mbr = d.senderAt(i);
            long highest_seqno = d.highSeqnoAt(i);
            long highest_seen_seqno = d.highSeqnoSeenAt(i);
            if (this.digest.getIndex(mbr) == -1) {
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug((Object)("sender " + mbr + " not found in stability vector"));
                continue;
            }
            long my_highest_seqno = this.digest.highSeqnoAt(mbr);
            if (my_highest_seqno < 0L) {
                if (highest_seqno >= 0L) {
                    this.digest.setHighSeqnoAt(mbr, highest_seqno);
                }
            } else {
                this.digest.setHighSeqnoAt(mbr, Math.min(my_highest_seqno, highest_seqno));
            }
            if ((my_highest_seen_seqno = this.digest.highSeqnoSeenAt(mbr)) < 0L) {
                if (highest_seen_seqno < 0L) continue;
                this.digest.setHighSeqnoSeenAt(mbr, highest_seen_seqno);
                continue;
            }
            this.digest.setHighSeqnoSeenAt(mbr, Math.max(my_highest_seen_seqno, highest_seen_seqno));
        }
        this.heard_from.removeElement(sender);
        if (this.heard_from.size() == 0) {
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("sending stability msg " + this.printStabilityDigest(this.digest)));
            }
            this.sendStabilityMessage(this.digest.copy());
            this.initialize();
        }
    }

    synchronized void sendStableMessage() {
        Digest d = null;
        Message msg = new Message();
        d = this.getDigest();
        if (d != null && d.size() > 0) {
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("mcasting digest " + d + " (num_gossip_runs=" + this.num_gossip_runs + ", max_gossip_runs=" + this.max_gossip_runs + ')'));
            }
            if (this.suspended) {
                if (this.log.isTraceEnabled()) {
                    this.log.trace((Object)("will not send STABLE message as suspended=" + this.suspended));
                }
                return;
            }
            StableHeader hdr = new StableHeader(1, d);
            msg.putHeader(this.getName(), hdr);
            this.passDown(new Event(1, msg));
        }
    }

    Digest getDigest() {
        Digest ret = null;
        this.passDown(new Event(57));
        ret = (Digest)this.digest_promise.getResult(this.digest_timeout);
        if (ret == null && this.log.isErrorEnabled()) {
            this.log.error((Object)("digest could not be fetched from below (timeout was " + this.digest_timeout + " msecs)"));
        }
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendStabilityMessage(Digest tmp) {
        if (this.timer == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error((Object)"timer is null, cannot schedule stability message to be sent");
            }
            this.timer = this.stack != null ? this.stack.timer : null;
            return;
        }
        long delay = Util.random(this.stability_delay);
        if (this.log.isDebugEnabled()) {
            this.log.debug((Object)("stability_task=" + this.stability_task + ", delay is " + delay));
        }
        Object object = this.stability_mutex;
        synchronized (object) {
            if (this.stability_task != null && !this.stability_task.cancelled()) {
                return;
            }
            this.stability_task = new StabilitySendTask(this, tmp, delay);
            this.timer.add(this.stability_task, true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleStabilityMessage(Digest d) {
        if (d == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error((Object)"stability vector is null");
            }
            return;
        }
        if (this.suspended) {
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("STABILITY message will not be handled as suspened=" + this.suspended));
            }
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug((Object)("stability vector is " + d.printHighSeqnos()));
        }
        Object object = this.stability_mutex;
        synchronized (object) {
            if (this.stability_task != null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug((Object)("cancelling stability task (running=" + !this.stability_task.cancelled() + ')'));
                }
                this.stability_task.stop();
                this.stability_task = null;
            }
        }
        if (!this.digest.sameSenders(d)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("received digest (digest=" + d + ") which does not match my own digest (" + this.digest + "): ignoring digest and re-initializing own digest"));
            }
            this.initialize();
            return;
        }
        this.passDown(new Event(30, d));
    }

    String printStabilityDigest(Digest d) {
        StringBuffer sb = new StringBuffer();
        boolean first = true;
        if (d != null) {
            for (int i = 0; i < d.size(); ++i) {
                if (!first) {
                    sb.append(", ");
                } else {
                    first = false;
                }
                sb.append(d.senderAt(i) + "#" + d.highSeqnoAt(i) + " (" + d.highSeqnoSeenAt(i) + ')');
            }
        }
        return sb.toString();
    }

    private class SuspendTask
    implements TimeScheduler.Task {
        boolean running = true;

        SuspendTask() {
        }

        void stop() {
            this.running = false;
        }

        public boolean cancelled() {
            return !this.running;
        }

        public long nextInterval() {
            return STABLE.this.max_suspend_time;
        }

        public void run() {
            if (STABLE.this.suspended) {
                STABLE.this.suspended = false;
                STABLE.this.log.warn((Object)"Reset suspended flag to true, this should never happen: check why RESUME_STABLE has not been received");
            }
            this.stop();
        }
    }

    private class StabilitySendTask
    implements TimeScheduler.Task {
        Digest d = null;
        Protocol stable_prot = null;
        boolean stopped = false;
        long delay = 2000L;

        public StabilitySendTask(Protocol stable_prot, Digest d, long delay) {
            this.stable_prot = stable_prot;
            this.d = d;
            this.delay = delay;
        }

        public void stop() {
            this.stopped = true;
        }

        public boolean cancelled() {
            return this.stopped;
        }

        public long nextInterval() {
            return this.delay;
        }

        public void run() {
            if (STABLE.this.suspended) {
                if (STABLE.this.log.isDebugEnabled()) {
                    STABLE.this.log.debug((Object)("STABILITY message will not be sent as suspened=" + STABLE.this.suspended));
                }
                this.stopped = true;
                return;
            }
            if (this.d != null && !this.stopped) {
                Message msg = new Message();
                StableHeader hdr = new StableHeader(2, this.d);
                msg.putHeader(STABLE.name, hdr);
                this.stable_prot.passDown(new Event(1, msg));
                this.d = null;
            }
            this.stopped = true;
        }
    }

    private class StableTask
    implements TimeScheduler.Task {
        boolean stopped = false;

        private StableTask() {
        }

        public void reset() {
            this.stopped = false;
        }

        public void stop() {
            this.stopped = true;
        }

        public boolean cancelled() {
            return this.stopped;
        }

        public long nextInterval() {
            long interval = this.computeSleepTime();
            if (interval <= 0L) {
                return 10000L;
            }
            return interval;
        }

        public void run() {
            if (STABLE.this.suspended) {
                STABLE.this.log.debug((Object)("stable task will not run as suspended=" + STABLE.this.suspended));
                return;
            }
            STABLE.this.initialize();
            STABLE.this.sendStableMessage();
            --STABLE.this.num_gossip_runs;
            if (STABLE.this.num_gossip_runs <= 0) {
                if (STABLE.this.log.isDebugEnabled()) {
                    STABLE.this.log.debug((Object)("stable task terminating (num_gossip_runs=" + STABLE.this.num_gossip_runs + ", max_gossip_runs=" + STABLE.this.max_gossip_runs + ')'));
                }
                this.stop();
            }
        }

        long computeSleepTime() {
            return this.getRandom((long)STABLE.this.mbrs.size() * STABLE.this.desired_avg_gossip * 2L);
        }

        long getRandom(long range) {
            return (long)(Math.random() * (double)range % (double)range);
        }
    }

    public static class StableHeader
    extends Header {
        static final int STABLE_GOSSIP = 1;
        static final int STABILITY = 2;
        int type = 0;
        Digest digest = null;

        public StableHeader() {
        }

        StableHeader(int type, Digest digest) {
            this.type = type;
            this.digest = digest;
        }

        static String type2String(int t) {
            switch (t) {
                case 1: {
                    return "STABLE_GOSSIP";
                }
                case 2: {
                    return "STABILITY";
                }
            }
            return "<unknown>";
        }

        public String toString() {
            StringBuffer sb = new StringBuffer();
            sb.append('[');
            sb.append(StableHeader.type2String(this.type));
            sb.append("]: digest is ");
            sb.append(this.digest);
            return sb.toString();
        }

        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeInt(this.type);
            this.digest.writeExternal(out);
        }

        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.type = in.readInt();
            this.digest = new Digest();
            this.digest.readExternal(in);
        }
    }
}

