/*
 * Decompiled with CFR 0.152.
 */
package com.sproutsocial.nsq;

import com.sproutsocial.nsq.Client;
import com.sproutsocial.nsq.Config;
import com.sproutsocial.nsq.Connection;
import com.sproutsocial.nsq.FailedMessageHandler;
import com.sproutsocial.nsq.HostAndPort;
import com.sproutsocial.nsq.MessageHandler;
import com.sproutsocial.nsq.NSQMessage;
import com.sproutsocial.nsq.Subscriber;
import com.sproutsocial.nsq.Subscription;
import com.sproutsocial.nsq.Util;
import java.io.IOException;
import net.jcip.annotations.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SubConnection
extends Connection {
    private final MessageHandler handler;
    private final FailedMessageHandler failedMessageHandler;
    private final Subscription subscription;
    private final String topic;
    private final int maxAttempts;
    private final int maxFlushDelayMillis;
    private int inFlight = 0;
    private int maxInFlight = 0;
    private int maxUnflushed = 0;
    private long finishedCount = 0L;
    private long requeuedCount = 0L;
    private static final Logger logger = LoggerFactory.getLogger(SubConnection.class);

    public SubConnection(Client client, HostAndPort host, Subscription subscription) {
        super(client, host);
        Subscriber subscriber = subscription.getSubscriber();
        this.handler = subscription.getHandler();
        this.failedMessageHandler = subscriber.getFailedMessageHandler();
        this.subscription = subscription;
        this.topic = subscription.getTopic();
        this.maxAttempts = subscriber.getMaxAttempts();
        this.maxFlushDelayMillis = subscriber.getMaxFlushDelayMillis();
        this.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                SubConnection.this.delayedFlush();
            }
        }, this.maxFlushDelayMillis / 2, this.maxFlushDelayMillis / 2, false);
    }

    public synchronized void finish(String id) {
        try {
            this.writeCommand("FIN", id);
            ++this.finishedCount;
            this.messageDone();
        }
        catch (IOException e) {
            logger.error("finish error. {}", (Object)this.stateDesc(), (Object)e);
            this.close();
        }
    }

    public synchronized void requeue(String id) {
        try {
            this.writeCommand("REQ", id, 0);
            ++this.requeuedCount;
            this.messageDone();
        }
        catch (IOException e) {
            logger.error("requeue error. {}", (Object)this.stateDesc(), (Object)e);
            this.close();
        }
    }

    @GuardedBy(value="this")
    private void messageDone() throws IOException {
        this.inFlight = Math.max(this.inFlight - 1, 0);
        if (this.inFlight == 0 && this.isStopping) {
            this.flushAndClose();
        } else {
            this.checkFlush();
        }
    }

    public synchronized void touch(String id) {
        try {
            this.writeCommand("TOUCH", id);
            this.checkFlush();
        }
        catch (IOException e) {
            logger.error("touch error. {}", (Object)this.stateDesc(), (Object)e);
            this.close();
        }
    }

    private synchronized void delayedFlush() {
        try {
            if (this.unflushedCount > 0 && Util.clock() - this.lastActionFlush > (long)(this.maxFlushDelayMillis / 2 + 10)) {
                this.flush();
            }
        }
        catch (Exception e) {
            logger.error("delayedFlush error. {}", (Object)this.stateDesc(), (Object)e);
            this.close();
        }
    }

    @GuardedBy(value="this")
    private void checkFlush() throws IOException {
        if (this.unflushedCount >= this.maxUnflushed) {
            this.flush();
        } else {
            ++this.unflushedCount;
        }
    }

    public synchronized void setMaxInFlight(int maxInFlight) {
        this.setMaxInFlight(maxInFlight, true);
    }

    public synchronized void setMaxInFlight(int maxInFlight, boolean isActive) {
        try {
            if (this.maxInFlight == maxInFlight) {
                return;
            }
            this.maxInFlight = maxInFlight;
            this.maxUnflushed = Math.min(maxInFlight / 3, 150);
            logger.debug("RDY:{} {}", (Object)maxInFlight, (Object)this.toString());
            this.writeCommand("RDY", maxInFlight);
            if (isActive) {
                this.flush();
            } else {
                this.out.flush();
            }
        }
        catch (IOException e) {
            logger.error("setMaxInFlight failed. con:{}", (Object)this.stateDesc(), (Object)e);
            this.close();
        }
    }

    public synchronized int getMaxInFlight() {
        return this.maxInFlight;
    }

    @Override
    public synchronized void connect(Config config) throws IOException {
        this.client.addSubConnection(this);
        super.connect(config);
        this.writeCommand("SUB", this.subscription.getTopic(), this.subscription.getChannel());
        this.flushAndReadOK();
    }

    private void failMessage(final NSQMessage msg) {
        if (this.failedMessageHandler != null) {
            this.handlerExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        SubConnection.this.failedMessageHandler.failed(SubConnection.this.subscription.getTopic(), SubConnection.this.subscription.getChannel(), msg);
                    }
                    catch (Throwable t) {
                        logger.error("failed message error", t);
                    }
                }
            });
        }
        this.finish(msg.getId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void onMessage(long timestamp, int attempts, String id, byte[] data) {
        final NSQMessage msg = new NSQMessage(timestamp, attempts, id, data, this.topic, this);
        SubConnection subConnection = this;
        synchronized (subConnection) {
            ++this.inFlight;
        }
        if (msg.getAttempts() >= this.maxAttempts) {
            this.failMessage(msg);
        } else {
            this.handlerExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        SubConnection.this.handler.accept(msg);
                    }
                    catch (Throwable t) {
                        logger.error("message error", t);
                    }
                }
            });
        }
    }

    @Override
    public void close() {
        super.close();
        this.client.getSchedExecutor().execute(new Runnable(){

            @Override
            public void run() {
                SubConnection.this.subscription.connectionClosed(SubConnection.this);
                SubConnection.this.client.connectionClosed(SubConnection.this);
            }
        });
    }

    @Override
    public synchronized void stop() {
        super.stop();
        if (this.inFlight == 0) {
            this.flushAndClose();
        } else {
            this.setMaxInFlight(0);
        }
    }

    public String toString() {
        return String.format("SubCon:%s %s.%s", this.host.getHost(), this.subscription.getTopic(), this.subscription.getChannel());
    }

    @Override
    public synchronized String stateDesc() {
        return String.format("%s inFlight:%d maxInFlight:%d fin:%d req:%d", super.stateDesc(), this.inFlight, this.maxInFlight, this.finishedCount, this.requeuedCount);
    }
}

