Overlapped IO와 IOCP

Non-Blocking 모드의 소켓 구성하기

앞서 epoll관련 포스팅에서 논블로킹 모드로 동작하는 소켓의 생성한 적이 있다.
이와 유사하게 윈도우에서는 다음의 함수호출을 통해서 논블로킹 모드로 소켓의 속성을 변경한다.

SOCKET hLisnSock,
int mode = 1;
. . . . .
hLisnSock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
ioctlsocket(hLisnSock, FIONBIO, &mode); // for non-blocking socket
. . . . .

위의 코드에서 호출하는 ioctlsocket 함수는 소켓의 IO방식을 컨트롤하는 함수이다. 그리고 위와 같은 형태로의 함수호출이 의미하는 바는 다음과 같다.

“핸들 hLisnSock이 참조하는 소켓의 입출력 모드(FIONBIO)를 변수 mode에 저장된 값의 형태로 변경한다.”

즉, FIONBIO는 소켓의 입출력 모드를 변경하는 옵션이며, 이 함수의 세 번쨰 인자로 전달된 주소 값의 변수에 0이 저장되어 있으면 블로킹 모드로, 0이 아닌 값이 저장되어 있으면 논블로킹 모드로 소켓의 입출력 속성을 변경한다. 그리고 이렇게 속성이 논블로킹 모드로 변경되면, 논블로킹 모드로 입출력 되는 것 이외에 다음의 특징도 지니게 된다.

  • 클라이언트의 연결요청이 존재하지 않는 상태에서 accept 함수가 호출되면 INVALID_SOCKET이 곧바로 반환된다. 그리고 이어서 WSAGetLastError 함수를 호출하면 WSAEWOULDBLOCK가 반환된다.
  • accept 함수호출을 통해서 새로 생성되는 소켓 역시 논블로킹 속성을 지닌다.

따라서 논블로킹 입출력 소켓을 대상으로 accept 함수를 호출해서 INVALID_SOCKET이 반환되면, WSAGetLastError 함수의 호출을 통해서 INVALID_SOCKET 이 반환된 이유를 확인하고, 그에 적절한 처리를 해야만 한다.

Overlapped IO만 가지고 에코 서버 구현하기

위에서 논-블로킹 소켓의 생성방법을 설명한 이유는 이것이 Overlapped IO 기반의 서버구현에 필요하기 때문이다.

IOCP가 존재하기 때문에 Overlapped IO만을 가지고 에코 서버를 구현하는 일은 드물다. 그러나 IOCP를 이해하기 위해선 Overlapped IO만 가지고 구현을 꼭 해봐야 한다.

코드의 양이 제법 되는 관계로 세 부분으로 나누어 정리한다.

CmplRouEchoServ_win.c

main 함수 이전

#include <stdio.h>
#include <stdlib.h>
#include <winsock2.h>

#define BUF_SIZE 1024
void CALLBACK ReadCompRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);
void CALLBACK WriteCompRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);
void ErrorHandling(char *message);

typedef struct
{
    SOCKET hClntSock;
    char buf[BUF_SIZE];
    WSABUF wsaBuf;
} PER_IO_DATA, *LPPER_IO_DATA

위 구조체에는 소켓의 핸들과 버퍼, 그리고 버퍼 관련 정보를 담는 WSABUF형 변수가 하나로 묶여있다. 이처럼 이 구조체만 참고해도 데이터의 송수신이 가능하도록 하였다.


main 함수

