반복과 동시성

동기적 Iterator와 같이 Stream에서 값을 반복하고 처리하는 여러 가지 방법이 있습니다. map, filter, fold와 같은 콤비네이터 방식과 try_map, try_filter, try_fold와 같이 오류가 생기면 바로 종료하는 방식의 메소드가 있습니다.

슬프게도 for 반복문은 Stream과 같이 사용할 수 없지만, 명령형 코드를 위해 while letnext/try_next 함수를 쓸 수 있습니다.

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // `next`를 쓰기 위함
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // `try_next`를 쓰기 위함
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}

하지만 우리가 단지 한 시점에 요소 한 개만 처리하고 있다면 잠재적으로 동시성의 기회를 내다버린 것과 다를 바 없습니다. 이 점이 바로 처음부터 비동기 코드를 써야 하는 이유입니다. 한 스트림 안에서 여러 아이템을 동시에 다루기 위해 for_each_concurrenttry_for_each_concurrent 메소드를 사용하시기 바랍니다.

async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // `try_for_each_concurrent`를 쓰기 위함
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}