CreateIoCompletionPort()
PostQueuedCompletionStatus()
GetQueuedCompletionStatus()
WSAGetOverlappedResult()
BindIoCompletionCallback() // Windows 2000
#define MAX_COMPLETION_THREAD_COUNT 32 // Maximum number of completion threads allowed
//
// Function: HandleIo
//
// Description:
// This function handles the IO on a socket. In the event of a receive, the
// completed receive is posted again. For completed accepts, another AcceptEx
// is posted. For completed sends, the buffer is freed.
//
void HandleIo(SOCKET_OBJ *sock, BUFFER_OBJ *buf, HANDLE CompPort, DWORD BytesTransfered, DWORD error)
{
SOCKET_OBJ *clientobj=NULL; // New client object for accepted connections
BUFFER_OBJ *recvobj=NULL, // Used to post new receives on accepted connections
*sendobj=NULL; // Used to post new sends for data received
BOOL bCleanupSocket;
char *tmp;
int i;
if ((error != NO_ERROR) && (gProtocol == IPPROTO_TCP))
{
// An error occured on a TCP socket, free the associated per I/O buffer
// and see if there are any more outstanding operations. If so we must
// wait until they are complete as well.
//
FreeBufferObj(buf);
if (InterlockedDecrement(&sock->OutstandingOps) == 0)
{
FreeSocketObj(sock);
}
return;
}
EnterCriticalSection(&sock->SockCritSec);
if (buf->operation == OP_ACCEPT)
{
HANDLE hrc;
SOCKADDR_STORAGE *LocalSockaddr=NULL,
*RemoteSockaddr=NULL;
int LocalSockaddrLen,
RemoteSockaddrLen;
// Get a new SOCKET_OBJ for the client connection
clientobj = GetSocketObj(buf->sclient, sock->af);
// Associate the new connection to our completion port
hrc = CreateIoCompletionPort(
(HANDLE)buf->sclient,
CompPort,
(ULONG_PTR)clientobj,
0
);
if (hrc == NULL)
{
fprintf(stderr, "CompletionThread: CreateIoCompletionPort failed: %d\n",
GetLastError());
return;
}
// Get a BUFFER_OBJ to echo the data received with the accept back to the client
sendobj = GetBufferObj(clientobj, BytesTransfered);
// Copy the buffer to the sending object
memcpy(sendobj->buf, buf->buf, BytesTransfered);
// Post the send
if (PostSend(clientobj, sendobj) == NO_ERROR)
{
// Now post some receives on this new connection
for(i=0; i < gOverlappedCount ;i++)
{
recvobj = GetBufferObj(clientobj, gBufferSize);
if (PostRecv(clientobj, recvobj) != NO_ERROR)
{
// If for some reason the send call fails, clean up the connection
FreeBufferObj(recvobj);
error = SOCKET_ERROR;
break;
}
}
}
else
{
// If for some reason the send call fails, clean up the connection
FreeBufferObj(sendobj);
error = SOCKET_ERROR;
}
if (gProtocol == IPPROTO_UDP)
{
memcpy(&sendobj->addr, &buf->addr, buf->addrlen);
}
// Swap the buffers (i.e. buffer we just received becomes the send buffer)
tmp = sendobj->buf;
sendobj->buflen = BytesTransfered;
sendobj->buf = buf->buf;
sendobj->IoOrder = buf->IoOrder;
buf->buf = tmp;
buf->buflen = gBufferSize;
InsertPendingSend(sock, sendobj);
if (DoSends(sock) != NO_ERROR)
{
error = SOCKET_ERROR;
}
else
{
// Post another receive
if (PostRecv(sock, buf) != NO_ERROR)
{
// In the event the recv fails, clean up the connection
FreeBufferObj(buf);
error = SOCKET_ERROR;
}
}
}
else
{
// Got 0 byte receive
// Graceful close - the receive returned 0 bytes read
sock->bClosing = TRUE;
// Free the receive buffer
FreeBufferObj(buf);
if (DoSends(sock) != NO_ERROR)
{
dbgprint("0: cleaning up in zero byte handler\n");
error = SOCKET_ERROR;
}
// If this was the last outstanding operation on socket, clean it up
if ((sock->OutstandingOps == 0) && (sock->OutOfOrderSends == NULL))
{
// cleaning up in zero byte handler
bCleanupSocket = TRUE;
}
}
}
else if ((buf->operation == OP_READ) && (error != NO_ERROR) && (gProtocol == IPPROTO_UDP))
{
// If for UDP, a receive completes with an error, we ignore it and re-post the recv
if (PostRecv(sock, buf) != NO_ERROR)
{
error = SOCKET_ERROR;
}
}
else if (buf->operation == OP_WRITE)
{
FreeBufferObj(buf);
if (DoSends(sock) != NO_ERROR)
{
// Cleaning up inside OP_WRITE handler
error = SOCKET_ERROR;
}
}
if (error != NO_ERROR)
{
sock->bClosing = TRUE;
}
//
// Check to see if socket is closing
//
if ( (InterlockedDecrement(&sock->OutstandingOps) == 0) &&
(sock->bClosing) &&
(sock->OutOfOrderSends == NULL) )
{
bCleanupSocket = TRUE;
}
else
{
if (DoSends(sock) != NO_ERROR)
{
bCleanupSocket = TRUE;
}
}
LeaveCriticalSection(&sock->SockCritSec);
if (bCleanupSocket)
{
closesocket(sock->s);
sock->s = INVALID_SOCKET;
FreeSocketObj(sock);
}
return;
}
//
// Function: CompletionThread
//
// Description:
// This is the completion thread which services our completion port. One of
// these threads is created per processor on the system. The thread sits in
// an infinite loop calling GetQueuedCompletionStatus and handling socket
// IO that completed.
//
DWORD WINAPI CompletionThread(LPVOID lpParam)
{
SOCKET_OBJ *sockobj=NULL; // Per socket object for completed I/O
BUFFER_OBJ *bufobj=NULL; // Per I/O object for completed I/O
OVERLAPPED *lpOverlapped=NULL; // Pointer to overlapped structure for completed I/O
HANDLE CompletionPort; // Completion port handle
DWORD BytesTransfered, // Number of bytes transfered
Flags; // Flags for completed I/O
int rc,
error;
if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
{
fprintf(stderr, "unable to load Winsock!\n");
return -1;
}
// Create the completion port used by this server
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
if (CompletionPort == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// Find out how many processors are on this system
GetSystemInfo(&sysinfo);
if (sysinfo.dwNumberOfProcessors > MAX_COMPLETION_THREAD_COUNT)
{
sysinfo.dwNumberOfProcessors = MAX_COMPLETION_THREAD_COUNT;
}
// Create the worker threads to service the completion notifications
for(i=0; i < (int)sysinfo.dwNumberOfProcessors ;i++)
{
CompThreads[i] = CreateThread(NULL, 0, CompletionThread, (LPVOID)CompletionPort, 0, NULL);
if (CompThreads[i] == NULL)
{
fprintf(stderr, "CreatThread failed: %d\n", GetLastError());
return -1;
}
}
rc = getaddrinfo(
NULL, // local interface to bind to
szPort,
&hints,
&res
);
if (rc != 0)
{
printf("Invalid address %s, getaddrinfo failed: %d\n", addr, rc);
return NULL;
}
// For each local address returned, create a listening/receiving socket
ptr = res;
while (ptr)
{
sockobj = GetSocketObj(INVALID_SOCKET, ptr->ai_family);
// Associate the socket and its SOCKET_OBJ to the completion port
hrc = CreateIoCompletionPort((HANDLE)sockobj->s, CompletionPort, (ULONG_PTR)sockobj, 0);
if (hrc == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// bind the socket to a local address and port
rc = bind(sockobj->s, ptr->ai_addr, ptr->ai_addrlen);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
return -1;
}
// Need to load the Winsock extension functions from each provider
// -- e.g. AF_INET and AF_INET6.
rc = WSAIoctl(
sockobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
sizeof(guidAcceptEx),
&sockobj->lpfnAcceptEx,
sizeof(sockobj->lpfnAcceptEx),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n",
WSAGetLastError());
return -1;
}
rc = WSAIoctl(
sockobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockaddrs,
sizeof(guidGetAcceptExSockaddrs),
&sockobj->lpfnGetAcceptExSockaddrs,
sizeof(sockobj->lpfnGetAcceptExSockaddrs),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n",
WSAGetLastError());
return -1;
}
// For TCP sockets, we need to "listen" on them
rc = listen(sockobj->s, 100);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
return -1;
}
// Keep track of the pending AcceptEx operations
sockobj->PendingAccepts = (BUFFER_OBJ **)HeapAlloc(
GetProcessHeap(),
HEAP_ZERO_MEMORY,
(sizeof(BUFFER_OBJ *) * gOverlappedCount));
if (sockobj->PendingAccepts == NULL)
{
fprintf(stderr, "HeapAlloc failed: %d\n", GetLastError());
ExitProcess(-1);
}
// Post the AcceptEx(s)
for(i=0; i < gOverlappedCount ;i++)
{
sockobj->PendingAccepts[i] = acceptobj = GetBufferObj(sockobj, gBufferSize);
PostAccept(sockobj, acceptobj);
}
//
// Maintain a list of the listening socket structures
//
if (ListenSockets == NULL)
{
ListenSockets = sockobj;
}
else
{
sockobj->next = ListenSockets;
ListenSockets = sockobj;
}
}
else
{
BUFFER_OBJ *recvobj=NULL;
DWORD bytes;
int optval;
if (interval == 12)
{
SOCKET_OBJ *listenptr=NULL;
int optval,
optlen;
// For TCP, cycle through all the outstanding AcceptEx operations
// to see if any of the client sockets have been connected but
// haven't received any data. If so, close them as they could be
// a denial of service attack.
listenptr = ListenSockets;
while (listenptr)
{
for(i=0; i < gOverlappedCount ;i++)
{
optlen = sizeof(optval);
rc = getsockopt(
listenptr->PendingAccepts[i]->sclient,
SOL_SOCKET,
SO_CONNECT_TIME,
(char *)&optval,
&optlen
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n",
WSAGetLastError());
return -1;
}
// If the socket has been connected for more than 5 minutes,
// close it. If closed, the AcceptEx call will fail in the
// completion thread.
if ((optval != 0xFFFFFFFF) && (optval > 300))
{
closesocket(listenptr->PendingAccepts[i]->sclient);
}
}
listenptr = listenptr->next;
}
interval = 0;
}
}
}