int main(int argc, char* argv[])
{
    WSADATA wsaData;
    SOCKET hLisnSock, hRecvSock;
    SOCKADDR_IN lisnAdr, recvAdr;
    LPWSAOVERLAPPED lpOvlp;
    DWORD recvBytes;
    LPPER_IO_DATA hbInfo;
    int mode = 1, recvAdrSz, flagInfo = 0;
    
    if(argc != 2)
    {
        printf("Usage : %s <port> \n", argv[0]);
        exit(1);
    }
    
    if(WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
    {
        ErrorHandling("WSAStartup() Error");
    }
    
    hLisnSock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    ioctlsocket(hLisnSock, FIONBIO, &mode);     // for non-blocking mode socket
    
    memset(&lisnAdr, 0, sizeof(lisnAdr));
    lisnAdr.sin_family = AF_INET;
    lisnAdr.sin_addr.s_addr = htonl(INADDR_ANY);
    lisnAdr.sin_port = htons(atoi(argv[1]));
    
    if(bind(hLisnSock, (SOCKADDR*)&lisnAdr, sizeof(lisnAdr)) == SOCKET_ERROR)
        ErrorHandling("bind() Error");
    if(listen(hLisnSock, 5) == SOCKET_ERROR)
        ErrorHandling("listen() Error");
    
    recvAdrSz = sizeof(recvAdr);
    
    // loop
    while(1)
    {
        SleepEx(100, TRUE);     // for alertable wait state
        hRecvSock = accept(hLisnSock, (SOCKADDR*)&recvAdr, &recvAdrSz);
        if(hRecvSock == INVALID_SOCKET)
        {
            if(WSAGetLasrError() == WSAWOULDBLOCK)
                continue;   // accept 요청 없는 경우: while 루프 다시 시작
            else
                ErrorHandling("accept() Error");
        }
        puts("Client connected...");
        
        lpOvlp = (LPWSAOVERLAPPED)malloc(sizeof(WSAOVERLAPPED));
        memset(lpOvlp, 0, sizeof(WSAOVERLAPPED));
        
        hbInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));
        hbInfo -> hClntSock = (DWORD)hRecvSock;
        (hbInfo -> wsaBuf).buf = hbInfo -> buf;
        (hbInfo -> wsaBuf).len = BUF_SIZE;
        
        lpOvlp -> hEvent = (HANDLE)hbInfo;
        WSARecv(hRecvSock, &(hbInfo -> wsaBuf), 1, &recvBytes, &flagInfo, lpOvlp, ReadCompRoutine);
    }   // end loop
    closesocket(hRecvSock);
    closesocket(hLisnSock);
    WSACleanUp();
    return 0;
}
  • Completion Routine 기반의 Overlapped IO모델에서는 WSAOVERLAPPED 구조체의 hEvent가 불필요하기 때문에 LPPER_IO_DATA와 같이 다른 데이터를 전달해도 무방하다.
  • 이렇게 LPPER_IO_DATA의 전달을 통해 Completion Routine이 호출하는 CALLBACK 함수는 WSAOVERLAPPED 구조체를 전달받아 그 안의 LPPER_IO_DATA를 확인할 수 있다.
  • 즉, 입출력이 완료된 소켓의 핸들과 버퍼에 접근 가능하다.

두 개의 Completion Routine 함수와 에러처리 함수

void CALLBACK ReadCompRoutine(DWORD dwError, DWORD szRecvBytes, LPWSAOVERLAPPED lpOverlapped, DWORD flags)
{
    LPPER_IO_DATA hbInfo = (LPPER_IO_DATA)(lpOverlapped -> hEvent);
    SOCKET hSock = hbInfo -> hClntSock;
    LPWSABUF bufInfo = &(hbInfo -> wsaBuf);
    DWORD sentBytes;
    
    if(szRecvBytes == 0)
    {
        closesocket(hSock);
        free(lpOverlapped -> hEvent); free(lpOverlapped);
        puts("Client disconnected...");
    }
    else    // echo
    {
        bufInfo -> len = szRecvBytes;
        WSASend(hSock, bufInfo, 1, &sendBytes, 0, lpOverlapped, WriteCompRoutine);
    }
}

void CALLBACK WriteCompRoutine(DWORD dwError, DWORD szSendBytes, LPWSAOVERLAPPED lpOverlapped, DWORD flags)
{
    LPPER_IO_DATA hbInfo = (LPPER_IO_DATA)(lpOverlapped -> hEvent);
    SOCKET hSock = hbInfo -> hClntSock;
    LPWSABUF bufInfo = &(hbInfo -> wsaBuf);
    DWORD recvBytes;
    int flagInfo = 0;
    WSARecv(hSock, bufInfo, 1, &recvBytes, &flagInfo, lpOverlapped, ReadCompRoutine);
}

void ErrorHandling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

IOCP의 단계적 구현

Completion Port의 생성

IOCP에서는 완료된 IO의 정보가 Completion Port 커널 오브젝트(이하 CP)에 등록된다. 그런데 그냥 등록되는 것이 아니라 다음과 같은 과정이 필요하다.

“이 소켓을 기반으로 진행되는 IO의 완료 상황은 저 CP오브젝트에 등록해 주세요”

