//
// Function: GetBufferObj
//
// Description:
// Allocate a BUFFER_OBJ. Each send, receive, and accept posted by a
// by the server uses one of these objects. That is, there is one BUFFER_OBJ
// allocated per I/O operation. After the I/O is initiated it is assigned to
// one of the completion threads. To increase performance, a look aside list
// may be used to cache freed BUFFER_OBJ.
//
BUFFER_OBJ *GetBufferObj(SOCKET_OBJ *sock, int buflen)
{
BUFFER_OBJ *newobj=NULL;
// Create the event that is to be signed upon completion
newobj->ol.hEvent = WSACreateEvent();
if (newobj->ol.hEvent == NULL)
{
fprintf(stderr, "GetBufferObj: WSACreateEvent failed: %d\n", WSAGetLastError());
ExitProcess(-1);
}
return newobj;
}
//
// Function: FreeBufferObj
//
// Description:
// Free the buffer object.
//
void FreeBufferObj(BUFFER_OBJ *obj)
{
// Close the event
WSACloseEvent(obj->ol.hEvent);
obj->ol.hEvent = NULL;
// Free the buffers
HeapFree(GetProcessHeap(), 0, obj->buf);
HeapFree(GetProcessHeap(), 0, obj);
}
#define DEFAULT_OVERLAPPED_COUNT 5 // default number of overlapped recvs to post
int gOverlappedCount = DEFAULT_OVERLAPPED_COUNT;
//
// This is our per handle data. One of these structures is allocated for
// each socket created by our server.
//
typedef struct _SOCKET_OBJ
{
SOCKET s; // Socket handle for client connection
int af, // Address family of socket (AF_INET or AF_INET6)
bClosing; // Indicates socket is closing
volatile LONG OutstandingOps; // Number of outstanding overlapped ops
// Pointers to Microsoft specific extensions (listening socket only)
LPFN_ACCEPTEX lpfnAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;
CRITICAL_SECTION SockCritSec; // Synchronize access to this SOCKET_OBJ
struct _SOCKET_OBJ *next; // Used to chain SOCKET_OBJ together
} SOCKET_OBJ;
//
// Function: GetSocketObj
//
// Description:
// Allocate a socket object and initialize its members. A socket object is
// allocated for each socket created (either by socket or accept).
//
SOCKET_OBJ *GetSocketObj(SOCKET s, int af)
{
SOCKET_OBJ *sockobj=NULL;
if (obj->OutstandingOps != 0)
{
return;
}
if (obj->s != INVALID_SOCKET)
{
closesocket(obj->s);
obj->s = INVALID_SOCKET;
}
DeleteCriticalSection(&obj->SockCritSec);
HeapFree(GetProcessHeap(), 0, obj);
}
//
// Allocated for each thread spawned. Each overlapped I/O issued on a socket is
// assigned to one of the threads in the thread pool.
//
typedef struct _THREAD_OBJ
{
BUFFER_OBJ *BufferList; // Linked list of all sockets allocated
int EventCount; // How many events are in the array to wait on?
HANDLE Event; // Used to signal new clients assigned
// to this thread
HANDLE Thread; // Handle to the curren thread
HANDLE Handles[MAXIMUM_WAIT_OBJECTS]; // Array of socket's event handles
CRITICAL_SECTION ThreadCritSec; // Protect access to SOCKET_OBJ lists
struct _THREAD_OBJ *next; // Next thread object in list
} THREAD_OBJ;
THREAD_OBJ *gChildThreads=NULL; // List of thread objects allocated
int gChildThreadsCount=0; // Number of child threads created
//
// Function: InsertBufferObjToThread
//
// Description:
// Insert a buffer object into the list of pending buffers objects for
// the given thread object. If the buffer can fit in the thread's queue,
// NO_ERROR is returned. If the thread is already waiting on the maximum
// allowable events, then SOCKET_ERROR is returned.
//
int InsertBufferObjToThread(THREAD_OBJ *thread, BUFFER_OBJ *buf)
{
int ret;
EnterCriticalSection(&thread->ThreadCritSec);
// See if the thread is full
if (thread->EventCount < MAXIMUM_WAIT_OBJECTS-1)
{
InsertBufferObj(&thread->BufferList, buf);
//
// Function: PostRecv
//
// Description:
// Post an overlapped receive operation on the socket.
//
int PostRecv(BUFFER_OBJ *recvobj)
{
WSABUF wbuf;
DWORD bytes,
flags;
int rc=NO_ERROR;
//
// Function: PostSend
//
// Description:
// Post an overlapped send operation on the socket.
//
int PostSend(BUFFER_OBJ *sendobj)
{
WSABUF wbuf;
DWORD bytes;
int rc;
//
// Function: PostAccept
//
// Description:
// Post an overlapped accept on a listening socket.
//
int PostAccept(BUFFER_OBJ *acceptobj)
{
DWORD bytes;
int rc=NO_ERROR;
//
// Function: RenumberEvents
//
// Description:
// This routine goes through the list of pending buffers within a thread
// and rebuilds the array of event handles that the thread waits on. When
// a new connection is accepted and several receive operations are posted,
// they are assigned to a thread and the thread is signaled to indicate
// new I/O has been placed in its queue. The thread needs to reinitialize
// its array so that it may be signaled for completion on that new I/O.
//
void RenumberEvents(THREAD_OBJ *thread)
{
BUFFER_OBJ *bptr=NULL;
int i;
//
// If index 0 is signaled then rebuild the array of event
// handles to wait on
EnterCriticalSection(&thread->ThreadCritSec);
i = 0;
bptr = thread->BufferList;
thread->EventCount = 1;
while (bptr)
{
thread->Handles[thread->EventCount++] = bptr->ol.hEvent;
//
// Function: HandleIo
//
// Description:
// This function handles the IO on a socket. First, the events signaled
// on the socket are enuemrated, then the appropriate handler routine
// for the event is called.
//
void HandleIo(BUFFER_OBJ *buf)
{
SOCKET_OBJ *sock=NULL,
*clientobj=NULL; // New client object for accepted connections
BUFFER_OBJ *recvobj=NULL, // Used to post new receives on accepted connections
*sendobj=NULL; // Used to post sends for data received
DWORD bytes,
flags;
BOOL bFreeSocketObj;
int error,
rc;
// Extract the SOCKET_OBJ from the BUFFER_OBJ for easy reference
sock = buf->Socket;
error = NO_ERROR;
bFreeSocketObj = FALSE;
InterlockedDecrement(&sock->OutstandingOps);
// Get the results of the overlapped operation that completed
rc = WSAGetOverlappedResult(
sock->s,
&buf->ol,
&bytes,
FALSE,
&flags
);
if (rc == FALSE)
{
error = WSAGetLastError();
fprintf(stderr, "HandleIo: WSAGetOverlappedResult failed: %d\n", error);
if (gProtocol == IPPROTO_TCP)
{
// An error occured on a TCP socket, so remove this I/O and if no
// more I/O is outstanding, free the socket object. Otherwise,
// wait for the remaining I/O on this socket to complete as well.
RemoveBufferFromThread(sock, buf);
FreeBufferObj(buf);
if (InterlockedDecrement(&sock->OutstandingOps) == 0)
{
printf("Freeing socket obj in GetOverlapepdResult\n");
FreeSocketObj(sock);
}
return;
}
}
if (buf->operation == OP_ACCEPT)
{
SOCKADDR_STORAGE *LocalSockaddr=NULL,
*RemoteSockaddr=NULL;
int LocalSockaddrLen,
RemoteSockaddrLen;
// Create a new SOCKET_OBJ for client socket
clientobj = GetSocketObj(buf->sclient, buf->Socket->af);
// Echo back any data received with the AcceptEx call
sendobj = GetBufferObj(clientobj, gBufferSize);
// Copy the data from the accept buffer to the send buffer
sendobj->buflen = bytes;
memcpy(sendobj->buf, buf->buf, bytes);
// Assign the send operation to a thread
AssignIoToThread(sendobj);
// Initiate the overlapped send
if (PostSend(sendobj) != NO_ERROR)
{
// In the event of an error, clean up the socket object
RemoveBufferFromThread(clientobj, sendobj);
FreeBufferObj(sendobj);
PostAccept(buf);
}
else if ((buf->operation == OP_READ) && (error == NO_ERROR))
{
//
// Receive compeleted successfully
//
if ((bytes > 0) || (gProtocol == IPPROTO_UDP))
{
// Create a buffer to send
sendobj = buf;
sendobj->buflen = bytes;
// Initiate the send
if (PostSend(sendobj) != NO_ERROR)
{
// In the event of an error, clean up the socket object
RemoveBufferFromThread(sock, sendobj);
FreeBufferObj(sendobj);
// Free the completed operation
RemoveBufferFromThread(sock, buf);
FreeBufferObj(buf);
// Check to see if there are more outstanding operations. If so, wait
// for them to complete; otherwise, clean up the socket object.
EnterCriticalSection(&sock->SockCritSec);
if (sock->OutstandingOps == 0)
{
closesocket(sock->s);
bFreeSocketObj = TRUE;
}
LeaveCriticalSection(&sock->SockCritSec);
}
}
else if ((buf->operation == OP_READ) && (error != NO_ERROR) && (gProtocol == IPPROTO_UDP))
{
// If a UDP receive failed, we really don't care. Just re-post it - that is
// we probably got an ICMP error.
if (PostRecv(buf) != NO_ERROR)
{
// In the event of an error, clean up the socket object
RemoveBufferFromThread(sock, buf);
FreeBufferObj(buf);
closesocket(sock->s);
sock->s = INVALID_SOCKET;
bFreeSocketObj = TRUE;
}
}
else if (buf->operation == OP_WRITE)
{
// See if the socket is closing, if so check to see if there are any outstanding
// operations. If not, clean up the connection; othewise wait for them
// to complete.
EnterCriticalSection(&sock->SockCritSec);
if (sock->bClosing && (sock->OutstandingOps == 0))
{
RemoveBufferFromThread(sock, buf);
closesocket(sock->s);
FreeBufferObj(buf);
// Free the send op that just completed
if (PostRecv(buf) != NO_ERROR)
{
RemoveBufferFromThread(sock, buf);
FreeBufferObj(buf);
}
}
LeaveCriticalSection(&sock->SockCritSec);
}
if (bFreeSocketObj)
{
FreeSocketObj(sock);
}
return;
}
//
// Function: IoThread
//
// Description:
// This is the I/O thread spawned to handle overlapped requests. When an
// overlapped operation is initialized, the I/O is first asisgned to a
// worker thread. This is the worker thread that waits for I/O to complete.
// Once an I/O operation is assigned to a thread, the thread's event is
// signaled which causes the thread to initialize its list of pending
// overlapped event handles to include any new operations assigned to it.
// Once one of the overlapepd I/O events is signaled, the thread calls the
// I/O handler routine to handle that particular operation and perform
// the necessariy steps.
//
DWORD WINAPI IoThread(LPVOID lpParam)
{
THREAD_OBJ *thread=NULL;
int index,
count,
rc,
i;
thread = (THREAD_OBJ *)lpParam;
// Initialize the event list to start with
RenumberEvents(thread);
while (1)
{
// Wait on the events
rc = WaitForMultipleObjects(
thread->EventCount,
thread->Handles,
FALSE,
INFINITE
);
if (rc == WAIT_FAILED || rc == WAIT_TIMEOUT)
{
if (GetLastError() == ERROR_INVALID_HANDLE)
{
RenumberEvents(thread);
continue;
}
else
{
fprintf(stderr, "IoThread: WaitForMultipleObjects failed: %d\n",
GetLastError());
break;
}
}
// Iterate through the events to see if more than one were signaled
count = thread->EventCount;
for(i=0; i < count ;i++)
{
rc = WaitForSingleObject(
thread->Handles[i],
0
);
if (rc == WAIT_TIMEOUT)
{
// This event wasn't signaled continue to the next one
continue;
}
index = i;
// Reset the event first
WSAResetEvent(thread->Handles[index]);
if (index == 0)
{
// The thread's event was signaled indicating new I/O assigned
RenumberEvents(thread);
break;
}
else
{
// Otherwise, an overlapped I/O operation completed, service it
HandleIo(FindBufferObj(thread, thread->Handles[index]));
}
}
}
ExitThread(0);
return 0;
}
//
// Function: AssignIoToThread
//
// Description:
// This routine assigns a socket connection to an available child
// thread to handle any IO on it. If no threads are available, a
// new thread is spawned to handle the connection.
//
void AssignIoToThread(BUFFER_OBJ *buf)
{
THREAD_OBJ *thread=NULL;
EnterCriticalSection(&gThreadListCritSec);
thread = gChildThreads;
while (thread)
{
// If this routine returns something other than SOCKET_ERROR
// that it was successfully assigned to a child thread.
if (InsertBufferObjToThread(thread, buf) == NO_ERROR)
{
break;
}
thread = thread->next;
}
if (thread == NULL)
{
// No thread was found to assign the client socket to, create a new thread
//
thread = GetThreadObj();
rc = getaddrinfo(
NULL, // local interface to bind to
szPort,
&hints,
&res
);
if (rc != 0)
{
printf("Invalid address %s, getaddrinfo failed: %d\n", addr, rc);
return NULL;
}
// For each local address returned, create a listening/receiving socket
ptr = res;
while (ptr)
{
sockobj = GetSocketObj(INVALID_SOCKET, ptr->ai_family);
// bind the socket to a local address and port
rc = bind(sockobj->s, ptr->ai_addr, ptr->ai_addrlen);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
return -1;
}
// Allocate the overlapped structures for the accepts
sockobj->PendingAccepts = (BUFFER_OBJ **)HeapAlloc(
GetProcessHeap(),
HEAP_ZERO_MEMORY,
(sizeof(BUFFER_OBJ *) * gOverlappedCount));
if (sockobj->PendingAccepts == NULL)
{
fprintf(stderr, "HeapAlloc failed: %d\n", GetLastError());
ExitProcess(-1);
}
// Post the initial accepts
for(i=0; i < gOverlappedCount ;i++)
{
sockobj->PendingAccepts[i] = acceptobj = GetBufferObj(sockobj, gBufferSize);
acceptobj->Socket = sockobj;
AssignIoToThread(acceptobj);
if (PostAccept(acceptobj) == NO_ERROR)
{
// If we can't post accepts just bail
ExitProcess(-1);
}
}
//
// Maintain a list of the listening socket structures
//
if (ListenSockets == NULL)
{
ListenSockets = sockobj;
}
else
{
sockobj->next = ListenSockets;
ListenSockets = sockobj;
}
}
else
{
BUFFER_OBJ *recvobj=NULL;
DWORD bytes;
int optval;