/*
 * Decompiled with CFR 0.152.
 */
package jp.ossc.nimbus.service.queue;

import jp.ossc.nimbus.core.ServiceBase;
import jp.ossc.nimbus.core.ServiceManagerFactory;
import jp.ossc.nimbus.core.ServiceName;
import jp.ossc.nimbus.daemon.Daemon;
import jp.ossc.nimbus.daemon.DaemonControl;
import jp.ossc.nimbus.daemon.DaemonRunnable;
import jp.ossc.nimbus.service.queue.DefaultQueueService;
import jp.ossc.nimbus.service.queue.Queue;
import jp.ossc.nimbus.service.queue.QueueHandler;
import jp.ossc.nimbus.service.queue.QueueHandlerContainer;
import jp.ossc.nimbus.service.queue.QueueHandlerContainerServiceMBean;
import jp.ossc.nimbus.util.SynchronizeMonitor;
import jp.ossc.nimbus.util.WaitSynchronizeMonitor;

public class QueueHandlerContainerService
extends ServiceBase
implements QueueHandlerContainer,
QueueHandlerContainerServiceMBean {
    private static final long serialVersionUID = -6527205946658554031L;
    protected ServiceName queueServiceName;
    protected Queue requestQueue;
    protected Daemon[] daemons;
    protected QueueReceiver[] invokers;
    protected int queueHandlerSize = 1;
    protected ServiceName queueHandlerServiceName;
    protected QueueHandler queueHandler;
    protected boolean isDaemonQueueHandler = true;
    protected long waitTimeout = -1L;
    protected int maxRetryCount = 0;
    protected long retryInterval = 1000L;
    protected String handlingErrorMessageId = "QHC__00001";
    protected String retryOverErrorMessageId = "QHC__00002";
    protected int queueHandlerThreadPriority = -1;
    protected boolean isReleaseQueue = true;
    protected long count = 0L;
    protected boolean isQueueHandlerNowaitOnStop;
    protected boolean isGarbageQueueOnStop = true;
    protected boolean isSuspend;
    protected SynchronizeMonitor suspendMonitor = new WaitSynchronizeMonitor();

    @Override
    public void setQueueServiceName(ServiceName name) {
        this.queueServiceName = name;
    }

    @Override
    public ServiceName getQueueServiceName() {
        return this.queueServiceName;
    }

    @Override
    public void setQueueHandlerServiceName(ServiceName name) {
        if (this.queueHandlerServiceName == null) {
            this.queueHandlerServiceName = name;
            if (this.daemons != null) {
                for (int i = 0; i < this.daemons.length; ++i) {
                    this.daemons[i].resume();
                }
            }
        } else {
            this.queueHandlerServiceName = name;
        }
    }

    @Override
    public ServiceName getQueueHandlerServiceName() {
        return this.queueHandlerServiceName;
    }

    @Override
    public void setQueueHandlerSize(int size) {
        this.queueHandlerSize = size;
    }

    @Override
    public int getQueueHandlerSize() {
        return this.queueHandlerSize;
    }

    @Override
    public void setReleaseQueue(boolean isRelease) {
        this.isReleaseQueue = isRelease;
    }

    @Override
    public boolean isReleaseQueue() {
        return this.isReleaseQueue;
    }

    @Override
    public void setWaitTimeout(long timeout) {
        this.waitTimeout = timeout;
    }

    @Override
    public long getWaitTimeout() {
        return this.waitTimeout;
    }

    @Override
    public void setMaxRetryCount(int count) {
        this.maxRetryCount = count;
    }

    @Override
    public int getMaxRetryCount() {
        return this.maxRetryCount;
    }

    @Override
    public void setRetryInterval(long interval) {
        this.retryInterval = interval;
    }

    @Override
    public long getRetryInterval() {
        return this.retryInterval;
    }

    @Override
    public void setHandlingErrorMessageId(String id) {
        this.handlingErrorMessageId = id;
    }

    @Override
    public String getHandlingErrorMessageId() {
        return this.handlingErrorMessageId;
    }

    @Override
    public void setRetryOverErrorMessageId(String id) {
        this.retryOverErrorMessageId = id;
    }

    @Override
    public String getRetryOverErrorMessageId() {
        return this.retryOverErrorMessageId;
    }

    @Override
    public void setGarbageQueueOnStop(boolean isGarbage) {
        this.isGarbageQueueOnStop = isGarbage;
    }

    @Override
    public boolean isGarbageQueueOnStop() {
        return this.isGarbageQueueOnStop;
    }

    @Override
    public int getActiveQueueHandlerSize() {
        if (this.invokers == null) {
            return 0;
        }
        int count = 0;
        for (int i = 0; i < this.invokers.length; ++i) {
            if (!this.invokers[i].isActive) continue;
            ++count;
        }
        return count;
    }

    @Override
    public int getStandbyQueueHandlerSize() {
        if (this.invokers == null) {
            return 0;
        }
        int count = 0;
        for (int i = 0; i < this.invokers.length; ++i) {
            if (this.invokers[i].isActive) continue;
            ++count;
        }
        return count;
    }

    @Override
    public void setDaemonQueueHandler(boolean isDaemon) {
        this.isDaemonQueueHandler = isDaemon;
    }

    @Override
    public boolean isDaemonQueueHandler() {
        return this.isDaemonQueueHandler;
    }

    @Override
    public void setQueueHandlerThreadPriority(int newPriority) {
        this.queueHandlerThreadPriority = newPriority;
    }

    @Override
    public int getQueueHandlerThreadPriority() {
        return this.queueHandlerThreadPriority;
    }

    @Override
    public void setQueueHandlerNowaitOnStop(boolean isNowait) {
        this.isQueueHandlerNowaitOnStop = isNowait;
    }

    @Override
    public boolean isQueueHandlerNowaitOnStop() {
        return this.isQueueHandlerNowaitOnStop;
    }

    @Override
    public long getAverageHandleProcessTime() {
        if (this.invokers == null) {
            return 0L;
        }
        int time = 0;
        if (this.invokers.length != 0) {
            for (int i = 0; i < this.invokers.length; ++i) {
                time = (int)((long)time + this.invokers[i].getAverageReceiveProcessTime());
            }
            time /= this.invokers.length;
        }
        return time;
    }

    @Override
    public void startService() throws Exception {
        if (this.getQueueServiceName() != null) {
            this.setQueueService((Queue)ServiceManagerFactory.getServiceObject(this.queueServiceName));
        }
        if (this.getQueueService() == null && this.queueHandlerSize > 0) {
            DefaultQueueService queue = new DefaultQueueService();
            queue.create();
            queue.start();
            this.setQueueService(queue);
        }
        if (this.getQueueService() == null) {
            if (this.getQueueHandler() == null) {
                throw new IllegalArgumentException("QueueHandler is null.");
            }
        } else {
            this.getQueueService().accept();
            if (this.queueHandlerSize < 0) {
                throw new IllegalArgumentException("queueHandlerSize < 0.");
            }
            this.invokers = new QueueReceiver[this.queueHandlerSize];
            this.daemons = new Daemon[this.queueHandlerSize];
            for (int i = 0; i < this.queueHandlerSize; ++i) {
                this.invokers[i] = new QueueReceiver();
                this.invokers[i].handler = this.getQueueHandler();
                this.daemons[i] = new Daemon(this.invokers[i]);
                this.daemons[i].setDaemon(this.isDaemonQueueHandler);
                this.daemons[i].setName(this.getServiceNameObject() + " QueueReceiver" + (i + 1));
                if (this.queueHandlerThreadPriority > 0) {
                    this.daemons[i].setPriority(this.queueHandlerThreadPriority);
                }
                if (this.invokers[i].handler == null) {
                    this.daemons[i].suspend();
                }
                this.daemons[i].start();
            }
        }
    }

    @Override
    public void stopService() throws Exception {
        if (this.daemons != null) {
            for (int i = 0; i < this.daemons.length; ++i) {
                if (this.isQueueHandlerNowaitOnStop) {
                    this.daemons[i].stopNoWait();
                } else {
                    this.daemons[i].stop();
                }
                this.daemons[i] = null;
                this.invokers[i] = null;
            }
        }
        if (this.getQueueService() != null && this.isReleaseQueue) {
            this.getQueueService().release();
        }
        this.daemons = null;
        this.invokers = null;
        this.count = 0L;
    }

    public void setQueueService(Queue queue) {
        this.requestQueue = queue;
    }

    public Queue getQueueService() {
        return this.requestQueue;
    }

    @Override
    public void setQueueHandler(QueueHandler handler) {
        if (this.queueHandler == null) {
            this.queueHandler = handler;
            if (this.daemons != null) {
                for (int i = 0; i < this.daemons.length; ++i) {
                    this.daemons[i].resume();
                }
            }
        } else {
            this.queueHandler = handler;
        }
    }

    @Override
    public QueueHandler getQueueHandler() {
        if (this.queueHandler != null) {
            return this.queueHandler;
        }
        if (this.queueHandlerServiceName != null) {
            return (QueueHandler)ServiceManagerFactory.getServiceObject(this.queueHandlerServiceName);
        }
        return null;
    }

    @Override
    public void push(Object item) {
        if (this.getQueueService() == null) {
            ++this.count;
            this.handleDequeuedObjectWithLock(this.getQueueHandler(), item, null);
        } else {
            this.getQueueService().push(item);
        }
    }

    @Override
    public boolean push(Object item, long timeout) {
        if (this.getQueueService() == null) {
            ++this.count;
            this.handleDequeuedObjectWithLock(this.getQueueHandler(), item, null);
            return true;
        }
        return this.getQueueService().push(item, timeout);
    }

    @Override
    public Object get() {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object get(long timeOutMs) {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object peek() {
        if (this.getQueueService() == null) {
            return null;
        }
        return this.getQueueService().peek();
    }

    @Override
    public Object peek(long timeOutMs) {
        if (this.getQueueService() == null) {
            return null;
        }
        return this.getQueueService().peek(timeOutMs);
    }

    @Override
    public Object remove(Object item) {
        if (this.getQueueService() == null) {
            return null;
        }
        return this.getQueueService().remove(item);
    }

    @Override
    public void clear() {
        if (this.getQueueService() == null) {
            return;
        }
        this.getQueueService().clear();
    }

    @Override
    public int size() {
        if (this.getQueueService() == null) {
            return 0;
        }
        return this.getQueueService().size();
    }

    @Override
    public long getCount() {
        if (this.getQueueService() == null) {
            return this.count;
        }
        return this.getQueueService().getCount();
    }

    @Override
    public long getDepth() {
        if (this.getQueueService() == null) {
            return 0L;
        }
        return this.getQueueService().size();
    }

    @Override
    public void accept() {
        if (this.getQueueService() != null) {
            this.getQueueService().accept();
        }
    }

    @Override
    public void release() {
        if (this.getQueueService() != null) {
            this.getQueueService().release();
        }
        this.count = 0L;
    }

    @Override
    public synchronized void resume() {
        if (!this.isSuspend) {
            return;
        }
        this.isSuspend = false;
        if (this.getQueueService() == null) {
            this.suspendMonitor.notifyAllMonitor();
            this.suspendMonitor.releaseAllMonitor();
        } else if (this.daemons != null) {
            for (int i = 0; i < this.daemons.length; ++i) {
                this.daemons[i].resume();
            }
        }
    }

    @Override
    public synchronized void suspend() {
        if (this.isSuspend) {
            return;
        }
        if (this.getQueueService() != null && this.daemons != null) {
            for (int i = 0; i < this.daemons.length; ++i) {
                this.daemons[i].suspend();
            }
        }
        this.isSuspend = true;
    }

    @Override
    public boolean isSuspend() {
        return this.isSuspend;
    }

    protected void handleDequeuedObjectWithLock(QueueHandler handler, Object dequeued, QueueReceiver receiver) {
        if (this.isSuspend) {
            try {
                this.suspendMonitor.initAndWaitMonitor();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        this.handleDequeuedObject(handler, dequeued, receiver, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleDequeuedObject(QueueHandler handler, Object dequeued, QueueReceiver receiver, DaemonControl ctrl) {
        if (handler == null) {
            return;
        }
        boolean isRetry = false;
        int retryCount = 0;
        do {
            block22: {
                try {
                    if (receiver != null) {
                        receiver.isActive = true;
                    }
                    try {
                        handler.handleDequeuedObject(dequeued);
                        isRetry = false;
                    }
                    catch (Throwable th) {
                        if (this.maxRetryCount > 0) {
                            if (retryCount >= this.maxRetryCount) {
                                isRetry = false;
                                try {
                                    handler.handleRetryOver(dequeued, th);
                                }
                                catch (Throwable th2) {
                                    this.getLogger().write(this.retryOverErrorMessageId, dequeued, th);
                                }
                            } else {
                                isRetry = true;
                                try {
                                    isRetry = handler.handleError(dequeued, th);
                                }
                                catch (Throwable th2) {
                                    isRetry = false;
                                    this.getLogger().write(this.handlingErrorMessageId, dequeued, th);
                                }
                            }
                            break block22;
                        }
                        isRetry = false;
                        try {
                            handler.handleRetryOver(dequeued, th);
                        }
                        catch (Throwable th2) {
                            this.getLogger().write(this.retryOverErrorMessageId, dequeued, th);
                        }
                    }
                }
                finally {
                    if (receiver != null) {
                        receiver.isActive = false;
                    }
                    if (ctrl != null && ctrl.isRunning()) {
                        Thread.interrupted();
                    }
                }
            }
            if (isRetry && this.retryInterval > 0L) {
                try {
                    Thread.sleep(this.retryInterval);
                }
                catch (InterruptedException e) {
                    isRetry = false;
                }
            }
            ++retryCount;
        } while (isRetry);
    }

    protected class QueueReceiver
    implements DaemonRunnable {
        protected QueueHandler handler;
        public boolean isActive;
        protected long receiveCount;
        protected long receiveProcessTime;

        protected QueueReceiver() {
        }

        public long getReceiveCount() {
            return this.receiveCount;
        }

        public long getAverageReceiveProcessTime() {
            return this.receiveCount == 0L ? 0L : this.receiveProcessTime / this.receiveCount;
        }

        @Override
        public boolean onStart() {
            return true;
        }

        @Override
        public boolean onStop() {
            return true;
        }

        @Override
        public boolean onSuspend() {
            return true;
        }

        @Override
        public boolean onResume() {
            return true;
        }

        @Override
        public Object provide(DaemonControl ctrl) {
            if (this.handler == null) {
                this.handler = QueueHandlerContainerService.this.getQueueHandler();
                if (this.handler == null) {
                    return null;
                }
            }
            return QueueHandlerContainerService.this.getQueueService().get(QueueHandlerContainerService.this.waitTimeout);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void consume(Object dequeued, DaemonControl ctrl) {
            ++this.receiveCount;
            long start = System.currentTimeMillis();
            try {
                QueueHandlerContainerService.this.handleDequeuedObject(this.handler, dequeued, this, ctrl);
            }
            finally {
                this.receiveProcessTime += System.currentTimeMillis() - start;
            }
        }

        @Override
        public void garbage() {
            if (QueueHandlerContainerService.this.getQueueService() != null && QueueHandlerContainerService.this.isGarbageQueueOnStop) {
                while (QueueHandlerContainerService.this.getQueueService().size() > 0) {
                    this.consume(QueueHandlerContainerService.this.getQueueService().get(0L), null);
                }
            }
        }
    }
}

