SocketAsyncEventArgs(IOCP) 기반 고성능 TCP 서버 구현 (一) - SocketAsyncEventArgs 봉인

19377 단어 C#TCP

최근 한 가지 수요에 부딪힌 것은 수천 대의 장치가 있는데 이 장치들은 모두 운영자의 네트워크를 통해 TCP/IP 프로토콜을 바탕으로 서버에 정보를 보내는 것이다. 그리고 이 장치들은 단방향으로 발송할 뿐이고 서버가 정보를 되돌릴 필요가 없다. 장치의 정보 발송 빈도는 1초에 한 번이다.서버 측이 받아들인 후에 정보를 분석하고 입고합니다.이것은 정상적인 조작이다.그래서 서버 측의 정보 수용 소프트웨어에 대해 비교적 높은 요구를 제기했다. 이렇게 많은 설비, 이렇게 높은 주파수는 서버 측이 소프트웨어를 받아들이는 건장함을 확보하고 붕괴해서는 안 된다.인터넷에서 관련 글을 찾아보니 Socket Async Event Args라는 종류가 상기 내가 원하는 기능을 실현할 수 있다는 것을 발견했다.다른 대신의 문장을 참고한 후에야 비로소 이곳에서 재능을 마구 뽐내지 않으셨으니 양해해 주십시오.

1. 변수와 보조 클래스 정의


Socket Async Event Args는 마이크로소프트에서 제공하는 클래스입니다.https://docs.microsoft.com/zh-cn/dotnet/api/system.net.sockets.socketasynceventargs?redirectedfrom=MSDN&view=netframework- 4.8 홈페이지 보기.그러나 마이크로소프트의 예는 비교적 거칠어서 내가 원하는 목적에 도달하지 못했기 때문에 나는 이곳에서 약간의 개조를 진행했다.우선 클래스를 만듭니다. 여기는 SocketServer라고 합니다.
클래스 Socket Server에서 먼저 변수를 정의하여 이 서버의 수신 소프트웨어가 비교적 많은 클라이언트, 즉 장치를 지원할 수 있도록 합니다.
        private int m_maxConnectNum;    //   
        private int m_revBufferSize;    //   
        BufferManager m_bufferManager; // 
        const int opsToAlloc = 2;
        Socket listenSocket;            // Socket  
        SocketEventPool m_pool;
        int m_clientCount;              //   
        Semaphore m_maxNumberAcceptedClients;
        List m_clients; //   

그 중에서 Buffer Manager라는 종류는 관리에 사용되고 클라이언트가 보낸 정보는 특별히 중요한 것이 아니다. 구체적인 코드 네트워크에서 이미 누군가가 실현했다. 여기서 나는 직접 붙였다.
    // This class creates a single large buffer which can be divided up 
    // and assigned to SocketAsyncEventArgs objects for use with each 
    // socket I/O operation.  
    // This enables bufffers to be easily reused and guards against 
    // fragmenting heap memory.
    // 
    // The operations exposed on the BufferManager class are not thread safe.
    class BufferManager
    {
        int m_numBytes;                 // the total number of bytes controlled by the buffer pool
        byte[] m_buffer;                // the underlying byte array maintained by the Buffer Manager
        Stack m_freeIndexPool;     // 
        int m_currentIndex;
        int m_bufferSize;

        public BufferManager(int totalBytes, int bufferSize)
        {
            m_numBytes = totalBytes;
            m_currentIndex = 0;
            m_bufferSize = bufferSize;
            m_freeIndexPool = new Stack();
        }

        // Allocates buffer space used by the buffer pool
        public void InitBuffer()
        {
            // create one big large buffer and divide that 
            // out to each SocketAsyncEventArg object
            m_buffer = new byte[m_numBytes];
        }

        // Assigns a buffer from the buffer pool to the 
        // specified SocketAsyncEventArgs object
        //
        // true if the buffer was successfully set, else false
        public bool SetBuffer(SocketAsyncEventArgs args)
        {

            if (m_freeIndexPool.Count > 0)
            {
                args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
            }
            else
            {
                if ((m_numBytes - m_bufferSize) < m_currentIndex)
                {
                    return false;
                }
                args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
                m_currentIndex += m_bufferSize;
            }
            return true;
        }

        // Removes the buffer from a SocketAsyncEventArg object.  
        // This frees the buffer back to the buffer pool
        public void FreeBuffer(SocketAsyncEventArgs args)
        {
            m_freeIndexPool.Push(args.Offset);
            args.SetBuffer(null, 0, 0);
        }

    }

