Condition Variable

Condition Variable은 Event의 변종

앞선 Event 코드에서…

mutex m;
queue<int32> q;

void Producer()
{
    while(true)
    {
        {
            unique_lock<mutex> lock(m);
            q.push(100);
        }
        
        ::SetEvent(handle); //Event의 상태를 Signal로 변경
        
        //만약 sleep하지 않고 Producer가 생산을 불안정 주기로 계속 한다면?
        //this_thread::sleep_for(10000ms);
    }
    
}

void Consumer()
{
    while(true)
    {
        ::WaitForSingleObject(handle, INFINITE); //무한 대기 -> 초록불일때까지 기다림.
        //auto 옵션이므로 여기서 다시 Signal이 빨간불로 바뀜.
        //만약 manual 옵션이라면 ::ResetEvent(handle); 추가
        
        unique_lock<mutex> lock(m);
        if (q.empty() == false)
        {
            int32 data = q.front();
            q.pop();
            cout << q.size() << endl; // 데이터가 아닌 큐의 크기를 출력하도록 변경
        }
    }
}

HANDLE handle; //생산자, 소비자에서 사용 가능하게끔 전역으로 선언

int main()
{
    //커널 오브젝트
    handle = ::CreateEvent(NULL/*보안속성*/, FALSE/*auto reset*/, FALSE/*초기 상태*/, NULL/*이름*/);
    
    thread t1(Producer);
    thread t2(Consumer);
    
    t1.join();
    t2.join();
    
    ::CloseHandle(handle); //사용이 끝난 Handle은 수거를 하는 것이 바람직함.
    return 0;
}
  • 생산자가 데이터를 넣고 파란불을 켜고, 소비자는 파란불을 확인하고 데이터를 꺼내고 빨간불로 바꾸고… 를 반복하면 q.size()는 0만 출력되지 않을까?
  • 실제로 실행해보면 데이터가 무한정 계속 늘어나는 것을 확인할 수 있다.

왜 데이터가 계속 늘어나는가?

void Producer()
{
    while(true)
    {
        {
            unique_lock<mutex> lock(m);
            q.push(100);
        }
        
        ::SetEvent(handle);
    }
    
}

void Consumer()
{
    while(true)
    {
        ::WaitForSingleObject(handle, INFINITE); 
        
        //----- The Section -----
        
        unique_lock<mutex> lock(m);
            
        if (q.empty() == false)
        {
            int32 data = q.front();
            q.pop();
            cout << q.size() << endl;
        }
    }
}
  1. 생산자가 데이터를 삽입하고 초록불로 바꿈
  2. 소비자는 초록불을 확인하고 빨간불로 바꿈, The Section 진입
  3. !!여기서 생산자가 또 다시 데이터를 삽입하려고 함
  4. 소비자는 락을 걸고 큐 사이즈를 읽어야 하는데, 이미 생산자가 락을 건 상태 ㅠㅠ
  5. 결과적으로 다음에 운 좋게 락이 풀렸을떄 큐에서 pop하고 사이즈를 읽는데, 이때는 이미 큐에 여러개의 데이터가 삽입된 상태임.

Consumer

unique_lock<mutex> lock(m);
while (q.empty() == false) //if를 while로
{
    int32 data = q.front();
    q.pop();
    cout << q.size() << endl;
}
  • 따라서 이런경우 if 대신 while을 사용하여 큐를 전부 비워주는 등의 조치를 취해야 함.

Condition Variable

mutex m;
//condition_variable 은 표준 mutex와 함께 동작한다.
condition_variable cv;

//일반적인 상황으로 확장시, _any 사용. 이 경우 따로 라이브러리를 불러와야 한다.
#include <condition_variable> 
condition_variable_any;

💡 참고 : CV는 User-Level Object이다. (Kernel Object가 아님)

  • Contidion Variable은 항상 Lock과 짝지어 사용한다.

코드

void Producer()
{
    while(true)
    {
        // --- CV 사용법 ---
        // 1) Lock을 잡고
        // 2) 공유 변수 값을 수정
        // 3) Lock을 풀고
        // 4) 조건변수 통해 다른 쓰레드에게 통지
        {
            unique_lock<mutex> lock(m); // 1)
            q.push(100);                // 2)
                                        // 3) 소멸자로 인한 Lock 자동 해제        
        }
        
        // ::SetEvent(handle); 대신
        cv.notify_one();                // 4) wait 중인 쓰레드가 있으면 딱 1개를 깨운다.
    }
    
}

void Consumer()
{
    while(true)
    {
        // ::WaitForSingleObject(handle, INFINITE); 대신
        unique_lock<mutex> lock(m);         // lock선언
        // 큐가 비어있지 않을때까지 기다리고 싶다. (탈출 조건) 람다식 사용
        cv.wait(lock, []() { return q.empty() == false; });
        // 1) Lock을 잡으려고 시도 (이미 잡혔다면 자동으로 통과)
        // 2) 조건 확인
        // - 만족O => 빠져 나와서 이어서 코드를 진행
        // - 만족X => Lock을 풀어주고 대기 상태, 다음 신호를 받으면 cv.wait() 반복
        
        
        // while (q.empty() == false) 어차피 위에서 확인하므로 의미 없음.
        {
            int32 data = q.front();
            q.pop();
            cout << q.size() << endl;
        }
    }
}
  • notify_one() 대신 모든 쓰레드에게 통지할 경우는 notify_all()사용
  • cv.wait()의 첫번째 인자는 lock_guard 가 아닌 unique_lock이 와야한다. 이유는 조건부로 풀어줘야 하는 경우가 생기기 때문.