/*
 * Decompiled with CFR 0.152.
 */
package jp.ossc.nimbus.service.queue;

import jp.ossc.nimbus.core.ServiceBase;
import jp.ossc.nimbus.core.ServiceManagerFactory;
import jp.ossc.nimbus.core.ServiceName;
import jp.ossc.nimbus.daemon.Daemon;
import jp.ossc.nimbus.daemon.DaemonControl;
import jp.ossc.nimbus.daemon.DaemonRunnable;
import jp.ossc.nimbus.service.queue.DistributedQueueHandlerContainerServiceMBean;
import jp.ossc.nimbus.service.queue.DistributedQueueSelector;
import jp.ossc.nimbus.service.queue.Queue;
import jp.ossc.nimbus.service.queue.QueueHandler;
import jp.ossc.nimbus.service.queue.QueueHandlerContainer;

public class DistributedQueueHandlerContainerService
extends ServiceBase
implements QueueHandlerContainer,
DistributedQueueHandlerContainerServiceMBean {
    private static final long serialVersionUID = 4594481433048573418L;
    protected ServiceName distributedQueueSelectorServiceName;
    protected DistributedQueueSelector distributedQueueSelector;
    protected Daemon[] daemons;
    protected QueueReceiver[] invokers;
    protected ServiceName queueHandlerServiceName;
    protected QueueHandler queueHandler;
    protected boolean isDaemonQueueHandler = true;
    protected long waitTimeout = -1L;
    protected int maxRetryCount = 0;
    protected long retryInterval = 1000L;
    protected String handlingErrorMessageId = "QHC__00001";
    protected String retryOverErrorMessageId = "QHC__00002";
    protected int queueHandlerThreadPriority = -1;
    protected boolean isReleaseQueue = true;

    public void setDistributedQueueSelectorServiceName(ServiceName name) {
        this.distributedQueueSelectorServiceName = name;
    }

    public ServiceName getDistributedQueueSelectorServiceName() {
        return this.distributedQueueSelectorServiceName;
    }

    public void setQueueHandlerServiceName(ServiceName name) {
        if (this.queueHandlerServiceName == null) {
            this.queueHandlerServiceName = name;
            if (this.daemons != null) {
                for (int i = 0; i < this.daemons.length; ++i) {
                    this.daemons[i].resume();
                }
            }
        } else {
            this.queueHandlerServiceName = name;
        }
    }

    public ServiceName getQueueHandlerServiceName() {
        return this.queueHandlerServiceName;
    }

    public void setReleaseQueue(boolean isRelease) {
        this.isReleaseQueue = isRelease;
    }

    public boolean isReleaseQueue() {
        return this.isReleaseQueue;
    }

    public void setWaitTimeout(long timeout) {
        this.waitTimeout = timeout;
    }

    public long getWaitTimeout() {
        return this.waitTimeout;
    }

    public void setMaxRetryCount(int count) {
        this.maxRetryCount = count;
    }

    public int getMaxRetryCount() {
        return this.maxRetryCount;
    }

    public void setRetryInterval(long interval) {
        this.retryInterval = interval;
    }

    public long getRetryInterval() {
        return this.retryInterval;
    }

    public void setHandlingErrorMessageId(String id) {
        this.handlingErrorMessageId = id;
    }

    public String getHandlingErrorMessageId() {
        return this.handlingErrorMessageId;
    }

    public void setRetryOverErrorMessageId(String id) {
        this.retryOverErrorMessageId = id;
    }

    public String getRetryOverErrorMessageId() {
        return this.retryOverErrorMessageId;
    }

    public int getActiveQueueHandlerSize() {
        if (this.invokers == null) {
            return 0;
        }
        int count = 0;
        for (int i = 0; i < this.invokers.length; ++i) {
            if (!this.invokers[i].isActive) continue;
            ++count;
        }
        return count;
    }

    public void setDaemonQueueHandler(boolean isDaemon) {
        this.isDaemonQueueHandler = isDaemon;
    }

    public boolean isDaemonQueueHandler() {
        return this.isDaemonQueueHandler;
    }

    public void setQueueHandlerThreadPriority(int newPriority) {
        this.queueHandlerThreadPriority = newPriority;
    }

    public int getQueueHandlerThreadPriority() {
        return this.queueHandlerThreadPriority;
    }

    public void startService() throws Exception {
        if (this.distributedQueueSelectorServiceName == null) {
            throw new IllegalArgumentException("DistributedQueueSelectorServiceName must be specified.");
        }
        this.distributedQueueSelector = (DistributedQueueSelector)ServiceManagerFactory.getServiceObject(this.distributedQueueSelectorServiceName);
        this.accept();
        Queue[] queues = this.distributedQueueSelector.getQueues();
        this.invokers = new QueueReceiver[queues.length];
        this.daemons = new Daemon[this.invokers.length];
        for (int i = 0; i < this.invokers.length; ++i) {
            this.invokers[i] = new QueueReceiver(queues[i]);
            this.invokers[i].handler = this.getQueueHandler();
            this.daemons[i] = new Daemon(this.invokers[i]);
            this.daemons[i].setDaemon(this.isDaemonQueueHandler);
            this.daemons[i].setName(this.getServiceNameObject() + " QueueReceiver" + (i + 1));
            if (this.queueHandlerThreadPriority > 0) {
                this.daemons[i].setPriority(this.queueHandlerThreadPriority);
            }
            if (this.invokers[i].handler == null) {
                this.daemons[i].suspend();
            }
            this.daemons[i].start();
        }
    }

    public void stopService() throws Exception {
        for (int i = 0; i < this.daemons.length; ++i) {
            this.daemons[i].stop();
            this.daemons[i] = null;
            this.invokers[i] = null;
        }
        if (this.isReleaseQueue) {
            this.release();
        }
        this.daemons = null;
        this.invokers = null;
    }

    public void setQueueHandler(QueueHandler handler) {
        if (this.queueHandler == null) {
            this.queueHandler = handler;
            if (this.daemons != null) {
                for (int i = 0; i < this.daemons.length; ++i) {
                    this.daemons[i].resume();
                }
            }
        } else {
            this.queueHandler = handler;
        }
    }

    public QueueHandler getQueueHandler() {
        if (this.queueHandler != null) {
            return this.queueHandler;
        }
        if (this.queueHandlerServiceName != null) {
            return (QueueHandler)ServiceManagerFactory.getServiceObject(this.queueHandlerServiceName);
        }
        return null;
    }

    public synchronized void push(Object item) {
        Queue queue = this.distributedQueueSelector.selectQueue(item);
        queue.push(item);
    }

    public Object get() {
        throw new UnsupportedOperationException();
    }

    public Object get(long timeOutMs) {
        throw new UnsupportedOperationException();
    }

    public Object peek() {
        throw new UnsupportedOperationException();
    }

    public Object peek(long timeOutMs) {
        throw new UnsupportedOperationException();
    }

    public void remove(Object item) {
        throw new UnsupportedOperationException();
    }

    public void clear() {
        Queue[] queues = this.distributedQueueSelector.getQueues();
        if (queues != null) {
            for (int i = 0; i < queues.length; ++i) {
                queues[i].clear();
            }
        }
    }

    public int size() {
        int size = 0;
        Queue[] queues = this.distributedQueueSelector.getQueues();
        if (queues != null) {
            for (int i = 0; i < queues.length; ++i) {
                size += queues[i].size();
            }
        }
        return size;
    }

    public long getCount() {
        long count = 0L;
        Queue[] queues = this.distributedQueueSelector.getQueues();
        if (queues != null) {
            for (int i = 0; i < queues.length; ++i) {
                count += queues[i].getCount();
            }
        }
        return count;
    }

    public void accept() {
        Queue[] queues = this.distributedQueueSelector.getQueues();
        if (queues != null) {
            for (int i = 0; i < queues.length; ++i) {
                queues[i].accept();
            }
        }
    }

    public void release() {
        Queue[] queues = this.distributedQueueSelector.getQueues();
        if (queues != null) {
            for (int i = 0; i < queues.length; ++i) {
                queues[i].release();
            }
        }
    }

    protected class QueueReceiver
    implements DaemonRunnable {
        protected Queue queue;
        protected QueueHandler handler;
        public boolean isActive;

        public QueueReceiver(Queue queue) {
            this.queue = queue;
        }

        public boolean onStart() {
            return true;
        }

        public boolean onStop() {
            return true;
        }

        public boolean onSuspend() {
            return true;
        }

        public boolean onResume() {
            return true;
        }

        public Object provide(DaemonControl ctrl) {
            if (this.handler == null) {
                this.handler = DistributedQueueHandlerContainerService.this.getQueueHandler();
                if (this.handler == null) {
                    return null;
                }
            }
            return this.queue.get(DistributedQueueHandlerContainerService.this.waitTimeout);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void consume(Object dequeued, DaemonControl ctrl) {
            if (this.handler == null) {
                return;
            }
            boolean isRetry = false;
            int retryCount = 0;
            do {
                block19: {
                    try {
                        this.isActive = true;
                        try {
                            this.handler.handleDequeuedObject(dequeued);
                        }
                        catch (Throwable th) {
                            if (DistributedQueueHandlerContainerService.this.maxRetryCount > 0) {
                                if (retryCount >= DistributedQueueHandlerContainerService.this.maxRetryCount) {
                                    isRetry = false;
                                    try {
                                        this.handler.handleRetryOver(dequeued, th);
                                    }
                                    catch (Throwable th2) {
                                        DistributedQueueHandlerContainerService.this.getLogger().write(DistributedQueueHandlerContainerService.this.retryOverErrorMessageId, dequeued, th);
                                    }
                                } else {
                                    isRetry = true;
                                    try {
                                        isRetry = this.handler.handleError(dequeued, th);
                                    }
                                    catch (Throwable th2) {
                                        DistributedQueueHandlerContainerService.this.getLogger().write(DistributedQueueHandlerContainerService.this.handlingErrorMessageId, dequeued, th);
                                    }
                                }
                                break block19;
                            }
                            isRetry = false;
                            try {
                                this.handler.handleError(dequeued, th);
                            }
                            catch (Throwable th2) {
                                DistributedQueueHandlerContainerService.this.getLogger().write(DistributedQueueHandlerContainerService.this.handlingErrorMessageId, dequeued, th);
                            }
                        }
                    }
                    finally {
                        this.isActive = false;
                    }
                }
                if (isRetry && DistributedQueueHandlerContainerService.this.retryInterval > 0L) {
                    try {
                        Thread.sleep(DistributedQueueHandlerContainerService.this.retryInterval);
                    }
                    catch (InterruptedException e) {
                        isRetry = false;
                    }
                }
                ++retryCount;
            } while (isRetry);
        }

        public void garbage() {
            if (this.queue != null) {
                while (this.queue.size() > 0) {
                    this.consume(this.queue.get(0L), null);
                }
            }
        }
    }
}