SocketEventPool 클래스는 클라이언트를 비동기적으로 관리하는 데 사용되는 코드입니다.
    class SocketEventPool
    {
        Stack m_pool;


        public SocketEventPool(int capacity)
        {
            m_pool = new Stack(capacity);
        }

        public void Push(SocketAsyncEventArgs item)
        {
            if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }
            lock (m_pool)
            {
                m_pool.Push(item);
            }
        }

        // Removes a SocketAsyncEventArgs instance from the pool  
        // and returns the object removed from the pool  
        public SocketAsyncEventArgs Pop()
        {
            lock (m_pool)
            {
                return m_pool.Pop();
            }
        }

        // The number of SocketAsyncEventArgs instances in the pool  
        public int Count
        {
            get { return m_pool.Count; }
        }

        public void Clear()
        {
            m_pool.Clear();
        }
    }

클라이언트가 가져온 정보는 서버 수신단에서 모두 비동기적으로 처리되고 AsyncUserToken 클래스로 클라이언트(즉 장치)를 관리한다. 코드도 이미 실현된 사람이 있다. 아래에 코드를 붙인다.
    class AsyncUserToken
    {
        ///   
        ///  IP   
        ///   
        public IPAddress IPAddress { get; set; }

        ///   
        ///    
        ///   
        public EndPoint Remote { get; set; }

        ///   
        ///  SOKET  
        ///   
        public Socket Socket { get; set; }

        ///   
        ///    
        ///   
        public DateTime ConnectTime { get; set; }

        /////   
        /////    
        /////   
        //public UserInfoModel UserInfo { get; set; }


        ///   
        ///    
        ///   
        public List Buffer { get; set; }


        public AsyncUserToken()
        {
            this.Buffer = new List();
        }
    }

위의 이 두 단락의 코드는 모두 보조류로 그렇게 중요하지 않으니, 여기는 더 이상 말할 필요가 없다.다음은 우리 서버 수신단이 어떻게 실현했는지 중점적으로 이야기한다.우선 관심은 서버 측이 클라이언트가 보낸 정보를 수신하여 처리해야 한다는 것이다. 여기에 의뢰를 정의하여 우리를 돕는다. 즉, 서버가 정보를 수신하면 이 함수에 들어가 처리한다.
        ///   
        ///    
        ///   
        ///    
        ///    
        public delegate void OnReceiveData(AsyncUserToken token, byte[] buff);

        ///   
        ///    
        ///   
        public event OnReceiveData ReceiveClientData;

이렇게 하면 내가 실행한 서버 쪽을 호출할 때 사용자 정의 함수를 직접 등록하여 연결할 수 있다. 예를 들어 다음 코드:
         _socketServer.ReceiveClientData += onReceiveData;

        private void onReceiveData(AsyncUserToken token, byte[] buff)
        {
            //Dosomething;
        }

2. 사용자 정의 패키지 SocketServer 클래스 만들기


자, 위에서 몇 가지 파라미터를 정의했습니다. 그러면 SocketServer 클래스를 초기화할 때 이 파라미터를 초기화해야 합니다. 다음 코드를 보십시오.
        ///   
        ///    
        ///   
        ///    
        ///    
        public SocketServer(int numConnections, int receiveBufferSize)
        {
            m_clientCount = 0;
            m_maxConnectNum = numConnections;
            m_revBufferSize = receiveBufferSize;
            // allocate buffers such that the maximum number of sockets can have one outstanding read and   
            //write posted to the socket simultaneously    
            m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize);

            m_pool = new SocketEventPool(numConnections);
            m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);
        }

3. SocketServer 클래스의 초기화