이를 가리켜 ‘소켓과 CP오브젝트의 연결 요청’이라 한다. 때문에 IOCP 모델의 서버 구현을 위해서는 다음곽 같은 과정이 선행되어야 한다.

  • Completion Port 오브젝트의 생성
  • Completion Port 오브젝트와 소켓의 연결

이때 소켓은 반드시 Overlapped 속성이 부여된 소켓이어야 하며, 위의 두가지 일은 다음 하나의 함수를 통하여 이루어진다.

#include <windows.h>

HANDLE CreateIoCompletionPort(
    HANDLE FileHandle,
    HANDLE ExistingCompletionPort,
    ULONG_PTR CompletionKey,
    DWORD NumberOfConcurrentThreads
);
// 성공 시 CP오브젝트의 핸들, 실패 시 NULL 반환
  • FileHandle: CP오브젝트 생성시에는 INVALID_HANDLE_VALUE 전달
  • ExistingCompletionPort: CP오브젝트 생성시에는 NULL 전달
  • CompletionKey: CP오브젝트 생성시에는 0 전달
  • NumberOfConcurrentThreads: CP오브젝트가 할당되어 완료된 IO를 처리할 쓰레드의 수 전달. 예를 들어 2가 전달되면 CP오브젝트에 할당되어 동시 실행 가능한 쓰레드의 수는 최대 2개로 제한된다. 그리고 이 인자에 0이 전달되면 시스템의 CPU 개수가 동시실행 가능한 쓰레드의 최대 수로 지정된다.

Completion Port 오브젝트와 소켓의 연결

CP오브젝트가 생성되었다면, 이제 이를 소켓과 연결시켜야 한다. 그래야 완료된 소켓의 IO정보가 CP오브젝트에 등록된다. 그럼 이번에는 이를 목적으로 CreateIOCompletionPort 함수를 다시 보자

#include <windows.h>

HANDLE CreateIoCompletionPort(
    HANDLE FileHandle,
    HANDLE ExistingCompletionPort,
    ULONG_PTR CompletionKey,
    DWORD NumberOfConcurrentThreads
);
// 성공 시 CP오브젝트의 핸들, 실패 시 NULL 반환
  • FileHandle: CP오브젝트에 연결할 소켓의 핸들 전달.
  • ExistingCompletionPort: 소켓과 연결할 CP오브젝트의 핸들 전달.
  • CompletionKey: 완료된 IO 관련 정보의 전달을 위한 매개변수, 이는 잠시 후에 소개하는 GetQueuedCompletionStatus함수와 함께 이해해야 한다.
  • NumberOfConcurrentThreads: 어떠한 값을 전달하건, 이 함수의 두 번째 매개변수가 NULL이 아니면 무시된다.

즉 매개변수 FileHandle에 전달된 핸들의 소켓을 매개변수 ExistingCompletionPort에 전달된 핸들의 CP오브젝트에 연결시키는 것이 위 함수의 두 번째 기능이다. 호출의 형태는 다음과 같다.

HANDLE hCpObject;
SOCKET hSock;
. . . . .
CreateIoCompletionPort((HANDLE)hSock, hCpObject, (DWORD)ioInfo, 0);

이렇게 CreateIoCompletionPort가 호출된 이후부터는 hSock을 대상으로 진행된 IO가 완료되면, 이에 대한 정보가 핸들 hCpObject에 해당하는 CP오브젝트에 등록된다.

CompletionPort의 완료된 IO확인과 쓰레드의 IO처리

#include <windows.h>

BOOL GetQueuedCompletionStatus(
    HANDLE CompletionPort,
    LPDWORD lpNumberOfBytes,
    PULONG_PTR lpCompletionKey,
    LPOVERLAPPED* lpOverlapped,
    DWORD dwMilliseconds
);
// 성공 시 TRUE, 실패 시 FALSE 반환 
  • CompletionPort: 완료된 IO 정보가 수록되어 있는 CP오브젝트의 핸들 전달
  • lpNumberOfBytes: 입출력 과정에서 송수신 된 데이터의 크기정보를 저장할 변수의 주소 값 전달
  • lpCompletionKey: CreateIoCompletionPort 함수의 세 번째 인자로 전달된 값의 저장을 위한 변수의 주소 값 전달.
  • lpOverlapped: WSASend, WSARecv 함수호출 시 전달하는 OVERLAPPED 구조체 변수의 주소값이 전달될, 변수의 주소값 전달.
  • dwMilliseconds: 타임아웃 정보 전달, 여기서 지정한 시간이 만료되면 FALSE를 반환하면서 함수를 빠져나가며, INFINITE를 전달하면 완료된 IO가 CP오브젝트에 등록될 때까지 블로킹 상태에 있는다.

