Executor와 시스템 입출력
우리는 이전 Future
트레잇 장에서 소켓을 비동기적으로 읽는 아래의 future
예제에 대해 살펴보았습니다..
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// 소켓에 데이터가 준비됨 -- 버퍼에 읽어 들이고 그 버퍼를 반환
Poll::Ready(self.socket.read_buf())
} else {
// 소켓에 아직 데이터가 준비되지 않음
//
// 데이터가 준비되면 `wake`가 호출될 수 있도록 조치함.
// 데이터가 준비되면, `wake`가 호출되고, 이 `Future`의 사용자는
// `poll`을 다시 호출하여 데이터를 읽을 수 있음을 알게 된다.
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
이 future는 소켓에 준비된 데이터를 읽어들이지만, 준비된 데이터가 없다면
executor에게 "소켓에 데이터가 준비되면 future의 태스크를 다시 깨워주세요"라고
요청하면서 콘텍스트를 양보할 것입니다. 하지만, 이 예제에서는 Socket
타입이
어떻게 구현되는지 확실하지 않고, 특별히 set_readable_callback
함수가
작동하는지가 분명하지 않습니다. 어떻게 하면 소켓에 데이터가 준비되었을 때,
wake()
가 호출될 수 있게 할까요? 끊임없이 socket
이 준비되었는지 확인하여
맞으면 wake()
를 호출하는 스레드를 하나 만드는 것도 선택지가 될 것입니다.
그러나 이 방법은 블록된 입출력 future 각각마다 별개의 스레드가 필요하기 때문에
매우 비효율적이어서 우리의 비동기 코드의 효율을 크게 저하시킬 것입니다.
사실 이 문제는 입출력과 연계된 시스템 블로킹 기본기능과 통합으로 해결합니다.
예를 들면 리눅스의 epoll
, FreeBSD와 Mac OS의 kqueue
, 윈도우즈의 IOCP,
퓨시아의 port
같은 것들이 있습니다(그리고 이것들 모두는 크로스 플랫폼 려스트
크레잇인 mio
를 이용하여 사용할 수 있습니다). 이러한 기본기능들은 모두 한
스레드가 여러개의 비동기 입출력 이벤트에 따라 블록하였다가 이벤트가 완성되면
반환할 수 있는 기능을 제공합니다. 이러한 API들은 실제로 보통 아래와 같은
형태입니다.
struct IoBlocker {
/* ... */
}
struct Event {
// 생성되어 전파되는 이벤트를 고유하게 식별하는 ID_
id: usize,
// 기다리고 있거나, 발생된 시그널 집합
signals: Signals,
}
impl IoBlocker {
/// 발생하면 블록할 비동기 입출력 이벤트의 새로운 컬렉션을 만든다.
fn new() -> Self { /* ... */ }
/// 특정 입출력 이벤트에 interest를 표시한다.
fn add_io_event_interest(
&self,
/// 이벤트가 발생할 오브젝트
io_object: &IoObject,
/// 이벤트가 발동되어야 하는 `io_object`에 발생할 시그널들의 집합으로,
/// 이 interest로부터 비롯된 이벤트에 부여될 ID와 짝지워진다.
event: Event,
) { /* ... */ }
/// 이벤트 중에 하나가 발생할 때까지 블록한다.
fn block(&self) -> Event { /* ... */ }
}
let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
&socket_1,
Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
&socket_2,
Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();
// 소켓에 데이터가 준비되면 "Socket 1 is now READABLE" 같은 걸 출력한다.
println!("Socket {:?} is now {:?}", event.id, event.signals);
future의 executor들은 아래의 기본기능들을 비동기 입출력 객체를 제공하기 위해
사용할 수 있습니다. 예를 들어, 소켓은 특별한 입출력 이벤트가 발생하였을 때, 콜백이
실행되도록 설정할 수 있습니다. 위의 SocketRead
예제의 경우에
Socket::set_readable_callback
함수는 아래와 같은 의사코드와 비슷한 형태일
것입니다.
impl Socket {
fn set_readable_callback(&self, waker: Waker) {
// `local_executor`는 로컬 executor에 대한 레퍼런스이다.
// `local_executor`는 소켓 생성시에 제공될 수도 있었겠지만, 실제로는
// 편의상 많은 executor 구현에서 스레드의 로컬 스토리지에
// 두고 사용한다.
let local_executor = self.local_executor;
// 이 입출력 객체의 고유 ID
let id = self.id;
// executor의 맵에 로컬 waker를 저장하여 입출력 이벤트가
// 도착하면 로컬 wake가 호출될 수 있다.
local_executor.event_map.insert(id, waker);
local_executor.add_io_event_interest(
&self.socket_file_descriptor,
Event { id, signals: READABLE },
);
}
}
이로써 우리는 어떤 입출력 이벤트라도 받아서 딱맞는 Waker
에게 보내줄 수 있는
executor 스레드를 만들었습니다. 그리고 Waker
는 이벤트에 대응하는 태스크를 깨울
것입니다. 또, executor는 입출력 이벤트를 더 확인하기 전에 보다 많은 태스크들을
완성될까지 구동할 수 있을 것입니다(그리고 사이클은 반복됩니다...).