/*
 * Decompiled with CFR 0.152.
 */
package jp.ossc.nimbus.service.publish.tcp;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import jp.ossc.nimbus.core.ServiceName;
import jp.ossc.nimbus.daemon.Daemon;
import jp.ossc.nimbus.daemon.DaemonControl;
import jp.ossc.nimbus.daemon.DaemonRunnable;
import jp.ossc.nimbus.service.io.Externalizer;
import jp.ossc.nimbus.service.log.Logger;
import jp.ossc.nimbus.service.publish.Client;
import jp.ossc.nimbus.service.publish.Message;
import jp.ossc.nimbus.service.publish.MessageCreateException;
import jp.ossc.nimbus.service.publish.MessageException;
import jp.ossc.nimbus.service.publish.MessageSendException;
import jp.ossc.nimbus.service.publish.ServerConnection;
import jp.ossc.nimbus.service.publish.ServerConnectionListener;
import jp.ossc.nimbus.service.publish.tcp.AddMessage;
import jp.ossc.nimbus.service.publish.tcp.ClientMessage;
import jp.ossc.nimbus.service.publish.tcp.IdMessage;
import jp.ossc.nimbus.service.publish.tcp.MessageImpl;
import jp.ossc.nimbus.service.publish.tcp.RemoveMessage;
import jp.ossc.nimbus.service.publish.tcp.StartReceiveMessage;
import jp.ossc.nimbus.service.queue.AbstractDistributedQueueSelectorService;
import jp.ossc.nimbus.service.queue.AsynchContext;
import jp.ossc.nimbus.service.queue.DefaultQueueService;
import jp.ossc.nimbus.service.queue.DistributedQueueHandlerContainerService;
import jp.ossc.nimbus.service.queue.Queue;
import jp.ossc.nimbus.service.queue.QueueHandler;
import jp.ossc.nimbus.service.queue.QueueHandlerContainerService;
import jp.ossc.nimbus.util.net.SocketFactory;