위 함수는 IOCP의 완료된 IO의 처리를 담당하는 쓰레드가 호출한다.

IOCP에서는 IO를 전담하는 쓰레드를 별도로 생성해야 한다.

IOCP 흐름

IOCP.png

흐름은 다음과 같이 요약된다.

  • 커널에 CP오브젝트 등록 : CreateIoCompletionPort 함수를 통해 IO작업에 할당할 쓰레드 수를 지정한다. CompletionPort(이하 CP)의 핸들이 반환된다.
  • I/O 디바이스(소켓, FD) IOCP 에 등록 : CreateIoCompletionPort 함수를 통해 디바이스(소켓)와 CP를 바인딩한다. 이 함수는 CP 의 핸들을 반환하는데, 앞으로의 작업에서 이 핸들을 이용할 것이다.
  • 비동기 I/O 시작 : 동기 함수(Connect, Close, Accept, Send) 와 같은 역할을 하는 비동기 함수들(AcceptEX, WSASend, WSARecv…) 을 실행해 윈도우 I/O 를 넘긴다. 이 동작을 거친 후 프로그램의 흐름이 다시 호출한 스레드로 바로 돌아온다(비동기).
  • 비동기 입출력 완료 : 비동기 입출력이 윈도우 I/O 가 종료되면 IOCP 라는 항구(Port) 에 쌓이게 되는데, 이 자료구조는 Queue 이다.
  • GetQueuedCompletionStatus : 스레드 중 작업이 끝난(할일이 없는 스레드) 에서 GetQueuedCompletionStatus 를 호출하면 IOCP 에서 완료된 내용을 꺼내서 받을 수 있다.

IOCP 기반의 에코 서버 구현

IOCPEchoServ_win.c

main 함수 이전

#include <stdio.h>
#include <stdlib.h>
#include <process.h>
#include <winsock2.h>
#include <windows.h>

#define BUF_SIZE 100
#define READ    3
#define WRTIE   5

typedef struct  // socket info
{
    SOCKET hClntSock;
    SOCKADDR_IN clntAdr;
} PER_HANDLE_DATA, *LPPER_HANDLE_DATA;

typedef struct  // buffer info
{
    OVERLAPPED overlapped;
    WSABUF wsaBuf;
    char buffer[BUF_SIZE];
    int rwMode; // READ or WRTIE
} PER_IO_DATA, *LPPER_IO_DATA;

DWORD WINAPI EchoThreadMain(LPVOID CompletionPortIO);
void ErrorHandling(char *message);

위 두 구조체가 어떻게 활용되는지 잘 관찰해야한다.

그리고 다음의 사실을 명심하고 있어야 한다.

“구조체 변수의 주소 값은 구조체 첫 번째 멤버의 주소 값과 일치한다.”

이는 다음 문장의 실행 결과로 EQUAL이 출력됨을 의미한다.

PER_IO_DATA ioData;
if(&ioData == &(ioData.overlapped))
  puts("EQUAL");
else
  puts("NOT EQUAL");

main 함수

