/*
 * Decompiled with CFR 0.152.
 */
package com.sproutsocial.nsq;

import com.sproutsocial.nsq.BasePubSub;
import com.sproutsocial.nsq.Batcher;
import com.sproutsocial.nsq.Client;
import com.sproutsocial.nsq.HostAndPort;
import com.sproutsocial.nsq.NSQException;
import com.sproutsocial.nsq.PubConnection;
import com.sproutsocial.nsq.Util;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class Publisher
extends BasePubSub {
    private final HostAndPort nsqd;
    private final HostAndPort failoverNsqd;
    private PubConnection con;
    private boolean isFailover = false;
    private long failoverStart;
    private int failoverDurationSecs = 300;
    private final Map<String, Batcher> batchers = new HashMap<String, Batcher>();
    private ScheduledExecutorService batchExecutor;
    private static final int DEFAULT_MAX_BATCH_SIZE = 16384;
    private static final int DEFUALT_MAX_BATCH_DELAY = 300;
    private static final Logger logger = LoggerFactory.getLogger(Publisher.class);

    public Publisher(Client client, String nsqd, String failoverNsqd) {
        super(client);
        this.nsqd = HostAndPort.fromString(nsqd).withDefaultPort(4150);
        this.failoverNsqd = failoverNsqd != null ? HostAndPort.fromString(failoverNsqd).withDefaultPort(4150) : null;
        client.addPublisher(this);
    }

    public Publisher(String nsqd, String failoverNsqd) {
        this(Client.getDefaultClient(), nsqd, failoverNsqd);
    }

    public Publisher(String nsqd) {
        this(nsqd, null);
    }

    @GuardedBy(value="this")
    private void checkConnection() throws IOException {
        if (this.con == null) {
            if (this.isStopping) {
                throw new NSQException("publisher stopped");
            }
            this.connect(this.nsqd);
        } else if (this.isFailover && Util.clock() - this.failoverStart > (long)(this.failoverDurationSecs * 1000)) {
            this.isFailover = false;
            this.connect(this.nsqd);
            logger.info("using primary nsqd");
        }
    }

    @GuardedBy(value="this")
    private void connect(HostAndPort host) throws IOException {
        if (this.con != null) {
            this.con.close();
        }
        this.con = new PubConnection(this.client, host, this);
        try {
            this.con.connect(this.config);
        }
        catch (IOException e) {
            this.con.close();
            this.con = null;
            throw e;
        }
        logger.info("publisher connected:{}", (Object)host);
    }

    public synchronized void connectionClosed(PubConnection closedCon) {
        if (this.con == closedCon) {
            this.con = null;
            logger.debug("removed closed publisher connection:{}", (Object)closedCon.getHost());
        }
    }

    public synchronized void publish(String topic, byte[] data) {
        Util.checkNotNull(topic);
        Util.checkNotNull(data);
        Util.checkArgument(data.length > 0);
        try {
            this.checkConnection();
            this.con.publish(topic, data);
        }
        catch (Exception e) {
            logger.error("publish error with:{}", (Object)(this.isFailover ? this.failoverNsqd : this.nsqd), (Object)e);
            this.publishFailover(topic, data);
        }
    }

    public synchronized void publishDeferred(String topic, byte[] data, long delay, TimeUnit unit) {
        Util.checkNotNull(topic);
        Util.checkNotNull(data);
        Util.checkArgument(data.length > 0);
        Util.checkArgument(delay > 0L);
        Util.checkNotNull((Object)unit);
        try {
            this.checkConnection();
            this.con.publishDeferred(topic, data, unit.toMillis(delay));
        }
        catch (Exception e) {
            throw new NSQException("deferred publish failed", e);
        }
    }

    public synchronized void publish(String topic, List<byte[]> dataList) {
        Util.checkNotNull(topic);
        Util.checkNotNull(dataList);
        Util.checkArgument(dataList.size() > 0);
        try {
            this.checkConnection();
            this.con.publish(topic, dataList);
        }
        catch (Exception e) {
            logger.error("publish error with:{}", (Object)(this.isFailover ? this.failoverNsqd : this.nsqd), (Object)e);
            for (byte[] data : dataList) {
                this.publishFailover(topic, data);
            }
        }
    }

    @GuardedBy(value="this")
    private void publishFailover(String topic, byte[] data) {
        try {
            if (this.failoverNsqd == null) {
                logger.warn("publish failed but no failoverNsqd configured. Will wait and retry once.");
                Util.sleepQuietly(10000);
                this.connect(this.nsqd);
            } else if (!this.isFailover) {
                this.failoverStart = Util.clock();
                this.isFailover = true;
                this.connect(this.failoverNsqd);
                logger.info("using failover nsqd:{}", (Object)this.failoverNsqd);
            }
            this.con.publish(topic, data);
        }
        catch (Exception e) {
            Util.closeQuietly(this.con);
            this.con = null;
            this.isFailover = false;
            throw new NSQException("publish failed", e);
        }
    }

    public synchronized void publishBuffered(String topic, byte[] data) {
        Util.checkNotNull(topic);
        Util.checkNotNull(data);
        Util.checkArgument(data.length > 0);
        Batcher batcher = this.batchers.get(topic);
        if (batcher == null) {
            batcher = new Batcher(this, topic, 16384, 300);
            this.batchers.put(topic, batcher);
        }
        batcher.publish(data);
    }

    public synchronized void setBatchConfig(String topic, int maxSizeBytes, int maxDelayMillis) {
        Batcher batcher = this.batchers.get(topic);
        if (batcher != null) {
            batcher.sendBatch();
        }
        batcher = new Batcher(this, topic, maxSizeBytes, maxDelayMillis);
        this.batchers.put(topic, batcher);
    }

    synchronized ScheduledExecutorService getBatchExecutor() {
        if (this.batchExecutor == null) {
            this.batchExecutor = Executors.newScheduledThreadPool(1, Util.threadFactory("nsq-batch"));
        }
        return this.batchExecutor;
    }

    @Override
    public synchronized void stop() {
        for (Batcher batcher : this.batchers.values()) {
            batcher.sendBatch();
        }
        super.stop();
        Util.closeQuietly(this.con);
        this.con = null;
        if (this.batchExecutor != null) {
            Util.shutdownAndAwaitTermination(this.batchExecutor, 40L, TimeUnit.MILLISECONDS);
        }
        if (this.client.isLonePublisher(this)) {
            Util.shutdownAndAwaitTermination(this.client.getSchedExecutor(), 40L, TimeUnit.MILLISECONDS);
        }
    }

    public synchronized int getFailoverDurationSecs() {
        return this.failoverDurationSecs;
    }

    public synchronized void setFailoverDurationSecs(int failoverDurationSecs) {
        this.failoverDurationSecs = failoverDurationSecs;
    }
}

