-
C++ IOCP windows服务器
2021-01-20 03:33:57首先,启动主线程,接收来自客户端的请求。... IOCP服务器实现 #pragma once #include #include #include <Windows> #include #include #include CThreadPool.h #include WorkA.h #include Wor -
IOCP windows套接字使用完成端口
2020-09-17 16:48:39本文的代码源自《游戏服务端IOCP模型,自己封装的一个类,3行代码搞定服务端》,我改进过了,希望作者不要说我侵权,我声明这段代码是作者的劳动结晶,我只不过是在此基础上进行了些修改和调试。 windows里有如同...本文的代码源自《游戏服务端IOCP模型,自己封装的一个类,3行代码搞定服务端》,我改进过了,希望作者不要说我侵权,我声明这段代码是作者的劳动结晶,我只不过是在此基础上进行了些修改和调试。
windows里有如同Linux中的epoll一般强大的套接字管理功能,即socket编程模型。
我们面对服务器端编程时,往往希望一台主机能同时承接成千上万个客户端连接,只要我们的CPU和内存足够处理业务即可。但对于socket,如果使用select管理,在windows里有最多管理64个套接字的上限,毕竟都是依靠轮询来反馈事件的。如果要管理上百个套接字,我们就需要考虑使用IOCP(完成端口)模型了,见《Windows网络编程》5.2.6 完成端口模型一节的内容。
在经历了2天各种百度学习的情况下,我发现网上对于这个完成端口描述大多都是照本宣科,而且逻辑不完整,同样,书中也有不完整的地方,所以我总结此文,并附带可用的代码供大家参考学习,其中如果有不对的地方,望留言指正!直接上代码,再说明用法,希望理解完成端口逻辑的同学可以看书或百度:
#pragma once #include <WinSock2.h> #include <afxmt.h> #include <afxtempl.h> #define ULONG_PTR ULONG #define PULONG_PTR ULONG* #define BUFFER_SIZE 1024 #define SOCK_TIMEOUT_SECONDS 60 class Iocp; typedef enum { OP_READ = 1, OP_WRITE = 2, OP_ACCEPT = 3, OP_CLOSE = 100, OP_DO_WORK = 101 } SOCKET_STATE; typedef struct { OVERLAPPED oOverlapped; WSABUF wsBuffer; CHAR szBuffer[BUFFER_SIZE]; DWORD dSend; DWORD dRecv; SOCKET_STATE sState; } PER_IO_DATA, *LPPER_IO_DATA; /*传送给处理函数的参数*/ typedef struct { SOCKET sSocket; // 客户端socket描述符 int index; // 序号,用于索引 CHAR key[32]; // ip:port CHAR szClientIP[24]; // 客户端IP字符串 UINT uiClientPort; // 客户端端口 time_t lastReceiveTime; // 最后接收时间 time_t connectedTime; // 创建链接的时间(如果超过这个时间还没有收到有效的ID,那么关闭) LPPER_IO_DATA lpIOData; // 释放内存用 Iocp *pIocp; // ServerScanThread要用 CMutex *lpMutex; } IOCPClient, *LPIOCPClient; typedef struct { int index; // 同IOCPClient的index CMap<CString, LPCTSTR, IOCPClient*, IOCPClient*> sockets; } STRU_MAP_ClientSockets; typedef void (*ReadProc)(LPIOCPClient lpData, LPPER_IO_DATA lpPER_IO_DATA); typedef VOID (*ScanProc)(LPIOCPClient lpClientSocket); class Iocp { public: Iocp(const CHAR *host, UINT port); ~Iocp(void); VOID SetThreadNums(); UINT GetThreadNums(); VOID SetPort(UINT port); UINT GetPort(); BOOL ListenEx(UINT backlog); VOID Close(); VOID Iocp::CreateScanThreads(); static VOID ServerWorkThread(VOID *_this); static VOID ServerScanThread(VOID *s); static VOID FreeClientSocket(Iocp *lpIocp, LPIOCPClient lpClientSocket); static int Send(SOCKET sockfd, const char *buff, const unsigned int size); static VOID SetClientSocketCountText(unsigned int count); static VOID OutPutLog(const char *szFormat, ...); VOID SetReadFunc(VOID *lprFun); VOID SetScanFunc(VOID *lprFun); int m_ThreadNums; // 线程数量,用于将socket分割到多个区域,扫描时每次只扫描一个区域 int m_AcceptClientIndex; // 接受连接的socket的序号,跟m_ThreadNums取余 STRU_MAP_ClientSockets *m_Sockets; // 因为需要根据线程数动态分配内存,所以不能是静态变量 unsigned int m_SocketCount; // 已连接客户端的数量 ReadProc m_ReadFun; // 读数据回调函数 ScanProc m_ScanFun; // 扫描socket回调函数 HANDLE m_cpHandle; // IO完成端口句柄 // 扩展的接受连接,放在线程里了 static VOID AcceptEx(VOID *_this); // 监听套接字,即服务端套接字 SOCKET m_ListenSocketID; };
#include "stdafx.h" #include "Iocp.h" #include <stdlib.h> #include <process.h> #include "resource.h" #pragma comment(lib, "ws2_32.lib") extern void DoRxTxWork(LPIOCPClient lpClientSocket); Iocp::Iocp(const CHAR *host, UINT port): m_ListenSocketID(INVALID_SOCKET), m_AcceptClientIndex(0) { SetClientSocketCountText((m_SocketCount = 0)); WSADATA wsaData; DWORD dwRet = WSAStartup( 0x0202, &wsaData ); if (0 != dwRet ) { WSACleanup(); throw 1; } SOCKADDR_IN sockAddr; memset( &sockAddr, 0, sizeof(SOCKADDR_IN) ) ; sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = inet_addr(host); sockAddr.sin_port = htons(port); /*创建监听套接字*/ m_ListenSocketID = WSASocket( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED ); if ( m_ListenSocketID == INVALID_SOCKET ) { throw 1; } /*设置套接字选项*/ CHAR opt = 1; BOOL ret = setsockopt( m_ListenSocketID , SOL_SOCKET , SO_REUSEADDR , (const CHAR * )&opt , sizeof(opt) ); if ( ret != 0 ) { throw 1 ; } /*绑定套接字*/ if (SOCKET_ERROR == bind(m_ListenSocketID, (const struct sockaddr *)&sockAddr, sizeof(struct sockaddr))) { throw 1 ; } /*创建完成端口*/ m_cpHandle = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 ); if ( m_cpHandle == NULL ) { throw 1 ; } SYSTEM_INFO mySysInfo; GetSystemInfo( &mySysInfo ); m_ThreadNums = (int)mySysInfo.dwNumberOfProcessors * 2; //m_ThreadNums = 1; m_Sockets = new STRU_MAP_ClientSockets[m_ThreadNums]; for ( int i = 0; i < m_ThreadNums; i++ ) { m_Sockets[i].index = i; _beginthread(Iocp::ServerWorkThread, 0, (VOID *)this); } TRACE("工作线程准备完成(%d个)\n", m_ThreadNums); OutPutLog("工作线程准备完成(%d个)\n", m_ThreadNums); } Iocp::~Iocp(void) { WSACleanup(); } VOID Iocp::AcceptEx(VOID *_this) { SOCKET acSocket; DWORD dwRecvBytes; Iocp * pIocp = (Iocp *)_this; SOCKADDR_IN sAddr; INT uiClientSize = sizeof(sAddr); TRACE("服务器已就绪, 套接字=%u ...\n", pIocp->m_ListenSocketID); OutPutLog("服务器已就绪, 套接字=%u ...\n", pIocp->m_ListenSocketID); while (TRUE) { acSocket = WSAAccept( pIocp->m_ListenSocketID, (SOCKADDR *)&sAddr, &uiClientSize, NULL, 0 ); if ( acSocket == SOCKET_ERROR ) { TRACE("接受连接发生错误: %d\n", WSAGetLastError()); return; } LPIOCPClient lpClientSocket = (LPIOCPClient)malloc(sizeof(IOCPClient)); if ( NULL == lpClientSocket ) { TRACE("Error while malloc lpClientSocket\n"); return; } memset(lpClientSocket, 0, sizeof(IOCPClient)); /*这里停止监听会有问题*/ LPPER_IO_DATA lpIOData = (LPPER_IO_DATA )malloc(sizeof(PER_IO_DATA)); if ( lpIOData == NULL ) { TRACE("Error while malloc lpIOData\n"); return; } memset(lpIOData, 0, sizeof(PER_IO_DATA)); lpClientSocket->connectedTime = lpClientSocket->lastReceiveTime = time(NULL); lpClientSocket->lpIOData = lpIOData; // 释放内存用 lpClientSocket->sSocket = acSocket; lpClientSocket->pIocp = pIocp; strcpy(lpClientSocket->szClientIP, inet_ntoa(sAddr.sin_addr)); lpClientSocket->uiClientPort = sAddr.sin_port; _snprintf(lpClientSocket->key, sizeof lpClientSocket->key, "%s:%d", lpClientSocket->szClientIP, lpClientSocket->uiClientPort); lpClientSocket->lpMutex = new CMutex(FALSE, lpClientSocket->key); if (CreateIoCompletionPort( (HANDLE)acSocket, pIocp->m_cpHandle, (ULONG_PTR)lpClientSocket, 0 ) == NULL) { TRACE("Error while CreateIoCompletionPort\n"); return; } TRACE("客户端已连接:%s:%u\n", lpClientSocket->szClientIP, lpClientSocket->uiClientPort); OutPutLog("客户端已连接:%s:%u\n", lpClientSocket->szClientIP, lpClientSocket->uiClientPort); // 投递线程事件 lpIOData->dSend = 0; lpIOData->dRecv = 0; lpIOData->wsBuffer.len = BUFFER_SIZE - 1; lpIOData->wsBuffer.buf = lpIOData->szBuffer; lpIOData->sState = OP_READ; DWORD flags = 0; if (WSARecv(acSocket, &(lpIOData->wsBuffer), 1, &dwRecvBytes, &flags, &(lpIOData->oOverlapped), NULL) == SOCKET_ERROR) { if (WSAGetLastError() != ERROR_IO_PENDING ) { TRACE("Error ERROR_IO_PENDING\n"); return; } else { // 客户端按接受连接的顺序依次放入4个线程进行扫描处理 pIocp->m_AcceptClientIndex = (pIocp->m_AcceptClientIndex + 1) % pIocp->m_ThreadNums; lpClientSocket->index = pIocp->m_AcceptClientIndex; pIocp->m_Sockets[lpClientSocket->index].sockets[lpClientSocket->key] = lpClientSocket; SetClientSocketCountText(++pIocp->m_SocketCount); TRACE("客户端异步读取已完成,等待读取数据...\n"); OutPutLog("客户端异步读取已完成,等待读取数据...\n"); } } } } BOOL Iocp::ListenEx(UINT backlog) { if (SOCKET_ERROR == listen(m_ListenSocketID, backlog)) { return FALSE; } /*创建监听线程*/ if (-1 == _beginthread(Iocp::AcceptEx, 0, (VOID *)this)) { return FALSE; } return TRUE; } VOID Iocp:: ServerWorkThread( VOID * _this ) { Iocp * lpIocp = (Iocp *)_this; HANDLE hPlePort = (HANDLE)lpIocp->m_cpHandle; DWORD dwBytes; LPIOCPClient lpClientSocket = NULL; LPPER_IO_DATA lpIOData = NULL; LPOVERLAPPED lpOverlapped = NULL; DWORD sendBytes = 0; DWORD recvBytes = 0; DWORD dwFlag = 0; while (TRUE) { if (0 == GetQueuedCompletionStatus( hPlePort, &dwBytes, (PULONG_PTR)&lpClientSocket, &lpOverlapped, INFINITE )) { FreeClientSocket(lpIocp, lpClientSocket); continue ; } lpIOData = (LPPER_IO_DATA)CONTAINING_RECORD(lpOverlapped, PER_IO_DATA, oOverlapped); if (0 == dwBytes && (lpIOData->sState == OP_READ || lpIOData->sState == OP_WRITE)) { TRACE("客户端断开了连接:%s\n", lpClientSocket->key); OutPutLog("客户端断开了连接:%s\n", lpClientSocket->key); closesocket(lpClientSocket->sSocket); FreeClientSocket(lpIocp, lpClientSocket); continue; } switch (lpIOData->sState) { case OP_READ: lpIOData->dRecv = dwBytes; lpClientSocket->lastReceiveTime = time(NULL); lpIocp->m_ReadFun(lpClientSocket, lpIOData); lpIOData->dRecv = 0; ZeroMemory( &(lpIOData->oOverlapped), sizeof( OVERLAPPED ) ); lpIOData->wsBuffer.len = BUFFER_SIZE - 1; lpIOData->wsBuffer.buf = lpIOData->szBuffer; lpIOData->sState = OP_READ; if ( WSARecv( lpClientSocket->sSocket, &(lpIOData->wsBuffer), 1, &recvBytes, &dwFlag, &(lpIOData->oOverlapped), NULL ) == SOCKET_ERROR ) { if ( WSAGetLastError() != ERROR_IO_PENDING ) { return; } } break; case OP_WRITE: // 什么也不用做 break; case OP_DO_WORK: lpIocp->m_ScanFun(lpClientSocket); break; case OP_CLOSE: TRACE("主动断开长期无响应的客户端:%s\n", lpClientSocket->key); OutPutLog("主动断开长期无响应的客户端:%s\n", lpClientSocket->key); // 这里不能直接释放内存,因为还会触发一次GetQueuedCompletionStatus返回0,在返回0时释放内存 closesocket(lpClientSocket->sSocket); break; default: break; } } } VOID Iocp::FreeClientSocket(Iocp *lpIocp, LPIOCPClient lpClientSocket) { if (NULL == lpIocp || NULL == lpClientSocket) { return; } lpIocp->m_Sockets[lpClientSocket->index].sockets.RemoveKey(lpClientSocket->key); SetClientSocketCountText(--lpIocp->m_SocketCount); free(lpClientSocket->lpIOData); free(lpClientSocket); TRACE("内存已经释放!\n"); } VOID Iocp::SetReadFunc(VOID *lprFun) { m_ReadFun = (ReadProc)lprFun; } VOID Iocp::SetScanFunc(VOID *lprFun) { m_ScanFun = (ScanProc)lprFun; CreateScanThreads(); } VOID Iocp::CreateScanThreads() { STRU_MAP_ClientSockets *sock; for (int i = 0; i < m_ThreadNums; i++) { sock = &m_Sockets[i]; _beginthread(Iocp::ServerScanThread, 0, (VOID *)sock); } } VOID Iocp::ServerScanThread(VOID *s) { static PER_IO_DATA IOData; POSITION pos; CString key; IOCPClient *lpClientSocket; STRU_MAP_ClientSockets *mapSock = (STRU_MAP_ClientSockets*)s; int index = mapSock->index; int doCount = 0; CMap<CString, LPCTSTR, IOCPClient*, IOCPClient*> *serverSockets = &mapSock->sockets; while (1) { Sleep(5000); //OutPutLog("序号[%d]定时器开始处理...", index); doCount = 0; pos = serverSockets->GetStartPosition(); while (pos) { doCount++; serverSockets->GetNextAssoc(pos, key, lpClientSocket); memset(&IOData, 0, sizeof(PER_IO_DATA)); IOData.sState = OP_DO_WORK; PostQueuedCompletionStatus(lpClientSocket->pIocp->m_cpHandle, 0, (ULONG_PTR)lpClientSocket, &IOData.oOverlapped); } //OutPutLog("序号[%d]定时器处理了%d个客户端", index, doCount); } } void Iocp::SetClientSocketCountText(unsigned int count) { CString countStr; countStr.Format("客户端数量: %u", count); CWnd *pWnd = AfxGetMainWnd(); HWND hHwnd = pWnd->m_hWnd; ::SetDlgItemText(hHwnd, IDC_CLIENT_COUNT, countStr); } void Iocp::OutPutLog(const char *szFormat, ...) { static char szLogBuffer[1024]; SYSTEMTIME curTime; GetLocalTime(&curTime); CString strTime; strTime.Format(_T("[%04d-%02d-%02d %02d:%02d:%02d] "), curTime.wYear,curTime.wMonth,curTime.wDay, curTime.wHour,curTime.wMinute,curTime.wSecond); strTime += szFormat; va_list pArgList; va_start(pArgList, szFormat); int len = _vsntprintf(szLogBuffer, sizeof szLogBuffer-2, strTime, pArgList); va_end(pArgList); if (szLogBuffer[len-1] == '\n') { if (szLogBuffer[len-2] != '\r') { szLogBuffer[len-1] = '\r'; szLogBuffer[len] = '\n'; szLogBuffer[len+1] = '\0'; } } else { szLogBuffer[len] = '\r'; szLogBuffer[len+1] = '\n'; szLogBuffer[len+2] = '\0'; } CWnd *pWnd = AfxGetMainWnd(); CEdit *pEdit = (CEdit*)pWnd->GetDlgItem(IDC_OUTLOG_EDIT); if (NULL == pEdit) return; int iTextLen = pEdit->GetWindowTextLength(); pEdit->SetRedraw(FALSE); pEdit->SetReadOnly(FALSE); pEdit->SetSel(iTextLen, iTextLen, TRUE); pEdit->ReplaceSel(szLogBuffer); // 这个函数还是在光标的位置书写 int lineCount = pEdit->GetLineCount(); // m_prlog是绑定CEDIT控件的对象 if(lineCount > 100) // 如果输出日志行太多,则删第一行 { pEdit->GetWindowText(szLogBuffer,1024 - 1);//只取前100个字符 CString tmp(szLogBuffer); int it1 = tmp.Find("\r\n") + 2; // 查找第一行的回车换行位置 pEdit->SetSel(0, it1); // 选择要删除的首行 pEdit->ReplaceSel(""); // 用空串替换掉首行 } pEdit->LineScroll(lineCount); //可用于水平滚动所有行最后一个字符,这只是设置edit进行滚动 pEdit->SetReadOnly(TRUE); pEdit->SetRedraw(TRUE); } int Iocp::Send(SOCKET sockfd, const char *buff, const unsigned int size) { static PER_IO_DATA PerIOData; memset(&PerIOData, 0, sizeof(PER_IO_DATA)); PerIOData.sState = OP_WRITE; PerIOData.wsBuffer.len = size; PerIOData.wsBuffer.buf = (char *)buff; DWORD byteSend = 0; int ErrorCode; int result = WSASend(sockfd, &PerIOData.wsBuffer, 1, &byteSend, 0, &PerIOData.oOverlapped, NULL); if (SOCKET_ERROR == result && ERROR_IO_PENDING != (ErrorCode = WSAGetLastError())) { TRACE("发送数据出错,错误码: %d\n", ErrorCode); } else { TRACE("成功发送数据: %d字节,返回值:%d\n", byteSend, result); } return result; }
// 回调1:客户端的发送的数据会在这个函数通知 void OnRead(LPIOCPClient lpClientSocket, LPPER_IO_DATA lpIOData) { if (NULL == lpClientSocket || NULL == lpIOData) { return; } int RxCount = (int) lpIOData->dRecv; char *RxBuff = lpIOData->szBuffer; RxBuff[RxCount] = '\0'; // 务必保证接收时留1个字节余量给这个结尾的0 Iocp::OutPutLog("%s:%d: %s\n", lpClientSocket->szClientIP, lpClientSocket->uiClientPort, RxBuff); Iocp::Send(lpClientSocket->sSocket, RxBuff, RxCount); } // 回调2:扫描套接字,目的是关闭闲置套接字,或定时发送心跳包(业务逻辑上要求对方回答) // 其实这个函数可以直接关闭套接字,只不过通过单独的CLOSE通知会对业务处理更灵活和方便 // 如果你在业务中体会不到,可以直接调用closesocket即可。 VOID OnScan(LPIOCPClient lpClientSocket) { static PER_IO_DATA IOData; if (NULL == lpClientSocket) { return; } if (time(NULL) - lpClientSocket->lastReceiveTime > SOCK_TIMEOUT_SECONDS) { memset(&IOData, 0, sizeof(PER_IO_DATA)); IOData.sState = OP_CLOSE; PostQueuedCompletionStatus(lpClientSocket->pIocp->m_cpHandle, 0, (ULONG_PTR)lpClientSocket, &IOData.oOverlapped); } } // 调用的位置,在MFC项目里 void CIOCPSocketDlg::OnRunServer() { static Iocp *g_IocpServer = NULL; if (NULL == g_IocpServer) { g_IocpServer = new Iocp("0.0.0.0", 8888); g_IocpServer->SetReadFunc(OnRead); // 回调1,读取套接字发来的内容 g_IocpServer->SetScanFunc(OnScan); // 回调2,定期扫描套接字,可能是业务逻辑要求发心跳包,这个步骤可以免去 g_IocpServer->ListenEx(10); } }
代码在MFC项目里,所以还有设置窗体内容的逻辑,大家修改成自己的即可。
补充代码逻辑(书上没讲到的):如果要主动关闭套接字,直接调用closesocket函数即可,因为调用此函数会导致GetQueuedCompletionStatus函数返回0,在返回0的逻辑里释放两个malloc的变量即可。而如果是客户端断开了,GetQueuedCompletionStatus返回的不是0,但满足0 == dwBytes并且是读状态,在这里则除了调用closesocket以外,还要释放malloc的变量(书上讲到了)。
另外:对于同一个套接字,应该不会同时在多个线程里出发读取完成操作,但是很可能在多个线程里出发读取和扫描通知(OP_DO_WORK),所以在业务中,如有必要,要考虑给每一个客户端一个mutex,加锁处理。
使用时,只要实现read,scan两个方法,如果确定不要scan(无法处理从网络中消失的客户端,比如客户端突然死机或断网,或服务器断网一段时间,在此期间客户端主动断开了),那么就不挂载scan函数即可,看SetScanFunc的实现里有新建线程的操作哦:VOID Iocp::SetScanFunc(VOID *lprFun) { m_ScanFun = (ScanProc)lprFun; CreateScanThreads(); }
-
IOCP in Windows
2021-01-09 06:47:42<div><p>One drawback of glib is that it does not support IOCP in Windows. I hope you will consider about implementing it. </p><p>该提问来源于开源项目:tboox/tbox</p></div> -
python iocp_Windows之IOCP
2021-02-09 03:27:42IOCP是一个异步I/O的Windows API,它可以高效地将I/O事件通知给应用程序,类似于Linux中的Epoll,关于epoll可以参考1. 简介IOCP模型属于一种通讯模型,适用于Windows平台下高负载服务器的一个技术。在处理大量用户...IOCP全称I/O Completion Port,中文译为I/O完成端口。IOCP是一个异步I/O的Windows API,它可以高效地将I/O事件通知给应用程序,类似于Linux中的Epoll,关于epoll可以参考
1. 简介
IOCP模型属于一种通讯模型,适用于Windows平台下高负载服务器的一个技术。在处理大量用户并发请求时,如果采用一个用户一个线程的方式那将造成CPU在这成千上万的线程间进行切换,后果是不可想象的。而IOCP完成端口模型则完全不会如此处理,它的理论是并行的线程数量必须有一个上限-也就是说同时发出500个客户请求,不应该允许出现500个可运行的线程。目前来说,IOCP完成端口是Windows下性能最好的I/O模型,同时它也是最复杂的内核对象。它避免了大量用户并发时原有模型采用的方式,极大的提高了程序的并行处理能力。
2. 原理图
一共包括三部分:完成端口(存放重叠的I/O请求),客户端请求的处理,等待者线程队列(一定数量的工作者线程,一般采用CPU*2个)
完成端口中所谓的[端口]并不是我们在TCP/IP中所提到的端口,可以说是完全没有关系。它其实就是一个通知队列,由操作系统把已经完成的重叠I/O请求的通知放入其中。当某项I/O操作一旦完成,某个可以对该操作结果进行处理的工作者线程就会收到一则通知。
通常情况下,我们会在创建一定数量的工作者线程来处理这些通知,也就是线程池的方法。线程数量取决于应用程序的特定需要。理想的情况是,线程数量等于处理器的数量,不过这也要求任何线程都不应该执行诸如同步读写、等待事件通知等阻塞型的操作,以免线程阻塞。每个线程都将分到一定的CPU时间,在此期间该线程可以运行,然后另一个线程将分到一个时间片并开始执行。如果某个线程执行了阻塞型的操作,操作系统将剥夺其未使用的剩余时间片并让其它线程开始执行。也就是说,前一个线程没有充分使用其时间片,当发生这样的情况时,应用程序应该准备其它线程来充分利用这些时间片。
3. IOCP优点
基于IOCP的开发是异步IO的,决定了IOCP所实现的服务器的高吞吐量。
通过引入IOCP,会大大减少Thread切换带来的额外开销,最小化的线程上下文切换,减少线程切换带来的巨大开销,让CPU把大量的事件用于线程的运行。当与该完成端口相关联的可运行线程的总数目达到了该并发量,系统就会阻塞,
4. IOCP应用
4.1 创建和关联完成端口
//功能:创建完成端口和关联完成端口
HANDLE WINAPI CreateIoCompletionPort(* __in HANDLE FileHandle, //已经打开的文件句柄或者空句柄,一般是客户端的句柄
* __in HANDLE ExistingCompletionPort, //已经存在的IOCP句柄
* __in ULONG_PTR CompletionKey, //完成键,包含了指定I/O完成包的指定文件
* __in DWORD NumberOfConcurrentThreads //真正并发同时执行最大线程数,一般推介是CPU核心数*2
* );
//创建完成端口句柄
HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
4.2 与socket进行关联
typedef struct{
SOCKET socket;//客户端socket
SOCKADDR_STORAGE ClientAddr;//客户端地址
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
//与socket进行关联
CreateIoCompletionPort((HANDLE)(PerHandleData -> socket),
completionPort, (DWORD)PerHandleData, 0);
4.3 获取队列完成状态
//功能:获取队列完成状态/*返回值:
调用成功,则返回非零数值,相关数据存于lpNumberOfBytes、lpCompletionKey、
lpoverlapped变量中。失败则返回零值。*/BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort,//完成端口句柄
LPDWORD lpNumberOfBytes, //一次I/O操作所传送的字节数
PULONG_PTR lpCompletionKey, //当文件I/O操作完成后,用于存放与之关联的CK
LPOVERLAPPED *lpOverlapped, //IOCP特定的结构体
DWORD dwMilliseconds); //调用者的等待时间/*
4.4 用于IOCP的特点函数
//用于IOCP的特定函数
typedef struct_OVERLAPPEDPLUS{
OVERLAPPED ol;//一个固定的用于处理网络消息事件返回值的结构体变量
SOCKET s, sclient; int OpCode; //用来区分本次消息的操作类型(在完成端口的操作里面,
是以消息通知系统,读数据/写数据,都是要发这样的
消息结构体过去的)
WSABUF wbuf; //读写缓冲区结构体变量
DWORD dwBytes, dwFlags; //一些在读写时用到的标志性变量
}OVERLAPPEDPLUS;
4.5 投递一个队列完成状态
//功能:投递一个队列完成状态
BOOL PostQueuedCompletionStatus(
HANDLE CompletlonPort,//指定想向其发送一个完成数据包的完成端口对象
DW0RD dwNumberOfBytesTrlansferred, //指定—个值,直接传递给GetQueuedCompletionStatus
函数中对应的参数
DWORD dwCompletlonKey, //指定—个值,直接传递给GetQueuedCompletionStatus函数中对应的参数
LPOVERLAPPED lpoverlapped, ); //指定—个值,直接传递给GetQueuedCompletionStatus
函数中对应的参数
5. 示例
#include #include#include#include
using namespacestd;#pragma comment(lib,"ws2_32.lib")
#pragma comment(lib,"kernel32.lib")HANDLE g_hIOCP;enumIO_OPERATION{IO_READ,IO_WRITE};structIO_DATA{
OVERLAPPED Overlapped;
WSABUF wsabuf;intnBytes;
IO_OPERATION opCode;
SOCKET client;
};char buffer[1024];
DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) {
IO_DATA*lpIOContext =NULL;
DWORD nBytes= 0;
DWORD dwFlags= 0;int nRet = 0;
DWORD dwIoSize= 0;void * lpCompletionKey =NULL;
LPOVERLAPPED lpOverlapped=NULL;while(1){
GetQueuedCompletionStatus(g_hIOCP,&dwIoSize,(LPDWORD)&lpCompletionKey,(LPOVERLAPPED *)&lpOverlapped, INFINITE);
lpIOContext= (IO_DATA *)lpOverlapped;if(dwIoSize == 0)
{
cout<< "Client disconnect" <
closesocket(lpIOContext->client);deletelpIOContext;continue;
}if(lpIOContext->opCode == IO_READ) //a read operation complete
{
ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
lpIOContext->wsabuf.buf =buffer;
lpIOContext->wsabuf.len = strlen(buffer)+1;
lpIOContext->opCode =IO_WRITE;
lpIOContext->nBytes = strlen(buffer)+1;
dwFlags= 0;
nBytes= strlen(buffer)+1;
nRet=WSASend(
lpIOContext->client,&lpIOContext->wsabuf, 1, &nBytes,
dwFlags,&(lpIOContext->Overlapped), NULL);if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING !=WSAGetLastError()) ) {
cout<< "WASSend Failed::Reason Code::"<< WSAGetLastError() <
closesocket(lpIOContext->client);deletelpIOContext;continue;
}
memset(buffer, NULL,sizeof(buffer));
}else if(lpIOContext->opCode == IO_WRITE) //a write operation complete
{//Write operation completed, so post Read operation.
lpIOContext->opCode =IO_READ;
nBytes= 1024;
dwFlags= 0;
lpIOContext->wsabuf.buf =buffer;
lpIOContext->wsabuf.len =nBytes;
lpIOContext->nBytes =nBytes;
ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
nRet=WSARecv(
lpIOContext->client,&lpIOContext->wsabuf, 1, &nBytes,&dwFlags,&lpIOContext->Overlapped, NULL);if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING !=WSAGetLastError()) ) {
cout<< "WASRecv Failed::Reason Code1::"<< WSAGetLastError() <
closesocket(lpIOContext->client);deletelpIOContext;continue;
}
cout<wsabuf.buf<
}
}return 0;
}voidmain ()
{
WSADATA wsaData;
WSAStartup(MAKEWORD(2,2), &wsaData);
SOCKET m_socket= WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED);
sockaddr_in server;
server.sin_family=AF_INET;
server.sin_port= htons(6000);
server.sin_addr.S_un.S_addr=htonl(INADDR_ANY);
bind(m_socket ,(sockaddr*)&server,sizeof(server));
listen(m_socket,8);
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);int g_ThreadCount = sysInfo.dwNumberOfProcessors * 2;
g_hIOCP= CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,g_ThreadCount);//CreateIoCompletionPort((HANDLE)m_socket,g_hIOCP,0,0);
for( int i=0;i < g_ThreadCount; ++i){
HANDLE hThread;
DWORD dwThreadId;
hThread= CreateThread(NULL, 0, WorkerThread, 0, 0, &dwThreadId);
CloseHandle(hThread);
}while(1)
{
SOCKET client=accept( m_socket, NULL, NULL );
cout<< "Client connected." <
cout<< "Binding Client Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() <
closesocket(client);
}else { //post a recv request
IO_DATA * data = newIO_DATA;
memset(buffer, NULL ,1024);
memset(&data->Overlapped, 0 , sizeof(data->Overlapped));
data->opCode =IO_READ;
data->nBytes = 0;
data->wsabuf.buf =buffer;
data->wsabuf.len = sizeof(buffer);
data->client =client;
DWORD nBytes= 1024 ,dwFlags=0;int nRet = WSARecv(client,&data->wsabuf, 1, &nBytes,&dwFlags,&data->Overlapped, NULL);if(nRet == SOCKET_ERROR && (ERROR_IO_PENDING !=WSAGetLastError())){
cout<< "WASRecv Failed::Reason Code::"<< WSAGetLastError() <
closesocket(client);deletedata;
}
cout<wsabuf.buf<
}
}
closesocket(m_socket);
WSACleanup();
}
Server.cpp
#include #include
using namespacestd;#pragma comment(lib,"ws2_32.lib")
voidmain()
{
WSADATA wsaData;
WSAStartup(MAKEWORD(2,2), &wsaData);
sockaddr_in server;
server.sin_family=AF_INET;
server.sin_port= htons(6000);
server.sin_addr.S_un.S_addr= inet_addr("127.0.0.1");
SOCKET client=socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);intflag;
flag= connect(client, (sockaddr*)&server, sizeof(server));if(flag < 0){
cout<
}while(1){
cout<
strcpy(buffer,"hello");
send(client, buffer,1024, 0);
memset(buffer, NULL,sizeof(buffer));
cout<
cout<
cout<
Sleep(10000);
}
closesocket(client);
WSACleanup();
}
client.cpp
-
windows IOCP
2012-02-23 09:42:25windows IOCP的一个封装,介绍大体的封装框架,Packet的处理还有逻辑bug -
Windows IOCP
2015-06-12 14:26:25Windows IOCP IOCP全称I/O Completion Port,中文译为I/O完成端口。IOCP是一个异步I/O的Windows API,它可以高效地将I/O事件通知给应用程序,类似于Linux中的Epoll。 简介 IOCP模型属于一种通讯模型,适用于...Windows IOCP
IOCP全称I/O Completion Port,中文译为I/O完成端口。IOCP是一个异步I/O的Windows API,它可以高效地将I/O事件通知给应用程序,类似于Linux中的Epoll。
简介
IOCP模型属于一种通讯模型,适用于Windows平台下高负载服务器的一个技术。在处理大量用户并发请求时,如果采用一个用户一个线程的方式那将造成CPU在这成千上万的线程间进行切换,后果是不可想象的。而IOCP完成端口模型则完全不会如此处理,它的理论是并行的线程数量必须有一个上限-也就是说同时发出500个客户请求,不应该允许出现500个可运行的线程。目前来说,IOCP完成端口是Windows下性能最好的I/O模型,同时它也是最复杂的内核对象。它避免了大量用户并发时原有模型采用的方式,极大的提高了程序的并行处理能力。
原理图
从图中可以看到,一共包括三部分:完成端口(存放重叠的I/O请求),客户端请求的处理,等待者线程队列(一定数量的工作者线程,一般采用CPU*2个)。
完成端口中所谓的[端口]并不是我们在TCP/IP中所提到的端口,可以说是完全没有关系。它其实就是一个通知队列,由操作系统把已经完成的重叠I/O请求的通知放入其中。当某项I/O操作一旦完成,某个可以对该操作结果进行处理的工作者线程就会收到一则通知。
通常情况下,我们会在创建一定数量的工作者线程来处理这些通知,也就是线程池的方法。线程数量取决于应用程序的特定需要。理想的情况是,线程数量等于处理器的数量,不过这也要求任何线程都不应该执行诸如同步读写、等待事件通知等阻塞型的操作,以免线程阻塞。每个线程都将分到一定的CPU时间,在此期间该线程可以运行,然后另一个线程将分到一个时间片并开始执行。如果某个线程执行了阻塞型的操作,操作系统将剥夺其未使用的剩余时间片并让其它线程开始执行。也就是说,前一个线程没有充分使用其时间片,当发生这样的情况时,应用程序应该准备其它线程来充分利用这些时间片。
IOCP的优点
基于IOCP的开发是异步IO的,决定了IOCP所实现的服务器的高吞吐量。
完成端口的线程并发量可以在创建该完成端口时指定,从而限制了与该完成端口相关联的可运行线程的数目。
通过引入IOCP,会大大减少Thread切换带来的额外开销,最小化的线程上下文切换,减少线程切换带来的巨大开销,让CPU把大量的事件用于线程的运行。当与该完成端口相关联的可运行线程的总数目达到了该并发量,系统就会阻塞任何与该完成端口相关联的后续线程的执行,直到与该完成端口相关联的可运行线程数目下降到小于该并发量为止。
Select是先查询再发起IO请求,IOCP是先发起IO请求再接收通知。但是Select方式在处理大量非活动连接时是比较低效的,因为每次Select需要对所有的Socket状态进行查询,而对非活动的Socket查询是没有意义的浪费,另外由于Socket句柄不能设置用户私有数据,当查询返回Socket句柄时还需要一个额外的查询来找到关联的用户对象,这两点是Select低效的关键。
IOCP的具体实现步骤
TCP IOCP实现具体步骤:
- 创建好 IOCP
- 创建 Socket ( socket 可以是由 Accept 得到)
- 将 Socket 关联到 IOCP
- socket 向 IOCP 提交各种所需请求
- IOCP 操作完成之后将结果返回给 socket
- 重复步骤 3 和 4 ,直到 socket 关闭
IOCP中用到单个函数,分为用于创建关联完成端口、获取完成状态和投递完成状态,函数原型:
//功能:创建完成端口和关联完成端口 HANDLE WINAPI CreateIoCompletionPort( * __in HANDLE FileHandle, // 已经打开的文件句柄或者空句柄,一般是客户端的句柄 * __in HANDLE ExistingCompletionPort, // 已经存在的IOCP句柄 * __in ULONG_PTR CompletionKey, // 完成键,包含了指定I/O完成包的指定文件 * __in DWORD NumberOfConcurrentThreads // 真正并发同时执行最大线程数,一般推介是CPU核心数*2 * ); //例子 //创建完成端口句柄 HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); typedef struct{ SOCKET socket;//客户端socket SOCKADDR_STORAGE ClientAddr;//客户端地址 }PER_HANDLE_DATA, *LPPER_HANDLE_DATA; //与socket进行关联 CreateIoCompletionPort((HANDLE)(PerHandleData -> socket), completionPort, (DWORD)PerHandleData, 0);
//功能:获取队列完成状态 BOOL GetQueuedCompletionStatus( HANDLE CompletionPort, //完成端口句柄 LPDWORD lpNumberOfBytes, //一次I/O操作所传送的字节数 PULONG_PTR lpCompletionKey, //当文件I/O操作完成后,用于存放与之关联的CK LPOVERLAPPED *lpOverlapped, //IOCP特定的结构体 DWORD dwMilliseconds); //调用者的等待时间 /* 返回值: 调用成功,则返回非零数值,相关数据存于lpNumberOfBytes、lpCompletionKey、lpoverlapped变量中。失败则返回零值。 */ //用于IOCP的特定函数 typedef struct _OVERLAPPEDPLUS{ OVERLAPPED ol; //一个固定的用于处理网络消息事件返回值的结构体变量 SOCKET s, sclient; int OpCode; //用来区分本次消息的操作类型(在完成端口的操作里面,是以消息通知系统,读数据/写数据,都是要发这样的消息结构体过去的) WSABUF wbuf; //读写缓冲区结构体变量 DWORD dwBytes, dwFlags; //一些在读写时用到的标志性变量 }OVERLAPPEDPLUS;
//功能:投递一个队列完成状态 BOOL PostQueuedCompletionStatus( HANDLE CompletlonPort, //指定想向其发送一个完成数据包的完成端口对象 DW0RD dwNumberOfBytesTrlansferred, //指定—个值,直接传递给GetQueuedCompletionStatus函数中对应的参数 DWORD dwCompletlonKey, //指定—个值,直接传递给GetQueuedCompletionStatus函数中对应的参数 LPOVERLAPPED lpoverlapped, ); //指定—个值,直接传递给GetQueuedCompletionStatus函数中对应的参数
#include <winsock2.h> #include <windows.h> #include <string> #include <iostream> using namespace std; #pragma comment(lib,"ws2_32.lib") #pragma comment(lib,"kernel32.lib") HANDLE g_hIOCP; enum IO_OPERATION{IO_READ,IO_WRITE}; struct IO_DATA{ OVERLAPPED Overlapped; WSABUF wsabuf; int nBytes; IO_OPERATION opCode; SOCKET client; }; char buffer[1024]; DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) { IO_DATA *lpIOContext = NULL; DWORD nBytes = 0; DWORD dwFlags = 0; int nRet = 0; DWORD dwIoSize = 0; void * lpCompletionKey = NULL; LPOVERLAPPED lpOverlapped = NULL; while(1){ GetQueuedCompletionStatus(g_hIOCP, &dwIoSize,(LPDWORD)&lpCompletionKey,(LPOVERLAPPED *)&lpOverlapped, INFINITE); lpIOContext = (IO_DATA *)lpOverlapped; if(dwIoSize == 0) { cout << "Client disconnect" << endl; closesocket(lpIOContext->client); delete lpIOContext; continue; } if(lpIOContext->opCode == IO_READ) // a read operation complete { ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped)); lpIOContext->wsabuf.buf = buffer; lpIOContext->wsabuf.len = strlen(buffer)+1; lpIOContext->opCode = IO_WRITE; lpIOContext->nBytes = strlen(buffer)+1; dwFlags = 0; nBytes = strlen(buffer)+1; nRet = WSASend( lpIOContext->client, &lpIOContext->wsabuf, 1, &nBytes, dwFlags, &(lpIOContext->Overlapped), NULL); if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { cout << "WASSend Failed::Reason Code::"<< WSAGetLastError() << endl; closesocket(lpIOContext->client); delete lpIOContext; continue; } memset(buffer, NULL, sizeof(buffer)); } else if(lpIOContext->opCode == IO_WRITE) //a write operation complete { // Write operation completed, so post Read operation. lpIOContext->opCode = IO_READ; nBytes = 1024; dwFlags = 0; lpIOContext->wsabuf.buf = buffer; lpIOContext->wsabuf.len = nBytes; lpIOContext->nBytes = nBytes; ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped)); nRet = WSARecv( lpIOContext->client, &lpIOContext->wsabuf, 1, &nBytes, &dwFlags, &lpIOContext->Overlapped, NULL); if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { cout << "WASRecv Failed::Reason Code1::"<< WSAGetLastError() << endl; closesocket(lpIOContext->client); delete lpIOContext; continue; } cout<<lpIOContext->wsabuf.buf<<endl; } } return 0; } void main () { WSADATA wsaData; WSAStartup(MAKEWORD(2,2), &wsaData); SOCKET m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED); sockaddr_in server; server.sin_family = AF_INET; server.sin_port = htons(6000); server.sin_addr.S_un.S_addr = htonl(INADDR_ANY); bind(m_socket ,(sockaddr*)&server,sizeof(server)); listen(m_socket, 8); SYSTEM_INFO sysInfo; GetSystemInfo(&sysInfo); int g_ThreadCount = sysInfo.dwNumberOfProcessors * 2; g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,g_ThreadCount); //CreateIoCompletionPort((HANDLE)m_socket,g_hIOCP,0,0); for( int i=0;i < g_ThreadCount; ++i){ HANDLE hThread; DWORD dwThreadId; hThread = CreateThread(NULL, 0, WorkerThread, 0, 0, &dwThreadId); CloseHandle(hThread); } while(1) { SOCKET client = accept( m_socket, NULL, NULL ); cout << "Client connected." << endl; if (CreateIoCompletionPort((HANDLE)client, g_hIOCP, 0, 0) == NULL){ cout << "Binding Client Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl; closesocket(client); } else { //post a recv request IO_DATA * data = new IO_DATA; memset(buffer, NULL ,1024); memset(&data->Overlapped, 0 , sizeof(data->Overlapped)); data->opCode = IO_READ; data->nBytes = 0; data->wsabuf.buf = buffer; data->wsabuf.len = sizeof(buffer); data->client = client; DWORD nBytes= 1024 ,dwFlags=0; int nRet = WSARecv(client,&data->wsabuf, 1, &nBytes, &dwFlags, &data->Overlapped, NULL); if(nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())){ cout << "WASRecv Failed::Reason Code::"<< WSAGetLastError() << endl; closesocket(client); delete data; } cout<<data->wsabuf.buf<<endl; } } closesocket(m_socket); WSACleanup(); }
#include <iostream> #include <WinSock2.h> using namespace std; #pragma comment(lib,"ws2_32.lib") void main() { WSADATA wsaData; WSAStartup(MAKEWORD(2,2), &wsaData); sockaddr_in server; server.sin_family = AF_INET; server.sin_port = htons(6000); server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); SOCKET client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); int flag; flag = connect(client, (sockaddr*)&server, sizeof(server)); if(flag < 0){ cout<<"error!"<<endl; return; } while(1){ cout<<"sent hello!!!!"<<endl; char buffer[1024]; strcpy(buffer,"hello"); send(client, buffer, 1024, 0); memset(buffer, NULL, sizeof(buffer)); cout<<"recv: "<<endl; int rev = recv(client, buffer, 1024, 0); if(rev == 0) cout<<"recv nothing!"<<endl; cout<<buffer<<endl; Sleep(10000); } closesocket(client); WSACleanup(); }
参考
http://www.cnblogs.com/lidabo/archive/2012/12/10/2812230.html
http://www.codeproject.com/KB/IP/iocp-multicast-udp.aspx
http://blog.csdn.net/zhongguoren666/article/details/7386592
http://www.baike.com/wiki/%E5%AE%8C%E6%88%90%E7%AB%AF%E5%8F%A3%E6%A8%A1%E5%9E%8B
http://blog.csdn.net/neicole/article/details/7549497
转载
-
windows iocp
2019-01-30 15:52:32// test.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" // winsock 2 的头文件和库 #include <winsock2.h> #include <MSWSock.h&...windows.h&g...// test.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"
// winsock 2 的头文件和库
#include <winsock2.h>
#include <MSWSock.h>
#include <iostream>
#include <list>
#include <windows.h>using namespace std;
#pragma comment(lib,"ws2_32.lib")
enum IOOperType{
TYPE_RECV, //数据接收事件
TYPE_SEND, //数据发送事件
};struct IO_DATA
{
OVERLAPPED overLapped;
WSABUF wsabuf;
char buf[1024];
IOOperType operType;
SOCKET client;void Init(IOOperType type,const SOCKET& sck,int len = 0)
{
memset(&overLapped, 0, sizeof(overLapped));
operType = type;
wsabuf.buf = buf;
memset(buf, 0, sizeof(buf));
if (0 == len)
{
wsabuf.len = sizeof(buf);
}
else
{
wsabuf.len = len;
}
client = sck;
}
};list<IO_DATA*> g_listUnUseData;
list<IO_DATA*> g_listData;
CRITICAL_SECTION g_cs;
long g_nCount = 0;IO_DATA* GetOverlapped()
{
EnterCriticalSection(&g_cs);
IO_DATA* pTmp = NULL;
if (g_listUnUseData.size()>0)
{
pTmp = g_listUnUseData.back();
g_listUnUseData.pop_back();
}
else
{
pTmp = new(IO_DATA);
g_listData.push_front(pTmp);
}
LeaveCriticalSection(&g_cs);
return pTmp;
}void RecycleOverlapped(IO_DATA* pData)
{
EnterCriticalSection(&g_cs);
g_listUnUseData.push_front(pData);
LeaveCriticalSection(&g_cs);
}DWORD _stdcall WorkerThread(LPVOID lParam)
{
HANDLE* pHandle = (HANDLE*)lParam;
IO_DATA* lpIOContext = NULL;
DWORD nBytes = 0;
DWORD dwFlags = 0;
int nRet = 0;DWORD dwIoSize = 0;
void * lpCompletionKey = NULL;
LPOVERLAPPED lpOverlapped = NULL;
while (1)
{
GetQueuedCompletionStatus(*pHandle, &dwIoSize, (LPDWORD)&lpCompletionKey, &lpOverlapped, INFINITE);
lpIOContext = (IO_DATA*)lpOverlapped;
if (dwIoSize == 0)
{
cout << "Client disconnect" << endl;
closesocket(lpIOContext->client);
RecycleOverlapped(lpIOContext);
continue;
}switch (lpIOContext->operType)
{
case TYPE_RECV:
{
InterlockedIncrement(&g_nCount);
if (0 == g_nCount % 100000)
cout << g_nCount << endl;lpIOContext->Init(TYPE_RECV, lpIOContext->client);
DWORD nBytes = 1024, dwFlags = 0;
int nRet = WSARecv(lpIOContext->client, &lpIOContext->wsabuf, 1, &nBytes, &dwFlags, &lpIOContext->overLapped, NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())){
cout << "WASRecv Failed::Reason Code::" << WSAGetLastError() << endl;
closesocket(lpIOContext->client);
RecycleOverlapped(lpIOContext);
}IO_DATA* pData = GetOverlapped();
pData->Init(TYPE_SEND, lpIOContext->client);
sprintf_s(pData->buf, "hello %s", lpIOContext->buf);
DWORD SendBytes = strlen(pData->buf);
pData->wsabuf.len = SendBytes;nRet = WSASend(pData->client, &pData->wsabuf, 1,
&SendBytes, 0, &pData->overLapped, NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())){
cout << "WSASend Failed::Reason Code::" << WSAGetLastError() << endl;
closesocket(lpIOContext->client);
RecycleOverlapped(lpIOContext);
}
}
break;
case TYPE_SEND:
{
RecycleOverlapped(lpIOContext);
}
break;
default:
break;
}
}
return 0;
}int _tmain(int argc, _TCHAR* argv[])
{
InitializeCriticalSection(&g_cs);
//listen socket需要将accept操作投递到完成端口,因此,listen socket属性必须有重叠IO
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);SOCKET hSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (hSocket == INVALID_SOCKET)
{
cout << "WSASocket create socket error" << WSAGetLastError() << endl;
return false;
}
int nMaxConcurrency = 8;
HANDLE hHandleIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nMaxConcurrency);
//创建并设置IOCP并发线程数量
if (INVALID_HANDLE_VALUE == hHandleIocp)
{
cout << "IOCP create error,error code " << WSAGetLastError() << endl;
return false;
}sockaddr_in service;
service.sin_family = AF_INET;
service.sin_port = htons(6666);
service.sin_addr.s_addr = inet_addr("127.0.0.1");
if (bind(hSocket, (sockaddr*)&service, sizeof(service)) == SOCKET_ERROR)
{
cout << "bind() error,error code " << WSAGetLastError() << endl;
return false;
}
cout << "bind ok!" << endl;if (listen(hSocket, SOMAXCONN) == SOCKET_ERROR)
{
cout << "listen() error,error code " << WSAGetLastError() << endl;
return false;
}
cout << "listen ok!" << endl;
HANDLE* hHandle = new HANDLE[nMaxConcurrency];
for (int i = 0; i < nMaxConcurrency; ++i)
{
DWORD dwThreadID = 0;
hHandle[i] = CreateThread(NULL, 0, WorkerThread, &hHandleIocp, 0, &dwThreadID);
}while (1)
{
sockaddr* addr = NULL;
int* addrLen = 0;
SOCKET client = accept(hSocket, addr, NULL);
if (CreateIoCompletionPort((HANDLE)client, hHandleIocp, 0, 0) == NULL)
{
cout << "Binding client socket to completionPort fail" << GetLastError() << endl;
closesocket(client);
continue;
}IO_DATA* data = GetOverlapped();
data->Init(TYPE_RECV, client);DWORD nBytes = 1024, dwFlags = 0;
int nRet = WSARecv(client, &data->wsabuf, 1, &nBytes, &dwFlags, &data->overLapped, NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())){
cout << "WASRecv Failed::Reason Code::" << WSAGetLastError() << endl;
closesocket(client);RecycleOverlapped(data);
}
}closesocket(hSocket);
WSACleanup();
return 0;
}// client.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"
#include <iostream>
#include <WinSock2.h>
using namespace std;#pragma comment(lib,"ws2_32.lib")
DWORD _stdcall WorkThread(LPVOID)
{
sockaddr_in server;
server.sin_family = AF_INET;
server.sin_port = htons(6666);
server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");const int num = 200;
SOCKET client[num];for (int i = 0; i < num; ++i)
{
client[i] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
int flag;
flag = connect(client[i], (sockaddr*)&server, sizeof(server));
if (flag < 0){
cout << "error!" << endl;
return 0;
}
}
cout << "connect success" << endl;
while (1){
int nPos = rand() % 100;
char buffer[1024] = "JFKL;AJKFD;LASFKJDSA;KFDS";send(client[nPos], buffer, 1024, 0);
memset(buffer, NULL, sizeof(buffer));int rev = recv(client[nPos], buffer, 1024, 0);
if (rev == 0)
{
continue;
}}
for (int i = 0; i < num; ++i)
{
closesocket(client[i]);
}
return 0;
}int _tmain(int argc, _TCHAR* argv[])
{
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);DWORD dwThreadID;
HANDLE handle[5];
for (int i = 0; i < 5; ++i)
{
handle[i] = CreateThread(NULL, 0, WorkThread, NULL, 0, &dwThreadID);
}
WaitForMultipleObjects(5, handle, TRUE, -1);
WSACleanup();
return 0;
} -
windows IOCP文档
2015-02-01 19:34:02IOCP文档,基本详细介绍了IOCP机制 -
use IOCP in windows
2021-01-09 00:25:09<div><p>https://docs.python.org/3/library/asyncio-policy.html#asyncio.WindowsProactorEventLoopPolicy</p><p>该提问来源于开源项目:encode/uvicorn</p></div> -
IOCP原理windows下实现
2018-12-29 10:25:22此包为windows下高并发高性能网络库实现原理阐述,可以做为有封装需求的参考 -
Windows之IOCP
2018-07-03 15:05:00IOCP是一个异步I/O的Windows API,它可以高效地将I/O事件通知给应用程序,类似于Linux中的Epoll,关于epoll可以参考linux之epoll 1. 简介 IOCP模型属于一种通讯模型,适用于Windows平台下高负载服务器的一个技术... -
windows IOCP示例代码
2017-12-07 09:08:05使用Windows提供的IOCP技术,以及系统线程池,实现Windows服务器端对大并发的长连接的支持。 -
windows IOCP封装
2012-02-22 17:32:46一个IOCP封装,未经测试,有兴趣的同学看下整体结构就行了。 一些逻辑上的bug待修正 -
windows IOCP模型
2017-02-21 19:51:35在老师分配任务(“尝试利用IOCP模型写出服务端和客户端的代码”)给我时,脑子一片空白,并不知道什么是IOCP模型,会不会是像软件设计模式里面的工厂模式,装饰模式之类的那些呢?嘿嘿,不过好像是一个挺好玩的... -
Windows IOCP框架
2010-08-17 23:10:25这是一个IOCP框架,易扩展、易复用、易维护。 代码已有所变更,详情请看: http://blog.csdn.net/chenyu2202863/archive/2010/08/17/5818920.aspx -
IOCP reactor Windows/Twisted backpressure issue
2020-11-30 02:08:14<div><p>This is an <strong>upstream (Twisted) issue</strong> effecting Crossbar.io on Windows when running IOCP reactor, in a backpressure situation. I dump the links here to track the issue on our ... -
windows IOCP高并发版
2018-04-27 11:44:28windows IOCP高并发版 很不错的资源哟ss不错的资源哟ss -
Investigate using job objects and IOCP on Windows
2020-12-09 12:07:27<p>If you create a job object, you can associate an IOCP with it. If you then create processes in that job (generally using <a href="https://msdn.microsoft.com/en-us/library/windows/desktop/ms681949%... -
Windows IOCP是什么
2019-07-13 01:59:55欢迎阅读此篇IOCP教程。我将先给出IOCP的定义然后给出它的实现方法,最后剖析一个Echo程序来为您拨开IOCP的谜云,除去你心中对IOCP的烦恼。OK,但我不能保证你明白IOCP的一切,但我会尽我最大的努力。以下是我会在这... -
Assign an ID for IOCP on windows
2021-01-10 07:31:53error on Windows. <p>I have already refactored my library to use one single <code>EventLoop</code> so that the socket object won't move from one <code>EventLoop</code> to another. But I still got... -
windows—IOCP
2019-04-02 23:56:00#include <windows.h> 6 7 #define BUF_SIZE 128 8 #define READ 3 9 #define WRITE 5 10 void ErrorHandling( char * message); 11 unsigned int WINAPI EchoThreadMain(LPVOID ...