응용: executor 구현하기
러스트 Future
는 지연계산됩니다. 완성시키기 위해 실제로 구동하기 전까지
future는 아무것도 하지 않을 것입니다. future를 완성까지 구동하는 한 가지 방법은
async
함수 안에서 future를 .await
하는 것입니다. 다만, 그렇게 하면 문제가
하나 생깁니다: 누가 최상위 async
함수로부터 반환된 future를 실행할 것인가라는
문제입니다. 그리고 그 해답은 Future
executor입니다.
Future
executor는 최상위 Future
의 집합을 받아 Future
가 진행할 수 있을
때마다 poll
을 호출해서 완성될 때까지 실행합니다. 일반적으로, executor는
시작하면서 future를 한 번 poll
합니다. Future
가 wake()
를 호출하여 진행할
준비가 되었음을 알릴 때, future는 큐 뒤에 넣어지고, poll
이 다시 호출됩니다.
이는 Future
가 완성될 때까지 반복됩니다.
이 장에서 우리는 수많은 최상위 future를 완성될 때까지 동시에 실행할 수 있는 간단한 executor를 만들 것입니다.
이 예제에서는 Waker
를 쉽게 만들 수 있게 도와주는 ArcWake
트레잇 때문에
futures
크레잇이 필요합니다. Cargo.toml
을 수정하여 새 의존성을 추가하세요.
[package]
name = "timer_future"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"
[dependencies]
futures = "0.3"
다음은, src/main.rs
의 맨 위에 아래와 같이 import합니다.
use {
futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
},
std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
},
// 이전 장에서 작성한 타이머
timer_future::TimerFuture,
};
실행할 태스크를 채널을 통해 보내면 우리의 executor가 작동할 겁니다. executor는 채널에서 이벤트를 당겨와서 실행합니다. 만약, 어떤 태스크가 조금 더 일 할 준비가 됐다면(즉, 깨워진다면), 그 태스크는 자기가 다시 poll될 수 있게 채널에 자기 스스로를 넣습니다.
이러한 설계 덕분에, executor는 그저 태스크 채널의 수신 단말만 있으면 됩니다. 유저에게는 송신 단말이 주어지므로, 새로은 future를 만들 수 있습니다. 태스크라는 것은 결국 스스로를 다시 스케쥴링할 수 있는 future일 뿐입니다. 따라서, 우리는 태스크들을 송신자(sender)와 짝지운 future의 형태로 저장할 것입니다. 송신자는 태스크가 자기자신을 큐에 넣는데 사용됩니다.
/// 채널에서 태스크를 받아서 실행하는 태스크 executor
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// 새 future를 태스크 채널에 생성해 넣는 `Spawner`
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// `Executor`에게 poll될 수 있게 스스로를 재스케줄링하는 future
struct Task {
/// 완성되기 위해서 큐에 넣어져야 하는, 진행중인 future
///
/// 정확히 하자면, `Mutex`가 꼭 필요한 것은 아니다. 우리는 한 시점에
/// (future들을 실행하는) 오직 하나의 스레드만 가지고 있기 때문이다. 하지만,
/// 러스트는 우리의 `future`가 한 개의 스레드 안에서만 변경된다는 사실을 알
/// 수 없기 때문에, 우리는 스레드 안전성을 위해 `Mutex`를 사용해야만 한다.
/// 현업에서는 `Mutex` 대신 `UnsafeCell`을 사용할 수도 있다.
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// 태스크가 자기자신을 태스크 큐의 마지막에 넣는데 사용되는 핸들
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// 채널(큐)이 일시점에 가질 수 있는 태스크의 최대 갯수.
// 그냥 `sync_channel`을 만드는데 필요한 것이고, 실제 executor에 적용될 일은
// 없을 것이다.
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
새 future를 만들기 쉽게 메소드 한 개를 더 spawner에 추가합시다. 이 메소드는
future 타입을 받아서, box로 감싸고, 새 Arc<Task>
로 만들 것입니다.
새로 만든 Arch<Task>
는 excutor에게 enqueue될 것입니다.
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("큐에 태스크가 너무 많습니다.");
}
}
future를 poll하기 위해서는, Waker
를 생성해야 합니다. 태스크 깨우기 section에서
설명했듯이, Waker
는 wake
가 호출되면 태스크가 다시 poll될 수 있도록
스케쥴링합니다. Waker
들은 executor에게 정확히 어떤 태스크가 준비되었는지
알려주기 때문에, executor는 진행할 준비가 된 future들만 poll한다는 점을
기억하십시오. 새로운 Waker
를 만드는 가장 쉬운 방법은 ArcWake
트레잇을
구현하고, waker_ref
나 .into_waker()
함수를 이용하여 Arc<impl ArcWake>
를
Waker
로 변경하는 것입니다. 우리의 태스크를 위한 ArcWake
를 구현하여 Waker
로
변경하고 깨워봅시다.
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// `wake`를 이 태스크를 다시 태스크 채널에 보내는 방식으로 구현한다. 그래서
// executor가 이 태스크를 다시 poll할 것이다.
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("큐에 태스크가 너무 많습니다.");
}
}
Arc<Task>
로부터 만들어진 Waker
의 wake()
를 호출하면 Arc
의 복사본이 태스크
채널로 송신될 것이다. 그러면 우리의 executor는 그 태스크를 집어 poll해야 한다.
구현해 봅시다.
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// future를 취하고 나서, 아직 future가 완성되지 않았으면(아직 Some이면),
// future를 완성하기 위해 poll한다.
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// task 자기자신으로부터 `LocalWaker`를 생성한다.
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>`는 `Pin<Box<dyn Future<Output = T> + Send +
// 'static>>`의 type alias이다.
// `Pin::as_mut` 메소드를 호출하여 `BoxFuture<T>`로부터
// `Pin<&mut dyn Future + Send + 'static>`을 얻을 수 있다.
if future.as_mut().poll(context).is_pending() {
// future의 처리가 끝나지 않았으므로, 그것의 task에 도로
// 넣어서 미래에 다시 실행될 수 있게 한다.
*future_slot = Some(future);
}
}
}
}
}
축하합니다! future executor를 완성하였습니다. 여러분이 만든 executor를
asycn/.await
코드나 우리가 아까 만든 TimeFuture
같은 커스텀 future를
실행하는데도 사용할 수 있습니다.
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// 타이머 전후로 문자열을 출력할 태스크를 생성한다.
spawner.spawn(async {
println!("howdy!");
// 우리의 타이머 future가 2초 후에 완성될 때까지 기다린다.
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// 여러분의 executor가 spawner가 끝났음을 확인하고 더 이상 실행할 task를
// 받지 않도록 spawner를 drop한다.
drop(spawner);
// excutor를 task 큐가 빌 때까지 실행한다. "howdy!" 출력, 일시중지,
// "done!"출력 순으로 동작할 것이다.
executor.run();
}