精华内容
下载资源
问答
  • C++ IOCP windows服务器

    2021-01-20 03:33:57
    首先,启动主线程,接收来自客户端的请求。...  IOCP服务器实现 #pragma once #include #include #include <Windows> #include #include #include CThreadPool.h #include WorkA.h #include Wor
  • windows IOCP示例代码

    2017-12-07 09:08:05
    使用Windows提供的IOCP技术,以及系统线程池,实现Windows服务器端对大并发的长连接的支持。
  • 本文的代码源自《游戏服务端IOCP模型,自己封装的一个类,3行代码搞定服务端》,我改进过了,希望作者不要说我侵权,我声明这段代码是作者的劳动结晶,我只不过是在此基础上进行了些修改和调试。 windows里有如同...

    本文的代码源自《游戏服务端IOCP模型,自己封装的一个类,3行代码搞定服务端》,我改进过了,希望作者不要说我侵权,我声明这段代码是作者的劳动结晶,我只不过是在此基础上进行了些修改和调试。
    windows里有如同Linux中的epoll一般强大的套接字管理功能,即socket编程模型。
    我们面对服务器端编程时,往往希望一台主机能同时承接成千上万个客户端连接,只要我们的CPU和内存足够处理业务即可。但对于socket,如果使用select管理,在windows里有最多管理64个套接字的上限,毕竟都是依靠轮询来反馈事件的。如果要管理上百个套接字,我们就需要考虑使用IOCP(完成端口)模型了,见《Windows网络编程》5.2.6 完成端口模型一节的内容。
    在经历了2天各种百度学习的情况下,我发现网上对于这个完成端口描述大多都是照本宣科,而且逻辑不完整,同样,书中也有不完整的地方,所以我总结此文,并附带可用的代码供大家参考学习,其中如果有不对的地方,望留言指正!

    直接上代码,再说明用法,希望理解完成端口逻辑的同学可以看书或百度:

    #pragma once
    #include <WinSock2.h>
    #include <afxmt.h>
    #include <afxtempl.h>
    
    #define ULONG_PTR ULONG
    #define PULONG_PTR ULONG*
    
    #define  BUFFER_SIZE 1024
    #define  SOCK_TIMEOUT_SECONDS 60
    
    class Iocp;
    
    typedef enum {
    	OP_READ   = 1,
    	OP_WRITE  = 2,
    	OP_ACCEPT = 3,
    	OP_CLOSE   = 100,
    	OP_DO_WORK = 101
    } SOCKET_STATE;
    
    typedef struct
    {
    	OVERLAPPED oOverlapped;
    	WSABUF wsBuffer;
    	CHAR szBuffer[BUFFER_SIZE];
    	DWORD dSend;
    	DWORD dRecv;
    	SOCKET_STATE sState;
    } PER_IO_DATA, *LPPER_IO_DATA;
    
    /*传送给处理函数的参数*/
    typedef struct
    {
    	SOCKET sSocket; // 客户端socket描述符
    	int    index;  // 序号,用于索引
    	CHAR   key[32]; // ip:port
    	CHAR   szClientIP[24]; // 客户端IP字符串
    	UINT   uiClientPort;  // 客户端端口
    	time_t lastReceiveTime; // 最后接收时间
    	time_t connectedTime; // 创建链接的时间(如果超过这个时间还没有收到有效的ID,那么关闭)
    	LPPER_IO_DATA lpIOData; // 释放内存用
    	Iocp *pIocp; // ServerScanThread要用
    	CMutex *lpMutex;
    } IOCPClient, *LPIOCPClient;
    
    typedef struct 
    {
    	int index; // 同IOCPClient的index
    	CMap<CString, LPCTSTR, IOCPClient*, IOCPClient*> sockets;
    } STRU_MAP_ClientSockets;
    
    typedef void (*ReadProc)(LPIOCPClient lpData, LPPER_IO_DATA lpPER_IO_DATA);
    typedef VOID (*ScanProc)(LPIOCPClient lpClientSocket);
    
    class Iocp
    {
    public:
    	Iocp(const CHAR *host, UINT port);
    	~Iocp(void);
    	
    	VOID SetThreadNums();
    	UINT GetThreadNums();
    	VOID SetPort(UINT port);
    	UINT GetPort();
    	BOOL ListenEx(UINT backlog);
    	VOID Close();
    	VOID Iocp::CreateScanThreads();
    	static VOID ServerWorkThread(VOID *_this);
    	static VOID ServerScanThread(VOID *s);
    	static VOID FreeClientSocket(Iocp *lpIocp, LPIOCPClient lpClientSocket);
    	static int Send(SOCKET sockfd, const char *buff, const unsigned int size);
    	static VOID SetClientSocketCountText(unsigned int count);
    	static VOID OutPutLog(const char *szFormat, ...);
    	VOID SetReadFunc(VOID *lprFun);
    	VOID SetScanFunc(VOID *lprFun);
    	
    	int m_ThreadNums; // 线程数量,用于将socket分割到多个区域,扫描时每次只扫描一个区域
    	int m_AcceptClientIndex; // 接受连接的socket的序号,跟m_ThreadNums取余
    	STRU_MAP_ClientSockets *m_Sockets; // 因为需要根据线程数动态分配内存,所以不能是静态变量
    	unsigned int m_SocketCount; // 已连接客户端的数量
    	ReadProc m_ReadFun; // 读数据回调函数
    	ScanProc m_ScanFun; // 扫描socket回调函数
    	HANDLE m_cpHandle;  // IO完成端口句柄
    	
    	// 扩展的接受连接,放在线程里了
    	static VOID AcceptEx(VOID  *_this);
    	
    	// 监听套接字,即服务端套接字
    	SOCKET  m_ListenSocketID;
    };
    
    
    #include "stdafx.h"
    #include "Iocp.h"
    #include <stdlib.h>
    #include <process.h>
    #include "resource.h"
    
    #pragma comment(lib, "ws2_32.lib")
    
    extern void DoRxTxWork(LPIOCPClient lpClientSocket);
    
    Iocp::Iocp(const CHAR *host, UINT port):
    	m_ListenSocketID(INVALID_SOCKET),
    	m_AcceptClientIndex(0)
    {
    	SetClientSocketCountText((m_SocketCount = 0));
    
    	WSADATA wsaData;
    	DWORD dwRet = WSAStartup( 0x0202, &wsaData );
    	if (0 != dwRet )
    	{
    		WSACleanup();
    		throw 1;
    	}
    
    	SOCKADDR_IN sockAddr;
    	memset( &sockAddr, 0, sizeof(SOCKADDR_IN) ) ;
    	sockAddr.sin_family = AF_INET;
    	sockAddr.sin_addr.s_addr = inet_addr(host);
    	sockAddr.sin_port = htons(port);
    
    	/*创建监听套接字*/
    	m_ListenSocketID = WSASocket( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED );
    	if ( m_ListenSocketID == INVALID_SOCKET )
    	{
    		throw 1;
    	}
    
    	/*设置套接字选项*/
    	CHAR opt = 1;
    	BOOL ret = setsockopt( m_ListenSocketID , SOL_SOCKET , SO_REUSEADDR , (const CHAR * )&opt , sizeof(opt) );
    	if ( ret != 0 )
    	{
    		throw 1 ;
    	}
    
    	/*绑定套接字*/
    	if (SOCKET_ERROR == bind(m_ListenSocketID, (const struct sockaddr *)&sockAddr, sizeof(struct sockaddr)))
    	{
    		throw 1 ;
    	}
    
    	/*创建完成端口*/
    	m_cpHandle  = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );
    	if ( m_cpHandle == NULL )
    	{
    		throw 1 ;
    	}
    
    	SYSTEM_INFO mySysInfo;
    	GetSystemInfo( &mySysInfo );
    	m_ThreadNums = (int)mySysInfo.dwNumberOfProcessors * 2;
    	//m_ThreadNums = 1;
    	m_Sockets = new STRU_MAP_ClientSockets[m_ThreadNums];
    	for ( int i = 0; i < m_ThreadNums; i++ )
    	{
    		m_Sockets[i].index = i;
    		_beginthread(Iocp::ServerWorkThread,  0,  (VOID *)this);
    	}
    	TRACE("工作线程准备完成(%d个)\n", m_ThreadNums);
    	OutPutLog("工作线程准备完成(%d个)\n", m_ThreadNums);
    }
    
    Iocp::~Iocp(void)
    {
    	WSACleanup();
    }
    
    VOID Iocp::AcceptEx(VOID  *_this)
    {
    	SOCKET acSocket;
    	DWORD dwRecvBytes;
    	Iocp * pIocp = (Iocp *)_this;
    	SOCKADDR_IN sAddr;
    	INT uiClientSize = sizeof(sAddr);
    
    	TRACE("服务器已就绪, 套接字=%u ...\n", pIocp->m_ListenSocketID);
    	OutPutLog("服务器已就绪, 套接字=%u ...\n", pIocp->m_ListenSocketID);
    	while (TRUE)
    	{
    		acSocket = WSAAccept( pIocp->m_ListenSocketID, (SOCKADDR *)&sAddr, &uiClientSize, NULL, 0 );
    		if ( acSocket == SOCKET_ERROR )
    		{
    			TRACE("接受连接发生错误: %d\n", WSAGetLastError());
    			return;
    		}
    
    		LPIOCPClient lpClientSocket = (LPIOCPClient)malloc(sizeof(IOCPClient));
    		if ( NULL == lpClientSocket )
    		{
    			TRACE("Error while malloc lpClientSocket\n");
    			return;
    		}
    		memset(lpClientSocket, 0, sizeof(IOCPClient));
    
    		/*这里停止监听会有问题*/
    		LPPER_IO_DATA lpIOData = (LPPER_IO_DATA )malloc(sizeof(PER_IO_DATA));
    		if ( lpIOData == NULL )
    		{
    			TRACE("Error while malloc lpIOData\n");
    			return;
    		}
    		memset(lpIOData, 0, sizeof(PER_IO_DATA));
    		
    		lpClientSocket->connectedTime = lpClientSocket->lastReceiveTime = time(NULL);
    		lpClientSocket->lpIOData = lpIOData; // 释放内存用
    		lpClientSocket->sSocket = acSocket;
    		lpClientSocket->pIocp = pIocp;
    		strcpy(lpClientSocket->szClientIP, inet_ntoa(sAddr.sin_addr));
    		lpClientSocket->uiClientPort = sAddr.sin_port;
    		_snprintf(lpClientSocket->key, sizeof lpClientSocket->key, "%s:%d", lpClientSocket->szClientIP, lpClientSocket->uiClientPort);
    		lpClientSocket->lpMutex = new CMutex(FALSE, lpClientSocket->key);
    		if (CreateIoCompletionPort( (HANDLE)acSocket, pIocp->m_cpHandle, (ULONG_PTR)lpClientSocket, 0 ) == NULL)
    		{
    			TRACE("Error while CreateIoCompletionPort\n");
    			return;
    		}
    		TRACE("客户端已连接:%s:%u\n", lpClientSocket->szClientIP, lpClientSocket->uiClientPort);
    		OutPutLog("客户端已连接:%s:%u\n", lpClientSocket->szClientIP, lpClientSocket->uiClientPort);
    		
    		// 投递线程事件
    		lpIOData->dSend = 0;
    		lpIOData->dRecv = 0;
    		lpIOData->wsBuffer.len = BUFFER_SIZE - 1;
    		lpIOData->wsBuffer.buf = lpIOData->szBuffer;
    		lpIOData->sState = OP_READ;
    
    		DWORD flags = 0;
    		if (WSARecv(acSocket, &(lpIOData->wsBuffer), 1, &dwRecvBytes, &flags, &(lpIOData->oOverlapped), NULL) == SOCKET_ERROR)
    		{
    			if (WSAGetLastError() != ERROR_IO_PENDING )
    			{
    				TRACE("Error ERROR_IO_PENDING\n");
    				return;
    			}
    			else
    			{
    				
    				// 客户端按接受连接的顺序依次放入4个线程进行扫描处理
    				pIocp->m_AcceptClientIndex = (pIocp->m_AcceptClientIndex + 1) % pIocp->m_ThreadNums;
    				lpClientSocket->index = pIocp->m_AcceptClientIndex;
    				pIocp->m_Sockets[lpClientSocket->index].sockets[lpClientSocket->key] = lpClientSocket;
    				SetClientSocketCountText(++pIocp->m_SocketCount);
    				TRACE("客户端异步读取已完成,等待读取数据...\n");
    				OutPutLog("客户端异步读取已完成,等待读取数据...\n");
    			}
    		}
    	}
    }
    
    BOOL Iocp::ListenEx(UINT backlog)
    {
    	if (SOCKET_ERROR == listen(m_ListenSocketID, backlog))
    	{
    		return FALSE;
    	}
    	/*创建监听线程*/
    	if (-1 == _beginthread(Iocp::AcceptEx, 0, (VOID *)this))
    	{
    		return FALSE;
    	}
    	return TRUE;
    }
    
    VOID Iocp:: ServerWorkThread( VOID * _this )
    {
    	Iocp * lpIocp = (Iocp *)_this;
    	HANDLE hPlePort  = (HANDLE)lpIocp->m_cpHandle;
    	DWORD dwBytes;
    	LPIOCPClient lpClientSocket = NULL;
    	LPPER_IO_DATA lpIOData = NULL;
    	LPOVERLAPPED lpOverlapped = NULL;
    	DWORD sendBytes = 0;
    	DWORD recvBytes = 0;
    	DWORD dwFlag = 0;
    	while (TRUE)
    	{
    		if (0 == GetQueuedCompletionStatus( hPlePort, &dwBytes, (PULONG_PTR)&lpClientSocket, &lpOverlapped, INFINITE ))
    		{
    			FreeClientSocket(lpIocp, lpClientSocket);
    			continue ;
    		}
    		lpIOData = (LPPER_IO_DATA)CONTAINING_RECORD(lpOverlapped, PER_IO_DATA, oOverlapped);
    		if (0 == dwBytes && (lpIOData->sState == OP_READ || lpIOData->sState == OP_WRITE))
    		{
    			TRACE("客户端断开了连接:%s\n", lpClientSocket->key);
    			OutPutLog("客户端断开了连接:%s\n", lpClientSocket->key);
    			closesocket(lpClientSocket->sSocket);
    			FreeClientSocket(lpIocp, lpClientSocket);
    			continue;
    		}
    
    		switch (lpIOData->sState) {
    		case OP_READ:
    			lpIOData->dRecv = dwBytes;
    			lpClientSocket->lastReceiveTime = time(NULL);
    
    			lpIocp->m_ReadFun(lpClientSocket, lpIOData);
    
    			lpIOData->dRecv = 0;
    			ZeroMemory( &(lpIOData->oOverlapped), sizeof( OVERLAPPED ) );
    			lpIOData->wsBuffer.len = BUFFER_SIZE - 1;
    			lpIOData->wsBuffer.buf = lpIOData->szBuffer;
    			lpIOData->sState = OP_READ;
    			if ( WSARecv( lpClientSocket->sSocket, &(lpIOData->wsBuffer), 1, &recvBytes, &dwFlag, &(lpIOData->oOverlapped), NULL ) == SOCKET_ERROR )
    			{
    				if ( WSAGetLastError() != ERROR_IO_PENDING )
    				{
    					return;
    				}
    			}
    			break;
    		case OP_WRITE:
    			// 什么也不用做
    			break;
    		case OP_DO_WORK:
    			lpIocp->m_ScanFun(lpClientSocket);
    			break;
    		case OP_CLOSE:
    			TRACE("主动断开长期无响应的客户端:%s\n", lpClientSocket->key);
    			OutPutLog("主动断开长期无响应的客户端:%s\n", lpClientSocket->key);
    			// 这里不能直接释放内存,因为还会触发一次GetQueuedCompletionStatus返回0,在返回0时释放内存
    			closesocket(lpClientSocket->sSocket); 
    			break;
    		default:
    			break;
    		}
    
    	}
    }
    
    VOID Iocp::FreeClientSocket(Iocp *lpIocp, LPIOCPClient lpClientSocket)
    {
    	if (NULL == lpIocp || NULL == lpClientSocket) {
    		return;
    	}
    	lpIocp->m_Sockets[lpClientSocket->index].sockets.RemoveKey(lpClientSocket->key);
    	SetClientSocketCountText(--lpIocp->m_SocketCount);
    	free(lpClientSocket->lpIOData);
    	free(lpClientSocket);
    	TRACE("内存已经释放!\n");
    }
    
    VOID Iocp::SetReadFunc(VOID *lprFun)
    {
    	m_ReadFun  = (ReadProc)lprFun;
    }
    
    VOID Iocp::SetScanFunc(VOID *lprFun)
    {
    	m_ScanFun  = (ScanProc)lprFun;
    	CreateScanThreads();
    }
    
    VOID Iocp::CreateScanThreads()
    {
    	STRU_MAP_ClientSockets *sock;
    	for (int i = 0; i < m_ThreadNums; i++) {
    		sock = &m_Sockets[i];
    		_beginthread(Iocp::ServerScanThread,  0,  (VOID *)sock);
    	}
    }
    
    VOID Iocp::ServerScanThread(VOID *s)
    {
    	static PER_IO_DATA IOData;
    	POSITION pos;
    	CString key;
    	IOCPClient *lpClientSocket;
    	STRU_MAP_ClientSockets *mapSock = (STRU_MAP_ClientSockets*)s;
    	int index = mapSock->index;
    	int doCount = 0;
    	CMap<CString, LPCTSTR, IOCPClient*, IOCPClient*> *serverSockets = &mapSock->sockets;
    	while (1) {
    		Sleep(5000);
    		//OutPutLog("序号[%d]定时器开始处理...", index);
    		doCount = 0;
    		pos = serverSockets->GetStartPosition();
    		while (pos) {
    			doCount++;
    			serverSockets->GetNextAssoc(pos, key, lpClientSocket);
    
    			memset(&IOData, 0, sizeof(PER_IO_DATA));
    			IOData.sState = OP_DO_WORK;
    			PostQueuedCompletionStatus(lpClientSocket->pIocp->m_cpHandle, 0, (ULONG_PTR)lpClientSocket, &IOData.oOverlapped);
    		}
    		//OutPutLog("序号[%d]定时器处理了%d个客户端", index, doCount);
    	}
    }
    
    void Iocp::SetClientSocketCountText(unsigned int count)
    {
    	CString countStr;
    	countStr.Format("客户端数量: %u", count);
    
    	CWnd *pWnd = AfxGetMainWnd();
    	HWND hHwnd = pWnd->m_hWnd;
    	::SetDlgItemText(hHwnd, IDC_CLIENT_COUNT, countStr);
    }
    
    void Iocp::OutPutLog(const char *szFormat, ...)
    {
    	static char szLogBuffer[1024];
    	
    	SYSTEMTIME curTime;
    	GetLocalTime(&curTime);
    	CString strTime;
    	strTime.Format(_T("[%04d-%02d-%02d %02d:%02d:%02d] "),
    		curTime.wYear,curTime.wMonth,curTime.wDay,
    		curTime.wHour,curTime.wMinute,curTime.wSecond);
    	strTime += szFormat;
    	
    	va_list pArgList;
    	va_start(pArgList, szFormat);
    	int len = _vsntprintf(szLogBuffer, sizeof szLogBuffer-2, strTime, pArgList);
    	va_end(pArgList);
    	if (szLogBuffer[len-1] == '\n') {
    		if (szLogBuffer[len-2] != '\r') {
    			szLogBuffer[len-1] = '\r';
    			szLogBuffer[len] = '\n';
    			szLogBuffer[len+1] = '\0';
    		}
    	} else {
    		szLogBuffer[len] = '\r';
    		szLogBuffer[len+1] = '\n';
    		szLogBuffer[len+2] = '\0';
    	}
    	
    	
    	CWnd *pWnd =  AfxGetMainWnd();
    	CEdit *pEdit = (CEdit*)pWnd->GetDlgItem(IDC_OUTLOG_EDIT);
    	if (NULL == pEdit) return;
    	
    	int iTextLen = pEdit->GetWindowTextLength();
    	pEdit->SetRedraw(FALSE);
    	pEdit->SetReadOnly(FALSE);
    	pEdit->SetSel(iTextLen, iTextLen, TRUE);
    	pEdit->ReplaceSel(szLogBuffer);     // 这个函数还是在光标的位置书写
    	int lineCount = pEdit->GetLineCount(); // m_prlog是绑定CEDIT控件的对象
        if(lineCount > 100) // 如果输出日志行太多,则删第一行
    	{
    		pEdit->GetWindowText(szLogBuffer,1024 - 1);//只取前100个字符
    		CString tmp(szLogBuffer);
    		int it1 = tmp.Find("\r\n") + 2; // 查找第一行的回车换行位置
    		pEdit->SetSel(0, it1); // 选择要删除的首行
    		pEdit->ReplaceSel(""); // 用空串替换掉首行
    	}
    	pEdit->LineScroll(lineCount);   //可用于水平滚动所有行最后一个字符,这只是设置edit进行滚动
    	pEdit->SetReadOnly(TRUE);
    	pEdit->SetRedraw(TRUE);
    }
    
    int Iocp::Send(SOCKET sockfd, const char *buff, const unsigned int size)
    {
    	static PER_IO_DATA PerIOData;
    
    	memset(&PerIOData, 0, sizeof(PER_IO_DATA));
    	PerIOData.sState = OP_WRITE;
    	PerIOData.wsBuffer.len = size;
    	PerIOData.wsBuffer.buf = (char *)buff;
    	DWORD byteSend = 0;
    	int ErrorCode;
    	int result = WSASend(sockfd, &PerIOData.wsBuffer, 1, &byteSend, 0, &PerIOData.oOverlapped, NULL);
    	if (SOCKET_ERROR == result && ERROR_IO_PENDING != (ErrorCode = WSAGetLastError())) {
    		TRACE("发送数据出错,错误码: %d\n", ErrorCode);
    	} else {
    		TRACE("成功发送数据: %d字节,返回值:%d\n", byteSend, result);
    	}
    	return result;
    }
    
    // 回调1:客户端的发送的数据会在这个函数通知
    void OnRead(LPIOCPClient lpClientSocket, LPPER_IO_DATA lpIOData)
    {
    	if (NULL == lpClientSocket || NULL == lpIOData) {
    		return;
    	}
        int RxCount = (int) lpIOData->dRecv;
        char *RxBuff  = lpIOData->szBuffer;
    	RxBuff[RxCount] = '\0'; // 务必保证接收时留1个字节余量给这个结尾的0
    	Iocp::OutPutLog("%s:%d: %s\n", lpClientSocket->szClientIP, lpClientSocket->uiClientPort, RxBuff);
    	Iocp::Send(lpClientSocket->sSocket, RxBuff, RxCount);
    }
    
    // 回调2:扫描套接字,目的是关闭闲置套接字,或定时发送心跳包(业务逻辑上要求对方回答)
    // 其实这个函数可以直接关闭套接字,只不过通过单独的CLOSE通知会对业务处理更灵活和方便
    // 如果你在业务中体会不到,可以直接调用closesocket即可。
    VOID OnScan(LPIOCPClient lpClientSocket)
    {
    	static PER_IO_DATA IOData;
    	if (NULL == lpClientSocket) {
    		return;
    	}
    	if (time(NULL) - lpClientSocket->lastReceiveTime > SOCK_TIMEOUT_SECONDS) {
    		memset(&IOData, 0, sizeof(PER_IO_DATA));
    		IOData.sState = OP_CLOSE;
    		PostQueuedCompletionStatus(lpClientSocket->pIocp->m_cpHandle, 0, (ULONG_PTR)lpClientSocket, &IOData.oOverlapped);
    	}
    }
    // 调用的位置,在MFC项目里
    void CIOCPSocketDlg::OnRunServer() 
    {
    	static Iocp *g_IocpServer = NULL;
    	if (NULL == g_IocpServer) {
    		g_IocpServer = new Iocp("0.0.0.0",  8888);
    		g_IocpServer->SetReadFunc(OnRead); // 回调1,读取套接字发来的内容
    		g_IocpServer->SetScanFunc(OnScan); // 回调2,定期扫描套接字,可能是业务逻辑要求发心跳包,这个步骤可以免去
    		g_IocpServer->ListenEx(10);
    	}
    }
    

    代码在MFC项目里,所以还有设置窗体内容的逻辑,大家修改成自己的即可。
    补充代码逻辑(书上没讲到的):如果要主动关闭套接字,直接调用closesocket函数即可,因为调用此函数会导致GetQueuedCompletionStatus函数返回0,在返回0的逻辑里释放两个malloc的变量即可。而如果是客户端断开了,GetQueuedCompletionStatus返回的不是0,但满足0 == dwBytes并且是读状态,在这里则除了调用closesocket以外,还要释放malloc的变量(书上讲到了)。
    另外:对于同一个套接字,应该不会同时在多个线程里出发读取完成操作,但是很可能在多个线程里出发读取和扫描通知(OP_DO_WORK),所以在业务中,如有必要,要考虑给每一个客户端一个mutex,加锁处理。
    使用时,只要实现read,scan两个方法,如果确定不要scan(无法处理从网络中消失的客户端,比如客户端突然死机或断网,或服务器断网一段时间,在此期间客户端主动断开了),那么就不挂载scan函数即可,看SetScanFunc的实现里有新建线程的操作哦:

    VOID Iocp::SetScanFunc(VOID *lprFun)
    {
    	m_ScanFun  = (ScanProc)lprFun;
    	CreateScanThreads();
    }
    
    展开全文
  • windowsiocp(完成端口)demo,包含客户端和服务器两个工程
  • WindowsIOCP踩过的一些坑

    千次阅读 2019-05-04 23:05:49
    IOCP目前是性能最好的模型,主要缺点是只能在windows平台下使用,一个IOCP对象,在操作系统中可以关联多个socket和(或)文件控制端。它主要是在内部封装了LIFO原则的请求队列、FIFO原则的完成包队列、多线程处理。...

    前段时间在搞win下面的IOCP服务器时发现了一些问题,有些也是折磨了好久才慢慢理解的,今天将这些踩过的坑记录下来,避免以后遇到同样的问题,又要折磨好久。

    IOCP目前是性能最好的网络模型,他与其他网络模型的的区别和优缺点就不做赘述了,这些网上随便一搜就有大量的文章去解释,主要说下IOCP的优缺点,一个IOCP对象,在操作系统中可以关联多个socket和(或)文件控制端。它主要是在内部封装了LIFO原则的请求队列、FIFO原则的完成队列、多线程处理,同样IOCP也是唯一个不需要考虑安全属性的Windows对象,因为IO完成端口在设计时就只能在一个进程中运行。IOCP也是异步模型,我们只能将IO操作的事件投递到IOCP上,实际上socket的send和recv操作均是由IOCP完成的,存在的问题就是,我们没办法控制send和recv的成功。还有一个缺点就是只能在Windows环境使用,这样就会存在一点的局限性。

    IOCP模型主要是利用了重叠I/O技术,最难的部分也是在重叠I/O的管理上面,因此我们将这块进行了简便处理,将结构体修改成类,这样会比较方便的进行线性管理。

    首先我们创建一个重叠I/O结构的类:

    #define MAX_BUFFER 8192	//buffer的最大长度,一般的HTTP POST数据用4个页字段完全够用,不够用的话按情况而定
    
    class CCPerIOData
    {
    public:
    	CCPerIOData();
    	~CCPerIOData();
    
    	void ResetIO();
    public:
    	
    	//下面是重叠I/O的内部结构
    	
    	WSAOVERLAPPED	m_Overlappend;
    	SOCKET			m_AcceptSocket;
    	WSABUF			m_wsaBuf;
    	DWORD			m_opType;
    	char            m_szBuffer[MAX_BUFFER];
    	
    	//定义重叠I/O的工作类型,方便操作
    	typedef enum
    	{
    		OP_ACCEPT,
    		OP_SEND,
    		OP_RECV,
    		OP_NULL
    	}OPERATION_TYPE;
    };
    

    为了线性管理重叠I/O结构,我们需要创建一个socket管理类。

    #include "periodata.h"
    
    using namespace std;
    
    class SocketHandle
    {
    public:
    
    	SocketHandle();
    	~SocketHandle();
    
    public:
    
    	SOCKET				m_Socket;			//客户端连接的socket
    	SOCKADDR_IN			m_ClientAddr;
    	volatile time_t		m_nTimer;			//socket最后一次活跃的时间,设计这个字段的主要目的是socket的关闭可能不及时,HTTP请求短连接情况下可能会存在端口不够用的情况,我们需要及时的关闭这些超过时间无响应的socket
    	vector<CCPerIOData*>	m_VSocketIoData;
    
    public:
    	
    	//从队列中删除一个重叠I/O
    	void 		RemoveIOData(CCPerIOData* pPerIoData);
    	CPerIOData* GetNewIOData();
    
    };
    
    SocketHandle::SocketHandle()
    {
    	m_Socket = INVALID_SOCKET;
    	ZeroMemory(&m_ClientAddr, sizeof(m_ClientAddr));
    	m_nTimer = time(0);
    }
    SocketHandle::~SocketHandle()
    {
    	//销毁创建的一些指针,注意,m_VSocketIoData这个vector里面存放的全部是CPerIOData类型的指针,因此需要全部清理,清理方法:
    	
    	//遍历释放
    
    	std::vector<CPerIOData*>::iterator iter = m_VSocketIoData.begin();
    	for(; iter != m_VSocketIoData.end(); iter++)
    	{
    		//delete ...
    	}
    	m_VSocketIoData.clear();
    }
    
    CPerIOData* SocketHandle::GetNewIOData()
    {
    	CPerIOData* p = new CPerIOData;
    	m_VSocketIoData.push_back(p);	//每次重新创建一个重叠I/O,加入vector
    	return p;
    }
    
    void SocketHandle::RemoveIOData(CPerIOData* pPerIoData)
    {
    	if(pPerIoData == NULL || m_VSocketIoData.empty())
    	{
    		return;
    	}
    	vector<CPerIOData*>::iterator iter = m_VSocketIoData.begin();
    	for(; iter != m_VSocketIoData.end(); iter++)
    	{
    		if(pPerIoData == *iter)
    		{
    			// do something ...
    			break;
    		}
    	}
    }
    

    接下来我们就要准备进入主题:

    要使用完成端口,首先需要注意以下几项:
    1、创建完成端口
    2、根据系统性能创建工作线程
    3、创建一个监听套接字,并将监听套接字和完成端口绑定
    4、创建n个供客户端连接的socket
    5、加载完成端口的函数

    为什么需要加载完成端口的函数呢,因为IOCP在设计时就已经确定了他不能直接在WindowsAPI下使用,并且由于winsock的版本原因,需要加载他的函数才能够正常的使用。

    先看一个简单的头文件定义:

    上面第五点一定要注意,由于winsock的版本问题,完成端口的函数不能够直接使

    #include "SocketHandle.h"
    class CHttpSerVer
    {
    public:
    	
    	CHttpSerVer();
    	virtual ~CHttpSerVer();
    	bool Start();
    	void Stop();
    	
    private:
    	static DWORD WINAPI WorkThread(LPVOID lpParam);			//工作线程,用来不断的循环等待IOCP的状态返回,并处理IOCP的请求
    	static DWORD WINAPI CheckClientThread(LPVOID lpParam);  //客户端检测线程,用来检测3秒内没有活跃的socket,并释放这些socket资源
    
    	//程序结束时的一些资源回收函数
    	void ClearClientList();
    	void DeInitialize();
    	void CheckClientList();
    
    	//接收的客户端管理函数
    	void AddToClientList(SocketHandle* pSocketHandle);
    	void RemoveSocketHandle(SocketHandle* pSocketHandle);
    	
    	//初始化IOCP和一些错误的处理
    	bool AssociateWithIOCP(SocketHandle* pSocketHandle);
    	bool HandleError(SocketHandle* pSocketHandle, DWORD dwError);
    
    	//IOCP工作流程的体现
    	bool PostAcceptEx(CPerIOData* pAcceptIoContext);
    	bool DoAcceptEx(CPerIOData* pIoContext);
    	void PostSend(SocketHandle* pSocketHandle, CPerIOData* pPerIOData);
    	bool DoSend(SocketHandle* pSocketHandle, CPerIOData* pPerIOData);
    	bool PostRecv(SocketHandle* pSocketHandle, CPerIOData* pPerIOData);
    	bool DoRecv(SocketHandle* pSocketHandle, CPerIOData* pPerIOData);
    
    protected:
    	static bool m_bIsRunning;
    	CMutex		m_ClientMutex; 		
    	CMutex		m_StartMutex;
    
    private:
    	
    	int m_nPort;					//监听端口
    	int m_nThreadNum;				//工作线程数目
    	int m_nSocketNum;				//设置每次能够监听的最大的客户端数量
    	int m_nCheckThreadRunTimeOut;	//检测是否无响应的超时时间
    
    	HANDLE*					m_pWorkerThreads;		//工作线程
    	HANDLE					m_pCheckClientThread;	//检测客户端是否在设置的时间内无响应的线程
    	HANDLE					m_hIOCompletionPort;	//IOCP完成端口
    	HANDLE					m_hShutdownEvent;		//程序结束的信号
    	SocketHandle*			m_pSocketHandle;		//定义的监听socket的管理
    	vector<SocketHandle*>	m_VClientSocket;		//管理已经连接的HTTP请求的客户端
    
    	//定义的IOCP的函数指针
    	LPFN_ACCEPTEX				m_pFnAcceptThread;
    	LPFN_GETACCEPTEXSOCKADDRS	m_pFnGetAcceptExSocketAddr;
    };
    
    bool CHttpSerVer::Start()
    {
    	//为了保证在有多线程调用时,只能启动一个监听线程
    	CSingleLock lock(&m_StartMutex);
    	if(!lock.Lock(1000))
    		return;
    	m_bIsRunning = true;	
    	
    	
    	m_nPort = atol(readconfig("监听端口", "80"));
    	m_nSocketNum = atol(readconfig("最大连接数", "1000"));		//设置同时能够连接的最大的客户端数
    
    	m_pSocketHandle = new SocketHandle();	
    	m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);	//退出事件信号
    	
    	/**创建完成端口**/
    	m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    	
    	/**根据系统的性能创建工作线程, 完成端口的最大性能的工作线程数为CPU核心数的2倍**/
    	SYSTEM_INFO sysTemInfo;
    	GetSystemInfo(&sysTemInfo);
    	m_nThreadNum = sysTemInfo.dwNumberOfProcessors * 2;
    	m_pWorkerThreads = new HANDLE[m_nThreadNum];
    	DWORD nThreadID;
    	for(int nIndex = 0; nIndex < m_nThreadNum; nIndex++)
    	{		
    		m_pWorkerThreads[nIndex] = ::CreateThread(0, 0, WorkThread, (void *)this, 0, &nThreadID);
    	}
    	
    	/**创建用于检测客户端连接的线程**/
    	m_pCheckClientThread = ::CreateThread(0, 0, CheckClientThread, (void *)this, 0, &nThreadID);
    	
    	
    	/**创建监听套接字**/
    	m_pSocketHandle->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    	
    	/**将监听套接字和完成端口绑定**/
    	CreateIoCompletionPort((HANDLE)m_pSocketHandle->m_Socket, m_hIOCompletionPort, (DWORD)m_pSocketHandle, 0);
    	
    	struct sockaddr_in SerVerAddrs;
    	memset(&SerVerAddrs, 0, sizeof(SerVerAddrs));
    	SerVerAddrs.sin_addr.S_un.S_addr = INADDR_ANY;
    	SerVerAddrs.sin_family = AF_INET;
    	SerVerAddrs.sin_port = htons(m_nPort);
    	
    	::bind(m_pSocketHandle->m_Socket, (sockaddr*)&SerVerAddrs, sizeof(SerVerAddrs));
    	
    	::listen(m_pSocketHandle->m_Socket, SOMAXCONN);
    	
    	//致此,监听socket已经在监听了
    	
    	/**由于IOCP的一些特殊的原因,我们必须首先加载IOCP的函数才能够使用IOCP**/
    	DWORD dwBytes = 0;
    	GUID guidAcceptEx = WSAID_ACCEPTEX;
    	GUID guidGetAcceptExSocketAddr = WSAID_GETACCEPTEXSOCKADDRS;
    	WSAIoctl(m_pSocketHandle->m_Socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidAcceptEx, sizeof(guidAcceptEx), &m_pFnAcceptThread, sizeof(m_pFnAcceptThread),&dwBytes,NULL,NULL);
    	WSAIoctl(m_pSocketHandle->m_Socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidGetAcceptExSocketAddr, sizeof(guidGetAcceptExSocketAddr), &m_pFnGetAcceptExSocketAddr,sizeof(m_pFnGetAcceptExSocketAddr), &dwBytes, NULL, NULL)
    	
    	/**IOCP最大的好处是我们能够提前准备n个socket供客户端连接,这样可减少每个socket连接时创建socket的内存消耗**/
    	for(int nIndex = 0; nIndex < m_nSocketNum; nIndex++)
    	{
    		CPerIOData* pListenIOData = m_pSocketHandle->GetNewIOData();
    		//创建完成之后将这些socket投递给IOCP的AcceptEx函数等到连接
    		if(PostAcceptEx(pListenIOData) == false)
    		{
    			//投递失败时则要对这些socket做资源回收
    			//do something ...
    		}
    	}	
    }
    

    下面看看怎样提前将1000个套接字提前准备:

    bool CHttpSerVer::PostAcceptEx(CPerIOData* pPerIOData)
    {	
    	ASSERT( INVALID_SOCKET! = pPerIOData->m_AcceptSocket);
    	
    	pPerIOData->m_AcceptSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    	if( INVALID_SOCKET == pPerIOData->m_AcceptSocket)  
    	{  
    		//do something ...
    		return false;
    	}
    
    	//设置这个socket的属性,使其能够重复利用
    	int nReuseAddr = 1;
    	setsockopt(pPerIOData->m_AcceptSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&nReuseAddr, sizeof(nReuseAddr));
    	
    	pPerIOData->ResetIO();	//这一步主要是清理这个重叠I/O中的buffer缓存
    	DWORD dwBytes = 0;  
    	pPerIOData->m_opType = CPerIOData::OP_ACCEPT;  
    	
    	if(FALSE == m_pFnAcceptThread(m_pSocketHandle->m_Socket, pPerIOData->m_AcceptSocket, pPerIOData->m_wsaBuf.buf, 0, sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, &pPerIOData->m_Overlappend))  
    	{  
    		if(WSA_IO_PENDING != WSAGetLastError())  
    		{  
    			//说明投递失败了
    			return false;  
    		}  
    	} 
    	return true;
    }
    

    这样我们的端口监听就已经完成了,并且已经提前准备了1000socket套接字准备客户端连接,这样我们的效率会有很大的提升,接下来最重要的还是工作线程的处理:

    DWORD WINAPI CHttpSerVer::WorkThread(LPVOID pParam)
    {
    	CHttpSerVer* pHttpServer = (CHttpSerVer*)pParam;
    	SocketHandle*	pSocketHandle = NULL;
    	DWORD			dwIOSize = 0;
    	OVERLAPPED*		pOverlapped =NULL;
    
    	//循环不断的等待退出线程事件的信号,WaitForSingleObject函数的功能是在等待信号的触发,如果在设定的超时时间内指定的信号有触发,则立即返回,如果在超时时间内内有触发,则等待时间超时后立即返回,具体的函数解释可以看下Windows的API。
    	while (WAIT_OBJECT_0 != WaitForSingleObject(pHttpServer->m_hShutdownEvent, 0))	
    	{	
    		/**获取IOCP的状态**/
    		BOOL bRet = GetQueuedCompletionStatus(pHttpServer->m_hIOCompletionPort, &dwIOSize, (PULONG_PTR)&pSocketHandle, &pOverlapped, WSA_INFINITE);
    		
    		if ( EXIT_CODE == (DWORD)pSocketHandle)
    		{
    			break;		//如果收到的是退出请求,则停止循环,退出完成端口的监听
    		}
    		if(bRet == FALSE)	//返回false,则说明接收失败了,处理这个错误后继续循环
    		{
    			DWORD dwError = WSAGetLastError();
    			pHttpServer->HandleError(pSocketHandle, dwError);
    			continue;	
    		}
    		else
    		{
    			/**下面这行代码是将完成端口接收到的数据读出来**/
    			CPerIOData* pPerIOData = CONTAINING_RECORD(pOverlapped, CPerIOData, m_Overlappend);
    			if((0 == dwIOSize) && CPerIOData::OP_ACCEPT != pPerIOData->m_opType)  
    			{  
    				//满足该条件的时,说明完成端口接收到的是客户端退出的请求,需要将维护的客户端列表里面的该套接字做资源回收
    				pHttpServer->RemoveSocketHandle(pSocketHandle);	
    				continue;  
    			}
    			else
    			{
    				pSocketHandle->m_nTimer = time(0);	//每次有新数据来时,更新活跃时间
    				switch(pPerIOData->m_opType)
    				{
    					case CPerIOData::OP_ACCEPT:
    					{
    						pHttpServer->DoAcceptEx(pPerIOData);	//说明有新的客户端连接
    						break;
    					}
    					case CPerIOData::OP_SEND:
    					{
    						//do something ...
    						//send
    						//recv	//数据发送完后投递recv,这样就能保证下一次数据过来时能够直接读取到
    						break;
    					}
    					case CPerIOData::OP_RECV:
    					{
    						//do something ...
    						break;
    					}	
    					default:
    					{
    						break;
    					}
    				}
    			}
    		}
    	}
    	return 0;
    }
    

    接下来我们看下有新客户端连接时怎么处理:

    bool CHttpSerVer::DoAcceptEx(CPerIOData* pPerIOData)
    {
    	CString strTemp;
    	SOCKADDR_IN *addrClient = NULL;
    	SOCKADDR_IN	*addrLocal = NULL;
    	int nClientLen = sizeof(SOCKADDR_IN);
    	int nLocalLen = sizeof(SOCKADDR_IN);
    
    	//该函数能够将客户端、本地的相关信息获取到,同时也能够接收到客户端第一次发送的数据
    	//第二个参数设置为0, 则直接返回
    	//如果第二个参数设置为 pPerIOData->m_wsaBuf.buf - ((sizeof(SOCKADDR_IN) + 16) * 2), 则会在等到接收到客户端第一次发送的数据后才能返回
    	this->m_pFnGetAcceptExSocketAddr(pPerIOData->m_wsaBuf.buf,
    		0,
    		sizeof(SOCKADDR_IN) + 16,
    		sizeof(SOCKADDR_IN) + 16,
    		(LPSOCKADDR*)&addrLocal,
    		&nLocalLen,
    		(LPSOCKADDR*)&addrClient,
    		&nClientLen);
    	
    	//将监听的socket的数据复制出来,然后把这个socket继续投递到下一个监听
    	SocketHandle* pSocketHandle = new SocketHandle();
     	pSocketHandle->m_Socket = pPerIOData->m_AcceptSocket;
    	
    	int nReuseAddr = 1;
    	setsockopt(pSocketHandle->m_Socket, SOL_SOCKET, SO_REUSEADDR, (char*)&nReuseAddr, sizeof(nReuseAddr));
    
    	if(!AssociateWithIOCP(pSocketHandle))
    	{
    		DELETE_PT(pSocketHandle);
    		return false;
    	}
    	
    	//对复制之后的socket投递recv,因为刚建立连接时没有数据传过来,需要投递给IOCP去接收数据
    	CPerIOData* pConIOData = pSocketHandle->GetNewIOData();
    	if(!PostRecv(pSocketHandle, pConIOData))
    	{
    		//投递失败,则需要更新客户端列表
    		//do something ...
    		return false;
    	}
    	
    	//为了方便管理,将这个客户端的信息添加到列表
    	this->AddToClientList(pSocketHandle);
    
    	//然后将listenSocket的I/O数据清理一下,准备投递到下一次客户端的连接,这样做的好处是,我们一直会有1000个socket准备接收新的客户端连接
    	pPerIOData->ResetIO();	
    	PostAcceptEx(pPerIOData);
    
    	return true;
    }
    

    下面看下客户端的检测是怎么实现的,为什么需要检测呢,是因为http请求是短连接,如果发现客户端列表里面有超过3秒钟时间没有活跃的客户端,我们默认这个客户端已经被关闭了,只是在关闭的过程中由于一些原因,导致没有关闭完成,因此,我们给IOCP投递一个recv事件,让IOCP去关闭这个链接。

    DWORD WINAPI CHttpSerVer::CheckClientThread(LPVOID pParam)
    {
    	CHttpSerVer* pHttpServer = (CHttpSerVer*)pParam;
    	//设置该线程1s执行一次,如果m_hShutdownEvent有信号,则立即结束线程循环 pHttpServer->m_nCheckThreadRunTimeOut = 1000
    	while (WAIT_OBJECT_0 != WaitForSingleObject(pHttpServer->m_hShutdownEvent, pHttpServer->m_nCheckThreadRunTimeOut))
    	{
    		pHttpServer->CheckClientList();
    	}
    	return 0;
    }
    
    void CHttpSerVer::CheckClientList()
    {
    	CSingleLock lock(&m_ClientMutex);
    	if(!lock.Lock(1000))
    		return;
    	if(m_VClientSocket.empty())
    		return;
    	vector<SocketHandle*>::iterator iter = m_VClientSocket.begin();
    	for(; iter != m_VClientSocket.end(); iter++)
    	{
    		if((time(0) - (*iter)->m_nTimer) > 3)
    		{
    			//如果某一个socket的活跃时间已经超过了我们设置的超时时间,则将该socket投递给IOCP,投递类型是RECV,这样做的好处是,我们并没有直接去关闭这些socket,而是让IOCP去释放
    			CPerIOData* pPerIOData = (*iter)->GetNewIOData();
    			pPerIOData->m_opType = CPerIOData::OP_RECV;
    			PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)(*iter), (LPOVERLAPPED)pPerIOData);
    		}
    	}
    	return;
    }
    

    下面是有客户端数据传过来时的操作,接收数据,然后投递一个异步的发送事件

    bool CHttpSerVer::DoRecv(SocketHandle* pSocketHandle, CPerIOData* pPerIOData)
    {
    	if(pSocketHandle == NULL || pPerIOData == NULL)
    	{
    		return false;
    	}
    	char szResponse[70] = {0};
    
    	//该函数执行,说明已经读到数据了,我们按照自己的要求去解析数据,如果发现接收到的数据为空,则将这个socket投递给IOCP,投递类型为RECV,目的是为了关闭这个socket
    	if(!ParseRequest(pPerIOData->m_wsaBuf.buf, szResponse))
    	{
    		pPerIOData->m_opType = CPerIOData::OP_RECV;
    		PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)pSocketHandle, (LPOVERLAPPED)pPerIOData);
    		return false;
    	}
    
    	//解析完数据后,投递一个WSASend,给客户端一个回应
    	pPerIOData->m_wsaBuf.buf = szResponse;
    	pPerIOData->m_opType = CPerIOData::OP_SEND;
    	pPerIOData->m_wsaBuf.len = strlen(szResponse) + 1;
    	this->PostSend(pSocketHandle, pPerIOData);
    	return true;
    }
    
    void CHttpSerVer::PostSend(SocketHandle* pSocketHandle, CPerIOData* pPerIOData)
    {
    	DWORD dwFlags = 0;
    	DWORD dwIOSize = 0;
    	pPerIOData->ResetIO();
    	pPerIOData->m_opType = CPerIOData::OP_SEND;
    	if(SOCKET_ERROR == WSASend(pSocketHandle->m_Socket, &pPerIOData->m_wsaBuf, 1, &dwIOSize, dwFlags, &pPerIOData->m_Overlappend, NULL))
    	{
    		//说明投递WASSend失败了
    		//do something ...
    	}
    }
    

    下面看下数据的解析,只实现了POST数据的解析:

    bool CHttpSerVer::ParseRequest(const char* szRequest, char* szResponse)
    {
    	char szResponseHeader[70] = {0};
    	CString strRequest(szRequest);
    
    	CString strMethod = strRequest.Left(4);
    	if(strMethod != "POST")
    	{
    		sprintf(szResponseHeader, "HTTP/1.0 404 ERROR\r\nContent-Length: 0\r\nConnection:close\r\n\r\n");
    		strcpy(szResponse, szResponseHeader);
    		return false;
    	}
    	int nConLenStart = strRequest.Find("Content-Length: ");
    	int nConLenEnd = strRequest.Find("\r\n", nConLenStart);
    	int nConLenPos = nConLenStart + strlen("Content-Length: ");
    	
    	CString strDataLen = strRequest.Mid(nConLenPos, nConLenEnd - nConLenPos);
    	
    	long nDataLen = atoi(strDataLen);
    	
    	int nDataPos = strRequest.Find("\r\n\r\n") + strlen("\r\n\r\n");
    	CString strData = strRequest.Mid(nDataPos, nDataLen);
    	if(strData.IsEmpty())
    	{
    		sprintf(szResponseHeader, "HTTP/1.0 200 OK\r\nContent-Length: 9\r\nConnection:close\r\n\r\nmsg empty");
    		strcpy(szResponse, szResponseHeader);
    		return true;
    	}
    	//下面是接收到的数据调用自己的业务函数,该业务函数目前没有实现
    	if(this is you function (strData.GetBuffer(0), nDataLen))
    	{
    		sprintf(szResponseHeader, "HTTP/1.0 200 OK\r\nContent-Length: 0\r\nConnection:close\r\n\r\n"); 
    		strcpy(szResponse, szResponseHeader);
    	}
    	else
    	{
    		sprintf(szResponseHeader, "HTTP/1.0 404 ERROR\r\nContent-Length: 0\r\nConnection:close\r\n\r\n");
    		strcpy(szResponse, szResponseHeader);
    	}
    	return true;
    }
    

    在做这个模型的是中间遇到了一些问题,主要是服务端产生大量的CLOSE_WAIT状态,这样会极大的影响性能,或者在这些socket没有在完全关闭的情况下,会造成系统的崩溃。下面我们分析下产生这个问题的原因,看下下面的这张图基本就能够明白这种状态是怎么产生的。在这里插入图片描述
    产生这个问题主要是因为客户端关闭连接,服务器被关闭连接,而服务端并没有发送FIN包给客户端(可能是有一些send或者recv的操作没有完成),这时候服务端就会处于CLOS_WAIT状态,而客户端处于FIN_WAIT_1状态。

    问题产生的原因就是代码本身的问题:需要仔细检查代码,该关闭socket的时候就要主动去关闭,因此在本模块中才会有检测不活跃的socket线程出现。

    在这个状态下时:系统会在等待时间超过2MSL(报文最大生存时间)后自动回收,但系统的设置这个时间一般是30min,因此,需要修改系统时间:

    Linux系统下:
    	vim /etc/sysctl.conf
    
    	net.ipv4.tcp_tw_reuse=1
    	net.ipv4.tcp_tw_recycle=0
    	net.ipv4.tcp_fin_timeout=30
    	net.ipv4.tcp_max_tw_buckets=80000
    	net.ipv4.tcp_timestamps=1
    	net.ipv4.ip_local_port_range=10000 65535
    	
    	修改完后执行命令 sysctl -p 使其生效。
    
    window系统下:
    	在HKEY_LOCAL_MACHINE/SYSTEM/CurrentControlSet/Services/Tcpip/Parameters,添加名为TcpTimedWaitDelay的
    	DWORD键,设置为30,以缩短TIME_WAIT的等待时间 
    

    同时在做这个时候出现了一个很奇怪的现象,在Windows下面资源管理器中可以发现,每当有一个HTTP请求结束后,程序的句柄数会增加2,只增不减,这样的后果是一定量的HTTP请求后,系统可用的句柄数会消耗完,程序将无法正常工作,后来使用windbg调试,发现在代码中使用了一个无用的变量指针,这个指针是继承的,并且每一个HTTP请求都会new一个这个指针,调用结束后句柄数增加2,将这个指针采用全局对象的方式,所有HTTP请求共用同一个对象,句柄增加的问题解决了。

    展开全文
  • windows socket IOCP讲解和代码 打包了pdf和代码和visual studio 工程
  • windows完成端口IOCP SOCKET TCP服务器 源代码.zip
  • Rust-IOCP是用Rust编写的Windows输入/输出完成端口(IOCP)库。 此板条箱仅在Windows中编译-在其他操作系统中,板条箱为空。 安装 从git仓库: [dependencies.iocp] git = " ...
  • 最完整的IOCP编程例子,从控件封装、通讯协议、上传下载、SQL查询到日志查看,都有完整实现。程序架构实现了:可纵向调整程序性能、控制协议开关、日志配置等一序列实现。
  • windows IOCP高并发版

    2018-04-27 11:44:28
    windows IOCP高并发版 很不错的资源哟ss不错的资源哟ss
  • windows IOCP

    2012-02-23 09:42:25
    windows IOCP的一个封装,介绍大体的封装框架,Packet的处理还有逻辑bug
  • IOCP原理windows下实现

    2018-12-29 10:25:22
    此包为windows下高并发高性能网络库实现原理阐述,可以做为有封装需求的参考
  • IOCP在客服端的实现

    2019-04-11 23:09:00
    在Piggy的客户端和服务端的基础上 进行了增量开发,服务的增加了 WSASend,客户端使用了IOCP模式,利用ConnectEx函数和完成端口,实现简单的连接管理
  • windows IOCP完成端口服务器开发+普通socket客户端 源代码.zip
  • windows IOCP文档

    2015-02-01 19:34:02
    IOCP文档,基本详细介绍了IOCP机制
  • 使用VS2015 unicode重新编译,新增了服务端回传数据的功能,压测客户端增加了实际压测的功能,并统计显示QPS
  • 集成windows iocp到libevent
  • Windows高效通信模型之IOCP

    万次阅读 2017-05-01 01:35:58
    今晚复习计网去了...然后思考了一下怎么解决今天下午那份代码,客户端被子线程阻塞的问题。就像之前说的,那份代码是“One-thread-...于是上网找了一下相关的信息,发现Windows下有一种号称性能最好的通信模型,叫做IOC

    今晚复习计网去了...然后思考了一下怎么解决今天下午那份代码,客户端被子线程阻塞的问题。就像之前说的,那份代码是“One-thread-per-client“的模型,对每个是客户端的连接请求,都要临时创建一个socket来处理,这样就造成了系统开销比较大的问题,而且线程之间的互相阻塞也是影响效率的重要原因。


    于是上网找了一下相关的信息,发现Windows下有一种号称性能最好的通信模型,叫做IOCP,中文名称叫做完成端口模型。一开始听到这个名字也是云里雾里,为啥端口会和高效处理高并发任务请求扯上关系,难道是端口还有什么不为人知的秘密吗?


    后来看了这份教程 http://www.cnblogs.com/lancidie/archive/2011/12/19/2293773.html 里面讲得比较快详细了,这里总结一下。


    先拿目前的模型来开刀,每个线程处理一个客户端的连接请求,首先是没有办法处理大量的连接请求,比如我现在有10万个连接请求,服务器端这边就要创建对应的socket,显然这个开销是无法接受的。再者,这样的操作会导致CPU不断地进行上下文切换,这就会导致CPU的负载非常之大,显然也是不现实的。我们姑且称这种方式是”阻塞通信+多线程“,缺点是开销太大,对CPU不友好,阻塞操作太多,会导致用户体验很差。


    那么我们就需要一种异步处理网络操作的模型,它的核心作用是在异步处理网络操作,使得CPU在网络操作执行的时候可以去执行别的任务,直到网络操作完成,再去获取处理后的数据。这里就用到了IOCP模型。


    具体的介绍再上面贴的教程连链接里面已经写得很详细了,说白了,IOCP的核心思想跟操作系统里面引入的中断有点类似(这个比喻可能不恰当,但是在整个学习过程中我对IOCP的效果一直有一种既视感)。在执行I/O操作时,CPU不需要忙等待,一直轮询设备是否需要执行I/O操作,而是采用中断的机制,只有当产生I/O操作的时候才通知CPU去处理,其他时间CPU可以执行别的业务。


    而IOCP的核心就在于维护一个公共的消息队列。首先创建2*CPU核心数的线程,作为Worker,初始的时候它们是空闲,被挂起的。然后再开辟一个单独的线程,来监听连接请求,每当有连接请求,就accept,并将网络操作放入消息队列中。此时,Worker线程排队从队列中取出这些操作请求并处理。注意,整个过程中主线程是非常空闲的,完全有能力去执行其他的任务。也就是说,现在我们有一条主线程,若干条Worker线程,还有一条监听线程。主线程把网络请求扔给Worker线程去做,自己就可以同步做别的事情,而不用被阻塞挂起,等待外部I/O完成了。而这个公共消息队列,就叫做”完成端口“,因为它是”完成“网络操作(属于外部I/O)之后再通知主线程把数据拿走...大概就是这么个模型。


    实际上Windows的socket库已经提供了完整的完成端口API,我感觉只要学会调用各种API来搭建整个模型就够了,至于深入的实现,可能以后工作有需要的话再去探究。

    看来要找个时间来改进一下整个通信模型了...然而接下来一段时间有得忙了,嘛,不过所谓的成长就是这样吧(真的无比渴望有个大佬带我)...


    最后附上通过阻塞式的accept操作来实现IOCP模型的流程图吧,感觉已经写得非常清晰了:

    展开全文
  • ACE的Proactor框架在Windows底层下是采用IOCP来实现的,这里采用IOCP模仿实现了ACE的Proactor框架,对于学习和研究ACE的Proactor框架很有帮助.
  • windows iocp网络通讯库封装
  • windows 系统线程池和完成端口的使用,样例等技术文章,还算详细

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 9,856
精华内容 3,942
关键字:

iocpwindows