ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • select() - IO 멀티플렉싱(Win32)
    Platform/소켓 2009. 5. 11. 16:43

    //
    // Allocated for each socket handle
    //
    typedef struct _SOCKET_OBJ
    {
        SOCKET      s;              // Socket handle
        int         listening;      // Socket is a listening socket (TCP)
        int         closing;        // Indicates whether the connection is closing

        SOCKADDR_STORAGE addr;      // Used for client's remote address
        int              addrlen;   // Length of the address

        BUFFER_OBJ *pending,        // List of pending buffers to be sent
                   *pendingtail;    // Last entry in buffer list

        struct _SOCKET_OBJ *next,   // Used to link socket objects together
                           *prev;
    } SOCKET_OBJ;

    //
    // Function: GetSocketObj
    //
    // Description:
    //    Allocate a socket object and initialize its members. A socket object is
    //    allocated for each socket created (either by socket or accept). The
    //    socket objects mantain a list of all buffers received that need to
    //    be sent.
    //
    SOCKET_OBJ *GetSocketObj(SOCKET s, int listening)
    {
        SOCKET_OBJ  *sockobj=NULL;

        sockobj = (SOCKET_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(SOCKET_OBJ));
        if (sockobj == NULL)
        {
            fprintf(stderr, "GetSocketObj: HeapAlloc failed: %d\n", GetLastError());
            ExitProcess(-1);
        }

        // Initialize the members
        sockobj->s = s;
        sockobj->listening = listening;
        sockobj->addrlen = sizeof(sockobj->addr);

        return sockobj;
    }

    //
    // Function: FreeSocketObj
    //
    // Description:
    //    Frees a socket object along with any queued buffer objects.
    //
    void FreeSocketObj(SOCKET_OBJ *obj)
    {
        BUFFER_OBJ  *ptr=NULL,
                    *tmp=NULL;

        ptr = obj->pending;
        while (ptr)
        {
            tmp = ptr;
            ptr = ptr->next;

            FreeBufferObj(tmp);
        }

        HeapFree(GetProcessHeap(), 0, obj);
    }

    SOCKET_OBJ *gSocketList=NULL,       // Linked list of all sockets allocated
               *gSocketListEnd=NULL;    // End of socket list
    int         gSocketCount=0;         // Number of socket objects in list

    //
    // Function: InsertSocketObj
    //
    // Description:
    //    Insert a socket object into the list of socket objects. Note that
    //    no synchronization is performed because this app is single threaded!
    //
    void InsertSocketObj(SOCKET_OBJ *sock)
    {
        sock->next = sock->prev = NULL;
        if (gSocketList == NULL)
        {
            // List is empty
            gSocketList = gSocketListEnd = sock;
        }
        else
        {
            // Non-empty; insert at the end
            sock->prev = gSocketListEnd;
            gSocketListEnd->next = sock;
            gSocketListEnd = sock;

        }
        gSocketCount++;
    }

    //
    // Function: RemoveSocketObj
    //
    // Description:
    //    Remove a socket object from the list of sockets. No synchronization is
    //    is performed since this app is single threaded.
    //
    void RemoveSocketObj(SOCKET_OBJ *sock)
    {
        if (sock->prev)
        {
            sock->prev->next = sock->next;
        }
        if (sock->next)
        {
            sock->next->prev = sock->prev;
        }

        if (gSocketList == sock)
            gSocketList = sock->next;
        if (gSocketListEnd == sock)
            gSocketListEnd = sock->prev;

        gSocketCount--;
    }

    //
    // Function: EnqueueBufferObj
    //
    // Description:
    //   Queue up a receive buffer for this connection.
    //
    void EnqueueBufferObj(SOCKET_OBJ *sock, BUFFER_OBJ *obj, BOOL AtHead)
    {
        if (sock->pending == NULL)
        {
            // Queue is empty
            sock->pending = sock->pendingtail = obj;
        }
        else if (AtHead == FALSE)
        {
            // Put new object at the end
            sock->pendingtail->next = obj;
            sock->pendingtail = obj;
        }
        else
        {
            // Put new object at the head
            obj->next = sock->pending;
            sock->pending = obj;
        }
    }

    //
    // Function: DequeueBufferObj
    //
    // Description:
    //    Remove a BUFFER_OBJ from the given connection's queue for sending.
    //
    BUFFER_OBJ *DequeueBufferObj(SOCKET_OBJ *sock)
    {
        BUFFER_OBJ *ret=NULL;

        if (sock->pendingtail != NULL)
        {
            // Queue is non empty
            ret = sock->pending;

            sock->pending = sock->pending->next;
            if (sock->pendingtail == ret)
            {
                // Item is the only item in the queue
                sock->pendingtail = NULL;
            }
        }

        return ret;
    }

    //
    // Function: ReceivePendingData
    //
    // Description:
    //    Receive data pending on the socket into a SOCKET_OBJ buffer. Enqueue
    //    the buffer into the socket object for sending later. This routine returns
    //    -1 indicating that the socket is no longer valid and the calling function
    //    should clean up (remove) the socket object. Zero is returned for success.
    //
    int ReceivePendingData(SOCKET_OBJ *sockobj)
    {
        BUFFER_OBJ *buffobj=NULL;
        int         rc,
                    ret;

        // Get a buffer to receive the data
        buffobj = GetBufferObj(gBufferSize);

        ret = 0;

        if (gProtocol == IPPROTO_TCP)
        {
            rc = recv(
                    sockobj->s,
                    buffobj->buf,
                    buffobj->buflen,
                    0
                    );
        }
        else
        {
            rc = recvfrom(
                    sockobj->s,
                    buffobj->buf,
                    buffobj->buflen,
                    0,
                    (SOCKADDR *)&buffobj->addr,
                   &buffobj->addrlen
                    );
        }
        if (rc == SOCKET_ERROR)
        {
            if (WSAGetLastError() != WSAEWOULDBLOCK)
            {
                // Socket connection has failed, close the socket
                fprintf(stderr, "recv(from) failed: %d\n", WSAGetLastError());

                closesocket(sockobj->s);

                ret = -1;
            }
            FreeBufferObj(buffobj);
        }
        else if (rc == 0)
        {
            // Graceful close
            if (gProtocol == IPPROTO_TCP)
            {
                FreeBufferObj(buffobj);
            }
            else
            {
                buffobj->buflen = 0;
                EnqueueBufferObj(sockobj, buffobj, FALSE);
            }

            sockobj->closing = TRUE;

            if (sockobj->pending == NULL)
            {
                // If no sends are pending, close the socket for good
                closesocket(sockobj->s);

                ret = -1;
            }
        }
        else
        {
            // Read data and enqueue the buffer for sending
            buffobj->buflen = rc;
            EnqueueBufferObj(sockobj, buffobj, FALSE);
        }

        return ret;
    }

    //
    // Function: SendPendingData
    //
    // Description:
    //    Send any data pending on the socket. This routine goes through the
    //    queued buffer objects within the socket object and attempts to
    //    send all of them. If the send fails with WSAEWOULDBLOCK, put the
    //    remaining buffer back in the queue (at the front) for sending
    //    later when select indicates sends can be made. This routine returns
    //    -1 to indicate that an error has occured on the socket and the
    //    calling routine should remove the socket structure; otherwise, zero
    //    is returned.
    //
    int SendPendingData(SOCKET_OBJ *sock)
    {
        BUFFER_OBJ *bufobj=NULL;
        BOOL        breakouter;
        int         nleft,
                    idx,
                    ret,
                    rc;

        // Attempt to dequeue all the buffer objects on the socket
        ret = 0;
        while (bufobj = DequeueBufferObj(sock))
        {
            if (gProtocol == IPPROTO_TCP)
            {
                breakouter = FALSE;

                nleft = bufobj->buflen;
                idx = 0;

                // In the event not all the data was sent we need to increment
                // through the buffer. This only needs to be done for stream
                // sockets since UDP is datagram and its all or nothing for that.
                while (nleft)
                {
                    rc = send(
                            sock->s,
                           &bufobj->buf[idx],
                            nleft,
                            0
                            );
                    if (rc == SOCKET_ERROR)
                    {
                        if (WSAGetLastError() == WSAEWOULDBLOCK)
                        {
                            BUFFER_OBJ *newbuf=NULL;

                            // Copy the unsent portion of the buffer and put it back
                            // at the head of the send queue
                            newbuf = GetBufferObj(nleft);
                            memcpy(newbuf->buf, &bufobj->buf[idx], nleft);

                            EnqueueBufferObj(sock, newbuf, TRUE);
                        }
                        else
                        {
                            // The connection was broken, indicate failure
                            ret = -1;
                        }
                        breakouter = TRUE;

                        break;
                    }
                    else
                    {
                        // Increment the send counters
                        nleft -= rc;
                        idx += 0;
                    }
                }
                FreeBufferObj(bufobj);

                if (breakouter)
                    break;
            }
            else
            {
                rc = sendto(
                        sock->s,
                        bufobj->buf,
                        bufobj->buflen,
                        0,
                        (SOCKADDR *)&bufobj->addr,
                        bufobj->addrlen
                        );
                if (rc == SOCKET_ERROR)
                {
                    if (WSAGetLastError() == WSAEWOULDBLOCK)
                    {
                        // If the send couldn't be made, put the buffer
                        // back at the head of the queue
                        EnqueueBufferObj(sock, bufobj, TRUE);

                        ret = 0;
                    }
                    else
                    {
                        // Socket error occured so indicate the error to the caller
                        ret = -1;
                    }
                    break;
                }
                else
                {
                    FreeBufferObj(bufobj);
                }
            }
        }

        // If no more sends are pending and the socket was marked as closing (the
        // receiver got zero bytes) then close the socket and indicate to the caller
        // to remove the socket structure.
        if ((sock->pending == NULL) && (sock->closing))
        {
            closesocket(sock->s);
            ret = -1;

            printf("Closing connection\n");
        }

        return ret;
    }

    int __cdecl main(int argc, char **argv)
    {
        WSADATA          wsd;
        SOCKET           s;
        SOCKET_OBJ      *sockobj=NULL,
                        *sptr=NULL,
                        *tmp=NULL;
        ULONG            lastprint=0;
        int              rc;
        struct fd_set    fdread,
                         fdwrite,
                         fdexcept;
        struct timeval   timeout;
        struct addrinfo *res=NULL,
                        *ptr=NULL;

        if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
        {
            fprintf(stderr, "unable to load Winsock!\n");
            return -1;
        }

        memset(&hints, 0, sizeof(hints));
        hints.ai_flags  = AI_PASSIVE);
        hints.ai_family = AF_UNSPEC;    // unspecified
        hints.ai_socktype = SOCK_STREAM;    // TCP socket type
        hints.ai_protocol = IPPROTO_TCP;    // TCP protocol

        rc = getaddrinfo(
                NULL,    // local interface to bind to
                szPort,
               &hints,
               &res
                );
        if (rc != 0)
        {
            printf("Invalid address %s, getaddrinfo failed: %d\n", addr, rc);
            return NULL;
        }

        // For each local address returned, create a listening/receiving socket
        ptr = res;
        while (ptr)
        {
            s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
            if (s == INVALID_SOCKET)
            {
                fprintf(stderr,"socket failed: %d\n", WSAGetLastError());
                return -1;
            }

            sockobj = GetSocketObj(s, (gProtocol == IPPROTO_TCP) ? TRUE : FALSE);

            InsertSocketObj(sockobj);

            // bind the socket to a local address and port
            rc = bind(sockobj->s, ptr->ai_addr, ptr->ai_addrlen);
            if (rc == SOCKET_ERROR)
            {
                fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
                return -1;
            }

            if (gProtocol == IPPROTO_TCP)
            {
                rc = listen(sockobj->s, 200);
                if (rc == SOCKET_ERROR)
                {
                    fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
                    return -1;
                }
            }

            ptr = ptr->ai_next;
        }
        // free the addrinfo structure for the 'bind' address
        freeaddrinfo(res);

        while (1)
        {
            FD_ZERO(&fdread);
            FD_ZERO(&fdwrite);
            FD_ZERO(&fdexcept);

            sptr = gSocketList;

            // Set each socket in the FD_SET structures
            while (sptr)
            {
                FD_SET(sptr->s, &fdread);
                FD_SET(sptr->s, &fdwrite);
                FD_SET(sptr->s, &fdexcept);

                sptr = sptr->next;
            }

            timeout.tv_sec = 5;
            timeout.tv_usec = 0;

            rc = select(0, &fdread, &fdwrite, &fdexcept, &timeout);
            if (rc == SOCKET_ERROR)
            {
                fprintf(stderr, "select failed: %d\n", WSAGetLastError());
                return -1;
            }
            else if (rc == 0)
            {
                // timeout
            }
            else
            {
                // Go through all the socket and see if they're present in the
                // fd_set structures.
                sptr = gSocketList;
                while (sptr)
                {
                    if (FD_ISSET(sptr->s, &fdread))
                    {
                        if (sptr->listening)
                        {
                            // Read is indicated on a listening socket, accept the connection
                            sockobj = GetSocketObj(INVALID_SOCKET, FALSE);

                            s = accept(sptr->s, (SOCKADDR *)&sockobj->addr, &sockobj->addrlen);
                            if (s == INVALID_SOCKET)
                            {
                                fprintf(stderr, "accept failed: %d\n", WSAGetLastError());
                                return -1;
                            }

                            sockobj->s = s;
                           
                            InsertSocketObj(sockobj);
                        }
                        else
                        {
                            // Read is indicated on a client socket, receive data
                            if (ReceivePendingData(sptr) != 0)
                            {
                                tmp = sptr;
                                sptr = sptr->next;

                                RemoveSocketObj(tmp);
                                FreeSocketObj(tmp);

                                // At the end of the list
                                if (sptr == NULL)
                                    continue;
                            }

                            // Attempt to send pending data
                            if (SendPendingData(sptr) != 0)
                            {
                                tmp = sptr;
                                sptr = sptr->next;

                                RemoveSocketObj(tmp);
                                FreeSocketObj(tmp);

                                // At the end of the list
                                if (sptr == NULL)
                                    continue;
                            }
                        }
                    }

                    if (FD_ISSET(sptr->s, &fdwrite))
                    {
                        // Write is indicated so attempt to send the pending data
                        if (SendPendingData(sptr) != 0)
                        {
                            tmp = sptr;
                            sptr = sptr->next;

                            RemoveSocketObj(tmp);
                            FreeSocketObj(tmp);

                            // At the end of the list
                            if (sptr == NULL)
                                continue;
                        }
                    }

                    if (FD_ISSET(sptr->s, &fdexcept))
                    {
                        // Not handling OOB data so just close the connection
                        tmp = sptr;
                        sptr = sptr->next;

                        RemoveSocketObj(tmp);
                        FreeSocketObj(tmp);

                        // At the end of the list
                        if (sptr == NULL)
                            continue;
                    }

                    sptr = sptr->next;
                }
            }
        }

        WSACleanup();
        return 0;
    }


    참조 사이트:
Designed by Tistory.