select!

futures::select 매크로를 사용하면 여러 future를 동시에 실행하면서, 어떤 future라도 완성되면 사용자가 바로 반응할 수 있습니다.


#![allow(unused)]
fn main() {
use futures::{
    future::FutureExt, // `.fuse()`에 필요
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}

위의 함수는 t1t2 둘 다 동시에 실행할 것입니다. 둘 중에 하나가 끝나면, 대응하는 핸들러가 println!을 호출하고, 위 함수는 나머지 태스크를 완성하지 않고 바로 종료됩니다.

select의 기본 문법은 <pattern> = <expression> => <code>,이고, select에 넣을 future 개수만큼 반복하면 됩니다.

default => ...complete => ...

또한 selectdefaultcomplete 분기를 지원합니다.

default 분기는 select에 넣어진 future들 중 아무것도 완성되지 않았으면 실행됩니다. 따라서 default 분기가 있는 select는 항상 즉시 반환합니다. 다른 어떤 future도 준비되지 않았으면 defualt가 실행되기 때문입니다.

complete 분기는 select에 넣어진 모든 future가 모두 완성되어 더 이상 진행할 일이 없는 경우를 다루기 위해 사용됩니다. complete 분기는 select!를 반복문 안에 넣을 때 유용합니다.


#![allow(unused)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // 실행되지 않음(future들은 준비되자마자 완성됨)
        };
    }
    assert_eq!(total, 10);
}
}

UnpinFusedFuture로 상호작용하기

위 첫 번째 예제에서, 여러분은 두 async fn가 반환한 future에 대해 pin_mut으로 고정하고, .fuse()를 호출해야 한다는 점을 인지했을 겁니다. select안에서 사용된 future들은 Unpin, FusedFuture 트레잇 둘 다 구현해야 하기 때문에, 이 호출들이 필요합니다.

select가 사용하는 future는 값으로 전달되지 않고 가변 참조로 전달되기 때문에, Unpin이 필요합니다. future의 소유권을 취하지 않기 때문에, 미완성된 future는 select를 호출한 다음에도 재사용 할 수 있습니다.

비슷하게, select는 이미 완성된 future를 poll하면 안되기 때문에, FusedFuture 트레잇이 필요합니다. FusedFuture는 future에 의해 구현되며, 자신이 완성되었는지 여부를 추적합니다. FusedFuture는 아직 완성되지 않은 future만 골라서 폴링할 수 있게 해주기 때문에 select를 반복문 안에서 사용할 수 있게 됩니다. 이는 위 예제에서 a_fut이나 b_fut가 반복문 2회차 때에 완성되는 것을 보면 알 수 있습니다. future::ready가 반환한 future가 FusedFuture를 구현하기 때문에, select가 그 future를 다시 poll하지 못하게 할 수 있습니다.

스트림은 같은 기능을 하는 FusedStream 트레잇을 가지고 있음을 알아두세요. FusedStream 트레잇을 구현하거나, .fuse()를 사용하여 래핑한 스트림은 .next() / .try_next()을 통해 FusedFuture를 뱉을 것입니다.


#![allow(unused)]
fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
}

FuseFuturesUnordered를 이용한 select 루프 내부에서의 동시적 태스크

Fuse:terminated() 함수는 눈에 잘 띄지는 않지만 유용한 함수입니다. 이 함수는 이미 종료되어 비어있지만, 나중에 필요할 때, 실행할 future를 넣어서 실행할 수 있는 future를 만들어 줍니다.

이 함수는 select 루프가 유효한 동안에 실행될 필요가 있지만 select 루프 자체 안에서 만들어지는 태스크가 있을 경우 유용합니다.

.select_next_some() 함수의 용도에 유의하세요. 이 함수는 스트림이 반환한 Some(_) 값에 대응하는 분기를 실행할 때만 select와 함께 사용될 수 있고. None은 무시할 겁니다.


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // 타이머가 경과되었음. 아직 실행되지 않고 있는 future가 있다면,
                // 새 `get_new_num_fut`를 시작
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // 새 숫자가 도착함 -- 새 `run_on_new_num_fut`를 시작하고 예전
                // 것을 드랍함.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // `run_on_new_num_fut`를 실행
            () = run_on_new_num_fut => {},
            // 모든 future가 완성되었다면 패닉. 왜냐하면 `indefinitely`는 값들을
            // 무기한으로 내야(yield) 함
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
}

같은 future의 여러 복사본을 동시에 실행할 필요가 있을 때에는 FuturesUnordered 타입을 사용하세요. 아래 예제는 위 예제랑 비슷하지만, run_on_new_num_fut의 복사본이 생겨도 중단하지 않고 각 복사본을 완성될때까지 실행한다는 점이 다릅니다. 또한, 아래 예제는 run_on_new_num_fut가 반환한 값을 출력할 것입니다.


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// `get_new_num`로부터 나온 마지막 숫자를 가지고 `run_on_new_num`를 실행
//
// `get_new_num`은 타이머가 경과될 때마다 즉시 현재 실행중인 `run_on_new_num`을
// 취소하고 새 반환값으로 대체하면서 재시작됨.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // 타이머 경과됨. 실행중인 `get_new_num_fut`이 없다면 새로
                // 시작함.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // 새 숫자가 도착함 -- 새 `run_on_new_num_fut`를 시작함.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // `run_on_new_num_futs`를 실행하고 완성된 `run_on_new_num_futs`가
            // 있는 지 확인함
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // 모든 것이 완성되었다면 패닉. 왜냐하면 `interval_timer`는 값을 무기한으로 내야 함
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

}