Waker
로 태스크 깨우기
future들이 첫 번째 poll
에서는 완성되지 못하는 것이 일반적입니다. 완성되지
못했을 경우, 더 진행이 가능할 준비가 되면 future가 poll될 수 있게 확실히
조치해둘 필요가 있습니다. Waker
타입으로 이 조치를 취할 수 있습니다.
future가 poll될 때마다 한 "태스크"의 일부분으로서 poll됩니다. 태스크란 한 executor에게 제공된 최상위 future들입니다.
Waker
는 wake()
메소드를 제공하는데, 이 메소드는 연관된 태스크가 깨워져야
한다고 executor에게 알리는데 사용됩니다. wake()
가 호출되었을 때, executor는
Waker
와 연관된 태스크가 진행될 준비가 되었으며, 태스크의 future가 다시
poll되어야 한다는 것을 알게 됩니다.
Waker
는 clone()
도 구현하기 때문에, 필요한 곳에 복사되고 저장될 수 있습니다.
Waker
를 사용하여 간단한 타이머를 구현해 봅시다.
응용: 타이머 만들기
이 예제의 목적에 따라, 우리는 타이머가 만들어졌을 때 그냥 새 스레드를 하나 생성할 것이고, 필요한 만큼 sleep할 것입니다. 그리고 time window가 지나면, 타이머 future에 시그널을 보낼 것입니다.
먼저, cargo new --lib timer_future
를 실행하여 새 프로젝트를 만들고, imports를
추가합시다. 그리고 src/lib.rs
파일에 코딩하면서 시작합니다.
#![allow(unused)] fn main() { use std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }; }
먼저 future 타입 자체를 정의합시다. 우리의 future에게는 타이머가 경과되었는지,
그래서 future가 완성되어야 하는지 여부를 스레드와 통신할 방법이 필요합니다.
그래서 공유된 Arc<Mutex<..>>
값을 사용해서 스레드와 future 사이에 통신할
것입니다.
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// future와 대기중인 스레드 사이에 공유되어 상태를 나타내는 구조체
struct SharedState {
/// 타이머가 경과되었는 지 여부
completed: bool,
/// `TimerFuture`가 실행될 태스크 용 waker.
/// 스레드는 `completed = true`라고 설정한 후에 `TimerFuture`의 태스크에게
/// "일어나서, `completed = true`인지 확인하고, 진행하라"고 전하는 데 이
/// waker를 사용할 수 있다.
waker: Option<Waker>,
}
자, 진짜로 Future
구현을 작성해 봅시다.
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 타이머가 이미 완성되었는지 알기 위해 공유된 상태를 확인.
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// waker를 설정해서 타이머가 완성되었을 때 스레드가 현재의 태스크를
// 깨울 수 있게 한다. 이렇게 함으로써 future는 다시 poll 될 수 있으며,
// 또, `completed = true`가 맞는지 확인 할 수 있다.
//
// "waker를 매번 반복적으로 클론하지 않고 한 번만 해도 되지
// 않을까" 생각할 수도 있다. 하지만, `TimerFuture`는 executor의 여러
// 태스크로 이동할 수 있기 때문에, 클론을 한 번만 하면 잘못된
// 태스크를 가리키는 정체된 waker가 만들어져 `TimerFuture`가 제대로
// 못 깨워질 것이다.
//
// 주의: `Waker::will_wake` 함수를 이용하여 `TimerFuture`가
// 제대로 못 깨워지는 문제를 체크할 수 있으나, 예제를 간단하게
// 하기 위해 생략하였다.
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
꽤 간단하죠? 스레드가 shared_state.completed = true
로 설정하였다면 future
가
완성된 것입니다. 아니라면, 우리는 스레드가 태스크를 다시 깨울 수 있도록,
Waker
를 현재의 태스크용으로 클론하여 shared_state.waker
에 전달합니다.
중요한 점은, future가 poll될 때마다 Waker
를 갱신해야 한다는 점입니다.
왜냐하면, 그 future가 다른 Waker
와 함께 다른 태스크로 이동했을 수 있기
때문입니다.(TODO: 의역필요) 이런 상황은 future가 poll되고 나서 태스크 사이에서
여기저기 전달될 때 발생합니다.
마지막으로, 실제로 타이머를 만들고 스레드를 시작할 API가 필요합니다.
impl TimerFuture {
/// 주어진 시간이 경과하면 완성되는 새로운 `TimerFuture`를 만든다.
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 새로운 스레드 생성
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// 타이머가 완성되어서 future가 poll된 마지막 태스크를 (존재한다면)
// 깨우는 시그널
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
짠! 이게 간단한 타이머 future를 만드는데 필요한 전부입니다. 이제 future가 실행될 executor만 있으면 되는데요...