int main(int argc, char* argv[])
{
    WSADATA wsaData;
    HANDLE hComPort;
    SYSTEM_INFO sysInfo;
    LPPER_IO_DATA ioInfo;
    LPPER_HANDLE_DATA handleInfo;
    
    SOCKET hServSocket;
    SOCKADDR_IN servAdr;
    int recvBytes, i, flags = 0;
    if(WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
        ErrorHandling("WSAStartup() Error");
    
    hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    GetSystemInfo(&sysInfo);
    for(i = 0 ; i < sysInfo.dwNumberOfProcessors ; i++)
        _beginthreadex(NULL, 0, EchoThreadMain, (LPVOID)hComPort, 0, NULL);
        
    hServSock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    memset(&servAdr, 0, sizeof(servAdr));
    servAdr.sin_family = AF_INET;
    servAdr.sin_addr.s_addr = htonl(INADDR_ANY);
    servAdr.sin_port = htons(atoi(argv[1]));
    
    bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr));
    listen(hServSock, 5);
    
    while(1)
    {
        SOCKET hClntSock;
        SOCKADDR_IN clntAdr;
        int addrLen = sizeof(clntAdr);
        
        hClntSock = accept(hServSock, (SOCKADDR*)&clntAdr, &addrLen);
        handleInfo = (LPPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA);
        handleInfo -> hClntSock = hClntSock;
        memcpy(&(handleInfo -> clntAdr), &clntAdr, addrLen);
        
        CreateIoCompletionPort((HANDLE)hClntSock, hComPort, (DWORD)handleInfo, 0);
        
        ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));
        memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
        ioInfo -> wsaBuf.len = BUF_SIZE;
        ioInfo -> wsaBuf.buf = ioInfo -> buffer;
        ioInfo -> rwMode = READ;
        WSARecv(handleInfo -> hClntSock, &(ioInfo -> wsaBuf), 1, &recvBytes, &flags, &(ioInfo -> overlapped), NULL);
    }
    return 0;
}

IOCP는 입력의 완료와 출력의 완료를 구분해주지 않는다. 따라서 입력을 진행한 것인지, 출력을 진행한 것인지에 대한 정보가 필요하다. 이는 PER_IO_DATA 구조체 변수의 멤버 rwMode가 담당한다.

WSARecv 함수를 호출하면서 일곱 번째 인자로 OVERLAPPED 구조체 변수의 주소 값을 전달하였다. 이 값은 이후에 GetQueued... 함수가 반환하면서 얻을 수 있다. 그런데 구조체 변수의 주소 값은 첫 번째 멤버의 주소 값과 동일하므로 PER_IO_DATA 구조체 변수의 주소 값을 전달한 것과 같다.

쓰레드의 main 함수와 에러처리 함수

DWORD WINAPI EchoThreadMain(LPVOID pComPort)
{
    HANDLE hComPort = (HANDLE)pComPort;
    SOCKET sock;
    DWORD bytesTrans;
    LPPER_HANDLE_DATA handleInfo;
    LPPER_IO_DATA ioInfo;
    DWORD flags = 0;
    
    while(1)
    {
        GetQueuedCompletionStatus(hComPort, &bytesTrans, (LPDWORD)&handleInfo, (LPOVERLAPPED*)&ioInfo, INFINITE);
        sock = handleInfo -> hClntSock;
        
        if(ioInfo -> rwMode == READ)
        {
            puts("message received!");
            if(bytesTrans == 0) // EOF 전송 시
            {
                closesocket(sock);
                free(handleInfo); free(ioInfo);
                continue;
            }
            
            memset(&(ioInfo -> overlapped), 0, sizeof(OVERLAPPED));
            ioInfo -> wsaBuf.len = bytesTrans;
            ioInfo -> rwMode = WRITE;
            WSASend(sock, &(ioInfo -> wsaBuf), 1, NULL, 0, &(ioInfo -> overlapped), NULL);
            
            ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));
            memset(&(ioInfo -> overlapped), 0, sizeof(OVERLAPPED));
            ioInfo -> wsaBuf.len = BUF_SIZE;
            ioInfo -> wsaBuf.buf = ioInfo -> buffer;
            ioInfo -> rwMode = READ;
            WSARecv(sock, &(ioInfo -> wsaBuf), 1, NULL, &flags, &(ioInfo -> overlapped), NULL);
        }
        else
        {
            puts("message sent!");
            free(ioInfo);
        }
    }
    return 0;
}

void ErrorHandling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

위 코드가 잘 이해가 가지 않았는데, malloc 전까지는 쓰레드 내에서 ioInfo의 멤버들을 모두 저장 및 컨트롤 할 수 있지만, WSARecv 이후에는 해당 IO 작업이 완료될때까지 어느 쓰레드도 해당 작업을 맡지 않는다. 따라서 쓰레드 A에서 WSARecv를 호출하더라도 쓰레드 A가 그 이후 곧바로 다른 IO 작업을 맡았다면 스택 내용이 전부 손상될 것이다. 혹은 쓰레드 A가 WSARecv를 통해 IO 요청한 작업을 쓰레드 B에서 핸들링할 수도 있을 것이다. 그래서 따로 힙영역을 할당해 준 것이 아닌가 싶다.