Executor와 시스템 입출력

우리는 이전 Future 트레잇 장에서 소켓을 비동기적으로 읽는 아래의 future 예제에 대해 살펴보았습니다..

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // 소켓에 데이터가 준비됨 -- 버퍼에 읽어 들이고 그 버퍼를 반환
            Poll::Ready(self.socket.read_buf())
        } else {
            // 소켓에 아직 데이터가 준비되지 않음
            //
            // 데이터가 준비되면 `wake`가 호출될 수 있도록 조치함.
            // 데이터가 준비되면, `wake`가 호출되고, 이 `Future`의 사용자는
            // `poll`을 다시 호출하여 데이터를 읽을 수 있음을 알게 된다.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

이 future는 소켓에 준비된 데이터를 읽어들이지만, 준비된 데이터가 없다면 executor에게 "소켓에 데이터가 준비되면 future의 태스크를 다시 깨워주세요"라고 요청하면서 콘텍스트를 양보할 것입니다. 하지만, 이 예제에서는 Socket 타입이 어떻게 구현되는지 확실하지 않고, 특별히 set_readable_callback 함수가 작동하는지가 분명하지 않습니다. 어떻게 하면 소켓에 데이터가 준비되었을 때, wake()가 호출될 수 있게 할까요? 끊임없이 socket이 준비되었는지 확인하여 맞으면 wake()를 호출하는 스레드를 하나 만드는 것도 선택지가 될 것입니다. 그러나 이 방법은 블록된 입출력 future 각각마다 별개의 스레드가 필요하기 때문에 매우 비효율적이어서 우리의 비동기 코드의 효율을 크게 저하시킬 것입니다.

사실 이 문제는 입출력과 연계된 시스템 블로킹 기본기능과 통합으로 해결합니다. 예를 들면 리눅스의 epoll, FreeBSD와 Mac OS의 kqueue, 윈도우즈의 IOCP, 퓨시아의 port같은 것들이 있습니다(그리고 이것들 모두는 크로스 플랫폼 려스트 크레잇인 mio를 이용하여 사용할 수 있습니다). 이러한 기본기능들은 모두 한 스레드가 여러개의 비동기 입출력 이벤트에 따라 블록하였다가 이벤트가 완성되면 반환할 수 있는 기능을 제공합니다. 이러한 API들은 실제로 보통 아래와 같은 형태입니다.

struct IoBlocker {
    /* ... */
}

struct Event {
    // 생성되어 전파되는 이벤트를 고유하게 식별하는 ID_
    id: usize,

    // 기다리고 있거나, 발생된 시그널 집합
    signals: Signals,
}

impl IoBlocker {
    /// 발생하면 블록할 비동기 입출력 이벤트의 새로운 컬렉션을 만든다.
    fn new() -> Self { /* ... */ }

    /// 특정 입출력 이벤트에 interest를 표시한다.
    fn add_io_event_interest(
        &self,

        /// 이벤트가 발생할 오브젝트
        io_object: &IoObject,

        /// 이벤트가 발동되어야 하는 `io_object`에 발생할 시그널들의 집합으로, 
        /// 이 interest로부터 비롯된 이벤트에 부여될 ID와 짝지워진다.
        event: Event,
    ) { /* ... */ }

    /// 이벤트 중에 하나가 발생할 때까지 블록한다.
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// 소켓에 데이터가 준비되면 "Socket 1 is now READABLE" 같은 걸 출력한다.
println!("Socket {:?} is now {:?}", event.id, event.signals);

future의 executor들은 아래의 기본기능들을 비동기 입출력 객체를 제공하기 위해 사용할 수 있습니다. 예를 들어, 소켓은 특별한 입출력 이벤트가 발생하였을 때, 콜백이 실행되도록 설정할 수 있습니다. 위의 SocketRead 예제의 경우에 Socket::set_readable_callback 함수는 아래와 같은 의사코드와 비슷한 형태일 것입니다.

impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor`는 로컬 executor에 대한 레퍼런스이다.
        // `local_executor`는 소켓 생성시에 제공될 수도 있었겠지만, 실제로는 
        // 편의상 많은 executor 구현에서 스레드의 로컬 스토리지에
        // 두고 사용한다.
        let local_executor = self.local_executor;

        // 이 입출력 객체의 고유 ID
        let id = self.id;

        // executor의 맵에 로컬 waker를 저장하여 입출력 이벤트가
        // 도착하면 로컬 wake가 호출될 수 있다.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

이로써 우리는 어떤 입출력 이벤트라도 받아서 딱맞는 Waker에게 보내줄 수 있는 executor 스레드를 만들었습니다. 그리고 Waker는 이벤트에 대응하는 태스크를 깨울 것입니다. 또, executor는 입출력 이벤트를 더 확인하기 전에 보다 많은 태스크들을 완성될까지 구동할 수 있을 것입니다(그리고 사이클은 반복됩니다...).