-
WSAEventSelect() - IO 멀티플렉싱(Win32)Platform/소켓 2009. 4. 18. 22:13WSACreateEvent()
WSACloseEvent()
WSAResetEvent()
WSAEventSelect()
IO 호출에 블록되기 전에 소켓 이벤트를 이벤트로 비동기적으로 통보 받을 수 있다.
비동기 데이타 송수신이 제공되는 것은 아니다.
FD_CLOSEWSAWaitForMultipleEvents()
FD_ACCEPT
FD_READ
FD_WRITE
WaitForSingleObject()
WaitForMultipleObjects()
MsgWaitForMultipleObjects()
SignalObjectAndWait()
#include <stdio.h>
#include <string.h>
#include <winsock2.h>
#define BUFSIZE 100
void CompressSockets(SOCKET* hSockArray, int omitIndex, int total);
void CompressEvents(WSAEVENT* hEventArray, int omitIndex, int total);
void ErrorHandling(char *message);
int main(int argc, char **argv)
{
WSADATA wsaData;
SOCKET hServSock;
SOCKADDR_IN servAddr;
SOCKET hSockArray[WSA_MAXIMUM_WAIT_EVENTS];
SOCKET hClntSock;
int clntLen;
SOCKADDR_IN clntAddr;
WSAEVENT hEventArray[WSA_MAXIMUM_WAIT_EVENTS];
WSAEVENT newEvent;
WSANETWORKEVENTS netEvents;
int sockTotal=0;
int index, i;
char message[BUFSIZE];
int strLen;
if(argc!=2){
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}
if(WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) /* Load Winsock 2.2 DLL */
ErrorHandling("WSAStartup() error!");
hServSock = socket(PF_INET, SOCK_STREAM, 0);
if(hServSock==INVALID_SOCKET)
ErrorHandling("socket() error");
servAddr.sin_family = AF_INET;
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
servAddr.sin_port = htons(atoi(argv[1]));
if(bind(hServSock, (struct sockaddr *) &servAddr, sizeof(servAddr))==SOCKET_ERROR)
ErrorHandling("bind() error");
newEvent = WSACreateEvent();
if(WSAEventSelect(hServSock, newEvent, FD_ACCEPT)==SOCKET_ERROR)
ErrorHandling("WSAEventSelect() error");
if(listen(hServSock, 5)==SOCKET_ERROR)
ErrorHandling("listen() error");
hSockArray[sockTotal]=hServSock;
hEventArray[sockTotal]=newEvent;
sockTotal++;
while(1)
{
index = WSAWaitForMultipleEvents(sockTotal, hEventArray, FALSE, WSA_INFINITE, FALSE);
index = index-WSA_WAIT_EVENT_0;
for(i=index; i<sockTotal; i++)
{
index=WSAWaitForMultipleEvents(1, &hEventArray[i], TRUE, 0, FALSE);
if((index==WSA_WAIT_FAILED || index==WSA_WAIT_TIMEOUT)) continue;
else
{
index=i;
WSAEnumNetworkEvents(hSockArray[index], hEventArray[index], &netEvents);
if(netEvents.lNetworkEvents & FD_ACCEPT) //연결 요청의 경우.
{
if(netEvents.iErrorCode[FD_ACCEPT_BIT] != 0){
puts("Accept Error");
break;
}
clntLen = sizeof(clntAddr);
hClntSock = accept(hSockArray[index], (SOCKADDR*)&clntAddr, &clntLen);
newEvent=WSACreateEvent();
WSAEventSelect(hClntSock, newEvent, FD_READ|FD_CLOSE);
hEventArray[sockTotal]=newEvent;
hSockArray[sockTotal]=hClntSock;
sockTotal++;
printf("새로 연결된 소켓의 핸들 %d \n", hClntSock);
} //if(NetworkEvents.lNetworkEvents & FD_ACCEPT) end
if(netEvents.lNetworkEvents & FD_READ) //데이터 전송의 경우.
{
if(netEvents.iErrorCode[FD_READ_BIT] != 0){
puts("Read Error");
break;
}
strLen=recv(hSockArray[index-WSA_WAIT_EVENT_0],
message, sizeof(message), 0);
send(hSockArray[index-WSA_WAIT_EVENT_0],
message, strLen, 0); // 에코 전송
}// if(netEvents.lNetworkEvents & FD_READ) end
if(netEvents.lNetworkEvents & FD_CLOSE) //연결 종료 요청의 경우.
{
if(netEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
puts("Close Error");
break;
}
WSACloseEvent(hEventArray[index]);
closesocket(hSockArray[index]);
printf("종료 된 소켓의 핸들 %d \n", hSockArray[index]);
sockTotal--;
CompressSockets(hSockArray, index, sockTotal); //배열 정리.
CompressEvents(hEventArray, index, sockTotal);
}// if(netEvents.lNetworkEvents & FD_CLOSE) end
} //else end
} //for(i=index; i<sockTotal; i++) end
} //while(1) end
WSACleanup();
return 0;
}
void CompressSockets(SOCKET* hSockArray, int omitIndex, int total)
{
int i;
for(i=omitIndex; i<total; i++)
hSockArray[i]=hSockArray[i+1];
}
void CompressEvents(WSAEVENT* hEventArray, int omitIndex, int total)
{
int i;
for(i=omitIndex; i<total; i++)
hEventArray[i]=hEventArray[i+1];
}
void ErrorHandling(char *message)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
//
// Allocated for each trhead spawned
//
typedef struct _THREAD_OBJ
{
SOCKET_OBJ *SocketList, // Linked list of all sockets allocated
*SocketListEnd; // End of socket list
int SocketCount; // Number of socket objects in list
HANDLE Event; // Used to signal new clients assigned
// to this thread
HANDLE Thread;
HANDLE Handles[MAXIMUM_WAIT_OBJECTS]; // Array of socket's event handles
CRITICAL_SECTION ThreadCritSec; // Protect access to SOCKET_OBJ lists
struct _THREAD_OBJ *next; // Next thread object in list
} THREAD_OBJ;
//
// Function: GetThreadObj
//
// Description:
// Allocate a thread object and initializes its members.
//
THREAD_OBJ *GetThreadObj()
{
THREAD_OBJ *thread=NULL;
thread = (THREAD_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(THREAD_OBJ));
if (thread == NULL)
{
fprintf(stderr, "GetThreadObj: HeapAlloc failed: %d\n", GetLastError());
ExitProcess(-1);
}
thread->Event = WSACreateEvent();
if (thread->Event == NULL)
{
fprintf(stderr, "GetThreadObj: WSACreateEvent failed: %d\n", WSAGetLastError());
ExitProcess(-1);
}
thread->Handles[0] = thread->Event;
InitializeCriticalSection(&thread->ThreadCritSec);
return thread;
}
//
// Function: FreeThreadObj
//
// Description:
// Free a thread object and is member fields.
//
void FreeThreadObj(THREAD_OBJ *thread)
{
WSACloseEvent(thread->Event);
CloseHandle(thread->Thread);
DeleteCriticalSection(&thread->ThreadCritSec);
HeapFree(GetProcessHeap(), 0, thread);
}
//
// Function: InsertSocketObj
//
// Description:
// Insert a socket object into the list of socket objects for
// the given thread object.
//
int InsertSocketObj(THREAD_OBJ *thread, SOCKET_OBJ *sock)
{
int ret;
EnterCriticalSection(&thread->ThreadCritSec);
if (thread->SocketCount < MAXIMUM_WAIT_OBJECTS-1)
{
sock->next = sock->prev = NULL;
if (thread->SocketList == NULL)
{
// List is empty
thread->SocketList = thread->SocketListEnd = sock;
}
else
{
// Non-empty; insert at the end
sock->prev = thread->SocketListEnd;
thread->SocketListEnd->next = sock;
thread->SocketListEnd = sock;
}
// Assign the socket's event into the thread's event list
thread->Handles[thread->SocketCount + 1] = sock->event;
thread->SocketCount++;
ret = NO_ERROR;
}
else
{
ret = SOCKET_ERROR;
}
LeaveCriticalSection(&thread->ThreadCritSec);
return ret;
}
//
// Function: RemoveSocketObj
//
// Description:
// Remove a socket object from the list of sockets for the given thread.
//
void RemoveSocketObj(THREAD_OBJ *thread, SOCKET_OBJ *sock)
{
EnterCriticalSection(&thread->ThreadCritSec);
if (sock->prev)
{
sock->prev->next = sock->next;
}
if (sock->next)
{
sock->next->prev = sock->prev;
}
if (thread->SocketList == sock)
thread->SocketList = sock->next;
if (thread->SocketListEnd == sock)
thread->SocketListEnd = sock->prev;
thread->SocketCount--;
// Signal thread to rebuild array of events
WSASetEvent(thread->Event);
InterlockedDecrement(&gCurrentConnections);
LeaveCriticalSection(&thread->ThreadCritSec);
}
//
// Function: FindSocketObj
//
// Description:
// Find a socket object within the list of sockets from a thread. The socket
// object is found by index number -- this must be so because the index of
// the event object in the thread's event array must match the order in which
// the socket object appears in the thread's socket list.
//
SOCKET_OBJ *FindSocketObj(THREAD_OBJ *thread, int index)
{
SOCKET_OBJ *ptr=NULL;
int i;
EnterCriticalSection(&thread->ThreadCritSec);
ptr = thread->SocketList;
for(i=0; i < index ;i++)
{
ptr = ptr->next;
}
LeaveCriticalSection(&thread->ThreadCritSec);
return ptr;
}
THREAD_OBJ *gChildThreads=NULL; // List of thread objects allocated
int gChildThreadsCount=0; // Number of child threads created
void RenumberThreadArray(THREAD_OBJ *thread)
{
SOCKET_OBJ *sptr=NULL;
int i;
EnterCriticalSection(&thread->ThreadCritSec);
i = 0;
sptr = thread->SocketList;
while (sptr)
{
thread->Handles[i+1] = sptr->event;
i++;
sptr = sptr->next;
}
LeaveCriticalSection(&thread->ThreadCritSec);
}
//
// Function: HandleIo
//
// Description:
// This function handles the IO on a socket. First, the events signaled
// on the socket are enuemrated, then the appropriate handler routine
// for the event is called.
//
int HandleIo(THREAD_OBJ *thread, SOCKET_OBJ *sock)
{
WSANETWORKEVENTS nevents;
int rc;
// Enumerate the events
rc = WSAEnumNetworkEvents(
sock->s,
sock->event,
&nevents
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "HandleIo: WSAEnumNetworkEvents failed: %d\n", WSAGetLastError());
return SOCKET_ERROR;
}
if (nevents.lNetworkEvents & FD_READ)
{
// Check for read error
if (nevents.iErrorCode[FD_READ_BIT] == 0)
{
rc = ReceivePendingData(sock);
if (rc == -1)
{
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
rc = SendPendingData(sock);
if (rc == -1)
{
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
}
else
{
fprintf(stderr, "HandleIo: FD_READ error %d\n",
nevents.iErrorCode[FD_READ_BIT]);
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
}
if (nevents.lNetworkEvents & FD_WRITE)
{
// Check for write error
if (nevents.iErrorCode[FD_WRITE_BIT] == 0)
{
rc = SendPendingData(sock);
if (rc == -1)
{
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
}
else
{
fprintf(stderr, "HandleIo: FD_WRITE error %d\n",
nevents.iErrorCode[FD_WRITE_BIT]);
return SOCKET_ERROR;
}
}
if (nevents.lNetworkEvents & FD_CLOSE)
{
// Check for close error
if (nevents.iErrorCode[FD_CLOSE_BIT] == 0)
{
// Socket has been indicated as closing so make sure all the data
// has been read
while (1)
{
rc = ReceivePendingData(sock);
if (rc == -1)
{
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
else if (rc != 0)
{
continue;
}
else
{
break;
}
}
// See if there is any data pending, if so try to send it
rc = SendPendingData(sock);
if (rc == -1)
{
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
}
else
{
fprintf(stderr, "HandleIo: FD_CLOSE error %d\n",
nevents.iErrorCode[FD_CLOSE_BIT]);
RemoveSocketObj(thread, sock);
FreeSocketObj(sock);
return SOCKET_ERROR;
}
}
return NO_ERROR;
}
//
// Function: ChildThread
//
// Description:
// This is the child thread that handles socket connections. Each thread
// can only wait on a maximum of 63 sockets. The main thread will assign
// each client connection to one of the child threads. If there is no
// thread to handle the socket, a new thread is created to handle the
// connection.
//
DWORD WINAPI ChildThread(LPVOID lpParam)
{
THREAD_OBJ *thread=NULL;
SOCKET_OBJ *sptr=NULL,
*sockobj=NULL;
int index,
rc,
i;
thread = (THREAD_OBJ *)lpParam;
while (1)
{
rc = WaitForMultipleObjects(
thread->SocketCount + 1,
thread->Handles,
FALSE,
INFINITE
);
if (rc == WAIT_FAILED || rc == WAIT_TIMEOUT)
{
fprintf(stderr, "ChildThread: WaitForMultipleObjects failed: %d\n", GetLastError());
break;
}
else
{
// Multiple events may be signaled at one time so check each
// event to see if its signaled
//
for(i=0; i < thread->SocketCount + 1 ;i++)
{
rc = WaitForSingleObject(thread->Handles[i], 0);
if (rc == WAIT_FAILED)
{
fprintf(stderr, "ChildThread: WaitForSingleObject failed: %d\n", GetLastError());
ExitThread(-1);
}
else if (rc == WAIT_TIMEOUT)
{
// This event isn't signaled, continue to the next one
continue;
}
index = i;
if (index == 0)
{
// If index 0 is signaled then rebuild the array of event
// handles to wait on
WSAResetEvent(thread->Handles[index]);
RenumberThreadArray(thread);
i = 1;
}
else
{
// Otherwise, its an event associated with a socket that
// was signaled. Handle the IO on that socket.
//
sockobj = FindSocketObj(thread, index-1);
if (sockobj != NULL)
{
if (HandleIo(thread, sockobj) == SOCKET_ERROR)
{
RenumberThreadArray(thread);
}
}
else
{
printf("Unable to find socket object!\n");
}
}
}
}
}
ExitThread(0);
return 0;
}
//
// Function: AssignToFreeThread
//
// Description:
// This routine assigns a socket connection to an available child
// thread to handle any IO on it. If no threads are available, a
// new thread is spawned to handle the connection.
//
void AssignToFreeThread(SOCKET_OBJ *sock)
{
THREAD_OBJ *thread=NULL;
thread = gChildThreads;
while (thread)
{
// If this routine returns something other than SOCKET_ERROR
// that it was successfully assigned to a child thread.
if (InsertSocketObj(thread, sock) != SOCKET_ERROR)
break;
thread = thread->next;
}
if (thread == NULL)
{
// No thread was found to assign the client socket to, create a new thread
//
printf("Creating new thread object\n");
thread = GetThreadObj();
thread->Thread = CreateThread(NULL, 0, ChildThread, (LPVOID)thread, 0, NULL);
if (thread->Thread == NULL)
{
fprintf(stderr, "AssignToFreeThread: CreateThread failed: %d\n", GetLastError());
ExitProcess(-1);
}
InsertSocketObj(thread, sock);
// Insert the thread the list of threads
if (gChildThreads == NULL)
{
gChildThreads = thread;
}
else
{
thread->next = gChildThreads;
gChildThreads = thread;
}
gChildThreadsCount++;
}
// signal child thread to rebuild the event list
WSASetEvent(thread->Event);
return;
}
int __cdecl main(int argc, char **argv)
{
WSADATA wsd;
SOCKET s;
SOCKET_OBJ *sockobj=NULL,
*sptr=NULL,
*tmp=NULL;
ULONG lastprint=0;
int rc;
struct fd_set fdread,
fdwrite,
fdexcept;
struct timeval timeout;
struct addrinfo *res=NULL,
*ptr=NULL;
if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
{
fprintf(stderr, "unable to load Winsock!\n");
return -1;
}
memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_PASSIVE);
hints.ai_family = AF_UNSPEC; // unspecified
hints.ai_socktype = SOCK_STREAM; // TCP socket type
hints.ai_protocol = IPPROTO_TCP; // TCP protocol
rc = getaddrinfo(
NULL, // local interface to bind to
szPort,
&hints,
&res
);
if (rc != 0)
{
printf("Invalid address %s, getaddrinfo failed: %d\n", addr, rc);
return NULL;
}
thread = GetThreadObj();
// For each local address returned, create a listening/receiving socket
ptr = res;
while (ptr)
{
sockobj = GetSocketObj(INVALID_SOCKET, (gProtocol == IPPROTO_TCP) ? TRUE : FALSE);
// create the socket
sockobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (sockobj->s == INVALID_SOCKET)
{
fprintf(stderr,"socket failed: %d\n", WSAGetLastError());
return -1;
}
InsertSocketObj(thread, sockobj);
// bind the socket to a local address and port
rc = bind(sockobj->s, ptr->ai_addr, ptr->ai_addrlen);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
return -1;
}
if (gProtocol == IPPROTO_TCP)
{
rc = listen(sockobj->s, 200);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
return -1;
}
// Register events on the socket
rc = WSAEventSelect(
sockobj->s,
sockobj->event,
FD_ACCEPT|FD_CLOSE
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
return -1;
}
}
else
{
// Register events on the socket
rc = WSAEventSelect(
sockobj->s,
sockobj->event,
FD_READ|FD_WRITE|FD_CLOSE
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
return -1;
}
}
ptr = ptr->ai_next;
}
// free the addrinfo structure for the 'bind' address
freeaddrinfo(res);
while (1)
{
rc = WaitForMultipleObjects(
thread->SocketCount + 1,
thread->Handles,
FALSE,
5000
);
if (rc == WAIT_FAILED)
{
fprintf(stderr, "WaitForMultipleObjects failed: %d\n", GetLastError());
break;
}
/* else if (rc == WAIT_TIMEOUT)
{
}*/
else
{
index = rc - WAIT_OBJECT_0;
sockobj = FindSocketObj(thread, index-1);
if (gProtocol == IPPROTO_TCP)
{
SOCKADDR_STORAGE sa;
WSANETWORKEVENTS ne;
SOCKET sc;
int salen;
rc = WSAEnumNetworkEvents(
sockobj->s,
thread->Handles[index],
&ne
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAEnumNetworkEvents failed: %d\n", WSAGetLastError());
break;
}
while (1)
{
sc = INVALID_SOCKET;
salen = sizeof(sa);
//
// For TCP, accept the connection and hand off the client socket
// to a worker thread
//
sc = accept(
sockobj->s,
(SOCKADDR *)&sa,
&salen
);
if ((sc == INVALID_SOCKET) && (WSAGetLastError() != WSAEWOULDBLOCK))
{
fprintf(stderr, "accept failed: %d\n", WSAGetLastError());
break;
}
else if (sc != INVALID_SOCKET)
{
newsock = GetSocketObj(INVALID_SOCKET, FALSE);
// Copy address information
memcpy(&newsock->addr, &sa, salen);
newsock->addrlen = salen;
newsock->s = sc;
// Register for read, write and close on the client socket
rc = WSAEventSelect(
newsock->s,
newsock->event,
FD_READ | FD_WRITE | FD_CLOSE
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
break;
}
AssignToFreeThread(newsock);
}
else
{
// Failed with WSAEWOULDBLOCK -- just continue
break;
}
}
}
else
{
// For UDP all we have to do is handle events on the main
// threads.
if (HandleIo(thread, sockobj) == SOCKET_ERROR)
{
RenumberThreadArray(thread);
}
}
}
}
WSACleanup();
return 0;
}
참조 사이트:
http://jjjryu.tistory.com/entry/WSAAsyncSelect-Win32
http://jjjryu.tistory.com/entry/select-IO-%EB%A9%80%ED%8B%B0%ED%94%8C%EB%A0%89%EC%8B%B1Win32
관련글 관련글 더보기