C++에선 비동기로 함수를 실행하기 위해선 아래의 std 라이브러리들이 쓰여진다.
- std::future, std::promise
- std::shared_future
- std::packaged_task
- std::async
future / promise
비동기적 실행을 통해서 하고 싶은 일은, 어떠한 데이터를 다른 쓰레드를 통해 처리하고, 그 처리된 데이터를 받아내는 것이라고 볼 수 있다. 즉, 처리해야되는 데이터를 미래에(future) 다시 돌려받겠다라는 약속(promise)라고 볼 수 있다.
#include <iostream>
#include <string>
#include <future>
#include <thread>
void worker(std::promise<std::string>* p) {
// 약속을 이행함. 해당 결과는 future에 들어가게 됨
p->set_value("some data");
}
int main(void) {
std::promise<std::string> p;
// 미래에 string 데이터를 받는다는 약속
std::future<std::string> data = p.get_future();
std::thread t(worker, &p);
// 약속된 데이터를 받을 때까지 기다림
data.wait();
// wait가 리턴됬다는 것은 future에 데이터가 준비되었다는 의미
// 참고로, wait없이 get만 사용해도 wait 한 것과 동일
std::cout << "받은 데이터 : " << data.get() << "\n";
t.join();
return 0;
}
promise에 대응되는 future 객체는 promise가 전달한 데이터를 get 함수를 통해서 얻을 수 있다. 참고로 get 함수를 호출하게 되면 future 내에 전달받은 객체가 이동된다. 따라서 get을 다시 호출하면 안된다. 정리하자면, promise는 생산자-소비자 패턴에서 생산자의 역할을 수행하고, future는 소비자의 역할을 수행한다고 볼 수 있다 그렇기 때문에 이러한 코드는 아래처럼 condition_variable을 사용해도 구현할 수 있다.
#include <iostream>
#include <string>
#include <thread>
#include <condition_variable>
#include <mutex>
std::condition_variable cv;
std::mutex m;
bool done = false;
std::string info;
void worker() {
{
std::unique_lock<std::mutex> lk(m);
info = "some data";
done = true;
}
cv.notify_all();
}
int main(void) {
std::thread t(worker);
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []() { return done; });
lk.unlock();
std::cout << "받은 데이터 : " << info << std::endl;
return 0;
}
단, future이 더 유용한 이유는 예외도 전달할 수 있다는 것이다.
include <iostream>
#include <string>
#include <future>
#include <thread>
void worker(std::promise<std::string>* p) {
try {
throw std::runtime_error("Some Error");
}
catch (std::exception e) {
// 예외 세팅
p->set_exception(std::current_exception());
//// 또는
// p->set_exception(std::make_exception_ptr(e));
}
}
int main(void) {
std::promise<std::string> p;
// 미래에 string 데이터를 받는다는 약속
std::future<std::string> data = p.get_future();
std::thread t(worker, &p);
// 약속된 데이터를 받을 때까지 기다림
data.wait();
try {
std::cout << "받은 데이터 : " << data.get() << "\n";
}
catch (const std::exception& e) {
std::cout << "예외 : " << e.what() << "\n";
}
t.join();
return 0;
}
참고로 set_exception 함수는 예외 객체가 아닌 exception_ptr을 전달해야하는데, 이것은 catch로 받은 예외 객체의 포인터가 아닌 현재 catch된 예외에 관한 정보를 반환하는 current_exception 함수가 리턴하는 객체이다. catch로 전달받은 예외 객체를 make_exception_ptr 함수를 통해서도 가능하다.
void worker(std::promise<std::string>* p) {
try {
throw std::runtime_error("Some Error");
}
catch (std::exception e) {
// 예외 세팅
p->set_exception(std::make_exception_ptr(e));
}
}
wait_for
wait_for을 사용하면, 정해진 시간 동안만 기다리고 그냥 진행할 수 있다.
#include <iostream>
#include <string>
#include <future>
#include <thread>
#include <chrono>
void worker(std::promise<void>* p) {
std::this_thread::sleep_for(std::chrono::seconds(10));
p->set_value();
}
int main(void) {
std::promise<void> p;
std::future<void> data = p.get_future();
std::thread t(worker, &p);
while (true) {
std::future_status status = data.wait_for(std::chrono::seconds(1));
if (status == std::future_status::timeout) {
// 아직 데이터가 준비되지 않음
std::cerr << ">";
}
else if (status == std::future_status::ready) {
// promise에서 future로 데이터 설정
break;
}
}
t.join();
return 0;
}
wait_for 함수는 promise가 설정될 때까지 기다리는 대신 인자로 전달된 시간만큼 기다렸다가 바로 리턴한다. 이 때, 리턴하는 값은 현재 future 상태를 나타내는 future_status 객체이다.
future_status 객체는 총 3 가지 상태를 가질 수 있다
- future에 값이 설정됬을 때는 future_status::ready
- wait_for에 지정한 시간이 지났지만 값이 설정되지 않아서 리턴됬을 때는 future_status::timeout
- 결과값을 계산하는 함수가 실행되지 않았다는 의미인 future_status::deferred가 있다
그리고 여기서 promise와 future에 void 객체를 템플릿 인자로 전달했는데, void인 경우에 아무런 객체도 전달하지는 않지만, future가 set이 되었는지의 유무로 판단하는 플래그의 역할을 수행할 수 있다.
shared_future
위에서 future의 경우 한 번만 get을 할 수 있다. 이는 get을 호출하게 되면 future 내부의 객체가 이동되기 때문이다. 하지만, 종종 다른 쓰레드에서 future를 get할 필요성이 있을 때
shared_future를 사용하면 가능하다.
#include <iostream>
#include <string>
#include <future>
#include <thread>
void runner(std::shared_future<void>* start) {
start->get();
std::cout << "출발!\n";
}
int main(void) {
std::promise<void> p;
std::shared_future<void> start = p.get_future();
std::thread t1(runner, &start);
std::thread t2(runner, &start);
std::thread t3(runner, &start);
std::thread t4(runner, &start);
std::cerr << "준비...";
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cerr << "땅!\n";
p.set_value();
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}
위와 같이 4개의 쓰레드에서 하나의 future값이 공통으로 사용하여, 위의 출력 결과를 확인할 수 있다.
packaged_task
C++에서 위에서 설명한 promise-future 패턴을 비동기적 함수(정확히는 Callable 함수; 람다 함수, 함수 객체)의 리턴값에 간단히 적용할 수 있는 packaged_task라는 것을 지원하고 있다.
packaged_task에 전달된 함수(task2)가 리턴할 때, 그 리턴값을 promise에 set_value 하고, 만약 예외를 던졌다면 set_exception이다. promise에 설정된 future는 packaged_task가 리턴하는 future에 접근할 수 있다.
#include <iostream>
#include <future>
#include <thread>
int some_task(int x) {
return 10 + x;
}
int main(void) {
// int(int) -> return type(arguments) int를 리턴 / int를 인자로 받는 함수
// std::function 참조
std::packaged_task<int(int)> task(some_task);
std::future<int> start = task.get_future();
std::thread t(std::move(task), 5);
std::cout << "결과값 : " << start.get() << std::endl;
t.join();
return 0;
}
packaged_task는 비동기로 실행할 함수 자체를 생성자의 인자로 받는다. 또한, 템플릿 인자로 해당 함수의 리턴 타입과 파라미터 타입도 명시해야 한다. packaged_task는 전달된 함수를 실행하고, 그 함수의 리턴값을 promise에 설정한다.
std::packaged_task<int(int)> task(some_task);
std::future<int> start = task.get_future();
그리고 해당 promise에 설정된 future는 위와 같이 get_future 함수를 통해서 설정할 수 있다. 생성된 packaged_task는 쓰레드에 전달하면 되는데, packaged_task는 복사 생성이 불가능하다. 때문에 명시적으로 move를 해주어야만 한다.(promise도 동일)
std::thread t(std::move(task), 5);
이렇게 비동기로 실행된 함수의 결과값은 추후에 future의 get 함수로 받을 수 있다. 이처럼 packaged_task를 사용하게 된다면 쓰레드에 굳이 promise를 전달하지 않아도 알아서 packaged_task가 함수의 리턴값을 처리해주어서 매우 편리하게 사용할 수 있다.
async
위에서 promise나 packaged_task는 비동기 실행을 위해서, 쓰레드를 명시적으로 생성하여 실행해야 했다. 하지만 std::async에 어떤 함수를 전달한다면, 아예 쓰레드를 알아서 만들어서 해당 함수를 비동기적으로 실행하고, 그 결과값을 future에 전달한다.
#include <iostream>
#include <future>
#include <thread>
#include <vector>
int sum(const std::vector<int>& v, int start, int end) {
int total = 0;
for (int i = start; i < end; i++) {
total += v[i];
}
return total;
}
int parallel_sum(const std::vector<int>& v) {
// 1 ~ 500 까지의 합
std::future<int> lower_half_future =
std::async(std::launch::async, sum, cref(v), 0, v.size() / 2);
// 501부터 1000까지의 합
int upper_half = sum(v, v.size() / 2, v.size());
return lower_half_future.get() + upper_half;
}
int main(void) {
std::vector<int> v;
v.reserve(1000);
for (int i = 0; i < 1000; i++) {
v.push_back(i + 1);
}
std::cout << "1부터 1000까지의 합 : " << parallel_sum(v) << "\n";
return 0;
}
async 함수는 인자로 받은 함수를 비동기로 실행하고, 해당 결과값을 보관할 future를 리턴한다.
std::future<int> lower_half_future =
std::async(std::launch::async, sum, cref(v), 0, v.size() / 2);
async의 첫번째 인자는 어떠한 형태로 실행할지를 전달하는데 아래 두 가지 값으로 설정 가능하다. 두 번째인자부터는 차례로 함수, 그리고 함수의 인자를 전달하면 된다.
- std::launch::async : 바로 쓰레드를 생성하여 인자로 전달된 함수를 실행
- std::launch::deferred : future의 get 함수가 호출되었을 때 실행(새로운 쓰레드 생성하지 않음, 동기적 실행)
이렇게 생성된 async 함수는 실행하는 함수의 결과값을 포함하는 future를 리턴하게 되고, 그 결과값은 get 함수를 통해서 얻을 수 있다.
return lower_half_future.get() + upper_half;
위의 parrallel 함수는 1부터 1000까지의 덧셈을 2개의 쓰레드에서 실행하는데, 1부터 500까지의 합은 async를 통해 생성된 새로운 쓰레드에서 처리하고, 나머지는 원래의 쓰레드에서 처리하게 된다.
위의 경우 1~1000까지의 덧셈이기 때문에 비동기나 동기 실행의 차이가 크지 않지만, 다음의 예제를 살펴보면 큰 차이가 난다는 것을 알 수 있다. 3초가 걸리는 실행 함수를 비동기 실행과 동기 실행으로 수행 시간을 출력하는 예제이다.
#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <chrono>
int do_work(int x) {
// do something
std::this_thread::sleep_for(std::chrono::seconds(3));
return x;
}
void do_work_parallel() {
auto f1 = std::async([]() { do_work(3); });
auto f2 = std::async([]() { do_work(3); });
do_work(3);
f1.get();
f2.get();
}
void do_work_sequential() {
do_work(3);
do_work(3);
do_work(3);
}
int main(void) {
std::cout << "----- Sequential Work -----\n";
auto t0 = std::chrono::steady_clock::now();
do_work_sequential();
auto t1 = std::chrono::steady_clock::now();
std::cout << "걸린 시간 : " << std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0).count() << " ms\n";
std::cout << "----- Parallel Work -----\n";
t0 = std::chrono::steady_clock::now();
do_work_parallel();
t1 = std::chrono::steady_clock::now();
std::cout << "걸린 시간 : " << std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0).count() << " ms\n";
return 0;
}
'프로그래밍 언어 > C++' 카테고리의 다른 글
C++ 정적 바인딩과 동적 바인딩의 차이점 (0) | 2022.08.21 |
---|---|
C++ 중괄호 초기화 (0) | 2022.08.19 |
C++ 동기(synchronous)와 비동기(asynchronous) / 블로킹(blocking)과 논블로킹(non-blocking) (0) | 2022.08.16 |
C++ Copy and Swap idiom (0) | 2022.08.08 |
C++ 위임 생성자 (delegating constructor) (0) | 2022.08.05 |