응용: executor 구현하기

러스트 Future는 지연계산됩니다. 완성시키기 위해 실제로 구동하기 전까지 future는 아무것도 하지 않을 것입니다. future를 완성까지 구동하는 한 가지 방법은 async 함수 안에서 future를 .await하는 것입니다. 다만, 그렇게 하면 문제가 하나 생깁니다: 누가 최상위 async 함수로부터 반환된 future를 실행할 것인가라는 문제입니다. 그리고 그 해답은 Future executor입니다.

Future executor는 최상위 Future의 집합을 받아 Future가 진행할 수 있을 때마다 poll을 호출해서 완성될 때까지 실행합니다. 일반적으로, executor는 시작하면서 future를 한 번 poll합니다. Futurewake()를 호출하여 진행할 준비가 되었음을 알릴 때, future는 큐 뒤에 넣어지고, poll이 다시 호출됩니다. 이는 Future가 완성될 때까지 반복됩니다.

이 장에서 우리는 수많은 최상위 future를 완성될 때까지 동시에 실행할 수 있는 간단한 executor를 만들 것입니다.

이 예제에서는 Waker를 쉽게 만들 수 있게 도와주는 ArcWake 트레잇 때문에 futures 크레잇이 필요합니다. Cargo.toml을 수정하여 새 의존성을 추가하세요.

[package]
name = "timer_future"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures = "0.3"

다음은, src/main.rs의 맨 위에 아래와 같이 import합니다.

use {
    futures::{
        future::{BoxFuture, FutureExt},
        task::{waker_ref, ArcWake},
    },
    std::{
        future::Future,
        sync::mpsc::{sync_channel, Receiver, SyncSender},
        sync::{Arc, Mutex},
        task::Context,
        time::Duration,
    },
    // 이전 장에서 작성한 타이머
    timer_future::TimerFuture,
};

실행할 태스크를 채널을 통해 보내면 우리의 executor가 작동할 겁니다. executor는 채널에서 이벤트를 당겨와서 실행합니다. 만약, 어떤 태스크가 조금 더 일 할 준비가 됐다면(즉, 깨워진다면), 그 태스크는 자기가 다시 poll될 수 있게 채널에 자기 스스로를 넣습니다.

이러한 설계 덕분에, executor는 그저 태스크 채널의 수신 단말만 있으면 됩니다. 유저에게는 송신 단말이 주어지므로, 새로은 future를 만들 수 있습니다. 태스크라는 것은 결국 스스로를 다시 스케쥴링할 수 있는 future일 뿐입니다. 따라서, 우리는 태스크들을 송신자(sender)와 짝지운 future의 형태로 저장할 것입니다. 송신자는 태스크가 자기자신을 큐에 넣는데 사용됩니다.

/// 채널에서 태스크를 받아서 실행하는 태스크 executor
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// 새 future를 태스크 채널에 생성해 넣는 `Spawner`
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// `Executor`에게 poll될 수 있게 스스로를 재스케줄링하는 future
struct Task {
    /// 완성되기 위해서 큐에 넣어져야 하는, 진행중인 future
    ///
    /// 정확히 하자면, `Mutex`가 꼭 필요한 것은 아니다. 우리는 한 시점에
    /// (future들을 실행하는) 오직 하나의 스레드만 가지고 있기 때문이다. 하지만,
    /// 러스트는 우리의 `future`가 한 개의 스레드 안에서만 변경된다는 사실을 알
    /// 수 없기 때문에, 우리는 스레드 안전성을 위해 `Mutex`를 사용해야만 한다.
    /// 현업에서는 `Mutex` 대신 `UnsafeCell`을 사용할 수도 있다.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// 태스크가 자기자신을 태스크 큐의 마지막에 넣는데 사용되는 핸들
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // 채널(큐)이 일시점에 가질 수 있는 태스크의 최대 갯수.
    // 그냥 `sync_channel`을 만드는데 필요한 것이고, 실제 executor에 적용될 일은
    // 없을 것이다.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

새 future를 만들기 쉽게 메소드 한 개를 더 spawner에 추가합시다. 이 메소드는 future 타입을 받아서, box로 감싸고, 새 Arc<Task>로 만들 것입니다. 새로 만든 Arch<Task>는 excutor에게 enqueue될 것입니다.

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("큐에 태스크가 너무 많습니다.");
    }
}

future를 poll하기 위해서는, Waker를 생성해야 합니다. 태스크 깨우기 section에서 설명했듯이, Wakerwake가 호출되면 태스크가 다시 poll될 수 있도록 스케쥴링합니다. Waker들은 executor에게 정확히 어떤 태스크가 준비되었는지 알려주기 때문에, executor는 진행할 준비가 된 future들만 poll한다는 점을 기억하십시오. 새로운 Waker를 만드는 가장 쉬운 방법은 ArcWake 트레잇을 구현하고, waker_ref.into_waker() 함수를 이용하여 Arc<impl ArcWake>Waker로 변경하는 것입니다. 우리의 태스크를 위한 ArcWake를 구현하여 Waker로 변경하고 깨워봅시다.

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // `wake`를 이 태스크를 다시 태스크 채널에 보내는 방식으로 구현한다. 그래서
        // executor가 이 태스크를 다시 poll할 것이다.
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("큐에 태스크가 너무 많습니다.");
    }
}

Arc<Task>로부터 만들어진 Wakerwake()를 호출하면 Arc의 복사본이 태스크 채널로 송신될 것이다. 그러면 우리의 executor는 그 태스크를 집어 poll해야 한다. 구현해 봅시다.

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // future를 취하고 나서, 아직 future가 완성되지 않았으면(아직 Some이면),
            // future를 완성하기 위해 poll한다.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // task 자기자신으로부터 `LocalWaker`를 생성한다.
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>`는 `Pin<Box<dyn Future<Output = T> + Send +
                // 'static>>`의 type alias이다.
                // `Pin::as_mut` 메소드를 호출하여 `BoxFuture<T>`로부터
                // `Pin<&mut dyn Future + Send + 'static>`을 얻을 수 있다.
                if future.as_mut().poll(context).is_pending() {
                    // future의 처리가 끝나지 않았으므로, 그것의 task에 도로
                    // 넣어서 미래에 다시 실행될 수 있게 한다.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

축하합니다! future executor를 완성하였습니다. 여러분이 만든 executor를 asycn/.await 코드나 우리가 아까 만든 TimeFuture같은 커스텀 future를 실행하는데도 사용할 수 있습니다.

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // 타이머 전후로 문자열을 출력할 태스크를 생성한다.
    spawner.spawn(async {
        println!("howdy!");
        // 우리의 타이머 future가 2초 후에 완성될 때까지 기다린다.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // 여러분의 executor가 spawner가 끝났음을 확인하고 더 이상 실행할 task를
    // 받지 않도록 spawner를 drop한다.
    drop(spawner);

    // excutor를 task 큐가 빌 때까지 실행한다. "howdy!" 출력, 일시중지,
    // "done!"출력 순으로 동작할 것이다.
    executor.run();
}