精华内容
下载资源
问答
  • socket源码分析之accept

    千次阅读 2018-09-12 09:02:04
    socket源码分析之accept() 基于 kernel 3.10 之前有分析过TCP accept()的实现,但是太过于沉浸于代码本身,没有结合应用去分析accept()函数。 我们要解决如下几个问题: 1:accept()函数的实现,包括从...

    socket源码分析之accept()

    基于 kernel 3.10

    之前有分析过TCP accept()的实现,但是太过于沉浸于代码本身,没有结合应用去分析accept()函数。

    我们要解决如下几个问题:
    1:accept()函数的实现,包括从全队列中取出sock。
    2:accept()函数如何如何被唤醒
    3:accept()函数如何解决惊群
    4:多个进程accept(),优先唤醒哪个进程

    accept()函数的实现

    accept()函数实现逻辑相对比较简单

    如果没有完成建立的TCP会话,阻塞情况下,则阻塞,非阻塞情况下,则返回-EAGAIN

    所以总结来说需要考虑这么几种情况:
    1、当前全队列中有socket,则accept()直接返回对应的fd。
    2、如果当前全队列中没有socket,则如果当前socket是阻塞的,直接睡眠。
    3、如果当前全队列中没有socket,如果非阻塞,就直接返回-EAGAIN。
    4、如果是阻塞的listenfd,需要将当前进程挂在listenfd对应socket的等待队列里面,当前进程让出cpu,并且等待唤醒。

    accept实现的调用链

    sys_accept->sys_accept4->inet_accept->inet_csk_accept
    其中 inet_csk_accept是核心处理逻辑,其处理了上述1、3两种情况。

    
    /*
     * This will accept the next outstanding connection.
     */
    struct sock *inet_csk_accept(struct sock *sk, int flags, int *err)
    {
    	struct inet_connection_sock *icsk = inet_csk(sk);
    	struct request_sock_queue *queue = &icsk->icsk_accept_queue;
    	struct sock *newsk;
    	struct request_sock *req;
    	int error;
    
    	lock_sock(sk);
    
    	/* We need to make sure that this socket is listening,
    	 * and that it has something pending.
    	 */
        
        //只有TCP_LISTEN状态的socket才能调用accept
    	error = -EINVAL;
    	if (sk->sk_state != TCP_LISTEN)
    		goto out_err;
    
    	/* Find already established connection */
        
        //如果当前全队列中有已经三次握手建立起来后的连接,就不会进这个if,直接走到后面取全队列中的socket
    	if (reqsk_queue_empty(queue)) {
    		long timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
    
    		/* If this is a non blocking socket don't sleep */
            //非阻塞的socket,直接返回了
    		error = -EAGAIN;
    		if (!timeo)
    			goto out_err;
    
            //阻塞的socket,调用 inet_csk_wait_for_connect ,下文会说
    		error = inet_csk_wait_for_connect(sk, timeo);
            
    		if (error)
    			goto out_err;
    	}
        
        //走到这里,说明全队列中有socket,直接取出来
    	req = reqsk_queue_remove(queue);
    	newsk = req->sk;
    
    	sk_acceptq_removed(sk);
    	if (sk->sk_protocol == IPPROTO_TCP && queue->fastopenq != NULL) {
    		spin_lock_bh(&queue->fastopenq->lock);
    		if (tcp_rsk(req)->listener) {
    			/* We are still waiting for the final ACK from 3WHS
    			 * so can't free req now. Instead, we set req->sk to
    			 * NULL to signify that the child socket is taken
    			 * so reqsk_fastopen_remove() will free the req
    			 * when 3WHS finishes (or is aborted).
    			 */
    			req->sk = NULL;
    			req = NULL;
    		}
    		spin_unlock_bh(&queue->fastopenq->lock);
    	}
    out:
    	release_sock(sk);
    	if (req)
    		__reqsk_free(req);
    	return newsk;
    out_err:
    	newsk = NULL;
    	req = NULL;
    	*err = error;
    	goto out;
    }
    
    

    inet_csk_wait_for_connect函数处理了2、4两种情况。

    
    /*
     * Wait for an incoming connection, avoid race conditions. This must be called
     * with the socket locked.
     */
    static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
    {
    	struct inet_connection_sock *icsk = inet_csk(sk);
    	DEFINE_WAIT(wait);
    	int err;
    
    	/*
    	 * True wake-one mechanism for incoming connections: only
    	 * one process gets woken up, not the 'whole herd'.
    	 * Since we do not 'race & poll' for established sockets
    	 * anymore, the common case will execute the loop only once.
    	 *
    	 * Subtle issue: "add_wait_queue_exclusive()" will be added
    	 * after any current non-exclusive waiters, and we know that
    	 * it will always _stay_ after any new non-exclusive waiters
    	 * because all non-exclusive waiters are added at the
    	 * beginning of the wait-queue. As such, it's ok to "drop"
    	 * our exclusiveness temporarily when we get woken up without
    	 * having to remove and re-insert us on the wait queue.
    	 */
    	for (;;) {
            //prepare_to_wait_exclusive很重要,把 wait 挂到当前sk的等待队列里面。
    		prepare_to_wait_exclusive(sk_sleep(sk), &wait,
    					  TASK_INTERRUPTIBLE);
    		release_sock(sk);
            //icsk_accept_queue是全队列
    		if (reqsk_queue_empty(&icsk->icsk_accept_queue))
    			timeo = schedule_timeout(timeo);//阻塞情况下,只有主动唤醒当前进程,才会继续执行。
    		lock_sock(sk);
    		err = 0;
            
            //如果阻塞且非超时的情况从schedule_timeout返回,那么必然是全队列有值了。
    		if (!reqsk_queue_empty(&icsk->icsk_accept_queue))
    			break;//这个break是所有程序必经之路
    		err = -EINVAL;
    		if (sk->sk_state != TCP_LISTEN)
    			break;
    		err = sock_intr_errno(timeo);
            
            //有信号或者睡眠时间满了,则退出循环,否则接着睡。
    		if (signal_pending(current))
    			break;
    		err = -EAGAIN;
    		if (!timeo)
    			break;
    	}
    	finish_wait(sk_sleep(sk), &wait);
    	return err;
    }
    
    

    首先,为什么循环?这是历史原因,考虑有这么一种情况,就是睡眠时间没有睡满,那么 schedule_timeout返回的值大于0,那么什么情况下,睡眠没有睡满呢?一种情况就是进程收到信号,另一种就是listenfd对应的socket的全队列有数据了,不考虑信号的情况,假设全队列有数据了,历史上,Linux的accept是惊群的,全队列有值后,所有进程都唤醒,那么必然存在某些进程读取到了全队列socket,而某些没有读取到,这些没有读取到的进程,肯定是睡眠没睡满,所以需要接着睡。
    但是本文分析的Linux内核版本是3.10,全队列有数据时,只会唤醒一个进程,故而,次for循环只会跑一次。

    prepare_to_wait_exclusive函数很重要,把当前上下文加到listenfd对应的socket等待队列里面,如果是多进程,那么listenfd对应的socket等待队列里面会有多个进程的上下文。

    多进程 accept 如何处理惊群

    多进程accept,不考虑resuseport,那么多进程accept只会出现在父子进程同时accept的情况,那么上文也说过,prepare_to_wait_exclusive函数会被当前进程上下文加入到listenfd等待队列里面,所以父子进程的上下文都会加入到socket的等待队列里面。核心问题就是这么唤醒,我们可以相当,所谓的惊群,就是把等待队里里面的所有进程都唤醒。
    我们此时来看看如何唤醒。

    int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
    {
    	struct sock *rsk;
    
        ......
    	if (sk->sk_state == TCP_LISTEN) {
    		struct sock *nsk = tcp_v4_hnd_req(sk, skb);
    		if (!nsk)
    			goto discard;
    
    		if (nsk != sk) {
    			sock_rps_save_rxhash(nsk, skb);
                //当三次握手客户端的ack到来时,会走tcp_child_process这里
    			if (tcp_child_process(sk, nsk, skb)) {
    				rsk = nsk;
    				goto reset;
    			}
    			return 0;
    		}
    	}
        ......
    }
    

    tcp_child_process:

    int tcp_child_process(struct sock *parent, struct sock *child,
    		      struct sk_buff *skb)
    {
    	int ret = 0;
    	int state = child->sk_state;
    
    	if (!sock_owned_by_user(child)) {
    		ret = tcp_rcv_state_process(child, skb, tcp_hdr(skb),
    					    skb->len);
    		/* Wakeup parent, send SIGIO */
    		if (state == TCP_SYN_RECV && child->sk_state != state)
    			parent->sk_data_ready(parent, 0);//唤醒 在accept的进程,调用 sock_def_readable
    	} else {
    		/* Alas, it is possible again, because we do lookup
    		 * in main socket hash table and lock on listening
    		 * socket does not protect us more.
    		 */
    		__sk_add_backlog(child, skb);
    	}
    
    	bh_unlock_sock(child);
    	sock_put(child);
    	return ret;
    }
    
    

    parent->sk_data_ready:

    static void sock_def_readable(struct sock *sk, int len)
    {
    	struct socket_wq *wq;
    
    	rcu_read_lock();
    	wq = rcu_dereference(sk->sk_wq);
        //显然,我们在accept的时候调用了`prepare_to_wait_exclusive`加入了队列,故唤醒靠 wake_up_interruptible_sync_poll
    	if (wq_has_sleeper(wq))
    		wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
    						POLLRDNORM | POLLRDBAND);
    	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
    	rcu_read_unlock();
    }
    
    #define wake_up_interruptible_sync_poll(x, m)				\
    	__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
    
    

    注意,__wake_up_sync_key的第三个参数是1

    void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
    			int nr_exclusive, void *key)
    {
    	unsigned long flags;
    	int wake_flags = WF_SYNC;
    
    	if (unlikely(!q))
    		return;
    
    	if (unlikely(!nr_exclusive))
    		wake_flags = 0;
    
    	spin_lock_irqsave(&q->lock, flags);
        //mode是TASK_INTERRUPTIBLE nr_exclusive是1,
    	__wake_up_common(q, mode, nr_exclusive, wake_flags, key);
    	spin_unlock_irqrestore(&q->lock, flags);
    }
    

    __wake_up_common:

    static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
    			int nr_exclusive, int wake_flags, void *key)
    {
    	wait_queue_t *curr, *next;
    
    	list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
    		unsigned flags = curr->flags;
    
            //prepare_to_wait_exclusive时候,flags是WQ_FLAG_EXCLUSIVE,入参nr_exclusive是1,所以只执行一次就break了。
    		if (curr->func(curr, mode, wake_flags, key) &&
    				(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
    			break;
    	}
    }
    

    所以多个进程accept的时候,内核只会唤醒1个等待的进程,且唤醒的逻辑是FIFO。

    展开全文
  • Java中Socket源码

    千次阅读 2016-12-05 09:03:02
    Java中Socket源码 public class Socket implements java.io.Closeable { /** * Various states of this socket. */ private boolean created = false; private boolean bound = false; p

    Java中Socket源码

    
    public
    class Socket implements java.io.Closeable {
        /**
         * Various states of this socket.
         */
        private boolean created = false;
        private boolean bound = false;
        private boolean connected = false;
        private boolean closed = false;
        private Object closeLock = new Object();
        private boolean shutIn = false;
        private boolean shutOut = false;
    
      
        SocketImpl impl;
    
       
        private boolean oldImpl = false;
    
       
        public Socket() {
            setImpl();
        }
        public Socket(Proxy proxy) {
            // Create a copy of Proxy as a security measure
            if (proxy == null) {
                throw new IllegalArgumentException("Invalid Proxy");
            }
            Proxy p = proxy == Proxy.NO_PROXY ? Proxy.NO_PROXY : sun.net.ApplicationProxy.create(proxy);
            if (p.type() == Proxy.Type.SOCKS) {
                SecurityManager security = System.getSecurityManager();
                InetSocketAddress epoint = (InetSocketAddress) p.address();
                if (epoint.getAddress() != null) {
                    checkAddress (epoint.getAddress(), "Socket");
                }
                if (security != null) {
                    if (epoint.isUnresolved())
                        epoint = new InetSocketAddress(epoint.getHostName(), epoint.getPort());
                    if (epoint.isUnresolved())
                        security.checkConnect(epoint.getHostName(), epoint.getPort());
                    else
                        security.checkConnect(epoint.getAddress().getHostAddress(),
                                      epoint.getPort());
                }
                impl = new SocksSocketImpl(p);
                impl.setSocket(this);
            } else {
                if (p == Proxy.NO_PROXY) {
                    if (factory == null) {
                        impl = new PlainSocketImpl();
                        impl.setSocket(this);
                    } else
                        setImpl();
                } else
                    throw new IllegalArgumentException("Invalid Proxy");
            }
        }
    
        protected Socket(SocketImpl impl) throws SocketException {
            this.impl = impl;
            if (impl != null) {
                checkOldImpl();
                this.impl.setSocket(this);
            }
        }
    
        public Socket(String host, int port)
            throws UnknownHostException, IOException
        {
            this(host != null ? new InetSocketAddress(host, port) :
                 new InetSocketAddress(InetAddress.getByName(null), port),
                 (SocketAddress) null, true);
        }
    
        public Socket(InetAddress address, int port) throws IOException {
            this(address != null ? new InetSocketAddress(address, port) : null,
                 (SocketAddress) null, true);
        }
    
        public Socket(String host, int port, InetAddress localAddr,
                      int localPort) throws IOException {
            this(host != null ? new InetSocketAddress(host, port) :
                   new InetSocketAddress(InetAddress.getByName(null), port),
                 new InetSocketAddress(localAddr, localPort), true);
        }
    
        public Socket(InetAddress address, int port, InetAddress localAddr,
                      int localPort) throws IOException {
            this(address != null ? new InetSocketAddress(address, port) : null,
                 new InetSocketAddress(localAddr, localPort), true);
        }
    
        @Deprecated
        public Socket(String host, int port, boolean stream) throws IOException {
            this(host != null ? new InetSocketAddress(host, port) :
                   new InetSocketAddress(InetAddress.getByName(null), port),
                 (SocketAddress) null, stream);
        }
    
        @Deprecated
        public Socket(InetAddress host, int port, boolean stream) throws IOException {
            this(host != null ? new InetSocketAddress(host, port) : null,
                 new InetSocketAddress(0), stream);
        }
    
        private Socket(SocketAddress address, SocketAddress localAddr,
                       boolean stream) throws IOException {
            setImpl();
    
            // backward compatibility
            if (address == null)
                throw new NullPointerException();
    
            try {
                createImpl(stream);
                if (localAddr != null)
                    bind(localAddr);
                if (address != null)
                    connect(address);
            } catch (IOException e) {
                close();
                throw e;
            }
        }
    
         void createImpl(boolean stream) throws SocketException {
            if (impl == null)
                setImpl();
            try {
                impl.create(stream);
                created = true;
            } catch (IOException e) {
                throw new SocketException(e.getMessage());
            }
        }
    
        private void checkOldImpl() {
            if (impl == null)
                return;
            // SocketImpl.connect() is a protected method, therefore we need to use
            // getDeclaredMethod, therefore we need permission to access the member
    
            oldImpl = AccessController.doPrivileged
                                    (new PrivilegedAction<Boolean>() {
                public Boolean run() {
                    Class[] cl = new Class[2];
                    cl[0] = SocketAddress.class;
                    cl[1] = Integer.TYPE;
                    Class clazz = impl.getClass();
                    while (true) {
                        try {
                            clazz.getDeclaredMethod("connect", cl);
                            return Boolean.FALSE;
                        } catch (NoSuchMethodException e) {
                            clazz = clazz.getSuperclass();
                            // java.net.SocketImpl class will always have this abstract method.
                            // If we have not found it by now in the hierarchy then it does not
                            // exist, we are an old style impl.
                            if (clazz.equals(java.net.SocketImpl.class)) {
                                return Boolean.TRUE;
                            }
                        }
                    }
                }
            });
        }
    
        void setImpl() {
            if (factory != null) {
                impl = factory.createSocketImpl();
                checkOldImpl();
            } else {
                // No need to do a checkOldImpl() here, we know it's an up to date
                // SocketImpl!
                impl = new SocksSocketImpl();
            }
            if (impl != null)
                impl.setSocket(this);
        }
    
        SocketImpl getImpl() throws SocketException {
            if (!created)
                createImpl(true);
            return impl;
        }
    
        public void connect(SocketAddress endpoint) throws IOException {
            connect(endpoint, 0);
        }
    
        public void connect(SocketAddress endpoint, int timeout) throws IOException {
            if (endpoint == null)
                throw new IllegalArgumentException("connect: The address can't be null");
    
            if (timeout < 0)
              throw new IllegalArgumentException("connect: timeout can't be negative");
    
            if (isClosed())
                throw new SocketException("Socket is closed");
    
            if (!oldImpl && isConnected())
                throw new SocketException("already connected");
    
            if (!(endpoint instanceof InetSocketAddress))
                throw new IllegalArgumentException("Unsupported address type");
    
            InetSocketAddress epoint = (InetSocketAddress) endpoint;
            InetAddress addr = epoint.getAddress ();
            int port = epoint.getPort();
            checkAddress(addr, "connect");
    
            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                if (epoint.isUnresolved())
                    security.checkConnect(epoint.getHostName(), port);
                else
                    security.checkConnect(addr.getHostAddress(), port);
            }
            if (!created)
                createImpl(true);
            if (!oldImpl)
                impl.connect(epoint, timeout);
            else if (timeout == 0) {
                if (epoint.isUnresolved())
                    impl.connect(addr.getHostName(), port);
                else
                    impl.connect(addr, port);
            } else
                throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
            connected = true;
            /*
             * If the socket was not bound before the connect, it is now because
             * the kernel will have picked an ephemeral port & a local address
             */
            bound = true;
        }
    
        public void bind(SocketAddress bindpoint) throws IOException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            if (!oldImpl && isBound())
                throw new SocketException("Already bound");
    
            if (bindpoint != null && (!(bindpoint instanceof InetSocketAddress)))
                throw new IllegalArgumentException("Unsupported address type");
            InetSocketAddress epoint = (InetSocketAddress) bindpoint;
            if (epoint != null && epoint.isUnresolved())
                throw new SocketException("Unresolved address");
            if (epoint == null) {
                epoint = new InetSocketAddress(0);
            }
            InetAddress addr = epoint.getAddress();
            int port = epoint.getPort();
            checkAddress (addr, "bind");
            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                security.checkListen(port);
            }
            getImpl().bind (addr, port);
            bound = true;
        }
    
        private void checkAddress (InetAddress addr, String op) {
            if (addr == null) {
                return;
            }
            if (!(addr instanceof Inet4Address || addr instanceof Inet6Address)) {
                throw new IllegalArgumentException(op + ": invalid address type");
            }
        }
    
        final void postAccept() {
            connected = true;
            created = true;
            bound = true;
        }
    
        void setCreated() {
            created = true;
        }
    
        void setBound() {
            bound = true;
        }
    
        void setConnected() {
            connected = true;
        }
    
        public InetAddress getInetAddress() {
            if (!isConnected())
                return null;
            try {
                return getImpl().getInetAddress();
            } catch (SocketException e) {
            }
            return null;
        }
    
        public InetAddress getLocalAddress() {
            // This is for backward compatibility
            if (!isBound())
                return InetAddress.anyLocalAddress();
            InetAddress in = null;
            try {
                in = (InetAddress) getImpl().getOption(SocketOptions.SO_BINDADDR);
    
                if (!NetUtil.doRevealLocalAddress()) {
                    SecurityManager sm = System.getSecurityManager();
                    if (sm != null)
                        sm.checkConnect(in.getHostAddress(), -1);
                }
                if (in.isAnyLocalAddress()) {
                    in = InetAddress.anyLocalAddress();
                }
            } catch (SecurityException e) {
                in = InetAddress.getLoopbackAddress();
            } catch (Exception e) {
                in = InetAddress.anyLocalAddress(); // "0.0.0.0"
            }
            return in;
        }
    
        public int getPort() {
            if (!isConnected())
                return 0;
            try {
                return getImpl().getPort();
            } catch (SocketException e) {
                // Shouldn't happen as we're connected
            }
            return -1;
        }
    
        public int getLocalPort() {
            if (!isBound())
                return -1;
            try {
                return getImpl().getLocalPort();
            } catch(SocketException e) {
                // shouldn't happen as we're bound
            }
            return -1;
        }
    
        public SocketAddress getRemoteSocketAddress() {
            if (!isConnected())
                return null;
            return new InetSocketAddress(getInetAddress(), getPort());
        }
    
        public SocketAddress getLocalSocketAddress() {
            if (!isBound())
                return null;
            return new InetSocketAddress(getLocalAddress(), getLocalPort());
        }
    
        public SocketChannel getChannel() {
            return null;
        }
    
        public InputStream getInputStream() throws IOException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            if (!isConnected())
                throw new SocketException("Socket is not connected");
            if (isInputShutdown())
                throw new SocketException("Socket input is shutdown");
            final Socket s = this;
            InputStream is = null;
            try {
                is = AccessController.doPrivileged(
                    new PrivilegedExceptionAction<InputStream>() {
                        public InputStream run() throws IOException {
                            return impl.getInputStream();
                        }
                    });
            } catch (java.security.PrivilegedActionException e) {
                throw (IOException) e.getException();
            }
            return is;
        }
    
        public OutputStream getOutputStream() throws IOException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            if (!isConnected())
                throw new SocketException("Socket is not connected");
            if (isOutputShutdown())
                throw new SocketException("Socket output is shutdown");
            final Socket s = this;
            OutputStream os = null;
            try {
                os = AccessController.doPrivileged(
                    new PrivilegedExceptionAction<OutputStream>() {
                        public OutputStream run() throws IOException {
                            return impl.getOutputStream();
                        }
                    });
            } catch (java.security.PrivilegedActionException e) {
                throw (IOException) e.getException();
            }
            return os;
        }
    
        public void setTcpNoDelay(boolean on) throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            getImpl().setOption(SocketOptions.TCP_NODELAY, Boolean.valueOf(on));
        }
    
        public boolean getTcpNoDelay() throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            return ((Boolean) getImpl().getOption(SocketOptions.TCP_NODELAY)).booleanValue();
        }
    
        public void setSoLinger(boolean on, int linger) throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            if (!on) {
                getImpl().setOption(SocketOptions.SO_LINGER, new Boolean(on));
            } else {
                if (linger < 0) {
                    throw new IllegalArgumentException("invalid value for SO_LINGER");
                }
                if (linger > 65535)
                    linger = 65535;
                getImpl().setOption(SocketOptions.SO_LINGER, new Integer(linger));
            }
        }
    
        public int getSoLinger() throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            Object o = getImpl().getOption(SocketOptions.SO_LINGER);
            if (o instanceof Integer) {
                return ((Integer) o).intValue();
            } else {
                return -1;
            }
        }
    
        public void sendUrgentData (int data) throws IOException  {
            if (!getImpl().supportsUrgentData ()) {
                throw new SocketException ("Urgent data not supported");
            }
            getImpl().sendUrgentData (data);
        }
    
        public void setOOBInline(boolean on) throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            getImpl().setOption(SocketOptions.SO_OOBINLINE, Boolean.valueOf(on));
        }
    
        public boolean getOOBInline() throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            return ((Boolean) getImpl().getOption(SocketOptions.SO_OOBINLINE)).booleanValue();
        }
    
        public synchronized void setSoTimeout(int timeout) throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            if (timeout < 0)
              throw new IllegalArgumentException("timeout can't be negative");
    
            getImpl().setOption(SocketOptions.SO_TIMEOUT, new Integer(timeout));
        }
    
        public synchronized int getSoTimeout() throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            Object o = getImpl().getOption(SocketOptions.SO_TIMEOUT);
            /* extra type safety */
            if (o instanceof Integer) {
                return ((Integer) o).intValue();
            } else {
                return 0;
            }
        }
    
        public synchronized void setSendBufferSize(int size)
        throws SocketException{
            if (!(size > 0)) {
                throw new IllegalArgumentException("negative send size");
            }
            if (isClosed())
                throw new SocketException("Socket is closed");
            getImpl().setOption(SocketOptions.SO_SNDBUF, new Integer(size));
        }
    
        public synchronized int getSendBufferSize() throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            int result = 0;
            Object o = getImpl().getOption(SocketOptions.SO_SNDBUF);
            if (o instanceof Integer) {
                result = ((Integer)o).intValue();
            }
            return result;
        }
    
        public synchronized void setReceiveBufferSize(int size)
        throws SocketException{
            if (size <= 0) {
                throw new IllegalArgumentException("invalid receive size");
            }
            if (isClosed())
                throw new SocketException("Socket is closed");
            getImpl().setOption(SocketOptions.SO_RCVBUF, new Integer(size));
        }
    
        public synchronized int getReceiveBufferSize()
        throws SocketException{
            if (isClosed())
                throw new SocketException("Socket is closed");
            int result = 0;
            Object o = getImpl().getOption(SocketOptions.SO_RCVBUF);
            if (o instanceof Integer) {
                result = ((Integer)o).intValue();
            }
            return result;
        }
    
        public void setKeepAlive(boolean on) throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            getImpl().setOption(SocketOptions.SO_KEEPALIVE, Boolean.valueOf(on));
        }
    
        public boolean getKeepAlive() throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            return ((Boolean) getImpl().getOption(SocketOptions.SO_KEEPALIVE)).booleanValue();
        }
    
        public void setTrafficClass(int tc) throws SocketException {
            if (tc < 0 || tc > 255)
                throw new IllegalArgumentException("tc is not in range 0 -- 255");
    
            if (isClosed())
                throw new SocketException("Socket is closed");
            getImpl().setOption(SocketOptions.IP_TOS, new Integer(tc));
        }
    
        public int getTrafficClass() throws SocketException {
            return ((Integer) (getImpl().getOption(SocketOptions.IP_TOS))).intValue();
        }
    
        public void setReuseAddress(boolean on) throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            getImpl().setOption(SocketOptions.SO_REUSEADDR, Boolean.valueOf(on));
        }
    
        public boolean getReuseAddress() throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            return ((Boolean) (getImpl().getOption(SocketOptions.SO_REUSEADDR))).booleanValue();
        }
    
        public synchronized void close() throws IOException {
            synchronized(closeLock) {
                if (isClosed())
                    return;
                if (created)
                    impl.close();
                closed = true;
            }
        }
    
        public void shutdownInput() throws IOException
        {
            if (isClosed())
                throw new SocketException("Socket is closed");
            if (!isConnected())
                throw new SocketException("Socket is not connected");
            if (isInputShutdown())
                throw new SocketException("Socket input is already shutdown");
            getImpl().shutdownInput();
            shutIn = true;
        }
    
        public void shutdownOutput() throws IOException
        {
            if (isClosed())
                throw new SocketException("Socket is closed");
            if (!isConnected())
                throw new SocketException("Socket is not connected");
            if (isOutputShutdown())
                throw new SocketException("Socket output is already shutdown");
            getImpl().shutdownOutput();
            shutOut = true;
        }
    
        public String toString() {
            try {
                if (isConnected())
                    return "Socket[addr=" + getImpl().getInetAddress() +
                        ",port=" + getImpl().getPort() +
                        ",localport=" + getImpl().getLocalPort() + "]";
            } catch (SocketException e) {
            }
            return "Socket[unconnected]";
        }
    
        public boolean isConnected() {
            // Before 1.3 Sockets were always connected during creation
            return connected || oldImpl;
        }
    
        public boolean isBound() {
            // Before 1.3 Sockets were always bound during creation
            return bound || oldImpl;
        }
    
        public boolean isClosed() {
            synchronized(closeLock) {
                return closed;
            }
        }
    
        public boolean isInputShutdown() {
            return shutIn;
        }
    
        public boolean isOutputShutdown() {
            return shutOut;
        }
    
        private static SocketImplFactory factory = null;
    
        public static synchronized void setSocketImplFactory(SocketImplFactory fac)
            throws IOException
        {
            if (factory != null) {
                throw new SocketException("factory already defined");
            }
            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                security.checkSetFactory();
            }
            factory = fac;
        }
    
        public void setPerformancePreferences(int connectionTime,
                                              int latency,
                                              int bandwidth)
        {
            /* Not implemented yet */
        }
    }
    


    展开全文
  • Android Socket源码实现与PC通讯

    千次下载 热门讨论 2012-07-15 08:54:43
    Android与PC通讯Socket编程代码,Java源码,仅供参考。源代码内分Socket_Client、Socket_Server两个文件夹。
  • vc++ Socket源码

    热门讨论 2007-09-30 08:26:48
    包含服务端和客户端两个部分。
  • HpSocket源码使用

    2020-07-02 10:21:40
    说明:去除了不必要的文件,已建好tcpserver,tcpclient,无连接的udpServer和udpclient类,可直接复制到项目中,然后在回调函数里面添加自己的处理函数即可,方便开发。详细使用说明见压缩包里的说明。...

    说明:去除了不必要的文件,已建好tcpserver,tcpclient,无连接的udpServer和udpclient类,可直接复制到项目中,然后在回调函数里面添加自己的处理函数即可,方便开发。详细使用说明见压缩包里的说明。

    https://download.csdn.net/download/qq_36626674/12569126

    展开全文
  • 参考了:Skynet服务器框架(六) Socket服务源码剖析和应用(linshuhe1的专栏)以及Skynet 源码学习 -- Socket Server 和 Skynet_socket(cchd0001的专栏) 用了Skynet下的Socket接口后,越发想看看它的底层实现。...

    参考了:Skynet服务器框架(六) Socket服务源码剖析和应用(linshuhe1的专栏)以及Skynet 源码学习 -- Socket Server 和 Skynet_socket(cchd0001的专栏)

    用了Skynet下的Socket接口后,越发想看看它的底层实现。和我之前想的一样,Skynet底层的网络并发在Linux下使用的正是 epoll。




    EPOLL封装层:


    ./skynet-src/socket_poll.h 给了我答案:

    #ifndef socket_poll_h
    #define socket_poll_h
    
    #include <stdbool.h>
    
    typedef int poll_fd;
    
    struct event {
    	void * s;
    	bool read;
    	bool write;
    };
    
    static bool sp_invalid(poll_fd fd);
    static poll_fd sp_create();
    static void sp_release(poll_fd fd);
    static int sp_add(poll_fd fd, int sock, void *ud);
    static void sp_del(poll_fd fd, int sock);
    static void sp_write(poll_fd, int sock, void *ud, bool enable);
    static int sp_wait(poll_fd, struct event *e, int max);
    static void sp_nonblocking(int sock);
    
    #ifdef __linux__
    #include "socket_epoll.h"
    #endif
    
    #if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
    #include "socket_kqueue.h"
    #endif
    
    #endif

    可以发现Skynet在Linux下使用了 epoll 来管理网络并发,在FreeBSD等平台下使用了 kqueue。该头文件定义了一个结构体 event ,后面可以知道,该结构体就是对 epoll 下的 epoll_event 做了简易封装,抛弃了 epoll_event 下EPOLLPRI、EPOLLERR等不常用事件,仅仅保留了EPOLLIN(读) 、EPOLLOUT(写)两个事件,分别用 read 和 write 两个 bool 值来简单标记。以上的函数实现在 socket_epoll.h 和 socket_kqueue.h  里。



    总体来说,Skynet在 epoll 的基础上封装了五层。本文先介绍最底下两层,下一篇介绍上三层。



    首先是 ./skynet-src/socket_epoll.h,这一层是对 epoll 的简单封装。

    // 用于判断产生的 epoll fd 是否有效
    static bool 
    sp_invalid(int efd) {
    	return efd == -1;
    }
    
    // 用于产生一个 epoll fd,1024是用来建议内核监听的数目,自从 linux 2.6.8 之后,该参数是被忽略的,即可以填大于0的任意值。
    static int
    sp_create() {
    	return epoll_create(1024);
    }
    
    // 释放 epoll fd
    static void
    sp_release(int efd) {
    	close(efd);
    }
    
    
    /*
     * 为 epoll 添加一个监听的文件描述符,仅监控读事件
     * fd    : sp_create() 返回值
     * sock  : 待监听的文件描述符
     * ud    : 自己使用的指针地址
     *       : 返回0表示添加成功, -1表示失败
     */
    static int 
    sp_add(int efd, int sock, void *ud) {
    	struct epoll_event ev;
    	ev.events = EPOLLIN;
    	ev.data.ptr = ud;
    	if (epoll_ctl(efd, EPOLL_CTL_ADD, sock, &ev) == -1) {
    		return 1;
    	}
    	return 0;
    }
    
    /*
     * 删除 epoll 中监听的 fd
     * fd    : sp_create()创建的fd
     * sock  : 待删除的fd
     */
    static void 
    sp_del(int efd, int sock) {
    	epoll_ctl(efd, EPOLL_CTL_DEL, sock , NULL);
    }
    
    /*
     * 修改 epoll 中已有 fd 的监听事件
     * efd   : epoll fd
     * sock  : 待修改的fd
     * ud    : 用户自定义数据指针
     * enable: true表示开启写监听, false表示还是读监听
     */
    static void 
    sp_write(int efd, int sock, void *ud, bool enable) {
    	struct epoll_event ev;
    	ev.events = EPOLLIN | (enable ? EPOLLOUT : 0);
    	ev.data.ptr = ud;
    	epoll_ctl(efd, EPOLL_CTL_MOD, sock, &ev);
    }
    
    /*
     * 轮询fd事件
     * efd   : sp_create()创建的fd
     * e     : 一段struct event内存的首地址
     * max   : e内存能够使用的最大值
     *       : 返回监听到事件的fd数量,write与read分别对应写和读事件flag,值为true时表示该事件发生
     */
    static int 
    sp_wait(int efd, struct event *e, int max) {
    	struct epoll_event ev[max];
    	int n = epoll_wait(efd , ev, max, -1);
    	int i;
    	for (i=0;i<n;i++) {
    		e[i].s = ev[i].data.ptr;
    		unsigned flag = ev[i].events;
    		e[i].write = (flag & EPOLLOUT) != 0;
    		e[i].read = (flag & EPOLLIN) != 0;
    	}
    
    	return n;
    }
    
    /*
     * 将fd设置为非阻塞
     */
    static void
    sp_nonblocking(int fd) {
    	int flag = fcntl(fd, F_GETFL, 0);
    	if ( -1 == flag ) {
    		return;
    	}
    
    	fcntl(fd, F_SETFL, flag | O_NONBLOCK);
    }


    接着是 ./skynet-src/socket_server.c 

    这一层对上一层的封装较为复杂。




    socket_server 封装:


    先看几个重要的结构体:

    struct write_buffer {
    	struct write_buffer * next;
    	void *buffer;
    	char *ptr;
    	int sz;
    	bool userobject;
    	uint8_t udp_address[UDP_ADDRESS_SIZE];
    };
    
    #define SIZEOF_TCPBUFFER (offsetof(struct write_buffer, udp_address[0]))
    #define SIZEOF_UDPBUFFER (sizeof(struct write_buffer))
    
    //写缓冲队列
    struct wb_list {
        struct write_buffer * head; //写缓冲区的头指针
        struct write_buffer * tail; //写缓冲区的尾指针
    };
    
    struct socket {
        uintptr_t opaque;   //所属服务在skynet中对应的handle
        struct wb_list high;//高优先级写队列
        struct wb_list low; //低优先级写队列
        int64_t wb_size;    //写缓存尚未发送的数据大小
        int fd;             
        int id;             //用于索引socket_server里的slot数组
        uint16_t protocol;  //使用的协议类型(TCP/UDP)
        uint16_t type;      //scoket的类型或状态(读、写、监听等)
        union {
            int size;       //读缓存预估需要的大小
            uint8_t udp_address[UDP_ADDRESS_SIZE];
        } p;
    };
    
    struct socket_server {
    	int recvctrl_fd;			// pipe读端
    	int sendctrl_fd;			// pipe写端
    	int checkctrl;				
    	poll_fd event_fd;			// epoll/kqueue的fd
    	int alloc_id;				
    	int event_n;				// epoll_wait 返回的事件数
    	int event_index;			// 当前处理的事件序号
    	struct socket_object_interface soi;
    	struct event ev[MAX_EVENT];		// epoll_wait 返回的事件集合
    	struct socket slot[MAX_SOCKET];		// 每个socket_server可以包含多个socket,slot存储这些socket
    	char buffer[MAX_INFO];
    	uint8_t udpbuffer[MAX_UDP_PACKAGE];
    	fd_set rfds;				// select监测的fd集
    };
    
    struct request_open {
    	int id;					// 用于在socket_server的slot找到对应的socket
    	int port;
    	uintptr_t opaque;
    	char host[1];
    };
    
    
    struct request_send {
    	int id;
    	int sz;
    	char * buffer;
    };
    
    
    struct request_send_udp {
    	struct request_send send;
    	uint8_t address[UDP_ADDRESS_SIZE];
    };
    
    
    struct request_setudp {
    	int id;
    	uint8_t address[UDP_ADDRESS_SIZE];
    };
    
    
    struct request_close {
    	int id;
    	uintptr_t opaque;
    };
    
    
    struct request_listen {
    	int id;
    	int fd;
    	uintptr_t opaque;
    	char host[1];
    };
    
    
    struct request_bind {
    	int id;
    	int fd;
    	uintptr_t opaque;
    };
    
    
    struct request_start {
    	int id;
    	uintptr_t opaque;
    };
    
    
    struct request_setopt {
    	int id;
    	int what;
    	int value;
    };
    
    
    struct request_udp {
    	int id;
    	int fd;
    	int family;
    	uintptr_t opaque;
    };
    
    
    /*
    	The first byte is TYPE
    
    
    	S Start socket
    	B Bind socket
    	L Listen socket
    	K Close socket
    	O Connect to (Open)
    	X Exit
    	D Send package (high)
    	P Send package (low)
    	A Send UDP package
    	T Set opt
    	U Create UDP socket
    	C set udp address
     */
    
    
    struct request_package {
    	uint8_t header[8];	// 6 bytes dummy
    	union {
    		char buffer[256];
    		struct request_open open;
    		struct request_send send;
    		struct request_send_udp send_udp;
    		struct request_close close;
    		struct request_listen listen;
    		struct request_bind bind;
    		struct request_start start;
    		struct request_setopt setopt;
    		struct request_udp udp;
    		struct request_setudp set_udp;
    	} u;
    	uint8_t dummy[256];
    };
    
    struct socket_message {
    	int id;
    	uintptr_t opaque;	// 在skynet中对应一个Actor实体的handle句柄
    	int ud;			// 对于accept连接来说, ud是新连接的id;对于数据(data)来说, ud是数据的大小
    	char * data;
    };




    此外,还有几个宏:

    // 宏定义socket_server_poll()返回的socket消息类型
    #define SOCKET_DATA 0      //数据data到来
    #define SOCKET_CLOSE 1     //关闭连接
    #define SOCKET_OPEN 2      //多处用到,参见代码
    #define SOCKET_ACCEPT 3    //被动连接建立
    #define SOCKET_ERROR 4     //错误
    #define SOCKET_EXIT 5      //退出socket
    #define SOCKET_UDP 6       //udp通信
    
    // socket状态
    #define SOCKET_TYPE_INVALID 0		// 无效的套接字
    #define SOCKET_TYPE_RESERVE 1		// 预留,即将被使用
    #define SOCKET_TYPE_PLISTEN 2		// 监听套接字,尚未加入 epoll 管理
    #define SOCKET_TYPE_LISTEN 3		// 监听套接字,已加入 epoll 管理
    #define SOCKET_TYPE_CONNECTING 4	// 尝试连接中的套接字
    #define SOCKET_TYPE_CONNECTED 5		// 已连接的套接字(主动或被动)
    #define SOCKET_TYPE_HALFCLOSE 6		// 上层已发起关闭套接字请求,但发送缓冲区尚未发送完毕,未调用close
    #define SOCKET_TYPE_PACCEPT 7		// accept()后的套接字,但尚未加入 epoll 管理
    #define SOCKET_TYPE_BIND 8		// 已绑定其他类型描述符,如 stdin, stdout



    先来看该层的初始化函数:

    struct socket_server *
    socket_server_create() {
    	int i;
    	int fd[2];			
    	poll_fd efd = sp_create();	// 创建一个监听 epoll,非常重要!
    	if (sp_invalid(efd)) {
    		fprintf(stderr, "socket-server: create event pool failed.\n");
    		return NULL;
    	}
    	if (pipe(fd)) {			// 创建 pipe 
    		sp_release(efd);
    		fprintf(stderr, "socket-server: create socket pair failed.\n");
    		return NULL;
    	}
    	if (sp_add(efd, fd[0], NULL)) {	// 将 pipe 的读端放入 epoll 中监听,注意 pipe 消息是没有 socket* 变量的,为NULL
    		// add recvctrl_fd to event poll
    		fprintf(stderr, "socket-server: can't add server fd to event pool.\n");
    		close(fd[0]);
    		close(fd[1]);
    		sp_release(efd);
    		return NULL;
    	}
    
    	struct socket_server *ss = MALLOC(sizeof(*ss));	// 创建 socket_server 实例,然后一系列初始化
    	ss->event_fd = efd;
    	ss->recvctrl_fd = fd[0];
    	ss->sendctrl_fd = fd[1];
    	ss->checkctrl = 1;
    
    	for (i=0;i<MAX_SOCKET;i++) {
    		struct socket *s = &ss->slot[i];
    		s->type = SOCKET_TYPE_INVALID;		// 所有socket的类型初始化为SOCKET_TYPE_INVALID
    		clear_wb_list(&s->high);
    		clear_wb_list(&s->low);
    	}
    	ss->alloc_id = 0;
    	ss->event_n = 0;
    	ss->event_index = 0;
    	memset(&ss->soi, 0, sizeof(ss->soi));
    	FD_ZERO(&ss->rfds);
    	assert(ss->recvctrl_fd < FD_SETSIZE);
    
    	return ss;
    }




    接着是该层的核心代码,该函数作为中枢,掌管着内外数据流动。初始化的 epoll 和 pipe 都在该函数中扮演重要角色。以该代码为核心,我们可以画出这样一幅图:


    参照下面的源码,我们知道 socket_server_poll 作为任务处理分发器,处理着一个 socket_server 里所有的事件。除了一些控制指令由 pipe 传输,还有一些其他事件由 epoll 监听。不论是控制指令还是 epoll 监听到的其他读写事件,都由 socket_server_poll 分发给相应的函数去处理。这里的其他描述符事件主要是指socket连接的事件,比如TCP异步连接成功触发的事件,socket的读写事件等等。

    // return type
    int
    socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
    	for (;;) {
    		if (ss->checkctrl) {						// 每次处理完epoll的事件后会设置checkctrl=1
    			if (has_cmd(ss)) {					// 检测管道读端是否可读
    				//printf("has_cmd = 1\n");
    				int type = ctrl_cmd(ss, result);		// 处理控制命令
    				if (type != -1) {
    					clear_closed_event(ss, result, type);
    					return type;
    				} else
    					continue;
    			} else {
    				//printf("has_cmd = 0\n");
    				ss->checkctrl = 0;					// pipe 里没有数据,置为0,此时如果有socket连接到来,接着可从sp_wait()获取事件,
    			}								// 当所有事件处理完毕后,会重新置为1,然后再次调用非阻塞select接受pipe事件。处理
    											// 一批事件的过程是连续的,不会被pipe事件打断,直到处理完。
    		}
    		printf("event: %d %d\n", ss->event_index, ss->event_n);
    		if (ss->event_index == ss->event_n) {					// 相等说明事件处理完毕,可以调用 sp_wait() 接收新事件
    			ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);		// epoll 监听很多东西,如 pipe 读端, 标准输出1,listen_fd, connect_fd 等等
    			printf("now we get: %d\n", ss->event_n);
    			ss->checkctrl = 1;
    			if (more) {
    				*more = 0;
    			}
    			ss->event_index = 0;
    			if (ss->event_n <= 0) {
    				ss->event_n = 0;
    				return -1;
    			}
    		}
    		struct event *e = &ss->ev[ss->event_index++];
    		struct socket *s = e->s;
    		if (s == NULL) {					// s = NULL 说明是 pipe 消息,直接跳过。 待本批所有事件处理完毕后再交由 has_cmd 和 ctrl_cmd 处理
    			// dispatch pipe message at beginning
    			continue;
    		}
    		printf("get fd: %d, opa: %d, type: %d, read: %d, write: %d\n", s->fd, s->opaque, s->type, e->read, e->write);
    		switch (s->type) {					// 处理 epoll 事件
    		case SOCKET_TYPE_CONNECTING:				// 由于使用了异步tcp连接,连接成功后,客户端connect_fd可写
    			return report_connect(ss, s, result);
    		case SOCKET_TYPE_LISTEN:
    			if (report_accept(ss, s, result)) {		// 由于使用了异步tcp连接,连接成功后,服务器listen_fd可读。
    				return SOCKET_ACCEPT;
    			}
    			break;
    		case SOCKET_TYPE_INVALID:
    			fprintf(stderr, "socket-server: invalid socket\n");
    			break;
    		default:
    			if (e->read) {
    				int type;
    				if (s->protocol == PROTOCOL_TCP) {
    					type = forward_message_tcp(ss, s, result);
    				} else {
    					type = forward_message_udp(ss, s, result);
    					if (type == SOCKET_UDP) {
    						// try read again
    						--ss->event_index;
    						return SOCKET_UDP;
    					}
    				}
    				if (e->write) {
    					// Try to dispatch write message next step if write flag set.
    					e->read = false;
    					--ss->event_index;
    				}
    				if (type == -1)
    					break;
    				clear_closed_event(ss, result, type);
    				return type;
    			}
    			if (e->write) {
    				int type = send_buffer(ss, s, result);
    				if (type == -1)
    					break;
    				clear_closed_event(ss, result, type);
    				return type;
    			}
    			break;
    		}
    	}
    }




    除此之外,还有两个函数协助:

    /*
     * 该函数使用非阻塞 select 来监测 pipe 读端,当 pipe 中写入数据后,pipe 将变为可读,返回 1 表明可读,0 为不可读。
     */
    static int
    has_cmd(struct socket_server *ss) {
    	struct timeval tv = {0,0};
    	int retval;
    
    	FD_SET(ss->recvctrl_fd, &ss->rfds);
    
    	retval = select(ss->recvctrl_fd+1, &ss->rfds, NULL, NULL, &tv);
    	if (retval == 1) {
    		return 1;
    	}
    	return 0;
    }
    
    /*
     * 该函数从 pipe 中读取数据,首先读2字节的头,取出数据类型和大小后,读取相应大小的数据后按消息类型发给相应的处理函数。result由socket_server_poll
     * 传入。依据不同的消息类型,交由 start_socket、bind_socket 等函数填写。
     */
    // return type
    static int
    ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
    	int fd = ss->recvctrl_fd;
    	// the length of message is one byte, so 256+8 buffer size is enough.
    	uint8_t buffer[256];
    	uint8_t header[2];
    	block_readpipe(fd, header, sizeof(header));
    	int type = header[0];
    	int len = header[1];
    	block_readpipe(fd, buffer, len);
    	// ctrl command only exist in local fd, so don't worry about endian.
    	switch (type) {
    	case 'S':
    		return start_socket(ss,(struct request_start *)buffer, result);
    	case 'B':
    		return bind_socket(ss,(struct request_bind *)buffer, result);
    	case 'L':
    		return listen_socket(ss,(struct request_listen *)buffer, result);
    	case 'K':
    		return close_socket(ss,(struct request_close *)buffer, result);
    	case 'O':
    		return open_socket(ss, (struct request_open *)buffer, result);
    	case 'X':
    		result->opaque = 0;
    		result->id = 0;
    		result->ud = 0;
    		result->data = NULL;
    		return SOCKET_EXIT;
    	case 'D':
    		return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_HIGH, NULL);
    	case 'P':
    		return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_LOW, NULL);
    	case 'A': {
    		struct request_send_udp * rsu = (struct request_send_udp *)buffer;
    		return send_socket(ss, &rsu->send, result, PRIORITY_HIGH, rsu->address);
    	}
    	case 'C':
    		return set_udp_address(ss, (struct request_setudp *)buffer, result);
    	case 'T':
    		setopt_socket(ss, (struct request_setopt *)buffer);
    		return -1;
    	case 'U':
    		add_udp_socket(ss, (struct request_udp *)buffer);
    		return -1;
    	default:
    		fprintf(stderr, "socket-server: Unknown ctrl %c.\n",type);
    		return -1;
    	};
    
    	return -1;
    }



    该层的核心接口:

    假如要在C语言中直接使用socket_server,基本上是用这些封装好的接口基本上也就足够了:

    // 由于所有的接口实现都写在头文件里面,全部声明为static。
    
    //创建一个socket_server
    struct socket_server * socket_server_create();
    
    //释放一个socket_server的资源占用
    void socket_server_release(struct socket_server *);
    
    /*
    * 封装了的epoll或kqueue,用来获取socket的网络事件或消息
    * (通常放在循环体中持续监听网络消息)
    * socket_server : socket_server_create() 返回的socket_server实例
    * result        : 结果数据存放的地址指针
    *               : 返回消息类型,对应于宏定义中的SOCKET_DATA的类型
    */
    int socket_server_poll(struct socket_server *, struct socket_message *result, int *more);
    
    //退出socket_server
    void socket_server_exit(struct socket_server *);
    
    /*  
    * 关闭socket_server
    * socket_server : socket_server_create() 返回的socket_server实例
    * opaque        : skynet中服务handle的句柄
    * id            : socket_server_listen() 返回的id
    */
    void socket_server_close(struct socket_server *, uintptr_t opaque, int id);
    
    /*  
    * 停止socket
    * socket_server : socket_server_create() 返回的socket_server实例
    * opaque        : skynet中服务handle的句柄
    * id            : socket句柄
    */
    void socket_server_shutdown(struct socket_server *, uintptr_t opaque, int id);
    
    /*  
    * 将该socket放入epoll中监听(启动之前要先通过socket_server_listen()开启TCP的socket(),bind(),listen()步骤)
    * 或将服务器 report_accept() 后的socket放入epoll中监听。总之,对于socket的fd,想要收发数据,都得先调用 socket_server_start()
    * socket_server : socket_server_create() 返回的socket_server实例
    * opaque        : skynet中服务handle的句柄
    * id            : socket_server_listen() 返回的id
    */
    void socket_server_start(struct socket_server *, uintptr_t opaque, int id);
    
    /*
    * 发送数据
    * socket_server : socket_server_create() 返回的socket_server实例
    * buffer        : 要发送的数据
    * sz            : 数据的大小
    * id            : socket_server_listen() 返回的id
    *               : 假如返回-1表示error
    */
    int64_t socket_server_send(struct socket_server *, int id, const void * buffer, int sz);
    
    void socket_server_send_lowpriority(struct socket_server *, int id, const void * buffer, int sz);
    
    /*  
    * 开启TCP监听,执行了socket(),bind(),listen() 步骤 
    * socket_server : socket_server_create() 返回的socket_server实例
    * opaque        : skynet中服务handle的句柄
    * addr          : ip地址
    * port          : 端口号
    *               : 返回一个id作为操作此端口监听的句柄        
    */
    int socket_server_listen(struct socket_server *, uintptr_t opaque, const char * addr, int port, int backlog);
    
    /*  
    * 以非阻塞的方式连接服务器
    * socket_server : socket_server_create() 返回的socket_server实例
    * opaque        : skynet中服务handle的句柄
    * addr          : ip地址
    * port          : 端口号
    *               : 返回一个id作为操作此端口监听的句柄        
    */
    int socket_server_connect(struct socket_server *, uintptr_t opaque, const char * addr, int port);
    
    /*  
    * 并不对应bind函数,而是将stdin、stout这类IO加入到epoll中管理
    * socket_server : socket_server_create() 返回的socket_server实例
    * opaque        : skynet中服务handle的句柄
    * fd            : socket的文本描述       
    */
    int socket_server_bind(struct socket_server *, uintptr_t opaque, int fd);
    
    // for tcp
    void socket_server_nodelay(struct socket_server *, int id);
    
    /*
    * 创建一个udp socket监听,并绑定skynet服务的handle,udp不需要像tcp那样要调用socket_server_start后才能接收消息
    * 如果port != 0, 绑定socket,如果addr == NULL, 绑定 ipv4 0.0.0.0。如果想要使用ipv6,地址使用“::”,端口中port设为0
    */
    int socket_server_udp(struct socket_server *, uintptr_t opaque, const char * addr, int port);
    
    // 设置默认的端口地址,返回0表示成功
    int socket_server_udp_connect(struct socket_server *, int id, const char * addr, int port);
    
    /*
    * 假如 socket_udp_address 是空的, 使用最后最后调用 socket_server_udp_connect 时传入的address代替
    * 也可以使用 socket_server_send 来发送udp数据
    */
    int64_t socket_server_udp_send(struct socket_server *, int id, const struct socket_udp_address *, const void *buffer, int sz);
    
    // 获取传入消息的IP地址 address, 传入的 socket_message * 必须是SOCKET_UDP类型
    const struct socket_udp_address * socket_server_udp_address(struct socket_server *, struct socket_message *, int *addrsz);
    
    // if you send package sz == -1, use soi.
    void socket_server_userobject(struct socket_server *, struct socket_object_interface *soi);

    以上函数有的会调用 reserve_id() 来获取一个 socket_server 中的 slot[] 中的一个 socket,该 socket 会存储很多相关信息。然而 reserve_id() 仅仅是初始化 socket 结构体,尚有很多其他变量并未被赋值。

    这些函数并不是真正的执行者,它们会将任务消息写入 pipe,然后由 socket_server_poll() 读取 pipe 再将任务消息交给真正的执行者。与前面 reserve_id() 对应的,在这些真正的执行者中有的会调用new_fd() 函数,进一步对之前 reserve_id() 后的 socket 进一步赋值,并按需将 fd 加入 epoll 的监管下。正如之前分析的 socket_server_poll() 函数。这些消息以字符区分:

    常用的指令有: 

    S Start socket 启动一个Socket
    B Bind socket 绑定一个Socket
    L Listen socket 监听一个Socket
    K Close socket 关闭一个Socket
    O Connect to (Open) 连接一个Socket
    X Exit 退出一个Socket
    D Send package (high) 发送数据
    P Send package (low) (不常用,也用于发送数据)
    A Send UDP package
    T Set opt
    U Create UDP socket
    C set udp address


    以上函数的执行伴随着自定义 socket 结构体里 type 改变,关系剖析如下:




    下面尝试用用这些API:

    以云风socket-server为例,里面有个test.c,为了便于分析,我们稍作修改:

    #include "socket_server.h"
    
    #include <sys/socket.h>
    #include <pthread.h>
    #include <sys/select.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <signal.h>
    
    int c;
    static void *
    _poll(void * ud) {
    	struct socket_server *ss = ud;
    	struct socket_message result;
    	for (;;) {
    		int type = socket_server_poll(ss, &result, NULL);
    		// DO NOT use any ctrl command (socket_server_close , etc. ) in this thread.
    		switch (type) {
    		case SOCKET_EXIT:
    			return NULL;
    		case SOCKET_DATA:
    			printf("message(%lu) [id=%d] size=%d data= %s\n",result.opaque,result.id, result.ud, result.data);
    			free(result.data);
    			break;
    		case SOCKET_CLOSE:
    			printf("close(%lu) [id=%d]\n",result.opaque,result.id);
    			break;
    		case SOCKET_OPEN:
    			printf("open(%lu) [id=%d] %s\n",result.opaque,result.id,result.data);
    			break;
    		case SOCKET_ERROR:
    			printf("error(%lu) [id=%d]\n",result.opaque,result.id);
    			break;
    		case SOCKET_ACCEPT:
    			printf("accept(%lu) [id=%d %s] from [%d]\n",result.opaque, result.ud, result.data, result.id);
    			break;
    		}
    	}
    }
    
    static void
    test(struct socket_server *ss) {
    	pthread_t pid;
    	pthread_create(&pid, NULL, _poll, ss);
    
    	/*
    	int c = socket_server_connect(ss,100,"127.0.0.1",80);
    	printf("connecting %d\n",c);
    	*/
    	int l = socket_server_listen(ss,200,"127.0.0.1",8888,32);		// 使用 127.0.0.1:8888 开启TCP监听
    	printf("listening %d\n",l);
    	socket_server_start(ss,201,l);						// 让epoll监听该TCP
    	int b = socket_server_bind(ss,300,1);					// 让epoll监听标准输出
    	printf("binding stdin %d\n",b);
    	int i;
    
    	c = socket_server_connect(ss, 400, "127.0.0.1", 8888);			// 异步连接 127.0.0.1:8888
    	//sleep(2);	
    	char *data = (char *) malloc(sizeof(char) * 20);
    	memcpy(data, "hello world", 20);
    	socket_server_send(ss, c, data, strlen(data));				// 发送数据
    	/*
    	for (i=0;i<100;i++) {
    		socket_server_connect(ss, 400+i, "127.0.0.1", 8888);
    	}
    	
    	socket_server_exit(ss);
    	*/	
    	pthread_join(pid, NULL); 
    }
    
    int
    main() {
    	struct sigaction sa;
    	sa.sa_handler = SIG_IGN;
    	sigaction(SIGPIPE, &sa, 0);
    
    	struct socket_server * ss = socket_server_create();
    	test(ss);
    	socket_server_release(ss);
    
    	return 0;
    }

    编译后运行,发现数据并未被服务器接收,排查半天,发现是 report_accept() 函数并未将 accept() 新创建的 fd 纳入 epoll 监听。这里为了便于分析,我们暂时修改成:将所有新 accept() 得到的 fd 都纳入 epoll 监听。修改仅一处,在 report_accept() 函数中:
    struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);

    修改为:

    struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, true);


    重新编译,这回可以正常接收。输出如下(包含部分自己添加的调试信息o(╯□╰)o):

    checkctrl = 1
    has_cmd = 1
    LLLLLLLLLLLLL
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 0 0
    listening 1
    binding stdin 2
    now we get: 1
    checkctrl = 1
    has_cmd = 1
    SSSSSSSSSSSS
    s->opaque old: 200, new: 201
    oooooooppppppppp3333333
    open(201) [id=1] start
    checkctrl = 1
    has_cmd = 1
    BBBBBBBBBBBB
    oooooooppppppppp222222
    open(300) [id=2] binding
    checkctrl = 1
    has_cmd = 1
    OOOOOOOOOOOOOO
    checkctrl = 1
    has_cmd = 1
    DDDDDDDDDDDDDD
    s->type: 4, id = 3
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 1 1
    now we get: 2
    get fd: 6, opa: 201, type: 3, read: 1, write: 0
    client_fd: 8
    accept(201) [id=4 127.0.0.1:58840] from [1]
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 1 2
    get fd: 7, opa: 400, type: 4, read: 0, write: 1
    Error: 0
    PTR: 1207962912, 0AAAAAAAAAAAAAAAAAAAAA
    oooooooppppppppp5555
    open(400) [id=3] 127.0.0.1
    checkctrl = 0
    event: 2 2
    now we get: 1
    get fd: 7, opa: 400, type: 5, read: 0, write: 1
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 1 1
    now we get: 1
    get fd: 8, opa: 201, type: 7, read: 1, write: 0
    message(201) [id=4] size=11 data= hello world			// 正常接收到数据
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 1 1



    skynet是异步读写,这让用户在调用API时更加容易,不用考虑同步问题,同步问题由skynet内部解决。前面我们接触了异步connect,现在我们简单看看它是如何异步写的。正如 test.c 里的代码,我们在调用 socket_server_send()  时并没有考虑之前的 socket_server_connect() 是否完成。可见异步操作的便捷。socket_server_send()  将 send 消息写入 pipe,然后由 send_socket() 来真正处理。send_socket()代码如下:

    /*
    	When send a package , we can assign the priority : PRIORITY_HIGH or PRIORITY_LOW
    
    	If socket buffer is empty, write to fd directly.
    		If write a part, append the rest part to high list. (Even priority is PRIORITY_LOW)
    	Else append package to high (PRIORITY_HIGH) or low (PRIORITY_LOW) list.
     */
    static int
    send_socket(struct socket_server *ss, struct request_send * request, struct socket_message *result, int priority, const uint8_t *udp_address) {
    	int id = request->id;
    	struct socket * s = &ss->slot[HASH_ID(id)];
    	struct send_object so;
    	send_object_init(ss, &so, request->buffer, request->sz);
    	if (s->type == SOCKET_TYPE_INVALID || s->id != id
    		|| s->type == SOCKET_TYPE_HALFCLOSE
    		|| s->type == SOCKET_TYPE_PACCEPT) {
    		so.free_func(request->buffer);
    		return -1;
    	}
    	assert(s->type != SOCKET_TYPE_PLISTEN && s->type != SOCKET_TYPE_LISTEN);
    	if (send_buffer_empty(s) && s->type == SOCKET_TYPE_CONNECTED) {
    		printf("s->type: %d, id = %d\n", s->type, id);
    		if (s->protocol == PROTOCOL_TCP) {
    			int n = write(s->fd, so.buffer, so.sz);
    			if (n<0) {
    				switch(errno) {
    				case EINTR:
    				case EAGAIN:
    					n = 0;
    					break;
    				default:
    					fprintf(stderr, "socket-server: write to %d (fd=%d) error :%s.\n",id,s->fd,strerror(errno));
    					force_close(ss,s,result);
    					return SOCKET_CLOSE;
    				}
    			}
    			if (n == so.sz) {
    				so.free_func(request->buffer);
    				return -1;
    			}
    			append_sendbuffer(ss, s, request, n);	// add to high priority list, even priority == PRIORITY_LOW
    		} else {
    			// udp
    			if (udp_address == NULL) {
    				udp_address = s->p.udp_address;
    			}
    			union sockaddr_all sa;
    			socklen_t sasz = udp_socket_address(s, udp_address, &sa);
    			int n = sendto(s->fd, so.buffer, so.sz, 0, &sa.s, sasz);
    			if (n != so.sz) {
    				append_sendbuffer_udp(ss,s,priority,request,udp_address);
    			} else {
    				so.free_func(request->buffer);
    				return -1;
    			}
    		}
    		sp_write(ss->event_fd, s->fd, s, true);
    	} else {
    		printf("s->type: %d, id = %d\n", s->type, id);
    		if (s->protocol == PROTOCOL_TCP) {
    			if (priority == PRIORITY_LOW) {
    				append_sendbuffer_low(ss, s, request);
    			} else {
    				append_sendbuffer(ss, s, request, 0);
    			}
    		} else {
    			if (udp_address == NULL) {
    				udp_address = s->p.udp_address;
    			}
    			append_sendbuffer_udp(ss,s,priority,request,udp_address);
    		}
    	}
    	return -1;
    }
    

    send_socket()不经可以发送udp数据,也可发送tcp数据。send_socket() 首先判断当前 socket 的状态,如果连接尚未建立,如出于 CONNECTING,那么将会调用 append 系列函数将数据暂时保存起来,待连接建立后再发送。特别需要格外小心的是一系列sp_write(ss->event_fd, s->fd, s, false) 及 sp_write(ss->event_fd, s->fd, s, true)的用法。对于 epoll 里的 EPOLLOUT 事件,当发送缓冲有空间,可以被写入数据时,该事件会一直被触发(有很大几率一直触发)。sp_write() 函数就是用于管理是否监听 EPOLLOUT 事件的钥匙。比如我们通过源码可以发现,如果在连接建立之前我们就调用了 send 函数,epoll 就会持续监听 EPOLLOUT 事件,直到被暂存的发送数据全部被 write() 成功(由 socket_server_poll 里的 send_buffer 发起),才会调用 sp_write(ss->event_fd, s->fd, s, false) 结束监听。send_buffer() 会检测自定义 socket 结构体里的发送缓存是否全部发送完毕。


    上面的例子中,只实现了客户端向服务器发数据,如何实现双向发数据呢?仔细看 send_socket() 这个函数,发现该函数仅保留对 CONNECTING 和 CONNECTED 两种 socket 类型的处理,过滤掉了其他所有类型socket。那么按之前的写法, 服务器的 socket 在 report_accept() 后处于 PACCEPT 的状态。难道是只有连接发起者才能发数据?想想不对啊。不小心又看了看 start_socket() 函数,明白了。。

    static int
    start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) {
    	int id = request->id;
    	result->id = id;
    	result->opaque = request->opaque;
    	result->ud = 0;
    	result->data = NULL;
    	struct socket *s = &ss->slot[HASH_ID(id)];
    	if (s->type == SOCKET_TYPE_INVALID || s->id !=id) {
    		return SOCKET_ERROR;
    	}
    	if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) {
    		if (sp_add(ss->event_fd, s->fd, s)) {
    			s->type = SOCKET_TYPE_INVALID;
    			return SOCKET_ERROR;
    		}
    		s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN;
    		s->opaque = request->opaque;
    		result->data = "start";
    		return SOCKET_OPEN;
    	} else if (s->type == SOCKET_TYPE_CONNECTED) {
    		s->opaque = request->opaque;
    		result->data = "transfer";
    		return SOCKET_OPEN;
    	}
    	return -1;
    }

     start_socket() 不仅可以让处于 PLISTEN 状态的 socket 变为 LISTEN 状态,还可以让处于 PACCEPT 状态变为 CONNECTED!!同时还能将其 fd 纳入 epoll 的监控下。可见, start_socket() 就是像是使能键一样。关于这一点,我们在之后上层的剖析中会有进一步介绍。由此推想到之前 report_accept() 函数对于 accept() 产生的新 fd,并未将其放在 epoll 的监控下,如果你想使用它进行 socket 通信,还需调用 socket_server_start() 函数来使能。结论:socket_server_listen() 及 report_accept() 新创建的 socket 都需要通过调用 socket_server_start() 来使能,才能收数据。于是,我们将 report_accept() 里的修改回滚为之前的:

    struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);
    

    然后,在收到 SOCKET_ACCEPT 消息后,调用 socket_server_start(),修改如下:

    int c;
    static void *
    _poll(void * ud) {
    	struct socket_server *ss = ud;
    	struct socket_message result;
    	for (;;) {
    		int type = socket_server_poll(ss, &result, NULL);
    		// DO NOT use any ctrl command (socket_server_close , etc. ) in this thread.
    		switch (type) {
    		case SOCKET_EXIT:
    			return NULL;
    		case SOCKET_DATA:
    			printf("message(%lu) [id=%d] size=%d data= %s\n",result.opaque,result.id, result.ud, result.data);
    			free(result.data);
    			char *data = (char *) malloc(sizeof(char) * 20);			// 新增
    			memcpy(data, "hello cxl", 20);						// 新增
    			socket_server_send(ss, result.id, data, strlen(data));			// 新增
    			break;
    		case SOCKET_CLOSE:
    			printf("close(%lu) [id=%d]\n",result.opaque,result.id);
    			break;
    		case SOCKET_OPEN:
    			printf("open(%lu) [id=%d] %s\n",result.opaque,result.id,result.data);
    			break;
    		case SOCKET_ERROR:
    			printf("error(%lu) [id=%d]\n",result.opaque,result.id);
    			break;
    		case SOCKET_ACCEPT:
    			printf("accept(%lu) [id=%d %s] from [%d]\n",result.opaque, result.ud, result.data, result.id);
    			socket_server_start(ss, 600, result.ud);				// 新增
    			break;
    		}
    	}
    }

    于是就能双向通信了。(我这种写法是无穷的回声,根本停不下来。。。)输出如下:

    listening 1
    binding stdin 2
    checkctrl = 1
    has_cmd = 1
    LLLLLLLLLLLLL
    checkctrl = 1
    has_cmd = 1
    SSSSSSSSSSSS
    s->opaque old: 200, new: 201
    oooooooppppppppp3333333
    open(201) [id=1] start
    checkctrl = 1
    has_cmd = 1
    BBBBBBBBBBBB
    error(300) [id=2]
    checkctrl = 1
    has_cmd = 1
    OOOOOOOOOOOOOO
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 0 0
    now we get: 2
    get fd: 6, opa: 201, type: 3, read: 1, write: 0
    client_fd: 8
    accept(201) [id=4 127.0.0.1:56814] from [1]
    checkctrl = 1
    has_cmd = 1
    SSSSSSSSSSSS
    s->opaque old: 201, new: 600
    oooooooppppppppp3333333
    open(600) [id=4] start
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 1 2
    get fd: 7, opa: 400, type: 4, read: 0, write: 1
    Error: 0
    PTR: 0, 0AAAAAAAAAAAAAAAAAAAAA
    oooooooppppppppp5555
    open(400) [id=3] 127.0.0.1
    checkctrl = 0
    event: 2 2
    now we get: 1
    checkctrl = 1
    has_cmd = 1
    DDDDDDDDDDDDDD
    s->type111111111111: 5, id = 3
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 1 1
    now we get: 1
    get fd: 8, opa: 600, type: 5, read: 1, write: 0
    message(600) [id=4] size=11 data= hello world
    checkctrl = 1
    has_cmd = 1
    DDDDDDDDDDDDDD
    s->type111111111111: 5, id = 4
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 1 1
    now we get: 1
    get fd: 7, opa: 400, type: 5, read: 1, write: 0
    message(400) [id=3] size=9 data= hello cxlld
    checkctrl = 1
    has_cmd = 1
    DDDDDDDDDDDDDD
    s->type111111111111: 5, id = 3
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 1 1
    now we get: 1
    get fd: 8, opa: 600, type: 5, read: 1, write: 0
    message(600) [id=4] size=9 data= hello cxlld
    checkctrl = 1
    has_cmd = 1
    DDDDDDDDDDDDDD
    s->type111111111111: 5, id = 4
    checkctrl = 1
    has_cmd = 0
    checkctrl = 0
    event: 1 1
    now we get: 1
    get fd: 7, opa: 400, type: 5, read: 1, write: 0
    message(400) [id=3] size=9 data= hello cxlld
    .
    .
    .



    附件:

    关于socket_server源码解析,还可以参考一下视频:


    展开全文
  • c++ poco StreamSocket 源码剖析

    千次阅读 2017-07-23 23:04:47
    1.概述 “This class provides an interface to a TCP stream socket.”,StreamSocket类是Socket类的子类,用作TCP客户端。本篇系统环境为linux,欢迎交流和指出问题,转载请标明作者和链接地址...
  • 你无须了解如何使用 Socket, 如何维护 Socket 连接和 Socket 如何工作,但是你却可以使用 SuperSocket 很容易的开发出一款 Socket 服务器端软件,例如游戏服务器,GPS 服务器, 工业控制服务和数据采集服务器等等。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 150,305
精华内容 60,122
关键字:

socket源码