/*
 * Decompiled with CFR 0.152.
 */
package jp.ossc.nimbus.service.publish.tcp;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import jp.ossc.nimbus.core.ServiceName;
import jp.ossc.nimbus.daemon.Daemon;
import jp.ossc.nimbus.daemon.DaemonControl;
import jp.ossc.nimbus.daemon.DaemonRunnable;
import jp.ossc.nimbus.service.io.Externalizer;
import jp.ossc.nimbus.service.log.Logger;
import jp.ossc.nimbus.service.publish.Client;
import jp.ossc.nimbus.service.publish.Message;
import jp.ossc.nimbus.service.publish.MessageCreateException;
import jp.ossc.nimbus.service.publish.MessageException;
import jp.ossc.nimbus.service.publish.MessageSendException;
import jp.ossc.nimbus.service.publish.ServerConnection;
import jp.ossc.nimbus.service.publish.ServerConnectionListener;
import jp.ossc.nimbus.service.publish.tcp.AddMessage;
import jp.ossc.nimbus.service.publish.tcp.ClientMessage;
import jp.ossc.nimbus.service.publish.tcp.IdMessage;
import jp.ossc.nimbus.service.publish.tcp.MessageImpl;
import jp.ossc.nimbus.service.publish.tcp.RemoveMessage;
import jp.ossc.nimbus.service.publish.tcp.StartReceiveMessage;
import jp.ossc.nimbus.service.queue.AbstractDistributedQueueSelectorService;
import jp.ossc.nimbus.service.queue.AsynchContext;
import jp.ossc.nimbus.service.queue.DefaultQueueService;
import jp.ossc.nimbus.service.queue.DistributedQueueHandlerContainerService;
import jp.ossc.nimbus.service.queue.Queue;
import jp.ossc.nimbus.service.queue.QueueHandler;
import jp.ossc.nimbus.service.queue.QueueHandlerContainerService;
import jp.ossc.nimbus.util.SynchronizeMonitor;
import jp.ossc.nimbus.util.WaitSynchronizeMonitor;
import jp.ossc.nimbus.util.net.SocketFactory;