public class ServerConnectionImpl
implements ServerConnection {
    private ServerSocket serverSocket;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private Set clients = new LinkedHashSet();
    private Map clientMap = Collections.synchronizedMap(new HashMap());
    private int maxSendRetryCount;
    private Logger logger;
    private String sendErrorMessageId;
    private String sendErrorRetryOverMessageId;
    private String clientConnectMessageId;
    private String clientClosedMessageId;
    private String clientCloseMessageId;
    private Daemon clientAcceptor;
    private QueueHandlerContainerService sendQueueHandlerContainer;
    private DefaultQueueService sendResponseQueue;
    private ClientDistributedQueueSelector queueSelector;
    private QueueHandlerContainerService asynchAcceptQueueHandlerContainer;
    private DistributedQueueHandlerContainerService asynchSendQueueHandlerContainer;
    private long sendCount;
    private long sendProcessTime;
    private List serverConnectionListeners;
    private Externalizer externalizer;
    private SocketFactory socketFactory;
    private List sendMessageCache = Collections.synchronizedList(new ArrayList());
    private long sendMessageCacheTime;
    private boolean isAcknowledge;
    private int messageRecycleBufferSize = 100;
    private List messageBuffer;
    private List sendRequestBuffer;
    private List asynchContextBuffer;
    private long bufferTime;
    private long bufferSize;
    private long bufferTimeoutInterval = 1000L;
    private Daemon sendBufferChecker;

    public ServerConnectionImpl(ServerSocket serverSocket, Externalizer ext, int sendThreadSize, ServiceName sendQueueServiceName, int asynchSendThreadSize, ServiceName asynchSendQueueServiceName, ServiceName asynchSendQueueFactoryServiceName, long bufferTime, long bufferSize, long bufferTimeoutInterval) throws Exception {
        this.serverSocket = serverSocket;
        this.externalizer = ext;
        this.messageBuffer = new ArrayList();
        this.sendRequestBuffer = new ArrayList();
        this.asynchContextBuffer = new ArrayList();
        this.bufferTime = bufferTime;
        this.bufferSize = bufferSize;
        if (bufferTimeoutInterval > 0L) {
            this.bufferTimeoutInterval = bufferTimeoutInterval;
        }
        this.initSend(sendQueueServiceName, sendThreadSize);
        this.initAsynchSend(asynchSendQueueServiceName, asynchSendQueueFactoryServiceName, asynchSendThreadSize);
        this.initClientAcceptor(serverSocket.getLocalSocketAddress());
        this.initSendBufferChecker(serverSocket.getLocalSocketAddress());
    }

    public ServerConnectionImpl(ServerSocketChannel ssc, Externalizer ext, int sendThreadSize, ServiceName sendQueueServiceName, int asynchSendThreadSize, ServiceName asynchSendQueueServiceName, ServiceName asynchSendQueueFactoryServiceName, SocketFactory sf, long bufferTime, long bufferSize, long bufferTimeoutInterval) throws Exception {
        this.serverSocketChannel = ssc;
        this.socketFactory = sf;
        this.externalizer = ext;
        this.messageBuffer = new ArrayList();
        this.sendRequestBuffer = new ArrayList();
        this.asynchContextBuffer = new ArrayList();
        this.bufferTime = bufferTime;
        this.bufferSize = bufferSize;
        if (bufferTimeoutInterval > 0L) {
            this.bufferTimeoutInterval = bufferTimeoutInterval;
        }
        this.initSend(sendQueueServiceName, sendThreadSize);
        this.initAsynchSend(asynchSendQueueServiceName, asynchSendQueueFactoryServiceName, asynchSendThreadSize);
        this.selector = Selector.open();
        this.serverSocketChannel.register(this.selector, 16, null);
        this.initClientAcceptor(this.serverSocketChannel.socket().getLocalSocketAddress());
        this.initSendBufferChecker(this.serverSocketChannel.socket().getLocalSocketAddress());
    }

    private void initClientAcceptor(SocketAddress localAddress) {
        this.clientAcceptor = new Daemon(new ClientAcceptor());
        this.clientAcceptor.setName("Nimbus Publish(TCP) ServerConnection ClientAcceptor " + localAddress);
        this.clientAcceptor.setDaemon(true);
        this.clientAcceptor.start();
    }

    private void initSendBufferChecker(SocketAddress localAddress) {
        if (this.bufferTime > 0L || this.bufferSize > 0L) {
            this.sendBufferChecker = new Daemon(new SendBufferChecker());
            this.sendBufferChecker.setName("Nimbus Publish(TCP) ServerConnection SendBufferChecker " + localAddress);
            this.sendBufferChecker.setDaemon(true);
            this.sendBufferChecker.start();
        }
    }

    private void initSend(ServiceName sendQueueServiceName, int sendThreadSize) throws Exception {
        if (sendThreadSize >= 2) {
            this.sendQueueHandlerContainer = new QueueHandlerContainerService();
            this.sendQueueHandlerContainer.create();
            if (sendQueueServiceName == null) {
                DefaultQueueService sendQueue = new DefaultQueueService();
                sendQueue.create();
                sendQueue.start();
                this.sendQueueHandlerContainer.setQueueService(sendQueue);
            } else {
                this.sendQueueHandlerContainer.setQueueServiceName(sendQueueServiceName);
            }
            this.sendQueueHandlerContainer.setQueueHandlerSize(sendThreadSize);
            this.sendQueueHandlerContainer.setQueueHandler(new SendQueueHandler());
            this.sendQueueHandlerContainer.setIgnoreNullElement(true);
            this.sendQueueHandlerContainer.setWaitTimeout(1000L);
            this.sendQueueHandlerContainer.start();
            this.sendResponseQueue = new DefaultQueueService();
            try {
                this.sendResponseQueue.create();
                this.sendResponseQueue.start();
            }
            catch (Exception e) {
                throw new MessageSendException(e);
            }
            this.sendResponseQueue.accept();
        }
    }

    private void initAsynchSend(ServiceName queueServiceName, ServiceName queueFactoryServiceName, int clientQueueDistributedSize) throws Exception {
        if (clientQueueDistributedSize > 0) {
            this.asynchAcceptQueueHandlerContainer = new QueueHandlerContainerService();
            this.asynchAcceptQueueHandlerContainer.create();
            if (queueServiceName == null) {
                DefaultQueueService acceptQueue = new DefaultQueueService();
                acceptQueue.create();
                acceptQueue.start();
                this.asynchAcceptQueueHandlerContainer.setQueueService(acceptQueue);
            } else {
                this.asynchAcceptQueueHandlerContainer.setQueueServiceName(queueServiceName);
            }
            this.asynchAcceptQueueHandlerContainer.setQueueHandlerSize(1);
            this.asynchAcceptQueueHandlerContainer.setQueueHandler(new AsynchAcceptQueueHandler());
            this.asynchAcceptQueueHandlerContainer.setIgnoreNullElement(true);
            this.asynchAcceptQueueHandlerContainer.setWaitTimeout(1000L);
            this.asynchAcceptQueueHandlerContainer.start();
            this.queueSelector = new ClientDistributedQueueSelector();
            this.queueSelector.create();
            this.queueSelector.setDistributedSize(clientQueueDistributedSize);
            if (queueFactoryServiceName != null) {
                this.queueSelector.setQueueFactoryServiceName(queueFactoryServiceName);
            }
            this.queueSelector.start();
            this.asynchSendQueueHandlerContainer = new DistributedQueueHandlerContainerService();
            this.asynchSendQueueHandlerContainer.create();
            this.asynchSendQueueHandlerContainer.setDistributedQueueSelector(this.queueSelector);
            this.asynchSendQueueHandlerContainer.setQueueHandler(new SendQueueHandler());
            this.asynchSendQueueHandlerContainer.setIgnoreNullElement(true);
            this.asynchSendQueueHandlerContainer.setWaitTimeout(1000L);
            this.asynchSendQueueHandlerContainer.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void recycleMessage(MessageImpl msg) {
        if (msg != null && this.messageBuffer.size() <= this.messageRecycleBufferSize) {
            msg.clear();
            List list = this.messageBuffer;
            synchronized (list) {
                if (this.messageBuffer.size() <= this.messageRecycleBufferSize) {
                    this.messageBuffer.add(msg);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected MessageImpl createMessage(byte type) {
        MessageImpl result = null;
        if (this.messageBuffer.size() != 0) {
            List list = this.messageBuffer;
            synchronized (list) {
                if (this.messageBuffer.size() != 0) {
                    result = (MessageImpl)this.messageBuffer.remove(0);
                }
            }
        }
        if (result == null) {
            result = new MessageImpl();
        }
        result.setMessageType(type);
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void recycleSendRequest(SendRequest req) {
        if (req != null && this.sendRequestBuffer.size() <= this.messageRecycleBufferSize) {
            req.clear();
            List list = this.sendRequestBuffer;
            synchronized (list) {
                if (this.sendRequestBuffer.size() <= this.messageRecycleBufferSize) {
                    this.sendRequestBuffer.add(req);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected SendRequest createSendRequest(ClientImpl client, MessageImpl message) {
        SendRequest result = null;
        if (this.sendRequestBuffer.size() != 0) {
            List list = this.sendRequestBuffer;
            synchronized (list) {
                if (this.sendRequestBuffer.size() != 0) {
                    result = (SendRequest)this.sendRequestBuffer.remove(0);
                    result.client = client;
                    result.message = message;
                }
            }
        }
        if (result == null) {
            result = new SendRequest(client, message);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void recycleAsynchContext(AsynchContext context) {
        if (context != null && this.asynchContextBuffer.size() <= this.messageRecycleBufferSize) {
            SendRequest req = (SendRequest)context.getInput();
            if (req != null) {
                this.recycleSendRequest(req);
            }
            context.clear();
            List list = this.asynchContextBuffer;
            synchronized (list) {
                if (this.asynchContextBuffer.size() <= this.messageRecycleBufferSize) {
                    this.asynchContextBuffer.add(context);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected AsynchContext createAsynchContext(SendRequest input, Queue queue) {
        AsynchContext result = null;
        if (this.asynchContextBuffer.size() != 0) {
            List list = this.asynchContextBuffer;
            synchronized (list) {
                if (this.asynchContextBuffer.size() != 0) {
                    result = (AsynchContext)this.asynchContextBuffer.remove(0);
                    result.setInput(input);
                    result.setResponseQueue(queue);
                }
            }
        }
        if (result == null) {
            result = new AsynchContext(input, queue);
        }
        return result;
    }

    public void setMessageRecycleBufferSize(int size) {
        this.messageRecycleBufferSize = size;
    }

    public void setMaxSendRetryCount(int count) {
        this.maxSendRetryCount = count;
        if (this.sendQueueHandlerContainer != null) {
            this.sendQueueHandlerContainer.setMaxRetryCount(this.maxSendRetryCount);
        }
        if (this.asynchSendQueueHandlerContainer != null) {
            this.asynchSendQueueHandlerContainer.setMaxRetryCount(this.maxSendRetryCount);
        }
    }

    public void setLogger(Logger logger) {
        this.logger = logger;
    }

    public void setSendErrorMessageId(String id) {
        this.sendErrorMessageId = id;
    }

    public void setSendErrorRetryOverMessageId(String id) {
        this.sendErrorRetryOverMessageId = id;
    }

    public void setClientConnectMessageId(String id) {
        this.clientConnectMessageId = id;
    }

    public void setClientClosedMessageId(String id) {
        this.clientClosedMessageId = id;
    }

    public void setClientCloseMessageId(String id) {
        this.clientCloseMessageId = id;
    }

    public void setSendMessageCacheTime(long time) {
        this.sendMessageCacheTime = time;
    }

    public void setAcknowledge(boolean isAck) {
        this.isAcknowledge = isAck;
    }

    @Override
    public Message createMessage(String subject, String key) throws MessageCreateException {
        MessageImpl message = this.createMessage((byte)0);
        message.setSubject(subject, key);
        return message;
    }

    @Override
    public Message castMessage(Message message) throws MessageException {
        if (message instanceof MessageImpl) {
            return message;
        }
        Message msg = this.createMessage(message.getSubject(), message.getKey());
        if (message.getSerializedBytes() != null) {
            msg.setSerializedBytes(message.getSerializedBytes());
        } else {
            msg.setObject(message.getObject());
        }
        return msg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void send(Message message) throws MessageSendException {
        long startTime = System.currentTimeMillis();
        if (this.clients.size() == 0) {
            this.addSendMessageCache((MessageImpl)message);
            return;
        }
        try {
            if (this.sendQueueHandlerContainer == null) {
                ArrayList<ClientImpl> currentClients = new ArrayList<ClientImpl>();
                for (ClientImpl client : this.clients) {
                    if (!client.isStartReceive() || !client.isTargetMessage(message)) continue;
                    currentClients.add(client);
                }
                for (int retryCount = -1; currentClients.size() != 0 && retryCount < this.maxSendRetryCount; ++retryCount) {
                    Iterator itr = currentClients.iterator();
                    while (itr.hasNext()) {
                        ClientImpl client = (ClientImpl)itr.next();
                        try {
                            client.send(message);
                            itr.remove();
                        }
                        catch (MessageSendException e) {
                            if (this.logger == null) continue;
                            if (retryCount + 1 >= this.maxSendRetryCount) {
                                if (this.sendErrorRetryOverMessageId == null) continue;
                                this.logger.write(this.sendErrorRetryOverMessageId, new Object[]{client, message}, (Throwable)e);
                                continue;
                            }
                            if (this.sendErrorMessageId == null) continue;
                            this.logger.write(this.sendErrorMessageId, new Object[]{client, message}, (Throwable)e);
                        }
                    }
                }
                if (currentClients.size() != 0) {
                    throw new MessageSendException("Send error : clients=" + currentClients + ", message=" + message);
                }
                ((MessageImpl)message).setSend(true);
            } else {
                HashMap<ClientImpl, AsynchContext> sendContexts = new HashMap<ClientImpl, AsynchContext>();
                for (ClientImpl client : this.clients) {
                    if (!client.isStartReceive() || !client.isTargetMessage(message)) continue;
                    SendRequest sendRequest = this.createSendRequest(client, (MessageImpl)message);
                    AsynchContext asynchContext = this.createAsynchContext(sendRequest, this.sendResponseQueue);
                    sendContexts.put(client, asynchContext);
                    this.sendQueueHandlerContainer.push(asynchContext);
                }
                Throwable th = null;
                int imax = sendContexts.size();
                for (int i = 0; i < imax; ++i) {
                    AsynchContext asynchContext = (AsynchContext)this.sendResponseQueue.get();
                    if (asynchContext == null) {
                        Iterator itr = sendContexts.values().iterator();
                        while (itr.hasNext()) {
                            ((AsynchContext)itr.next()).cancel();
                        }
                        throw new MessageSendException("Interrupted the waiting for a response sent : clients=" + sendContexts.keySet() + ", message=" + message, new InterruptedException());
                    }
                    if (asynchContext.isCancel()) {
                        --i;
                        continue;
                    }
                    if (asynchContext.getThrowable() == null) {
                        sendContexts.remove(((SendRequest)asynchContext.getInput()).client);
                    } else {
                        th = asynchContext.getThrowable();
                    }
                    this.recycleAsynchContext(asynchContext);
                }
                if (sendContexts.size() != 0) {
                    throw new MessageSendException("Send error : clients=" + sendContexts.keySet() + ", message=" + message, th);
                }
                ((MessageImpl)message).setSend(true);
            }
        }
        finally {
            this.addSendMessageCache((MessageImpl)message);
            this.sendProcessTime += System.currentTimeMillis() - startTime;
        }
    }

    @Override
    public void sendAsynch(Message message) {
        if (this.asynchAcceptQueueHandlerContainer == null) {
            throw new UnsupportedOperationException();
        }
        if (this.clients.size() == 0) {
            return;
        }
        this.asynchAcceptQueueHandlerContainer.push(message);
    }

    public long getSendCount() {
        return this.sendCount;
    }

    public void resetSendCount() {
        this.sendCount = 0L;
        this.sendProcessTime = 0L;
    }

    public double getAverageSendProcessTime() {
        return this.sendCount == 0L ? 0.0 : (double)this.sendProcessTime / (double)this.sendCount;
    }

    public double getAverageSendBytes() {
        long sendBytes = 0L;
        long sendCount = 0L;
        for (ClientImpl client : this.clients) {
            sendBytes += client.getSendBytes();
            sendCount += client.getSendCount();
        }
        return sendCount == 0L ? 0.0 : (double)sendBytes / (double)sendCount;
    }

    public Set getClients() {
        return this.clients;
    }

    @Override
    public int getClientCount() {
        return this.clients.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set getClientIds() {
        Map map = this.clientMap;
        synchronized (map) {
            return new HashSet(this.clientMap.keySet());
        }
    }

    @Override
    public Set getReceiveClientIds(Message message) {
        HashSet<Object> result = new HashSet<Object>();
        for (ClientImpl client : this.clients) {
            if (!client.isStartReceive() || !client.isTargetMessage(message)) continue;
            result.add(client.getId());
        }
        return result;
    }

    @Override
    public Set getSubjects(Object id) {
        ClientImpl client = (ClientImpl)this.clientMap.get(id);
        if (client == null) {
            return null;
        }
        return client.getSubjects();
    }

    @Override
    public Set getKeys(Object id, String subject) {
        ClientImpl client = (ClientImpl)this.clientMap.get(id);
        if (client == null) {
            return null;
        }
        return client.getKeys(subject);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reset() {
        List list = this.sendMessageCache;
        synchronized (list) {
            this.sendMessageCache.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addSendMessageCache(MessageImpl message) {
        long currentTime = System.currentTimeMillis();
        message.setSendTime(currentTime);
        List list = this.sendMessageCache;
        synchronized (list) {
            MessageImpl msg;
            this.sendMessageCache.add(message);
            int imax = this.sendMessageCache.size();
            for (int i = 0; i < imax && currentTime - (msg = (MessageImpl)this.sendMessageCache.get(0)).getSendTime() > this.sendMessageCacheTime; ++i) {
                MessageImpl trash = (MessageImpl)this.sendMessageCache.remove(0);
                if (!trash.isSend()) continue;
                this.recycleMessage(trash);
            }
            ++this.sendCount;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List getSendMessages(long from) {
        ArrayList<Object> result = new ArrayList<Object>();
        List list = this.sendMessageCache;
        synchronized (list) {
            MessageImpl msg;
            int i = this.sendMessageCache.size();
            while (--i >= 0 && (msg = (MessageImpl)this.sendMessageCache.get(i)).getSendTime() >= from) {
                result.add(0, msg.clone());
            }
        }
        return result;
    }

    public int getSendMessageCacheSize() {
        return this.sendMessageCache.size();
    }

    public synchronized void close() {
        if (this.clientAcceptor != null) {
            this.clientAcceptor.stopNoWait();
        }
        if (this.serverSocket != null) {
            try {
                this.serverSocket.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        try {
            this.send(this.createMessage((byte)2));
        }
        catch (MessageSendException messageSendException) {
            // empty catch block
        }
        for (ClientImpl client : this.clients) {
            client.close();
        }
        this.clientAcceptor = null;
        if (this.sendQueueHandlerContainer != null) {
            this.sendQueueHandlerContainer.stop();
            this.sendQueueHandlerContainer.destroy();
            this.sendQueueHandlerContainer = null;
        }
        if (this.asynchAcceptQueueHandlerContainer != null) {
            this.asynchAcceptQueueHandlerContainer.stop();
            this.asynchAcceptQueueHandlerContainer.destroy();
            this.asynchAcceptQueueHandlerContainer = null;
        }
        if (this.asynchSendQueueHandlerContainer != null) {
            this.asynchSendQueueHandlerContainer.stop();
            this.asynchSendQueueHandlerContainer.destroy();
            this.asynchSendQueueHandlerContainer = null;
        }
        if (this.queueSelector != null) {
            this.queueSelector.stop();
            this.queueSelector.destroy();
            this.queueSelector = null;
        }
    }

    @Override
    public void addServerConnectionListener(ServerConnectionListener listener) {
        if (this.serverConnectionListeners == null) {
            this.serverConnectionListeners = new ArrayList();
        }
        if (!this.serverConnectionListeners.contains(listener)) {
            this.serverConnectionListeners.add(listener);
        }
    }

    @Override
    public void removeServerConnectionListener(ServerConnectionListener listener) {
        if (this.serverConnectionListeners == null) {
            return;
        }
        this.serverConnectionListeners.remove(listener);
        if (this.serverConnectionListeners.size() == 0) {
            this.serverConnectionListeners = null;
        }
    }

    public String toString() {
        StringBuffer buf = new StringBuffer();
        buf.append(super.toString());
        buf.append('{');
        buf.append("server=").append(this.serverSocket == null ? null : this.serverSocket.getLocalSocketAddress());
        buf.append('}');
        return buf.toString();
    }

    private class ClientDistributedQueueSelector
    extends AbstractDistributedQueueSelectorService {
        private static final long serialVersionUID = 8050312124454494504L;

        private ClientDistributedQueueSelector() {
        }

        @Override
        protected Object getKey(Object obj) {
            AsynchContext asynchContext = (AsynchContext)obj;
            SendRequest request = (SendRequest)asynchContext.getInput();
            return request.client;
        }
    }

    private class SendQueueHandler
    implements QueueHandler {
        private SendQueueHandler() {
        }

        @Override
        public void handleDequeuedObject(Object obj) throws Throwable {
            if (obj == null) {
                return;
            }
            AsynchContext asynchContext = (AsynchContext)obj;
            SendRequest request = (SendRequest)asynchContext.getInput();
            if (request.client.isStartReceive()) {
                request.client.send(request.message);
            }
            if (asynchContext.getResponseQueue() != null) {
                asynchContext.getResponseQueue().push(asynchContext);
            }
        }

        @Override
        public boolean handleError(Object obj, Throwable th) throws Throwable {
            AsynchContext asynchContext = (AsynchContext)obj;
            if (ServerConnectionImpl.this.logger != null && ServerConnectionImpl.this.sendErrorMessageId != null) {
                SendRequest request = (SendRequest)asynchContext.getInput();
                ServerConnectionImpl.this.logger.write(ServerConnectionImpl.this.sendErrorMessageId, new Object[]{request.client, request.message}, th);
            }
            return true;
        }

        @Override
        public void handleRetryOver(Object obj, Throwable th) throws Throwable {
            AsynchContext asynchContext = (AsynchContext)obj;
            if (ServerConnectionImpl.this.logger != null && ServerConnectionImpl.this.sendErrorRetryOverMessageId != null) {
                SendRequest request = (SendRequest)asynchContext.getInput();
                ServerConnectionImpl.this.logger.write(ServerConnectionImpl.this.sendErrorRetryOverMessageId, new Object[]{request.client, request.message}, th);
            }
            asynchContext.setThrowable(th);
            if (asynchContext.getResponseQueue() != null) {
                asynchContext.getResponseQueue().push(asynchContext);
            }
        }
    }

    private class AsynchAcceptQueueHandler
    implements QueueHandler {
        private DefaultQueueService responseQueue = new DefaultQueueService();

        public AsynchAcceptQueueHandler() {
            try {
                this.responseQueue.create();
                this.responseQueue.start();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.responseQueue.accept();
        }

        @Override
        public void handleDequeuedObject(Object obj) throws Throwable {
            AsynchContext asynchContext;
            MessageImpl message = (MessageImpl)obj;
            if (message == null) {
                return;
            }
            if (ServerConnectionImpl.this.clients.size() == 0) {
                message.setSend(true);
                return;
            }
            HashMap<ClientImpl, AsynchContext> sendContexts = new HashMap<ClientImpl, AsynchContext>();
            for (ClientImpl client : ServerConnectionImpl.this.clients) {
                if (!client.isStartReceive() || !client.isTargetMessage(message)) continue;
                SendRequest sendRequest = ServerConnectionImpl.this.createSendRequest(client, message);
                asynchContext = ServerConnectionImpl.this.createAsynchContext(sendRequest, this.responseQueue);
                sendContexts.put(client, asynchContext);
                ServerConnectionImpl.this.asynchSendQueueHandlerContainer.push(asynchContext);
            }
            int imax = sendContexts.size();
            for (int i = 0; i < imax; ++i) {
                asynchContext = (AsynchContext)this.responseQueue.get();
                if (asynchContext == null) {
                    this.responseQueue = new DefaultQueueService();
                    try {
                        this.responseQueue.create();
                        this.responseQueue.start();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    this.responseQueue.accept();
                    break;
                }
                ServerConnectionImpl.this.recycleAsynchContext(asynchContext);
            }
            this.responseQueue.clear();
            message.setSend(true);
            ServerConnectionImpl.this.addSendMessageCache(message);
        }

        @Override
        public boolean handleError(Object obj, Throwable th) throws Throwable {
            return false;
        }

        @Override
        public void handleRetryOver(Object obj, Throwable th) throws Throwable {
        }
    }

    private class SendRequest {
        public ClientImpl client;
        public Message message;

        public SendRequest(ClientImpl client, MessageImpl message) {
            this.client = client;
            this.message = message;
        }

        public void clear() {
            this.client = null;
            this.message = null;
        }
    }

    public class ClientImpl
    implements DaemonRunnable,
    Client {
        private SocketChannel socketChannel;
        private Socket socket;
        private Daemon requestDispatcher;
        private Map subjects;
        private long sendCount;
        private long sendProcessTime;
        private long sendBytes;
        private boolean isEnabled = true;
        private Object id;
        private ByteBuffer byteBuffer;
        private int dataLength = -1;
        private long fromTime = -1L;
        private boolean isStartReceive = false;
        private ByteArrayOutputStream sendBuffer = new ByteArrayOutputStream();
        private int sendBufferSize;
        private long sendBufferCount;
        private long bufferStartTime = -1L;
        private ByteArrayOutputStream baos = new ByteArrayOutputStream();

        public ClientImpl(SocketChannel sc) {
            this.socketChannel = sc;
            this.socket = this.socketChannel.socket();
            this.subjects = Collections.synchronizedMap(new HashMap());
            this.byteBuffer = ByteBuffer.allocate(1024);
        }

        public ClientImpl(Socket sock) throws IOException {
            this.socket = sock;
            this.subjects = Collections.synchronizedMap(new HashMap());
            this.requestDispatcher = new Daemon(this);
            this.requestDispatcher.setName("Nimbus Publish(TCP) ServerConnection ClientRequestDisptcher " + this.socket.getRemoteSocketAddress());
            this.requestDispatcher.setDaemon(true);
        }

        public void startRequestDispatcher() {
            this.requestDispatcher.start();
        }

        public long getSendCount() {
            return this.sendCount;
        }

        public long getSendProcessTime() {
            return this.sendProcessTime;
        }

        public long getSendBytes() {
            return this.sendBytes;
        }

        public SocketChannel getSocketChannel() {
            return this.socketChannel;
        }

        public Socket getSocket() {
            return this.socket;
        }

        public boolean isEnabled() {
            return this.isEnabled;
        }

        public void setEnabled(boolean isEnabled) {
            this.isEnabled = isEnabled;
        }

        public boolean isStartReceive() {
            return this.isStartReceive;
        }

        public boolean isTargetMessage(Message message) {
            if (!message.containsDestinationId(this.getId())) {
                return false;
            }
            if (message.getSubject() != null) {
                for (String sbj : message.getSubjects()) {
                    Set keySet = (Set)this.subjects.get(sbj);
                    String key = message.getKey(sbj);
                    if (keySet == null || !keySet.contains(null) && !keySet.contains(key)) continue;
                    return true;
                }
            }
            return false;
        }

        public synchronized void send(Message message) throws MessageSendException {
            block12: {
                if (!this.isEnabled || this.socketChannel == null && this.socket == null) {
                    return;
                }
                try {
                    this.baos.reset();
                    ((MessageImpl)message).write(this.baos, ServerConnectionImpl.this.externalizer);
                    byte[] bytes = this.baos.toByteArray();
                    this.baos.reset();
                    DataOutputStream dos = new DataOutputStream(this.baos);
                    dos.writeInt(bytes.length);
                    dos.write(bytes);
                    dos.flush();
                    bytes = this.baos.toByteArray();
                    this.baos.reset();
                    boolean isBuffer = false;
                    if (((MessageImpl)message).getMessageType() == 0 && (ServerConnectionImpl.this.bufferTime > 0L || ServerConnectionImpl.this.bufferSize > 0L)) {
                        isBuffer = true;
                        long currentTime = System.currentTimeMillis();
                        if (this.bufferStartTime == -1L) {
                            this.bufferStartTime = currentTime;
                        }
                        if (ServerConnectionImpl.this.bufferTime > 0L && ServerConnectionImpl.this.bufferTime <= currentTime - this.bufferStartTime) {
                            isBuffer = false;
                        }
                        if (isBuffer && ServerConnectionImpl.this.bufferSize > 0L && ServerConnectionImpl.this.bufferSize <= (long)(this.sendBufferSize + bytes.length)) {
                            isBuffer = false;
                        }
                    }
                    this.sendBuffer.write(bytes);
                    this.sendBufferSize += bytes.length;
                    ++this.sendBufferCount;
                    if (isBuffer) break block12;
                    if (this.socketChannel != null) {
                        try {
                            this.socketChannel.register(ServerConnectionImpl.this.selector, 5, this);
                            ServerConnectionImpl.this.selector.wakeup();
                            break block12;
                        }
                        catch (ClosedChannelException e) {
                            throw new MessageSendException(e);
                        }
                    }
                    this.writeSendBuffer(null);
                }
                catch (SocketTimeoutException e) {
                    throw new MessageSendException(e);
                }
                catch (SocketException e) {
                    this.close(false, e);
                    throw new MessageSendException(e);
                }
                catch (IOException e) {
                    this.close(true, e);
                    throw new MessageSendException(e);
                }
            }
        }

        public synchronized void checkSendBuffer(long lastCheckTime) {
            if (this.sendBufferCount == 0L) {
                return;
            }
            boolean isBuffer = false;
            if (ServerConnectionImpl.this.bufferTime > 0L || ServerConnectionImpl.this.bufferSize > 0L) {
                isBuffer = true;
                if (ServerConnectionImpl.this.bufferTime > 0L) {
                    long currentTime = System.currentTimeMillis();
                    if (ServerConnectionImpl.this.bufferTime <= currentTime - this.bufferStartTime) {
                        isBuffer = false;
                    }
                } else if (ServerConnectionImpl.this.bufferSize > 0L && lastCheckTime > this.bufferStartTime) {
                    isBuffer = false;
                }
            }
            if (!isBuffer) {
                if (this.socketChannel != null) {
                    try {
                        this.socketChannel.register(ServerConnectionImpl.this.selector, 5, this);
                        ServerConnectionImpl.this.selector.wakeup();
                    }
                    catch (ClosedChannelException closedChannelException) {}
                } else {
                    this.writeSendBuffer(null);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public synchronized void writeSendBuffer(SelectionKey key) {
            long startTime = System.currentTimeMillis();
            try {
                byte[] bytes = this.sendBuffer.toByteArray();
                this.sendBuffer.reset();
                if (this.socketChannel != null) {
                    ByteBuffer buf = ByteBuffer.allocate(bytes.length);
                    buf.put(bytes);
                    buf.flip();
                    this.socketChannel.write(buf);
                } else {
                    OutputStream os = this.socket.getOutputStream();
                    os.write(bytes);
                    os.flush();
                }
            }
            catch (SocketException e) {
                if (key != null) {
                    key.cancel();
                }
                this.close(false, e);
            }
            catch (IOException e) {
                if (key != null) {
                    key.cancel();
                }
                this.close(true, e);
            }
            finally {
                this.sendCount += this.sendBufferCount;
                this.sendBytes += (long)this.sendBufferSize;
                this.sendProcessTime += System.currentTimeMillis() - startTime;
                this.sendBufferSize = 0;
                this.sendBufferCount = 0L;
                this.bufferStartTime = -1L;
            }
        }

        public void receive(SelectionKey key) {
            try {
                int readLength = this.socketChannel.read(this.byteBuffer);
                if (readLength == 0) {
                    return;
                }
                if (readLength == -1) {
                    throw new EOFException("EOF in reading length.");
                }
                do {
                    if (this.dataLength < 0) {
                        if (this.byteBuffer.position() < 4) {
                            return;
                        }
                        this.byteBuffer.flip();
                        this.dataLength = this.byteBuffer.getInt();
                        this.byteBuffer.compact();
                        if (this.dataLength <= 0) {
                            throw new IOException("DataLength is illegal." + this.dataLength);
                        }
                        if (this.dataLength > this.byteBuffer.capacity()) {
                            this.byteBuffer.flip();
                            ByteBuffer newByteBuffer = ByteBuffer.allocate(this.dataLength);
                            newByteBuffer.put(this.byteBuffer);
                            this.byteBuffer = newByteBuffer;
                        }
                    }
                    if (this.byteBuffer.position() < this.dataLength) {
                        return;
                    }
                    this.byteBuffer.flip();
                    byte[] dataBytes = new byte[this.dataLength];
                    this.byteBuffer.get(dataBytes);
                    this.dataLength = -1;
                    this.byteBuffer.compact();
                    ByteArrayInputStream is = new ByteArrayInputStream(dataBytes);
                    boolean isClosed = false;
                    if (ServerConnectionImpl.this.externalizer == null) {
                        ObjectInputStream ois = new ObjectInputStream(is);
                        isClosed = this.handleMessage((ClientMessage)ois.readObject());
                    } else {
                        isClosed = this.handleMessage((ClientMessage)ServerConnectionImpl.this.externalizer.readExternal(is));
                    }
                    if (!isClosed) continue;
                    key.cancel();
                    return;
                } while (this.byteBuffer.position() != 0);
            }
            catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            catch (SocketTimeoutException e) {
            }
            catch (SocketException e) {
                key.cancel();
                this.close(false, e);
            }
            catch (EOFException e) {
                key.cancel();
                this.close(false, e);
            }
            catch (IOException e) {
                key.cancel();
                this.close(true, e);
            }
        }

        public void resetSendCount() {
            this.sendCount = 0L;
            this.sendProcessTime = 0L;
            this.sendBytes = 0L;
        }

        public double getAverageSendProcessTime() {
            return this.sendCount == 0L ? 0.0 : (double)this.sendProcessTime / (double)this.sendCount;
        }

        public double getAverageSendBytes() {
            return this.sendCount == 0L ? 0.0 : (double)this.sendBytes / (double)this.sendCount;
        }

        public void close() {
            this.close(false, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected synchronized void close(boolean isClose, Throwable reason) {
            Object subject;
            if (ServerConnectionImpl.this.logger != null) {
                if (!isClose && ServerConnectionImpl.this.clientClosedMessageId != null) {
                    ServerConnectionImpl.this.logger.write(ServerConnectionImpl.this.clientClosedMessageId, new Object[]{this}, reason);
                } else if (isClose && ServerConnectionImpl.this.clientCloseMessageId != null) {
                    ServerConnectionImpl.this.logger.write(ServerConnectionImpl.this.clientCloseMessageId, new Object[]{this}, reason);
                }
            }
            if (this.subjects.size() != 0) {
                for (Map.Entry entry : this.subjects.entrySet()) {
                    subject = (String)entry.getKey();
                    Set keySet = (Set)entry.getValue();
                    if (ServerConnectionImpl.this.serverConnectionListeners == null || keySet.isEmpty()) continue;
                    String[] removeKeys = keySet.toArray(new String[0]);
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onRemoveSubject(this, (String)subject, removeKeys);
                    }
                }
            }
            if (this.isStartReceive) {
                this.isStartReceive = false;
                if (ServerConnectionImpl.this.serverConnectionListeners != null) {
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onStopReceive(this);
                    }
                }
            }
            Object id = this.getId();
            if (this.requestDispatcher != null) {
                this.requestDispatcher.stopNoWait();
                this.requestDispatcher = null;
            }
            if (this.socketChannel != null) {
                try {
                    this.socketChannel.close();
                }
                catch (IOException imax) {
                    // empty catch block
                }
                this.socketChannel = null;
            }
            if (this.socket != null) {
                try {
                    this.socket.close();
                }
                catch (IOException imax) {
                    // empty catch block
                }
                this.socket = null;
            }
            LinkedHashSet newClients = new LinkedHashSet();
            if (ServerConnectionImpl.this.clientAcceptor != null) {
                subject = ServerConnectionImpl.this.clientAcceptor;
                synchronized (subject) {
                    newClients.addAll(ServerConnectionImpl.this.clients);
                    newClients.remove(this);
                    ServerConnectionImpl.this.clients = newClients;
                }
            } else {
                newClients.addAll(ServerConnectionImpl.this.clients);
                newClients.remove(this);
                ServerConnectionImpl.this.clients = newClients;
            }
            ServerConnectionImpl.this.clientMap.remove(id);
            if (ServerConnectionImpl.this.serverConnectionListeners != null) {
                int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                for (int i = 0; i < imax; ++i) {
                    ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onClose(this);
                }
            }
        }

        public String toString() {
            StringBuffer buf = new StringBuffer();
            buf.append(super.toString());
            buf.append('{');
            buf.append("client=").append(this.socket == null ? null : this.socket.getRemoteSocketAddress());
            buf.append(", subject=").append(this.subjects);
            buf.append(", isEnabled=").append(this.isEnabled);
            buf.append('}');
            return buf.toString();
        }

        @Override
        public boolean onStart() {
            return true;
        }

        @Override
        public boolean onStop() {
            return true;
        }

        @Override
        public boolean onSuspend() {
            return true;
        }

        @Override
        public boolean onResume() {
            return true;
        }

        @Override
        public Object provide(DaemonControl ctrl) throws Throwable {
            try {
                DataInputStream dis = new DataInputStream(this.socket.getInputStream());
                int length = dis.readInt();
                if (length < 0) {
                    return new EOFException("illegal data length. length=" + length);
                }
                byte[] bytes = new byte[length];
                dis.readFully(bytes);
                ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
                if (ServerConnectionImpl.this.externalizer == null) {
                    ObjectInputStream ois = new ObjectInputStream(bais);
                    return ois.readObject();
                }
                return ServerConnectionImpl.this.externalizer.readExternal(bais);
            }
            catch (ClassNotFoundException e) {
                return null;
            }
            catch (SocketTimeoutException e) {
                return null;
            }
            catch (SocketException e) {
                this.close(false, e);
                return null;
            }
            catch (EOFException e) {
                this.close(false, e);
                return null;
            }
            catch (IOException e) {
                this.close(true, e);
                return null;
            }
        }

        @Override
        public void consume(Object paramObj, DaemonControl ctrl) throws Throwable {
            if (paramObj == null) {
                return;
            }
            ClientMessage message = (ClientMessage)paramObj;
            this.handleMessage(message);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean handleMessage(ClientMessage message) {
            Set<String> keySet = null;
            String[] keys = null;
            boolean isClosed = false;
            switch (message.getMessageType()) {
                case 1: {
                    IdMessage idMessage = (IdMessage)message;
                    ServerConnectionImpl.this.clientMap.remove(this.getId());
                    this.id = idMessage.getId();
                    ServerConnectionImpl.this.clientMap.put(this.getId(), this);
                    if (ServerConnectionImpl.this.serverConnectionListeners != null) {
                        int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                        for (int i = 0; i < imax; ++i) {
                            ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onConnect(this);
                        }
                    }
                    if (ServerConnectionImpl.this.logger == null || ServerConnectionImpl.this.clientConnectMessageId == null) break;
                    ServerConnectionImpl.this.logger.write(ServerConnectionImpl.this.clientConnectMessageId, new Object[]{this});
                    break;
                }
                case 2: {
                    List<String> addKeysList = Collections.synchronizedList(new ArrayList());
                    AddMessage addMessage = (AddMessage)message;
                    keySet = (Set<String>)this.subjects.get(addMessage.getSubject());
                    if (keySet == null) {
                        keySet = Collections.synchronizedSet(new HashSet());
                        this.subjects.put(addMessage.getSubject(), keySet);
                    }
                    if ((keys = addMessage.getKeys()) == null) {
                        if (keySet.add(null)) {
                            addKeysList.add(null);
                        }
                    } else {
                        for (int i = 0; i < keys.length; ++i) {
                            if (!keySet.add(keys[i])) continue;
                            addKeysList.add(keys[i]);
                        }
                    }
                    if (ServerConnectionImpl.this.serverConnectionListeners == null || addKeysList.isEmpty()) break;
                    String[] addkeys = addKeysList.toArray(new String[0]);
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onAddSubject(this, addMessage.getSubject(), addkeys);
                    }
                    break;
                }
                case 3: {
                    List<String> removeKeysList = Collections.synchronizedList(new ArrayList());
                    RemoveMessage removeMessage = (RemoveMessage)message;
                    keySet = (Set)this.subjects.get(removeMessage.getSubject());
                    if (keySet == null) break;
                    keys = removeMessage.getKeys();
                    if (keys == null) {
                        if (keySet.remove(null)) {
                            removeKeysList.add(null);
                        }
                        if (keySet.size() == 0) {
                            this.subjects.remove(removeMessage.getSubject());
                        }
                    } else {
                        for (int i = 0; i < keys.length; ++i) {
                            if (!keySet.remove(keys[i])) continue;
                            removeKeysList.add(keys[i]);
                        }
                        if (keySet.size() == 0) {
                            this.subjects.remove(removeMessage.getSubject());
                        }
                    }
                    if (ServerConnectionImpl.this.serverConnectionListeners == null || removeKeysList.isEmpty()) break;
                    String[] removeKeys = removeKeysList.toArray(new String[0]);
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onRemoveSubject(this, removeMessage.getSubject(), removeKeys);
                    }
                    break;
                }
                case 5: {
                    StartReceiveMessage startMessage = (StartReceiveMessage)message;
                    this.fromTime = startMessage.getFrom();
                    if (this.fromTime >= 0L) {
                        ClientImpl i = this;
                        synchronized (i) {
                            this.isStartReceive = true;
                            List messages = ServerConnectionImpl.this.getSendMessages(this.fromTime);
                            for (int i2 = 0; i2 < messages.size(); ++i2) {
                                Message msg = (Message)messages.get(i2);
                                if (!this.isTargetMessage(msg)) continue;
                                try {
                                    this.send(msg);
                                    continue;
                                }
                                catch (MessageSendException e) {
                                    if (ServerConnectionImpl.this.logger == null || ServerConnectionImpl.this.sendErrorRetryOverMessageId == null) continue;
                                    ServerConnectionImpl.this.logger.write(ServerConnectionImpl.this.sendErrorRetryOverMessageId, new Object[]{this, msg}, (Throwable)e);
                                }
                            }
                        }
                    } else {
                        this.isStartReceive = true;
                    }
                    if (ServerConnectionImpl.this.serverConnectionListeners == null) break;
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onStartReceive(this, this.fromTime);
                    }
                    break;
                }
                case 6: {
                    this.isStartReceive = false;
                    if (ServerConnectionImpl.this.serverConnectionListeners == null) break;
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onStopReceive(this);
                    }
                    break;
                }
                case 4: {
                    this.close();
                    isClosed = true;
                    break;
                }
            }
            if (!isClosed && ServerConnectionImpl.this.isAcknowledge) {
                MessageImpl response = ServerConnectionImpl.this.createMessage((byte)1);
                try {
                    response.setObject(new Short(message.getRequestId()));
                    this.send(response);
                }
                catch (MessageSendException messageSendException) {
                }
                catch (MessageException messageException) {
                    // empty catch block
                }
            }
            return isClosed;
        }

        @Override
        public void garbage() {
        }

        @Override
        public Set getSubjects() {
            if (this.subjects == null) {
                return null;
            }
            return new HashSet(this.subjects.keySet());
        }

        @Override
        public Set getKeys(String subject) {
            if (this.subjects == null) {
                return null;
            }
            return (Set)this.subjects.get(subject);
        }

        @Override
        public Object getId() {
            return this.id == null ? (this.socket == null ? null : this.socket.getRemoteSocketAddress()) : this.id;
        }
    }

    private class SendBufferChecker
    implements DaemonRunnable {
        private long lastCheckTime;

        @Override
        public boolean onStart() {
            return true;
        }

        @Override
        public boolean onStop() {
            return true;
        }

        @Override
        public boolean onSuspend() {
            return true;
        }

        @Override
        public boolean onResume() {
            return true;
        }

        @Override
        public Object provide(DaemonControl ctrl) throws Throwable {
            ctrl.sleep(ServerConnectionImpl.this.bufferTimeoutInterval, true);
            return null;
        }

        @Override
        public void consume(Object paramObj, DaemonControl ctrl) throws Throwable {
            if (ServerConnectionImpl.this.clients.size() == 0) {
                return;
            }
            long currentCheckTime = System.currentTimeMillis();
            for (ClientImpl client : ServerConnectionImpl.this.clients) {
                client.checkSendBuffer(this.lastCheckTime);
            }
            this.lastCheckTime = currentCheckTime;
        }

        @Override
        public void garbage() {
        }
    }

    private class ClientAcceptor
    implements DaemonRunnable {
        @Override
        public boolean onStart() {
            return true;
        }

        @Override
        public boolean onStop() {
            return true;
        }

        @Override
        public boolean onSuspend() {
            return true;
        }

        @Override
        public boolean onResume() {
            return true;
        }

        @Override
        public Object provide(DaemonControl ctrl) throws Throwable {
            if (ServerConnectionImpl.this.selector != null) {
                try {
                    return ServerConnectionImpl.this.selector.select(1000L) > 0 ? ServerConnectionImpl.this.selector.selectedKeys() : this;
                }
                catch (ClosedSelectorException e) {
                    return null;
                }
                catch (IOException e) {
                    return this;
                }
            }
            try {
                return ServerConnectionImpl.this.serverSocket.accept();
            }
            catch (SocketTimeoutException e) {
                return this;
            }
            catch (SocketException e) {
                return null;
            }
            catch (IOException e) {
                return this;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void consume(Object paramObj, DaemonControl ctrl) throws Throwable {
            if (paramObj == null) {
                ServerConnectionImpl.this.close();
                return;
            }
            if (ServerConnectionImpl.this.selector != null) {
                ClientImpl client = null;
                if (!(paramObj instanceof Set)) {
                    return;
                }
                Set keys = (Set)paramObj;
                Iterator itr = keys.iterator();
                while (itr.hasNext()) {
                    SelectionKey key = (SelectionKey)itr.next();
                    itr.remove();
                    try {
                        if (key.isAcceptable()) {
                            ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                            SocketChannel channel = ssc.accept();
                            if (channel == null) continue;
                            channel.configureBlocking(false);
                            if (ServerConnectionImpl.this.socketFactory != null) {
                                ServerConnectionImpl.this.socketFactory.applySocketProperties(channel.socket());
                            }
                            client = new ClientImpl(channel);
                            channel.register(key.selector(), 1, client);
                            LinkedHashSet<ClientImpl> newClients = new LinkedHashSet<ClientImpl>();
                            ServerConnectionImpl.this.clientMap.put(client.getId(), client);
                            ClientAcceptor clientAcceptor = this;
                            synchronized (clientAcceptor) {
                                newClients.addAll(ServerConnectionImpl.this.clients);
                                newClients.add(client);
                                ServerConnectionImpl.this.clients = newClients;
                                continue;
                            }
                        }
                        if (key.isReadable()) {
                            client = (ClientImpl)key.attachment();
                            client.receive(key);
                            continue;
                        }
                        if (key.isWritable()) {
                            key.interestOps(key.interestOps() & 0xFFFFFFFB);
                            client = (ClientImpl)key.attachment();
                            client.writeSendBuffer(key);
                            continue;
                        }
                        if (key.isValid()) continue;
                        key.cancel();
                    }
                    catch (CancelledKeyException cancelledKeyException) {}
                }
            } else {
                if (!(paramObj instanceof Socket)) {
                    return;
                }
                Socket socket = (Socket)paramObj;
                if (!socket.isBound() || socket.isClosed()) {
                    return;
                }
                ClientImpl client = new ClientImpl(socket);
                LinkedHashSet<ClientImpl> newClients = new LinkedHashSet<ClientImpl>();
                ServerConnectionImpl.this.clientMap.put(client.getId(), client);
                ClientAcceptor clientAcceptor = this;
                synchronized (clientAcceptor) {
                    newClients.addAll(ServerConnectionImpl.this.clients);
                    newClients.add(client);
                    ServerConnectionImpl.this.clients = newClients;
                }
                client.startRequestDispatcher();
            }
        }

        @Override
        public void garbage() {
        }
    }
}

