쓰레드 풀에 대한 이해
쓰레드의 생성과 소멸은 시스템에 많은 부담을 준다. 따라서 빈번한 쓰레드의 생성과 소멸을 피하기 위해선 쓰레드 풀을 유지하는 것은 성능 향상에 도움이 된다.
쓰레드 풀의 기본 원리는 쓰레드의 재활용이다. 할당된 일을 마친 쓰레드를 소멸시키지 않고, 쓰레드 풀에 저장해 뒀다가 필요할 때 다시 꺼내 쓰는 개념이다. 즉, 쓰레드의 생성과 소멸에 필요한 비용을 지불하지 않겠다는 것이다.
쓰레드 풀 동작 원리
- 쓰레드 풀은 처리해야 할 일(work)이 등록되기 전에 생성되는데, 풀이 생성됨과 동시에 쓰레드들도 생성되어 풀에서 대기하게 된다.
- 쓰레드 풀에 존재하는 쓰레드 하나를 임의로 할당해서 일의 처리를 도모한다.
- 만약 풀에 존재하는 쓰레드 수보다 처리해야 할 일의 수가 많다면, 일이 순서대로 처리되도록 디자인할 수 있고, 빠른 일 처리를 위해 추가적인 쓰레드가 생성되도록 풀을 디자인할 수도 있다.
쓰레드 풀의 함수 관계
- 1. WORK GetWorkFromPool (void);
- 쓰레드 풀에서 Work를 가져올 때 호출하는 함수이다.
- 2. DWORD AddWorkToPool (WORK work);
- 새로운 Work를 등록할 때 호출하는 함수이다.
- 3. DWORD MakeThreadToPool (DWORD numOfThread);
- 쓰레드 풀이 생성된 이후에 풀에 쓰레드를 생성(등록)하는 함수이다.
- 인자로 전달되는 수 만큼 쓰레드가 생성된다.
- 4. void WorkerThreadFunction (LPVOID pParam);
- 쓰레드가 생성되자마자 호출하는 쓰레드의 main함수이다.
- 이 함수의 구성을 봐야만 어떻게 Work를 할당받아서 처리하는지, 그리고 Work가 없을 때의 쓰레드 상태들을 알 수 있다.
쓰레드 풀 메커니즘
- 단계 1
- 전역으로 선언된 쓰레드 풀에 MakeThreadToPool 함수의 호출을 통해서 쓰레드를 생성해 등록시킨다.
- 이렇게 생성된 쓰레드는 이벤트 오브젝트가 Signaled 상태가 되기를 기다리며 Blocked 상태가 된다.
- 단계 2
- AddWorkToPool 함수 호출을 통해서 Work를 등록한다.
- 단계 3
- Work가 등록되면, 쓰레드 풀에서 Blocked 상태에 있는 모든 이벤트 오브젝트를 Signaled 상태로 변경한다.
- 단계 4
- 모든 이벤트 오브젝트가 Signaled 상태가 되므로, 모든 쓰레드가 Running 상태가 된다. 그러나 Work를 할당받은 하나의 쓰레드를 제외하고 나머지는 다시 Blocked 상태가 된다. (다소 비효율적인 부분이다.)
- -(보통 스레드별로 각자 이벤트 오브젝트를 갖는게 아니라 하나의 이벤트 오브젝트를 두고, 그게 Signaled 상태가 되면 스레드 풀에서 스레드를 하나만 깨우거나(auto-reset), 전체를 깨운다(manual-reset).)
- 단계 5
- Running 상태로 남아 있게 될 하나의 쓰레드는 GetWorkFromPool 함수 호출을 통해서 Work를 할당받아서 실행하게 된다.
- 전역으로 선언된 쓰레드 풀 접근 동기화
- 쓰레드 풀에 해당하는 gThreadPool은 전역으로 선언되어 있고, 둘 이상의 쓰레드에 의해서 참조되는 메모리 영역이다.
- 따라서 gThreadPool의 접근에 동기화가 필요하다.
- 이를 위해 다음 함수들을 정의하고 있다.
- 뮤텍스 기반 동기화 함수들을 래핑(Rapping)한 것이다.
- void InitMutex();
- void DeInitMutex();
- void AcquireMutex();
- void ReleaseMutex();
소스 코드
ThreadPool.h
#pragma once
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool
{
public:
ThreadPool(size_t capacity) : m_IsStop(false)
{
// the constructor just launches some amount of workers
for(size_t i = 0; i < capacity; ++i)
{
m_Workers.push_back(std::thread([&]()
{
while(true)
{
std::unique_lock<std::mutex> lock(m_QueMutex);
while(!m_IsStop && m_Tasks.empty())
{
m_Condition.wait(lock);
}
if (m_IsStop && m_Tasks.empty()) return;
std::function<void()> task(m_Tasks.front());
m_Tasks.pop();
lock.unlock();
task();
}
}));
}
}
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(m_QueMutex);
m_IsStop = true;
}
m_Condition.notify_all();
for(size_t i = 0; i < m_Workers.size(); ++i)
{
m_Workers[i].join();
}
}
template <class _F, class ... _Args>
auto Enqueue(_F&& f, _Args&& ... args)
->std::future<decltype(std::forward<_F>(f)(
std::forward<_Args>(args)...))>
{
typedef decltype(std::forward<_F>(f)(
std::forward<_Args>(args)...)) return_type;
// don't allow enqueueing after stopping the pool
if (m_IsStop)
{
throw std::runtime_error("enqueue on stopped ThreadPool");
}
auto task = std::make_shared<std::packaged_task<return_type()>>
(
std::bind(std::forward<_F>(f), std::forward<_Args>(args)...)
);
std::future<return_type> result = task->get_future();
{
std::unique_lock<std::mutex> lock(m_QueMutex);
m_Tasks.push([task](){ (*task)(); });
}
m_Condition.notify_one();
return result;
}
private:
// need to keep track of threads so we can join them
std::vector<std::thread> m_Workers;
// the task queue
std::queue<std::function<void()>> m_Tasks;
// synchronization
std::mutex m_QueMutex;
std::condition_variable m_Condition;
bool m_IsStop;
};
main.cpp
#include <iostream>
#include <chrono>
#include "ThreadPool.h"
int main(int argc, const char * argv[])
{
ThreadPool pool(4);
std::vector<std::future<int>> results;
for(int i = 0; i < 4; ++i)
{
results.push_back(pool.Enqueue([i](){
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "world " << i << std::endl;
return i * i;
}));
}
for(size_t i = 0; i < results.size(); ++i)
{
int value = results[i].get();
std::cout << value << ' ';
}
std::cout << std::endl;
return 0;
}
클래스 내에서 멤버함수를 사용하는 방법 #1
int sumfunc_static(int i)
{
int sum = 0;
for (int k = i * 100; k < i * 100 + 100; k++)
sum += k;
return sum;
}
int CMFCApplication1Dlg::sumfunc_memfunc(int i)
{
int sum = 0;
for (int k = i * 100; k < i * 100 + 100; k++)
sum += k;
return sum;
}
void CMFCApplication1Dlg::Test()
{
ThreadPool pool(4);
std::vector<std::future<int>> results;
for (int i = 0; i < 20; ++i)
{
/*auto lamda = &CMFCApplication1Dlg::sumfunc_memfunc;
(this->*lamda)(i);*/
/*auto lamda = &sumfunc_static;
lamda(i);*/
results.push_back(pool.Enqueue([&,i](){ return sumfunc_memfunc(i); }));
}
for (size_t i = 0; i < results.size(); ++i)
{
int value = results[i].get();
std::cout << value << std::endl;
}
std::cout << std::endl;
return ;
}
'CS > 네트워크' 카테고리의 다른 글
Reliable Data Transfer RDT란? (0) | 2023.07.09 |
---|---|
데드 레커닝 (Dead Reckoning) 개념 (0) | 2023.07.02 |
Overlapped (비동기) I/O, epoll, iocp 정의 및 코드 (0) | 2022.11.04 |
TCP와 UDP의 특징과 차이 (0) | 2022.11.04 |
C# 원자적 연산 (Interlocked 클래스) (0) | 2022.07.26 |