/*
 * Decompiled with CFR 0.152.
 */
package com.sproutsocial.nsq;

import com.sproutsocial.nsq.Message;
import com.sproutsocial.nsq.MessageHandler;
import com.sproutsocial.nsq.Subscription;
import com.sproutsocial.nsq.Util;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class BackoffHandler
implements MessageHandler {
    private volatile boolean isBackoff = false;
    private Subscription subscription;
    private final MessageHandler handler;
    private final int initDelay;
    private final int maxDelay;
    private long lastAttempt;
    private int delay;
    private int failCount;
    private int fullSpeedMaxInFlight;
    private static final int DEFAULT_INIT_DELAY_MILLIS = 1000;
    private static final int DEFAULT_MAX_DELAY_MILLIS = 60000;
    private static final Logger logger = LoggerFactory.getLogger(BackoffHandler.class);

    public BackoffHandler(MessageHandler handler, int initDelayMillis, int maxDelayMillis) {
        this.handler = handler;
        this.initDelay = initDelayMillis;
        this.maxDelay = maxDelayMillis;
    }

    public BackoffHandler(MessageHandler handler) {
        this(handler, 1000, 60000);
    }

    @Override
    public void accept(Message msg) {
        boolean backoff = this.isBackoff;
        if (backoff) {
            this.attemptDuringBackoff();
        }
        try {
            this.handler.accept(msg);
            if (backoff) {
                this.successDuringBackoff();
            }
            msg.finish();
        }
        catch (Exception e) {
            this.failure(msg, e);
        }
    }

    synchronized void setSubscription(Subscription subscription) {
        this.subscription = subscription;
    }

    private synchronized void failure(Message msg, Exception e) {
        this.isBackoff = true;
        ++this.failCount;
        logger.error("message error. failures:{}", (Object)this.failCount, (Object)e);
        if (this.failCount == 1) {
            this.delay = this.initDelay;
            this.fullSpeedMaxInFlight = this.subscription.getMaxInFlight();
            this.lastAttempt = Util.clock();
        } else {
            this.delay = Math.min(this.delay * 2, this.maxDelay);
            this.pauseSubscription();
        }
        msg.requeue();
    }

    private synchronized void pauseSubscription() {
        this.subscription.setMaxInFlight(0);
        this.subscription.getClient().schedule(new Runnable(){

            @Override
            public void run() {
                if (!((BackoffHandler)BackoffHandler.this).subscription.isStopping) {
                    BackoffHandler.this.subscription.setMaxInFlight(1);
                }
            }
        }, this.delay);
    }

    private synchronized void attemptDuringBackoff() {
        long now = Util.clock();
        int waited = (int)(now - this.lastAttempt);
        if (waited < this.delay) {
            Util.sleepQuietly(this.delay - waited);
            this.lastAttempt = Util.clock();
        } else {
            this.lastAttempt = now;
        }
    }

    private synchronized void successDuringBackoff() {
        this.delay /= 2;
        if (this.delay < this.initDelay) {
            this.isBackoff = false;
            this.failCount = 0;
            this.delay = 0;
            this.subscription.setMaxInFlight(this.fullSpeedMaxInFlight);
        } else {
            this.pauseSubscription();
        }
    }
}