위 코드는 클래스 Socket Server의 구조 함수입니다. 구조 함수에서 최대 연결 수와 모든 메시지의 크기를 설정했습니다. 왜냐하면 우리는 서버 측 소프트웨어가 더 많은 클라이언트의 정보를 받아들일 수 있기를 바랍니다.구조 함수를 통해 매개 변수를 초기화한 후, 우리는 모든 클라이언트에게 또는 모든 Socket에 일정한 메모리를 분배해야 한다. 다음과 같은 코드.
        ///   
        ///    
        ///   
        public void Init()
        {
            // Allocates one large byte buffer which all I/O operations use a piece of.  This gaurds   
            // against memory fragmentation  
            m_bufferManager.InitBuffer();
            m_clients = new List();
            // preallocate pool of SocketAsyncEventArgs objects  
            SocketAsyncEventArgs readWriteEventArg;

            for (int i = 0; i < m_maxConnectNum; i++)
            {
                readWriteEventArg = new SocketAsyncEventArgs();
                readWriteEventArg.Completed += new EventHandler(IO_Completed);
                readWriteEventArg.UserToken = new AsyncUserToken();

                // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object  
                m_bufferManager.SetBuffer(readWriteEventArg);
                // add SocketAsyncEventArg to the pool  
                m_pool.Push(readWriteEventArg);
            }
        }

        void IO_Completed(object sender, SocketAsyncEventArgs e)
        {
            // determine which type of operation just completed and call the associated handler  
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Receive:
                    ProcessReceive(e);
                    break;
                case SocketAsyncOperation.Send:
                    ProcessSend(e);
                    break;
                default:
                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");
            }

        }

4. 서비스의 시작과 종료


