Completion Port(Overlapped) - Win32
PostQueuedCompletionStatus()
GetQueuedCompletionStatus()
WSAGetOverlappedResult()
BindIoCompletionCallback() // Windows 2000
#define MAX_COMPLETION_THREAD_COUNT 32 // Maximum number of completion threads allowed
//
// Function: HandleIo
//
// Description:
// This function handles the IO on a socket. In the event of a receive, the
// completed receive is posted again. For completed accepts, another AcceptEx
// is posted. For completed sends, the buffer is freed.
//
void HandleIo(SOCKET_OBJ *sock, BUFFER_OBJ *buf, HANDLE CompPort, DWORD BytesTransfered, DWORD error)
{
SOCKET_OBJ *clientobj=NULL; // New client object for accepted connections
BUFFER_OBJ *recvobj=NULL, // Used to post new receives on accepted connections
*sendobj=NULL; // Used to post new sends for data received
BOOL bCleanupSocket;
char *tmp;
int i;
if (error != 0)
dbgprint("OP = %d; Error = %d\n", buf->operation, error);
bCleanupSocket = FALSE;
if ((error != NO_ERROR) && (gProtocol == IPPROTO_TCP))
{
// An error occured on a TCP socket, free the associated per I/O buffer
// and see if there are any more outstanding operations. If so we must
// wait until they are complete as well.
//
FreeBufferObj(buf);
if (InterlockedDecrement(&sock->OutstandingOps) == 0)
{
FreeSocketObj(sock);
}
return;
}
EnterCriticalSection(&sock->SockCritSec);
if (buf->operation == OP_ACCEPT)
{
HANDLE hrc;
SOCKADDR_STORAGE *LocalSockaddr=NULL,
*RemoteSockaddr=NULL;
int LocalSockaddrLen,
RemoteSockaddrLen;
// Print the client's addresss
sock->lpfnGetAcceptExSockaddrs(
buf->buf,
buf->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
sizeof(SOCKADDR_STORAGE) + 16,
sizeof(SOCKADDR_STORAGE) + 16,
(SOCKADDR **)&LocalSockaddr,
&LocalSockaddrLen,
(SOCKADDR **)&RemoteSockaddr,
&RemoteSockaddrLen
);
// Get a new SOCKET_OBJ for the client connection
clientobj = GetSocketObj(buf->sclient, sock->af);
// Associate the new connection to our completion port
hrc = CreateIoCompletionPort(
(HANDLE)buf->sclient,
CompPort,
(ULONG_PTR)clientobj,
0
);
if (hrc == NULL)
{
fprintf(stderr, "CompletionThread: CreateIoCompletionPort failed: %d\n",
GetLastError());
return;
}
// Get a BUFFER_OBJ to echo the data received with the accept back to the client
sendobj = GetBufferObj(clientobj, BytesTransfered);
// Copy the buffer to the sending object
memcpy(sendobj->buf, buf->buf, BytesTransfered);
// Post the send
if (PostSend(clientobj, sendobj) == NO_ERROR)
{
// Now post some receives on this new connection
for(i=0; i < gOverlappedCount ;i++)
{
recvobj = GetBufferObj(clientobj, gBufferSize);
if (PostRecv(clientobj, recvobj) != NO_ERROR)
{
// If for some reason the send call fails, clean up the connection
FreeBufferObj(recvobj);
error = SOCKET_ERROR;
break;
}
}
}
else
{
// If for some reason the send call fails, clean up the connection
FreeBufferObj(sendobj);
error = SOCKET_ERROR;
}
// Re-post the AcceptEx
PostAccept(sock, buf);
if (error != NO_ERROR)
{
if (clientobj->OutstandingOps == 0)
{
closesocket(clientobj->s);
clientobj->s = INVALID_SOCKET;
FreeSocketObj(clientobj);
}
else
{
clientobj->bClosing = TRUE;
}
error = NO_ERROR;
}
}
else if ((buf->operation == OP_READ) && (error == NO_ERROR))
{
//
// Receive completed successfully
//
if ((BytesTransfered > 0) || (gProtocol == IPPROTO_UDP))
{
// Create a buffer to send
sendobj = GetBufferObj(sock, gBufferSize);
if (gProtocol == IPPROTO_UDP)
{
memcpy(&sendobj->addr, &buf->addr, buf->addrlen);
}
// Swap the buffers (i.e. buffer we just received becomes the send buffer)
tmp = sendobj->buf;
sendobj->buflen = BytesTransfered;
sendobj->buf = buf->buf;
sendobj->IoOrder = buf->IoOrder;
buf->buf = tmp;
buf->buflen = gBufferSize;
InsertPendingSend(sock, sendobj);
if (DoSends(sock) != NO_ERROR)
{
error = SOCKET_ERROR;
}
else
{
// Post another receive
if (PostRecv(sock, buf) != NO_ERROR)
{
// In the event the recv fails, clean up the connection
FreeBufferObj(buf);
error = SOCKET_ERROR;
}
}
}
else
{
// Got 0 byte receive
// Graceful close - the receive returned 0 bytes read
sock->bClosing = TRUE;
// Free the receive buffer
FreeBufferObj(buf);
if (DoSends(sock) != NO_ERROR)
{
dbgprint("0: cleaning up in zero byte handler\n");
error = SOCKET_ERROR;
}
// If this was the last outstanding operation on socket, clean it up
if ((sock->OutstandingOps == 0) && (sock->OutOfOrderSends == NULL))
{
// cleaning up in zero byte handler
bCleanupSocket = TRUE;
}
}
}
else if ((buf->operation == OP_READ) && (error != NO_ERROR) && (gProtocol == IPPROTO_UDP))
{
// If for UDP, a receive completes with an error, we ignore it and re-post the recv
if (PostRecv(sock, buf) != NO_ERROR)
{
error = SOCKET_ERROR;
}
}
else if (buf->operation == OP_WRITE)
{
FreeBufferObj(buf);
if (DoSends(sock) != NO_ERROR)
{
// Cleaning up inside OP_WRITE handler
error = SOCKET_ERROR;
}
}
if (error != NO_ERROR)
{
sock->bClosing = TRUE;
}
//
// Check to see if socket is closing
//
if ( (InterlockedDecrement(&sock->OutstandingOps) == 0) &&
(sock->bClosing) &&
(sock->OutOfOrderSends == NULL) )
{
bCleanupSocket = TRUE;
}
else
{
if (DoSends(sock) != NO_ERROR)
{
bCleanupSocket = TRUE;
}
}
LeaveCriticalSection(&sock->SockCritSec);
if (bCleanupSocket)
{
closesocket(sock->s);
sock->s = INVALID_SOCKET;
FreeSocketObj(sock);
}
return;
}
//
// Function: CompletionThread
//
// Description:
// This is the completion thread which services our completion port. One of
// these threads is created per processor on the system. The thread sits in
// an infinite loop calling GetQueuedCompletionStatus and handling socket
// IO that completed.
//
DWORD WINAPI CompletionThread(LPVOID lpParam)
{
SOCKET_OBJ *sockobj=NULL; // Per socket object for completed I/O
BUFFER_OBJ *bufobj=NULL; // Per I/O object for completed I/O
OVERLAPPED *lpOverlapped=NULL; // Pointer to overlapped structure for completed I/O
HANDLE CompletionPort; // Completion port handle
DWORD BytesTransfered, // Number of bytes transfered
Flags; // Flags for completed I/O
int rc,
error;
CompletionPort = (HANDLE)lpParam;
while (1)
{
error = NO_ERROR;
rc = GetQueuedCompletionStatus(
CompletionPort,
&BytesTransfered,
(PULONG_PTR)&sockobj,
&lpOverlapped,
INFINITE
);
bufobj = CONTAINING_RECORD(lpOverlapped, BUFFER_OBJ, ol);
if (rc == FALSE)
{
// If the call fails, call WSAGetOverlappedResult to translate the
// error code into a Winsock error code.
dbgprint("CompletionThread: GetQueuedCompletionStatus failed: %d\n",
GetLastError());
rc = WSAGetOverlappedResult(
sockobj->s,
&bufobj->ol,
&BytesTransfered,
FALSE,
&Flags
);
if (rc == FALSE)
{
error = WSAGetLastError();
}
}
// Handle the IO operation
HandleIo(sockobj, bufobj, CompletionPort, BytesTransfered, error);
}
ExitThread(0);
return 0;
}
int __cdecl main(int argc, char **argv)
{
WSADATA wsd;
SOCKET s;
SOCKET_OBJ *sockobj=NULL,
*sptr=NULL,
*tmp=NULL;
ULONG lastprint=0;
int rc;
struct fd_set fdread,
fdwrite,
fdexcept;
struct timeval timeout;
struct addrinfo *res=NULL,
*ptr=NULL;
if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
{
fprintf(stderr, "unable to load Winsock!\n");
return -1;
}
// Create the completion port used by this server
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
if (CompletionPort == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// Find out how many processors are on this system
GetSystemInfo(&sysinfo);
if (sysinfo.dwNumberOfProcessors > MAX_COMPLETION_THREAD_COUNT)
{
sysinfo.dwNumberOfProcessors = MAX_COMPLETION_THREAD_COUNT;
}
// Create the worker threads to service the completion notifications
for(i=0; i < (int)sysinfo.dwNumberOfProcessors ;i++)
{
CompThreads[i] = CreateThread(NULL, 0, CompletionThread, (LPVOID)CompletionPort, 0, NULL);
if (CompThreads[i] == NULL)
{
fprintf(stderr, "CreatThread failed: %d\n", GetLastError());
return -1;
}
}
memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_PASSIVE);
hints.ai_family = AF_UNSPEC; // unspecified
hints.ai_socktype = SOCK_STREAM; // TCP socket type
hints.ai_protocol = IPPROTO_TCP; // TCP protocol
rc = getaddrinfo(
NULL, // local interface to bind to
szPort,
&hints,
&res
);
if (rc != 0)
{
printf("Invalid address %s, getaddrinfo failed: %d\n", addr, rc);
return NULL;
}
// For each local address returned, create a listening/receiving socket
ptr = res;
while (ptr)
{
sockobj = GetSocketObj(INVALID_SOCKET, ptr->ai_family);
// create the socket
sockobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (sockobj->s == INVALID_SOCKET)
{
fprintf(stderr,"socket failed: %d\n", WSAGetLastError());
return -1;
}
// Associate the socket and its SOCKET_OBJ to the completion port
hrc = CreateIoCompletionPort((HANDLE)sockobj->s, CompletionPort, (ULONG_PTR)sockobj, 0);
if (hrc == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// bind the socket to a local address and port
rc = bind(sockobj->s, ptr->ai_addr, ptr->ai_addrlen);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
return -1;
}
if (gProtocol == IPPROTO_TCP)
{
BUFFER_OBJ *acceptobj=NULL;
GUID guidAcceptEx = WSAID_ACCEPTEX,
guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes;
// Need to load the Winsock extension functions from each provider
// -- e.g. AF_INET and AF_INET6.
rc = WSAIoctl(
sockobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
sizeof(guidAcceptEx),
&sockobj->lpfnAcceptEx,
sizeof(sockobj->lpfnAcceptEx),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n",
WSAGetLastError());
return -1;
}
rc = WSAIoctl(
sockobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockaddrs,
sizeof(guidGetAcceptExSockaddrs),
&sockobj->lpfnGetAcceptExSockaddrs,
sizeof(sockobj->lpfnGetAcceptExSockaddrs),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n",
WSAGetLastError());
return -1;
}
// For TCP sockets, we need to "listen" on them
rc = listen(sockobj->s, 100);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
return -1;
}
// Keep track of the pending AcceptEx operations
sockobj->PendingAccepts = (BUFFER_OBJ **)HeapAlloc(
GetProcessHeap(),
HEAP_ZERO_MEMORY,
(sizeof(BUFFER_OBJ *) * gOverlappedCount));
if (sockobj->PendingAccepts == NULL)
{
fprintf(stderr, "HeapAlloc failed: %d\n", GetLastError());
ExitProcess(-1);
}
// Post the AcceptEx(s)
for(i=0; i < gOverlappedCount ;i++)
{
sockobj->PendingAccepts[i] = acceptobj = GetBufferObj(sockobj, gBufferSize);
PostAccept(sockobj, acceptobj);
}
//
// Maintain a list of the listening socket structures
//
if (ListenSockets == NULL)
{
ListenSockets = sockobj;
}
else
{
sockobj->next = ListenSockets;
ListenSockets = sockobj;
}
}
else
{
BUFFER_OBJ *recvobj=NULL;
DWORD bytes;
int optval;
// Turn off UDP errors resulting from ICMP messages (port/host unreachable, etc)
optval = 0;
rc = WSAIoctl(
sockobj->s,
SIO_UDP_CONNRESET,
&optval,
sizeof(optval),
NULL,
0,
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_UDP_CONNRESET failed: %d\n",
WSAGetLastError());
}
// For UDP, simply post some receives
for(i=0; i < gOverlappedCount ;i++)
{
recvobj = GetBufferObj(sockobj, gBufferSize);
PostRecv(sockobj, recvobj);
}
}
endpointcount++;
ptr = ptr->ai_next;
}
// free the addrinfo structure for the 'bind' address
freeaddrinfo(res);
interval = 0;
while (1)
{
rc = WSAWaitForMultipleEvents(
sysinfo.dwNumberOfProcessors,
CompThreads,
TRUE,
5000,
FALSE
);
if (rc == WAIT_FAILED)
{
fprintf(stderr, "WSAWaitForMultipleEvents failed: %d\n", WSAGetLastError());
break;
}
else if (rc == WAIT_TIMEOUT)
{
interval++;
if (interval == 12)
{
SOCKET_OBJ *listenptr=NULL;
int optval,
optlen;
// For TCP, cycle through all the outstanding AcceptEx operations
// to see if any of the client sockets have been connected but
// haven't received any data. If so, close them as they could be
// a denial of service attack.
listenptr = ListenSockets;
while (listenptr)
{
for(i=0; i < gOverlappedCount ;i++)
{
optlen = sizeof(optval);
rc = getsockopt(
listenptr->PendingAccepts[i]->sclient,
SOL_SOCKET,
SO_CONNECT_TIME,
(char *)&optval,
&optlen
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n",
WSAGetLastError());
return -1;
}
// If the socket has been connected for more than 5 minutes,
// close it. If closed, the AcceptEx call will fail in the
// completion thread.
if ((optval != 0xFFFFFFFF) && (optval > 300))
{
closesocket(listenptr->PendingAccepts[i]->sclient);
}
}
listenptr = listenptr->next;
}
interval = 0;
}
}
}
WSACleanup();
return 0;
}
.StartServer()
.LoadClient() // virtual
DtServerSocketClient
HandleConnect
OnDisconnect
OnReceive
OnError
.GetState()
.IsConnected()
.Disconnect()
.InternalInit();
.Init() // virtual
참조 사이트: