Waker로 태스크 깨우기

future들이 첫 번째 poll에서는 완성되지 못하는 것이 일반적입니다. 완성되지 못했을 경우, 더 진행이 가능할 준비가 되면 future가 poll될 수 있게 확실히 조치해둘 필요가 있습니다. Waker 타입으로 이 조치를 취할 수 있습니다.

future가 poll될 때마다 한 "태스크"의 일부분으로서 poll됩니다. 태스크란 한 executor에게 제공된 최상위 future들입니다.

Wakerwake() 메소드를 제공하는데, 이 메소드는 연관된 태스크가 깨워져야 한다고 executor에게 알리는데 사용됩니다. wake()가 호출되었을 때, executor는 Waker와 연관된 태스크가 진행될 준비가 되었으며, 태스크의 future가 다시 poll되어야 한다는 것을 알게 됩니다.

Wakerclone()도 구현하기 때문에, 필요한 곳에 복사되고 저장될 수 있습니다.

Waker를 사용하여 간단한 타이머를 구현해 봅시다.

응용: 타이머 만들기

이 예제의 목적에 따라, 우리는 타이머가 만들어졌을 때 그냥 새 스레드를 하나 생성할 것이고, 필요한 만큼 sleep할 것입니다. 그리고 time window가 지나면, 타이머 future에 시그널을 보낼 것입니다.

먼저, cargo new --lib timer_future를 실행하여 새 프로젝트를 만들고, imports를 추가합시다. 그리고 src/lib.rs파일에 코딩하면서 시작합니다.


#![allow(unused)]
fn main() {
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};
}

먼저 future 타입 자체를 정의합시다. 우리의 future에게는 타이머가 경과되었는지, 그래서 future가 완성되어야 하는지 여부를 스레드와 통신할 방법이 필요합니다. 그래서 공유된 Arc<Mutex<..>> 값을 사용해서 스레드와 future 사이에 통신할 것입니다.

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// future와 대기중인 스레드 사이에 공유되어 상태를 나타내는 구조체
struct SharedState {
    /// 타이머가 경과되었는 지 여부
    completed: bool,

    /// `TimerFuture`가 실행될 태스크 용 waker.
    /// 스레드는 `completed = true`라고 설정한 후에 `TimerFuture`의 태스크에게
    /// "일어나서, `completed = true`인지 확인하고, 진행하라"고 전하는 데 이
    /// waker를 사용할 수 있다.
    waker: Option<Waker>,
}

자, 진짜로 Future 구현을 작성해 봅시다.

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 타이머가 이미 완성되었는지 알기 위해 공유된 상태를 확인.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // waker를 설정해서 타이머가 완성되었을 때 스레드가 현재의 태스크를
            // 깨울 수 있게 한다. 이렇게 함으로써 future는 다시 poll 될 수 있으며,
            // 또, `completed = true`가 맞는지 확인 할 수 있다.
            //
            // "waker를 매번 반복적으로 클론하지 않고 한 번만 해도 되지
            // 않을까" 생각할 수도 있다. 하지만, `TimerFuture`는 executor의 여러
            // 태스크로 이동할 수 있기 때문에, 클론을 한 번만 하면 잘못된
            // 태스크를 가리키는 정체된 waker가 만들어져 `TimerFuture`가 제대로
            // 못 깨워질 것이다.
            //
            // 주의: `Waker::will_wake` 함수를 이용하여 `TimerFuture`가
            // 제대로 못 깨워지는 문제를 체크할 수 있으나, 예제를 간단하게 
            // 하기 위해 생략하였다.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

꽤 간단하죠? 스레드가 shared_state.completed = true로 설정하였다면 future가 완성된 것입니다. 아니라면, 우리는 스레드가 태스크를 다시 깨울 수 있도록, Waker를 현재의 태스크용으로 클론하여 shared_state.waker에 전달합니다.

중요한 점은, future가 poll될 때마다 Waker를 갱신해야 한다는 점입니다. 왜냐하면, 그 future가 다른 Waker와 함께 다른 태스크로 이동했을 수 있기 때문입니다.(TODO: 의역필요) 이런 상황은 future가 poll되고 나서 태스크 사이에서 여기저기 전달될 때 발생합니다.

마지막으로, 실제로 타이머를 만들고 스레드를 시작할 API가 필요합니다.

impl TimerFuture {
    /// 주어진 시간이 경과하면 완성되는 새로운 `TimerFuture`를 만든다.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // 새로운 스레드 생성
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // 타이머가 완성되어서 future가 poll된 마지막 태스크를 (존재한다면)
            // 깨우는 시그널
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

짠! 이게 간단한 타이머 future를 만드는데 필요한 전부입니다. 이제 future가 실행될 executor만 있으면 되는데요...