public class ServerConnectionImpl
implements ServerConnection {
    private ServerSocket serverSocket;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private Set clients = Collections.synchronizedSet(new LinkedHashSet());
    private Map clientMap = Collections.synchronizedMap(new HashMap());
    private int maxSendRetryCount;
    private Logger logger;
    private String sendErrorMessageId;
    private String sendErrorRetryOverMessageId;
    private Daemon clientAcceptor;
    private QueueHandlerContainerService sendQueueHandlerContainer;
    private ClientDistributedQueueSelector queueSelector;
    private DistributedQueueHandlerContainerService asynchSendQueueHandlerContainer;
    private long sendCount;
    private long sendProcessTime;
    private List serverConnectionListeners;
    private Externalizer externalizer;
    private SocketFactory socketFactory;
    private List sendBufferList = Collections.synchronizedList(new ArrayList());
    private long sendBufferTime;

    public ServerConnectionImpl(ServerSocket serverSocket, Externalizer ext, int sendThreadSize, ServiceName sendQueueServiceName, int asynchSendThreadSize, ServiceName asynchSendQueueServiceName) throws Exception {
        this.serverSocket = serverSocket;
        this.externalizer = ext;
        this.initSend(sendQueueServiceName, sendThreadSize);
        this.initAsynchSend(asynchSendQueueServiceName, asynchSendThreadSize);
        this.initClientAcceptor(serverSocket.getLocalSocketAddress());
    }

    public ServerConnectionImpl(ServerSocketChannel ssc, Externalizer ext, int sendThreadSize, ServiceName sendQueueServiceName, int asynchSendThreadSize, ServiceName asynchSendQueueServiceName, SocketFactory sf) throws Exception {
        this.serverSocketChannel = ssc;
        this.socketFactory = sf;
        this.externalizer = ext;
        this.initSend(sendQueueServiceName, sendThreadSize);
        this.initAsynchSend(asynchSendQueueServiceName, asynchSendThreadSize);
        this.selector = Selector.open();
        this.serverSocketChannel.register(this.selector, 16, null);
        this.initClientAcceptor(this.serverSocketChannel.socket().getLocalSocketAddress());
    }

    private void initClientAcceptor(SocketAddress localAddress) {
        this.clientAcceptor = new Daemon(new ClientAcceptor());
        this.clientAcceptor.setName("Nimbus Publish(TCP) ServerConnection ClientAcceptor " + localAddress);
        this.clientAcceptor.setDaemon(true);
        this.clientAcceptor.start();
    }

    private void initSend(ServiceName sendQueueServiceName, int sendThreadSize) throws Exception {
        if (sendThreadSize >= 2) {
            this.sendQueueHandlerContainer = new QueueHandlerContainerService();
            this.sendQueueHandlerContainer.create();
            if (sendQueueServiceName == null) {
                DefaultQueueService sendQueue = new DefaultQueueService();
                sendQueue.create();
                sendQueue.start();
                this.sendQueueHandlerContainer.setQueueService(sendQueue);
            } else {
                this.sendQueueHandlerContainer.setQueueServiceName(sendQueueServiceName);
            }
            this.sendQueueHandlerContainer.setQueueHandlerSize(sendThreadSize);
            this.sendQueueHandlerContainer.setQueueHandler(new SendQueueHandler());
            this.sendQueueHandlerContainer.start();
        }
    }

    private void initAsynchSend(ServiceName queueFactoryServiceName, int clientQueueDistributedSize) throws Exception {
        if (clientQueueDistributedSize > 0) {
            this.queueSelector = new ClientDistributedQueueSelector();
            this.queueSelector.create();
            this.queueSelector.setDistributedSize(clientQueueDistributedSize);
            if (queueFactoryServiceName != null) {
                this.queueSelector.setQueueFactoryServiceName(queueFactoryServiceName);
            }
            this.queueSelector.start();
            this.asynchSendQueueHandlerContainer = new DistributedQueueHandlerContainerService();
            this.asynchSendQueueHandlerContainer.create();
            this.asynchSendQueueHandlerContainer.setDistributedQueueSelector(this.queueSelector);
            this.asynchSendQueueHandlerContainer.setQueueHandler(new SendQueueHandler());
            this.asynchSendQueueHandlerContainer.start();
        }
    }

    public void setMaxSendRetryCount(int count) {
        this.maxSendRetryCount = count;
        if (this.sendQueueHandlerContainer != null) {
            this.sendQueueHandlerContainer.setMaxRetryCount(this.maxSendRetryCount);
        }
        if (this.asynchSendQueueHandlerContainer != null) {
            this.asynchSendQueueHandlerContainer.setMaxRetryCount(this.maxSendRetryCount);
        }
    }

    public void setLogger(Logger logger) {
        this.logger = logger;
    }

    public void setSendErrorMessageId(String id) {
        this.sendErrorMessageId = id;
    }

    public void setSendErrorRetryOverMessageId(String id) {
        this.sendErrorRetryOverMessageId = id;
    }

    public void setSendBufferTime(long time) {
        this.sendBufferTime = time;
    }

    @Override
    public Message createMessage(String subject, String key) throws MessageCreateException {
        MessageImpl message = new MessageImpl();
        message.setSubject(subject, key);
        return message;
    }

    @Override
    public Message castMessage(Message message) throws MessageException {
        if (message instanceof MessageImpl) {
            return message;
        }
        Message msg = this.createMessage(message.getSubject(), message.getKey());
        if (message.getSerializedBytes() != null) {
            msg.setSerializedBytes(message.getSerializedBytes());
        } else {
            msg.setObject(message.getObject());
        }
        return msg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(Message message) throws MessageSendException {
        block18: {
            long startTime = System.currentTimeMillis();
            this.addSendBuffer((MessageImpl)message);
            if (this.clients.size() == 0) {
                return;
            }
            try {
                ClientImpl[] clientArray = this.clients.toArray(new ClientImpl[this.clients.size()]);
                if (this.sendQueueHandlerContainer == null) {
                    ArrayList<ClientImpl> currentClients = new ArrayList<ClientImpl>();
                    for (int i = 0; i < clientArray.length; ++i) {
                        if (!clientArray[i].isStartReceive() || !clientArray[i].isTargetMessage(message)) continue;
                        currentClients.add(clientArray[i]);
                    }
                    for (int retryCount = -1; currentClients.size() != 0 && retryCount < this.maxSendRetryCount; ++retryCount) {
                        Iterator itr = currentClients.iterator();
                        while (itr.hasNext()) {
                            ClientImpl client = (ClientImpl)itr.next();
                            try {
                                client.send(message);
                                itr.remove();
                            }
                            catch (MessageSendException e) {
                                if (this.logger == null) continue;
                                if (retryCount + 1 >= this.maxSendRetryCount) {
                                    if (this.sendErrorRetryOverMessageId == null) continue;
                                    this.logger.write(this.sendErrorRetryOverMessageId, new Object[]{client, message}, (Throwable)e);
                                    continue;
                                }
                                if (this.sendErrorMessageId == null) continue;
                                this.logger.write(this.sendErrorMessageId, new Object[]{client, message}, (Throwable)e);
                            }
                        }
                    }
                    if (currentClients.size() != 0) {
                        throw new MessageSendException("Send error : clients=" + currentClients + ", message=" + message);
                    }
                    break block18;
                }
                DefaultQueueService responseQueue = new DefaultQueueService();
                try {
                    responseQueue.create();
                    responseQueue.start();
                }
                catch (Exception e) {
                    throw new MessageSendException(e);
                }
                responseQueue.accept();
                for (int i = 0; i < clientArray.length; ++i) {
                    if (!clientArray[i].isStartReceive() || !clientArray[i].isTargetMessage(message)) {
                        clientArray[i] = null;
                        continue;
                    }
                    this.sendQueueHandlerContainer.push(new AsynchContext(new SendRequest(clientArray[i], message), responseQueue));
                }
                ArrayList<ClientImpl> errorClients = new ArrayList<ClientImpl>();
                for (int i = 0; i < clientArray.length; ++i) {
                    AsynchContext asynchContext;
                    if (clientArray[i] == null || (asynchContext = (AsynchContext)responseQueue.get()).getThrowable() == null) continue;
                    errorClients.add(((SendRequest)asynchContext.getInput()).client);
                }
                if (errorClients.size() != 0) {
                    throw new MessageSendException("Send error : clients=" + errorClients + ", message=" + message);
                }
            }
            finally {
                this.sendProcessTime += System.currentTimeMillis() - startTime;
            }
        }
    }

    @Override
    public void sendAsynch(Message message) {
        if (this.asynchSendQueueHandlerContainer == null) {
            throw new UnsupportedOperationException();
        }
        this.addSendBuffer((MessageImpl)message);
        if (this.clients.size() == 0) {
            return;
        }
        ClientImpl[] clientArray = this.clients.toArray(new ClientImpl[this.clients.size()]);
        for (int i = 0; i < clientArray.length; ++i) {
            if (!clientArray[i].isStartReceive() || !clientArray[i].isTargetMessage(message)) continue;
            this.asynchSendQueueHandlerContainer.push(new AsynchContext(new SendRequest(clientArray[i], message)));
        }
    }

    public long getSendCount() {
        return this.sendCount;
    }

    public void resetSendCount() {
        this.sendCount = 0L;
        this.sendProcessTime = 0L;
    }

    public long getAverageSendProcessTime() {
        return this.sendCount == 0L ? 0L : this.sendProcessTime / this.sendCount;
    }

    public Set getClients() {
        return this.clients;
    }

    @Override
    public int getClientCount() {
        return this.clients.size();
    }

    @Override
    public Set getClientIds() {
        return new HashSet(this.clientMap.keySet());
    }

    @Override
    public Set getReceiveClientIds(Message message) {
        ClientImpl[] clientArray = this.clients.toArray(new ClientImpl[this.clients.size()]);
        HashSet<Object> result = new HashSet<Object>();
        for (int i = 0; i < clientArray.length; ++i) {
            if (!clientArray[i].isTargetMessage(message)) continue;
            result.add(clientArray[i].getId());
        }
        return result;
    }

    @Override
    public Set getSubjects(Object id) {
        ClientImpl client = (ClientImpl)this.clientMap.get(id);
        if (client == null) {
            return null;
        }
        return client.getSubjects();
    }

    @Override
    public Set getKeys(Object id, String subject) {
        ClientImpl client = (ClientImpl)this.clientMap.get(id);
        if (client == null) {
            return null;
        }
        return client.getKeys(subject);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addSendBuffer(MessageImpl message) {
        long currentTime = System.currentTimeMillis();
        message.setSendTime(currentTime);
        List list = this.sendBufferList;
        synchronized (list) {
            MessageImpl msg;
            this.sendBufferList.add(message);
            int imax = this.sendBufferList.size();
            for (int i = 0; i < imax && currentTime - (msg = (MessageImpl)this.sendBufferList.get(0)).getSendTime() > this.sendBufferTime; ++i) {
                this.sendBufferList.remove(0);
            }
            ++this.sendCount;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List getSendMessages(long from) {
        ArrayList<MessageImpl> result = new ArrayList<MessageImpl>();
        List list = this.sendBufferList;
        synchronized (list) {
            MessageImpl msg;
            int i = this.sendBufferList.size();
            while (--i >= 0 && (msg = (MessageImpl)this.sendBufferList.get(i)).getSendTime() >= from) {
                result.add(0, msg);
            }
        }
        return result;
    }

    public int getSendBufferSize() {
        return this.sendBufferList.size();
    }

    public void close() {
        try {
            this.send(new MessageImpl(true));
        }
        catch (MessageSendException e) {
            // empty catch block
        }
        if (this.clientAcceptor != null) {
            this.clientAcceptor.stopNoWait();
            this.clientAcceptor = null;
        }
        if (this.sendQueueHandlerContainer != null) {
            this.sendQueueHandlerContainer.stop();
            this.sendQueueHandlerContainer.destroy();
            this.sendQueueHandlerContainer = null;
        }
        if (this.asynchSendQueueHandlerContainer != null) {
            this.asynchSendQueueHandlerContainer.stop();
            this.asynchSendQueueHandlerContainer.destroy();
            this.asynchSendQueueHandlerContainer = null;
        }
        if (this.queueSelector != null) {
            this.queueSelector.stop();
            this.queueSelector.destroy();
            this.queueSelector = null;
        }
        if (this.serverSocket != null) {
            try {
                this.serverSocket.close();
            }
            catch (IOException e) {
                // empty catch block
            }
        }
        ClientImpl[] clientArray = this.clients.toArray(new ClientImpl[this.clients.size()]);
        for (int i = 0; i < clientArray.length; ++i) {
            clientArray[i].close();
        }
    }

    @Override
    public void addServerConnectionListener(ServerConnectionListener listener) {
        if (this.serverConnectionListeners == null) {
            this.serverConnectionListeners = new ArrayList();
        }
        if (!this.serverConnectionListeners.contains(listener)) {
            this.serverConnectionListeners.add(listener);
        }
    }

    @Override
    public void removeServerConnectionListener(ServerConnectionListener listener) {
        if (this.serverConnectionListeners == null) {
            return;
        }
        this.serverConnectionListeners.remove(listener);
        if (this.serverConnectionListeners.size() == 0) {
            this.serverConnectionListeners = null;
        }
    }

    public String toString() {
        StringBuffer buf = new StringBuffer();
        buf.append(super.toString());
        buf.append('{');
        buf.append("server=").append(this.serverSocket == null ? null : this.serverSocket.getLocalSocketAddress());
        buf.append('}');
        return buf.toString();
    }

    private class ClientDistributedQueueSelector
    extends AbstractDistributedQueueSelectorService {
        private static final long serialVersionUID = 8050312124454494504L;

        private ClientDistributedQueueSelector() {
        }

        @Override
        protected Object getKey(Object obj) {
            AsynchContext asynchContext = (AsynchContext)obj;
            SendRequest request = (SendRequest)asynchContext.getInput();
            return request.client;
        }
    }

    private class SendQueueHandler
    implements QueueHandler {
        private SendQueueHandler() {
        }

        @Override
        public void handleDequeuedObject(Object obj) throws Throwable {
            if (obj == null) {
                return;
            }
            AsynchContext asynchContext = (AsynchContext)obj;
            SendRequest request = (SendRequest)asynchContext.getInput();
            if (request.client.isStartReceive()) {
                request.client.send(request.message);
            }
            if (asynchContext.getResponseQueue() != null) {
                asynchContext.getResponseQueue().push(asynchContext);
            }
        }

        @Override
        public boolean handleError(Object obj, Throwable th) throws Throwable {
            AsynchContext asynchContext = (AsynchContext)obj;
            if (ServerConnectionImpl.this.logger != null && ServerConnectionImpl.this.sendErrorMessageId != null) {
                SendRequest request = (SendRequest)asynchContext.getInput();
                ServerConnectionImpl.this.logger.write(ServerConnectionImpl.this.sendErrorMessageId, new Object[]{request.client, request.message}, th);
            }
            return true;
        }

        @Override
        public void handleRetryOver(Object obj, Throwable th) throws Throwable {
            AsynchContext asynchContext = (AsynchContext)obj;
            if (ServerConnectionImpl.this.logger != null && ServerConnectionImpl.this.sendErrorRetryOverMessageId != null) {
                SendRequest request = (SendRequest)asynchContext.getInput();
                ServerConnectionImpl.this.logger.write(ServerConnectionImpl.this.sendErrorRetryOverMessageId, new Object[]{request.client, request.message}, th);
            }
            asynchContext.setThrowable(th);
            if (asynchContext.getResponseQueue() != null) {
                asynchContext.getResponseQueue().push(asynchContext);
            }
        }
    }

    private class SendRequest {
        public ClientImpl client;
        public Message message;

        public SendRequest(ClientImpl client, Message message) {
            this.client = client;
            this.message = message;
        }
    }

    public class ClientImpl
    implements DaemonRunnable,
    Client {
        private SocketChannel socketChannel;
        private Socket socket;
        private Daemon requestDispatcher;
        private Map subjects;
        private long sendCount;
        private long sendProcessTime;
        private boolean isEnabled = true;
        private Object id;
        private ByteBuffer lengthBuffer;
        private ByteBuffer dataBuffer;
        private int dataLength = 0;
        private long fromTime = -1L;
        private boolean isStartReceive = false;
        private SynchronizeMonitor selectMonitor;
        private SynchronizeMonitor sendMonitor;
        private SelectionKey sendKey;

        public ClientImpl(SocketChannel sc) {
            this.socketChannel = sc;
            this.socket = this.socketChannel.socket();
            this.subjects = Collections.synchronizedMap(new HashMap());
            this.lengthBuffer = ByteBuffer.allocate(4);
            this.selectMonitor = new WaitSynchronizeMonitor();
            this.sendMonitor = new WaitSynchronizeMonitor();
        }

        public ClientImpl(Socket sock) throws IOException {
            this.socket = sock;
            this.subjects = Collections.synchronizedMap(new HashMap());
            this.requestDispatcher = new Daemon(this);
            this.requestDispatcher.setName("Nimbus Publish(TCP) ServerConnection ClientRequestDisptcher " + this.socket.getRemoteSocketAddress());
            this.requestDispatcher.setDaemon(true);
            this.requestDispatcher.start();
        }

        public SocketChannel getSocketChannel() {
            return this.socketChannel;
        }

        public Socket getSocket() {
            return this.socket;
        }

        public boolean isEnabled() {
            return this.isEnabled;
        }

        public void setEnabled(boolean isEnabled) {
            this.isEnabled = isEnabled;
        }

        public boolean isStartReceive() {
            return this.isStartReceive;
        }

        public boolean isTargetMessage(Message message) {
            if (!message.containsDestinationId(this.getId())) {
                return false;
            }
            if (message.getSubject() != null) {
                for (String sbj : message.getSubjects()) {
                    Set keySet = (Set)this.subjects.get(sbj);
                    String key = message.getKey(sbj);
                    if (keySet == null || !keySet.contains(null) && !keySet.contains(key)) continue;
                    return true;
                }
            }
            return false;
        }

        public synchronized void send(Message message) throws MessageSendException {
            if (!this.isEnabled || this.socketChannel == null && this.socket == null) {
                return;
            }
            long startTime = System.currentTimeMillis();
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                ((MessageImpl)message).write(baos, ServerConnectionImpl.this.externalizer);
                byte[] bytes = baos.toByteArray();
                if (this.socketChannel != null) {
                    ByteBuffer buf = ByteBuffer.allocate(bytes.length + 4);
                    buf.putInt(bytes.length);
                    buf.put(bytes);
                    buf.flip();
                    this.selectMonitor.initMonitor();
                    ((ClientAcceptor)ServerConnectionImpl.this.clientAcceptor.getDaemonRunnable()).addSendClient(this);
                    this.selectMonitor.waitMonitor();
                    this.socketChannel.write(buf);
                    this.sendMonitor.notifyAllMonitor();
                } else {
                    DataOutputStream dos = new DataOutputStream(this.socket.getOutputStream());
                    dos.writeInt(bytes.length);
                    dos.write(bytes);
                    dos.flush();
                }
                ++this.sendCount;
                this.sendProcessTime += System.currentTimeMillis() - startTime;
            }
            catch (InterruptedException e) {
                return;
            }
            catch (SocketTimeoutException e) {
                throw new MessageSendException(e);
            }
            catch (SocketException e) {
                this.close();
                throw new MessageSendException(e);
            }
            catch (IOException e) {
                this.close();
                throw new MessageSendException(e);
            }
        }

        public void setSendKey(SelectionKey key) {
            this.sendKey = key;
            this.sendMonitor.initMonitor();
            this.selectMonitor.notifyAllMonitor();
        }

        public void waitSend() throws IOException {
            try {
                this.sendMonitor.waitMonitor();
            }
            catch (InterruptedException e) {
                return;
            }
            finally {
                this.socketChannel.register(this.sendKey.selector(), 1, this);
            }
        }

        public void receive(SelectionKey key) {
            try {
                int readLength;
                if (this.dataBuffer == null) {
                    readLength = this.socketChannel.read(this.lengthBuffer);
                    if (readLength == 0) {
                        return;
                    }
                    if (readLength == -1) {
                        throw new EOFException("EOF in reading length.");
                    }
                    if (this.lengthBuffer.position() < 4) {
                        return;
                    }
                    this.lengthBuffer.rewind();
                    this.dataLength = this.lengthBuffer.getInt();
                    this.lengthBuffer.clear();
                    if (this.dataLength <= 0) {
                        throw new IOException("DataLength is illegal." + this.dataLength);
                    }
                    this.dataBuffer = ByteBuffer.allocate(this.dataLength);
                }
                if ((readLength = this.socketChannel.read(this.dataBuffer)) == 0) {
                    return;
                }
                if (readLength == -1) {
                    throw new EOFException("EOF in reading data.");
                }
                if (this.dataBuffer.position() < this.dataLength) {
                    return;
                }
                this.dataBuffer.rewind();
                byte[] dataBytes = new byte[this.dataLength];
                this.dataBuffer.get(dataBytes);
                this.dataBuffer = null;
                this.lengthBuffer.clear();
                ByteArrayInputStream is = new ByteArrayInputStream(dataBytes);
                boolean isClosed = false;
                if (ServerConnectionImpl.this.externalizer == null) {
                    ObjectInputStream ois = new ObjectInputStream(is);
                    isClosed = this.handleMessage((ClientMessage)ois.readObject());
                } else {
                    isClosed = this.handleMessage((ClientMessage)ServerConnectionImpl.this.externalizer.readExternal(is));
                }
                if (isClosed) {
                    key.cancel();
                }
            }
            catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            catch (SocketTimeoutException e) {
            }
            catch (IOException e) {
                key.cancel();
                this.close();
            }
        }

        public long getSendCount() {
            return this.sendCount;
        }

        public void resetSendCount() {
            this.sendCount = 0L;
            this.sendProcessTime = 0L;
        }

        public long getAverageSendProcessTime() {
            return this.sendCount == 0L ? 0L : this.sendProcessTime / this.sendCount;
        }

        public synchronized void close() {
            Object id = this.getId();
            if (this.requestDispatcher != null) {
                this.requestDispatcher.stopNoWait();
                this.requestDispatcher = null;
            }
            if (this.selectMonitor != null) {
                this.selectMonitor.releaseMonitor();
                this.selectMonitor = null;
            }
            if (this.sendMonitor != null) {
                this.sendMonitor.releaseMonitor();
                this.sendMonitor = null;
            }
            if (this.socketChannel != null) {
                try {
                    this.socketChannel.finishConnect();
                    this.socketChannel.close();
                }
                catch (IOException e) {
                    // empty catch block
                }
                this.socketChannel = null;
            }
            if (this.socket != null) {
                try {
                    this.socket.close();
                }
                catch (IOException e) {
                    // empty catch block
                }
                this.socket = null;
            }
            ServerConnectionImpl.this.clients.remove(this);
            ServerConnectionImpl.this.clientMap.remove(id);
            if (ServerConnectionImpl.this.serverConnectionListeners != null) {
                int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                for (int i = 0; i < imax; ++i) {
                    ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onClose(this);
                }
            }
            this.isStartReceive = false;
        }

        public String toString() {
            StringBuffer buf = new StringBuffer();
            buf.append(super.toString());
            buf.append('{');
            buf.append("client=").append(this.socket == null ? null : this.socket.getRemoteSocketAddress());
            buf.append(", subject=").append(this.subjects);
            buf.append(", isEnabled=").append(this.isEnabled);
            buf.append('}');
            return buf.toString();
        }

        @Override
        public boolean onStart() {
            return true;
        }

        @Override
        public boolean onStop() {
            return true;
        }

        @Override
        public boolean onSuspend() {
            return true;
        }

        @Override
        public boolean onResume() {
            return true;
        }

        @Override
        public Object provide(DaemonControl ctrl) throws Throwable {
            try {
                DataInputStream dis = new DataInputStream(this.socket.getInputStream());
                int length = dis.readInt();
                byte[] bytes = new byte[length];
                dis.readFully(bytes);
                ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
                if (ServerConnectionImpl.this.externalizer == null) {
                    ObjectInputStream ois = new ObjectInputStream(bais);
                    return ois.readObject();
                }
                return ServerConnectionImpl.this.externalizer.readExternal(bais);
            }
            catch (ClassNotFoundException e) {
                return this;
            }
            catch (SocketTimeoutException e) {
                return this;
            }
            catch (SocketException e) {
                return null;
            }
            catch (EOFException e) {
                return null;
            }
            catch (IOException e) {
                return this;
            }
        }

        @Override
        public void consume(Object paramObj, DaemonControl ctrl) throws Throwable {
            if (paramObj == null) {
                this.close();
                return;
            }
            if (!(paramObj instanceof ClientMessage)) {
                return;
            }
            ClientMessage message = (ClientMessage)paramObj;
            this.handleMessage(message);
        }

        private boolean handleMessage(ClientMessage message) {
            Set<String> keySet = null;
            String[] keys = null;
            boolean result = false;
            switch (message.getMessageType()) {
                case 1: {
                    IdMessage idMessage = (IdMessage)message;
                    ServerConnectionImpl.this.clientMap.remove(this.getId());
                    this.id = idMessage.getId();
                    ServerConnectionImpl.this.clientMap.put(this.getId(), this);
                    if (ServerConnectionImpl.this.serverConnectionListeners == null) break;
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onConnect(this);
                    }
                    break;
                }
                case 2: {
                    List<String> addKeysList = Collections.synchronizedList(new ArrayList());
                    AddMessage addMessage = (AddMessage)message;
                    keySet = (Set<String>)this.subjects.get(addMessage.getSubject());
                    if (keySet == null) {
                        keySet = Collections.synchronizedSet(new HashSet());
                        this.subjects.put(addMessage.getSubject(), keySet);
                    }
                    if ((keys = addMessage.getKeys()) == null) {
                        if (keySet.add(null)) {
                            addKeysList.add(null);
                        }
                    } else {
                        for (int i = 0; i < keys.length; ++i) {
                            if (!keySet.add(keys[i])) continue;
                            addKeysList.add(keys[i]);
                        }
                    }
                    if (ServerConnectionImpl.this.serverConnectionListeners == null || addKeysList.isEmpty()) break;
                    String[] addkeys = addKeysList.toArray(new String[0]);
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onAddSubject(this, addMessage.getSubject(), addkeys);
                    }
                    break;
                }
                case 3: {
                    List<String> removeKeysList = Collections.synchronizedList(new ArrayList());
                    RemoveMessage removeMessage = (RemoveMessage)message;
                    keySet = (Set)this.subjects.get(removeMessage.getSubject());
                    if (keySet == null) break;
                    keys = removeMessage.getKeys();
                    if (keys == null) {
                        if (keySet.remove(null)) {
                            removeKeysList.add(null);
                        }
                        if (keySet.size() == 0) {
                            this.subjects.remove(removeMessage.getSubject());
                        }
                    } else {
                        for (int i = 0; i < keys.length; ++i) {
                            if (!keySet.remove(keys[i])) continue;
                            removeKeysList.add(keys[i]);
                        }
                        if (keySet.size() == 0) {
                            this.subjects.remove(removeMessage.getSubject());
                        }
                    }
                    if (ServerConnectionImpl.this.serverConnectionListeners == null || removeKeysList.isEmpty()) break;
                    String[] removeKeys = removeKeysList.toArray(new String[0]);
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onRemoveSubject(this, removeMessage.getSubject(), removeKeys);
                    }
                    break;
                }
                case 5: {
                    StartReceiveMessage startMessage = (StartReceiveMessage)message;
                    this.fromTime = startMessage.getFrom();
                    if (this.fromTime >= 0L) {
                        List messages = ServerConnectionImpl.this.getSendMessages(this.fromTime);
                        for (int i = 0; i < messages.size(); ++i) {
                            Message msg = (Message)messages.get(i);
                            try {
                                this.send(msg);
                                continue;
                            }
                            catch (MessageSendException e) {
                                if (ServerConnectionImpl.this.logger == null || ServerConnectionImpl.this.sendErrorRetryOverMessageId == null) continue;
                                ServerConnectionImpl.this.logger.write(ServerConnectionImpl.this.sendErrorRetryOverMessageId, new Object[]{this, msg}, (Throwable)e);
                            }
                        }
                    }
                    this.isStartReceive = true;
                    if (ServerConnectionImpl.this.serverConnectionListeners == null) break;
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onStartReceive(this, this.fromTime);
                    }
                    break;
                }
                case 6: {
                    this.isStartReceive = false;
                    if (ServerConnectionImpl.this.serverConnectionListeners == null) break;
                    int imax = ServerConnectionImpl.this.serverConnectionListeners.size();
                    for (int i = 0; i < imax; ++i) {
                        ((ServerConnectionListener)ServerConnectionImpl.this.serverConnectionListeners.get(i)).onStopReceive(this);
                    }
                    break;
                }
                case 4: {
                    this.close();
                    result = true;
                    break;
                }
            }
            return result;
        }

        @Override
        public void garbage() {
        }

        @Override
        public Set getSubjects() {
            if (this.subjects == null) {
                return null;
            }
            return this.subjects.keySet();
        }

        @Override
        public Set getKeys(String subject) {
            if (this.subjects == null) {
                return null;
            }
            return (Set)this.subjects.get(subject);
        }

        @Override
        public Object getId() {
            return this.id == null ? (this.socket == null ? null : this.socket.getRemoteSocketAddress()) : this.id;
        }
    }

    private class ClientAcceptor
    implements DaemonRunnable {
        private Queue sendClientQueue;
        private List sendClients;

        public ClientAcceptor() {
            if (ServerConnectionImpl.this.selector != null) {
                DefaultQueueService sendQueue = new DefaultQueueService();
                try {
                    sendQueue.create();
                    sendQueue.start();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                this.sendClientQueue = sendQueue;
                this.sendClients = new ArrayList();
            }
        }

        public void addSendClient(ClientImpl client) {
            this.sendClientQueue.push(client);
            ServerConnectionImpl.this.selector.wakeup();
        }

        @Override
        public boolean onStart() {
            if (this.sendClientQueue != null) {
                this.sendClientQueue.accept();
            }
            return true;
        }

        @Override
        public boolean onStop() {
            if (this.sendClientQueue != null) {
                this.sendClientQueue.release();
            }
            return true;
        }

        @Override
        public boolean onSuspend() {
            return true;
        }

        @Override
        public boolean onResume() {
            return true;
        }

        @Override
        public Object provide(DaemonControl ctrl) throws Throwable {
            if (ServerConnectionImpl.this.selector != null) {
                try {
                    return ServerConnectionImpl.this.selector.select(1000L) > 0 ? ServerConnectionImpl.this.selector.selectedKeys() : this;
                }
                catch (ClosedSelectorException e) {
                    return null;
                }
                catch (IOException e) {
                    return this;
                }
            }
            try {
                return ServerConnectionImpl.this.serverSocket.accept();
            }
            catch (SocketTimeoutException e) {
                return this;
            }
            catch (SocketException e) {
                return null;
            }
            catch (IOException e) {
                return this;
            }
        }

        @Override
        public void consume(Object paramObj, DaemonControl ctrl) throws Throwable {
            if (paramObj == null) {
                ServerConnectionImpl.this.close();
                return;
            }
            if (ServerConnectionImpl.this.selector != null) {
                ClientImpl client = null;
                boolean isAddKey = false;
                while ((client = (ClientImpl)this.sendClientQueue.get(0L)) != null) {
                    client.getSocketChannel().register(ServerConnectionImpl.this.selector, 5, client);
                    isAddKey = true;
                }
                if (isAddKey && ServerConnectionImpl.this.selector.selectNow() > 0) {
                    paramObj = ServerConnectionImpl.this.selector.selectedKeys();
                }
                if (!(paramObj instanceof Set)) {
                    return;
                }
                Set keys = paramObj;
                Iterator itr = keys.iterator();
                while (itr.hasNext()) {
                    SelectionKey key = (SelectionKey)itr.next();
                    itr.remove();
                    try {
                        if (key.isAcceptable()) {
                            ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                            SocketChannel channel = ssc.accept();
                            if (channel == null) continue;
                            channel.configureBlocking(false);
                            if (ServerConnectionImpl.this.socketFactory != null) {
                                ServerConnectionImpl.this.socketFactory.applySocketProperties(channel.socket());
                            }
                            client = new ClientImpl(channel);
                            channel.register(key.selector(), 1, client);
                            ServerConnectionImpl.this.clients.add(client);
                            ServerConnectionImpl.this.clientMap.put(client.getId(), client);
                            continue;
                        }
                        if (key.isReadable()) {
                            client = (ClientImpl)key.attachment();
                            client.receive(key);
                            continue;
                        }
                        if (key.isWritable()) {
                            client = (ClientImpl)key.attachment();
                            this.sendClients.add(client);
                            client.setSendKey(key);
                            continue;
                        }
                        if (key.isValid()) continue;
                        key.cancel();
                    }
                    catch (CancelledKeyException e) {}
                }
                int imax = this.sendClients.size();
                for (int i = 0; i < imax; ++i) {
                    ((ClientImpl)this.sendClients.remove(0)).waitSend();
                }
            } else {
                if (!(paramObj instanceof Socket)) {
                    return;
                }
                Socket socket = (Socket)((Object)paramObj);
                if (!socket.isBound() || socket.isClosed()) {
                    return;
                }
                ClientImpl client = new ClientImpl(socket);
                ServerConnectionImpl.this.clients.add(client);
                ServerConnectionImpl.this.clientMap.put(client.getId(), client);
            }
        }

        @Override
        public void garbage() {
        }
    }
}