내용을 분배한 후에 모든 준비 작업을 하면 됩니다. 다음은 SocketServer와 같은 종류의 시작 함수를 직접 쓸 수 있습니다.
        ///   
        ///    
        ///   
        ///   
        public bool Start(IPEndPoint localEndPoint)
        {
            try
            {
                m_clients.Clear();
                listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                listenSocket.Bind(localEndPoint);
                // start the server with a listen backlog of 100 connections  
                listenSocket.Listen(m_maxConnectNum);
                // post accepts on the listening socket  
                StartAccept(null);
                return true;
            }
            catch (Exception)
            {
                return false;
            }
        }

        // Begins an operation to accept a connection request from the client   
        //  
        // The context object to use when issuing   
        // the accept operation on the server's listening socket  
        public void StartAccept(SocketAsyncEventArgs acceptEventArg)
        {
            if (acceptEventArg == null)
            {
                acceptEventArg = new SocketAsyncEventArgs();
                acceptEventArg.Completed += new EventHandler(AcceptEventArg_Completed);
            }
            else
            {
                // socket must be cleared since the context object is being reused  
                acceptEventArg.AcceptSocket = null;
            }

            m_maxNumberAcceptedClients.WaitOne();
            if (!listenSocket.AcceptAsync(acceptEventArg))
            {
                ProcessAccept(acceptEventArg);
            }
        }


        private void ProcessAccept(SocketAsyncEventArgs e)
        {
            try
            {
                Interlocked.Increment(ref m_clientCount);
                // Get the socket for the accepted client connection and put it into the   
                //ReadEventArg object user token  
                SocketAsyncEventArgs readEventArgs = m_pool.Pop();
                AsyncUserToken userToken = (AsyncUserToken)readEventArgs.UserToken;
                userToken.Socket = e.AcceptSocket;
                userToken.ConnectTime = DateTime.Now;
                userToken.Remote = e.AcceptSocket.RemoteEndPoint;
                userToken.IPAddress = ((IPEndPoint)(e.AcceptSocket.RemoteEndPoint)).Address;

                lock (m_clients) { m_clients.Add(userToken); }

                if (ClientNumberChange != null)
                    ClientNumberChange(1, userToken);
                if (!e.AcceptSocket.ReceiveAsync(readEventArgs))
                {
                    ProcessReceive(readEventArgs);
                }
            }
            catch (Exception me)
            {
                LogHelper.WriteLog(me.Message + "\r
" + me.StackTrace); } // Accept the next connection request if (e.SocketError == SocketError.OperationAborted) return; StartAccept(e); }

상술한 절차를 통해 우리는 IP 주소와 포트를 통해 SocketServer 서비스를 시작할 수 있다.SocketServer라는 서버의 수신 소프트웨어를 더욱 튼튼하게 하기 위해서, 우리는 서비스 정지 기능을 하나 더 추가합니다.
        ///   
        ///    
        ///   
        public void Stop()
        {
            foreach (AsyncUserToken token in m_clients)
            {
                try
                {
                    listenSocket.Shutdown(SocketShutdown.Both);
                }
                catch (Exception) { }
            }
            

            listenSocket.Close();
            int c_count = m_clients.Count;
            lock (m_clients) { m_clients.Clear(); }

            if (ClientNumberChange != null)
                ClientNumberChange(-c_count, null);
        }

상기 코드는 순환을 통해 모든 Socket 연결을 닫습니다.

5. 등록 서버 정보 수신 처리 함수


앞의 코드에서 초기화할 때 Socket Async Event Args 클래스의 Completed 이벤트를 등록했습니다. 이 이벤트는 서버가 수신이든 발송이든 완성된 함수는 IO_Completed.
         readWriteEventArg.Completed += new EventHandler(IO_Completed);

        void IO_Completed(object sender, SocketAsyncEventArgs e)
        {
            // determine which type of operation just completed and call the associated handler  
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Receive:
                    ProcessReceive(e);
                    break;
                case SocketAsyncOperation.Send:
                    ProcessSend(e);
                    break;
                default:
                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");
            }

        }

의뢰된 IO_Completed 함수에서 우리는 수신된 정보를 통해 수신인지 발송인지 판단합니다. 수신이라면 ProcessReceive(Socket Async Event Argse)라는 함수를 사용하여 수신된 정보를 처리합니다.
        // This method is invoked when an asynchronous receive operation completes.   
        // If the remote host closed the connection, then the socket is closed.    
        // If data was received then the data is echoed back to the client.  
        //  
        private void ProcessReceive(SocketAsyncEventArgs e)
        {
            try
            {
                // check if the remote host closed the connection  
                AsyncUserToken token = (AsyncUserToken)e.UserToken;
                if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
                {
                    //   
                    byte[] data = new byte[e.BytesTransferred];
                    Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
                    lock (token.Buffer)
                    {
                        token.Buffer.AddRange(data);
                    }
                    if (ReceiveClientData != null)
                    {
                        ReceiveClientData(token, data);
                    }

                    // .  , Socket.ReceiveAsync   
                    if (!token.Socket.ReceiveAsync(e))
                        this.ProcessReceive(e);
                }
                else
                {
                    CloseClientSocket(e);
                }
            }
            catch (Exception xe)
            {
                LogHelper.WriteLog(xe.Message + "\r
" + xe.StackTrace); } }

위의 코드를 보면 우리가 정보를 받은 후에 정보를 다른 등록 위탁 함수에 위탁하여 처리해야 한다는 것을 알 수 있다.
                    if (ReceiveClientData != null)
                    {
                        ReceiveClientData(token, data);
                    }

지금까지 우리는 SocketServer와 같은 수신 기능을 실현했다. 만약에 독자가 클라이언트가 정보를 보내는 것에 흥미가 있다면 아래의 코드를 직접 참고한다.

6. 등록 서버 정보 전송 처리 함수

        // This method is invoked when an asynchronous send operation completes.    
        // The method issues another receive on the socket to read any additional   
        // data sent from the client  
        //  
        //   
        private void ProcessSend(SocketAsyncEventArgs e)
        {
            if (e.SocketError == SocketError.Success)
            {
                // done echoing data back to the client  
                AsyncUserToken token = (AsyncUserToken)e.UserToken;
                // read the next block of data send from the client  
                bool willRaiseEvent = token.Socket.ReceiveAsync(e);
                if (!willRaiseEvent)
                {
                    ProcessReceive(e);
                }
            }
            else
            {
                CloseClientSocket(e);
            }
        }

        //   
        private void CloseClientSocket(SocketAsyncEventArgs e)
        {
            AsyncUserToken token = e.UserToken as AsyncUserToken;

            lock (m_clients) { m_clients.Remove(token); }
            // , ,   
            if (ClientNumberChange != null)
                ClientNumberChange(-1, token);
            // close the socket associated with the client  
            try
            {
                token.Socket.Shutdown(SocketShutdown.Send);
            }
            catch (Exception) { }
            token.Socket.Close();
            // decrement the counter keeping track of the total number of clients connected to the server  
            Interlocked.Decrement(ref m_clientCount);
            m_maxNumberAcceptedClients.Release();
            // Free the SocketAsyncEventArg so they can be reused by another client  
            e.UserToken = new AsyncUserToken();
            m_pool.Push(e);
        }



        ///   
        ///  ,   
        ///   
        ///   
        ///   
        ///   
        public void SendMessage(AsyncUserToken token, byte[] message)
        {
            if (token == null || token.Socket == null || !token.Socket.Connected)
                return;
            try
            {
                // ,    
                SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();
                sendArg.UserToken = token;
                sendArg.SetBuffer(byte, 0, byte.Length);  // .  
                token.Socket.SendAsync(sendArg);
            }
            catch (Exception e)
            {
                LogHelper.WriteLog("SendMessage - Error:" + e.Message);
            }
        }

위의 코드는 다운로드, 다운로드 주소를 제공합니다.
https://download.csdn.net/download/aplsc/11817545

좋은 웹페이지 즐겨찾기