Java

JAVA 8 스트림 튜토리얼

출처: Java 8 Stream Tutorial

이 예제-주도 튜토리얼은 Java 8 스트림에 대한 상세한 개요를 제공한다. 맨 처음 Stream API에 대해서 읽을 때, 난 JAVA I/O의 InputStreamOutputStream과 비슷하게 들리는 이름 때문에 헷갈려했다. 하지만 Java 8 스트림은 완전 다른 것이다. 스트림은 Monads, 즉 Java에 함수형 프로그래밍을 가져오는 데 큰 부분을 맡는다.

함수형 프로그래밍에서, monad는 단계적으로 정의된 계산들을 나타내는 구조다. monad 구조로 된 형식은 이어서 실행되어야 하는 연산에서나 같이 입력한 중첩된 기능들에서 어떤 의미를 가지는 지를 정의한다.

이 가이드는 어떻게 Java 8 스트림으로 작업할 수 있는 지와 여러 가지 다른 사용 가능한 스트림 연산을 어떻게 사용하는 지 가르쳐 준다. 독자는 이제 실행 순서와 스트림의 순서가 어떻게 실행 시간 성능에 영향을 미치는 지에 대해 배울 수 있을 것이다. 더 강력한 스트림 연산인 reducecollect, flatMap은 글 전체에서 자세하게 다룰 것이다. 이 튜토리얼은 병행 스트림의 깊은 곳 까지 보고 끝낼 것이다.

만약 아직까지 Java 8 람다 표현식과 함수형 인터페이스와 메소드 참조에 익숙하지 않다면, 이 튜토리얼을 시작하기 전에 내 Java 8 Tutorial을 읽기 바란다.

역자 주: UPDATE – 이 튜토리얼 저자가 지금 Java 8  스트림 API를 JavaScript에 구현하고 있습니다. 관심 있다면  Stream.js 에 방문해서 피드백을 남겨주세요.

어떻게 스트림이 동작하는가

스트림은 원소들의 순열을 나타내고, 이런 원소들에 대한 계산을 수행하기 위한 여러 종류의 연산들을 지원한다.

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);

// C1
// C2

스트림 연산은 중간에 있을 수도 있고, 맨 끝에 있을 수도 있다. 중간 연산들은 여러 개의 중간 연산들을 세미콜론 없이 연결할 수 있게 스트림을 반환한다. 단말 연산들은 스트림이 아닌 결과를 반환하거나 아예 반환하지 않을 수도 있다. 위의 예제에서 filter 그리고 map, sorted는 중간 연산이지만 forEach 는 단말 연산이다. 모든 가능한 스트림 연산을 보려면 Stream Javadoc을 보기 바란다. 위의 예제와 같이 이러한 스트림 연산의 연결을 연산 파이프라인이라고 부른다.

대부분의 스트림 연산들은 연산이 정확히 해야할 동작을 정해놓은 함수형 인터페이스, 즉 람다 표현식을 매개변수로 받는다. 대부분의 연산들은 서로 간섭하지 말아야 하고, 상태가 없어야 한다. 이건 뭘 의미할까?

함수가 스트림의 원본 데이터를 수정하지 않는 것을 간섭하지 않는다고 말한다. 그 예로, 위의 예제에서는 어떤 람다 표현식도 myList에 원소를 추가하거나 제거하지 않는다.

연산의 실행이 결정적이라면 함수는 상태가 없다고 말한다. 그 예로, 위의 예제에서는 어떤 람다 표현식도 실행 중간에 바뀔 수 있는 외부 범위에 있는 변경 가능한 변수나 상태에 의존하지 않는다.

여러 가지 스트림

스트림은 여러 원본 데이터(특히 컬렉션)에서 만들어질 수 있다. 리스트나 집합은 stream()parallelStream()을 통해 순차적이거나 병행적인 스트림을 만들 수 있다. 병행 스트림은 여러 쓰레드를 사용해서 연산이 가능하고 이는 이 튜토리얼의 후반부에서 다룰 것이다. 지금은 순차 스트림에 집중하자.

Arrays.asList("a1", "a2", "a3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a1

객체들의 리스트에서 stream() 메서드를 부르면 일반적인 객체 스트림을 반환한다. 하지만 우리는 다음 예제의 코드와 같이 스트림으로 작업하기 위해서 컬렉션을 생성할 필요는 없다.

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1

단지 Stream.of()를 사용해서 객체 레퍼런스 묶음에서 스트림을 생성할 수 있다.

일반적인 객체 스트림 말고도, Java 8는 원시 자료형인 intlong, double를 사용해서 작업하기 위해 특수한 종류의 스트림들을 들고 있다. 어렴풋이 추측할 수 있듯이, 바로 IntStreamLongStream, DoubleStream이다.

IntStreamIntStream.range()를 사용해서 기존의 for문을 대체할 수 있다.

IntStream.range(1, 4)
    .forEach(System.out::println);

// 1
// 2
// 3

이 모든 원시 스트림들은 다음과 같은 차이점을 제외하면 일반적인 객체 스트림과 똑같이 작동한다. 원시 스트림은 특별한 람다 표현식을 사용하는데, 예로 들어, Function 대신 IntFunction을 사용하거나 Predicate 대신 IntPredicate를 사용한다. 그리고 원시 스트림은 sum()average()라는 추가적인 단말 집계 연산들을 지원한다.

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0

때로는 일반적인 객체 스트림을 원시 스트림으로 바꾸거나 그 반대로 하는 것이 유용하다. 이런 목적을 위해서, 객체 스트림은 mapToInt()mapToLong(), 그리고 mapToDouble()이라는 특수한 사상 연산을 지원한다.

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1))
    .mapToInt(Integer::parseInt)
    .max()
    .ifPresent(System.out::println);  // 3

원시 스트림은 mapToObj()를 통해서 객체 스트림으로 바뀔 수 있다.

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

다음은 이들을 결합한 예제이다. double 형의 스트림을 int 형 스트림으로 사상한 후, 문자열로 된 객체 스트림에 사상한다.

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

가공 순서

이제 우리는 어떻게 여러 가지 스트림을 만들고 작업하는지 배웠다. 이제 스트림 연산이 어떻게 진행되는지 깊게 들여다 보자.

중간 연산의 중요한 특징은 lazy하다는 점이다. 단말 연산이 빠져있는 이 샘플을 보자.

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });

이 코드 토막을 실행하면, 콘솔에는 아무것도 출력되지 않는다. 왜냐하면 중간 연산은 단말 연산이 존재해야 실행되기 때문이다.

위의 예제에 단말 연산인 forEach를 추가해보자.

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));

이 코드 토막을 하면 우리가 원하던 결과가 콘솔에 나타난다.

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

결과의 순서를 보면 놀라울 것이다. 단순한 접근이라면 분명 연산들이 스트림의 원소들을 따라서 수평적으로 실행되었을 것이다. 하지만 그 대신 각각의 원소들은 수직으로 연결된 채로 순회하고 있다. 첫 문자열인 “d2″가 filter()forEach를 지나고, 그 다음,  두번째 문자열인 “a2″가 진행된다.

다음 예제에서 볼 수 있듯이, 이 동작은 각각의 원소에 대해 실제로 실행되는 연산의 수를 줄여줄 수 있다.

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });

// map:      d2
// anyMatch: D2
// map:      a2
// anyMatch: A2

anyMatch는 주어진 입력에 대해 Predicate가 만족되면 true를 반환한다. 두번째 원소가 “A2″를 전달했으므로 위의 anyMatch는 참이다. 스트림 연결은 수직적으로 수행하기 때문에, map은 이 경우 두 번만 실행된다. 따라서 스트림의 모든 원소를 사상하는 것 대신, map은 최대한 적게 실행된다.

왜 순서를 고려해야 하는가

다음 예제는 중간 연산인 mapfilter, 그리고 단말 연산인 forEach로 이루어져 있다. 이 연산들이 어떻게 실행되는지 다시 한번 살펴보자.

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

// map:     d2
// filter:  D2
// map:     a2
// filter:  A2
// forEach: A2
// map:     b1
// filter:  B1
// map:     b3
// filter:  B3
// map:     c
// filter:  C

여러분들이 예상했듯이 mapfilter는 컬렉션의 모든 문자열에 대해 각각 다섯 번 불려졌고, forEach는 한 번만 불렸다.

만약 우리가 filter를 맨 앞으로 당기는 것으로 연산의 순서를 바꾼다면, 실제로 실행되는 연산의 수를 줄일 수 있다.

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// map:     a2
// forEach: A2
// filter:  b1
// filter:  b3
// filter:  c

이제, map은 한 번만 불리기 때문에 연산 파이프라인은 많은 입력에 대해 더 빨라졌다. 복잡한 메서드 연결을 구성할 때는 반드시 위의 연산 순서를 명심해두자.

sorted를 추가해서 위의 예제를 확장해보자.

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

정렬은 중간 연산의 특별한 종류다. 정렬은 상태를 가지는 연산이라 불리는데 이는 원소들의 컬렉션을 정렬하기 위해서는 순서를 정하는 도중에 상태를 저장해야 하기 때문이다.

이 예제를 실행하면 다음과 같은 콘솔 출력이 나온다.

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2

첫번째로, 정렬 연산은 입력된 컬렉션 전체에서 실행된다. 다른 말로 sorted는 수평적으로 실행된다. 그래서 이 경우 sorted는 입력된 컬렉션의 모든 원소들에 대한 조합을 8번 요구한다.

다시 한번 우리는 이 연결을 재배열해서 성능을 최적화 할 수 있다.

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// filter:  b1
// filter:  b3
// filter:  c
// map:     a2
// forEach: A2

이 예제에서 sorted는 한번도 호출되지 않았는데 이는 filter가 입력된 컬렉션을 단 하나의 원소로 줄였기 때문이다. 따라서 큰 컬렉션이 입력되었을 때의 성능은 크게 증가하였다.

스트림들을 재사용하기

Java 8 스트림은 재사용이 불가능하다. 어떤 단말 연산이든 호출하는 즉시 스트림은 닫힌다.

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

같은 스트림 내에서 noneMatchanyMatch 뒤에서 호출하면 다음과 같은 예외가 발생한다.

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)

이런 한계를 극복하기 위해 우리는 실행하고 싶은 모든 단말 연산에다가 새로운 스트림 연결을 생성해야 했다. 예로 들어 우리는 이미 모든 중간 연산들이 제공되어 있는 새로운 스트림을 생성하기 위해 스트림 공급자를 만들 수도 있다.

추가 내용: 스트림 공급자에 대한 예시는 다음과 같다.

https://stackoverflow.com/a/36263725

Supplier<T> 인터페이스를 사용해서 위의 filter 스트림을 생성하는 익명 클래스를 만들면, 비록 실제로는 다른 스트림 인스턴스가 생성되지만 원하는 스트림 연산 과정을 재사용할 수 있다.

고급 연산들

스트림은 여러가지 다른 연산들을 지원한다. 우리는 이미 filtermap 같은 정말 중요한 연산들을 배웠다. 다른 모든 연산들은 여러분이 찾아보도록 남겨놓겠다 (스트림 Javadoc을 봐라). 대신에 collectflatMap, reduce 같은 조금 더 복잡한 연산으로 깊숙히 들어가보자.

이 섹션에서의 대부분의 코드 샘플은 다음과 같은 인물 목록을 데모 용도로 사용한다.

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

Collect

collect는 원소들의 스트림을 ListSet, Map 같은 다른 종류의 결과로 변환하는데 정말 유용한 단말 연산이다. Collect는 supplier(공급자), accumulator, combiner, finisher라는 연산들로 구성된 Collector를 받는다. 이는 처음에는 매우 복잡하게 들리겠지만, 좋은 점은 Java 8이 다양한 빌트-인 collector들을 Collectors 라는 클래스를 통해 제공하고 있다는 것이다. 그래서 대부분의 보편적인 연산들은 직접 collector를 구현할 필요가 없다.

매우 보편적인 사용 예시부터 시작해보자.

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

System.out.println(filtered);    // [Peter, Pamela]

여기서 볼 수 있듯이 원소들의 스트림에서 리스트를 생성하는 건 매우 단순하다. 리스트 대신 집합이 필요하다면 단지 Collectors.toSet()을 사용하면 된다.

다음 예제는 모든 사람들을 나이대로 묶는 것이다.

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Collector는 매우 다재다능하다. 모든 사람의 평균 연령을 내는 것과 같은 원소들의 스트림의 집계를 만들 수도 있다.

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge);     // 19.0

만약 조금 더 포괄적인 통계에 관심이 있다면, 요약 컬렉터는 특별한 빌트-인 요약 통계 객체를 반환한다. 그래서 우리는 단순히 min, max 그리고 사람들의 나이의 average, 게다가 sumcount 까지 알 수 있다.

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

다음 예제는 모든 사람들을 한 문자열에 넣는 것이다.

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

join 컬렉터는 구획 문자, 추가적으로 접두사와 접미사를 받아들인다.

스트림 원소들을 map으로 변환하기 위해 우리는 키와 값이 어떻게 사상되어야 할 지 정해줘야 한다. 반드시 사상된 키는 유일해야 하고 그렇지 않으면 IllegalStateException이 던져진다. 예외를 우회하기 위해 병합 함수를 추가적인 매개변수로 넘겨줄 수 있다.

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

이제 우리는 가장 강력한 빌트-인 컬렉터들을 몇몇 알게되었다. 이번엔 우리만의 특별한 컬렉터를 만들어보자. 우리는 스트림에 있는 모든 사람들을 이름을 전부 대문자로 변환해서 이를 잇고 | 파이프 문자로 구분하면서 한 문자열로 변환하고 싶어한다. 이를 하기 위해선 우리는 Collector.of()를 사용해서 새로운 컬렉터를 만들어야 한다. 우리는 반드시 supplier, accumulator, combiner, finisher 이 컬렉션의 네 가지 재료를 넘겨줘야 한다.

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher

String names = persons
    .stream()
    .collect(personNameCollector);

System.out.println(names);  // MAX | PETER | PAMELA | DAVID

Java의 문자열은 변하지 않기 때문에 우리는 컬렉터가 우리가 원하는 문자열을 만들게 하기 위해 StringJoiner 와 같은 헬퍼 클래스가 필요하다. supplier는 먼저 StringJoiner를 적합한 구획 문자와 함께 생성하고, accumulator는 StringJoiner에 각 사람의 대문자로 된 이름을 추가한다. combiner는 두 개의 StringJoiner를 하나로 합병하는 법을 알고 있고, 마지막으로 finisher는 StringJoiner로부터 원하는 문자열을 생성한다.

FlatMap

우리는 이미 map 연산을 활용해서 어떻게 객체들의 스트림에서 다른 종류의 객체로 바꾸는지 배웠다. map은 꽤나 제한적인데 이는 모든 객체가 오직 하나의 객체로만 사상될 수 있기 때문이다. 하지만 만약 우리가 한 객체를 여러개의 다른 객체들로 변환하거나 아예 변환시키지 않고 싶다면? flatMap이 해결해 줄 것이다.

flatMap은 스트림의 각각의 원소들을 다른 객체들의 스트림으로 변환한다. 그래서 각각의 객체들은 아무 것도 없거나 한 개 또는 여러개의 객체의 스트림으로 변환된다. 이런 스트림의 내용물은 flatMap 연산으로 반환된 스트림 안에 들어가게 된다.

flatMap이 작동하는 것을 보기 전에 우리는 적합한 타입 계층이 필요하다.

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}

다음으로 우리는 여러 개의 객체들을 초기화하기 위해 스트림에 대해 알고 있는 지식을 활용한다.

List<Foo> foos = new ArrayList<>();

// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

이제 우리는 각각 3 개의 bar로 구성된 3 개의 foo의 리스트를 가지고 있다.

FlatMap은 객체들의 스트림을 반환해야 하는 함수를 받아들인다. 그래서 각각의 foo에서 bar 객체들을 풀어내기 위해서 우리는 단지 적합한 함수를 넘겨주면 된다.

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 
// Bar2 
// Bar3 
// Bar1 
// Bar2 
// Bar3 
// Bar1 
// Bar2 
// Bar3 

여기서 볼 수 있듯이, 우리는 성공적으로 3 개의 foo 객체의 스트림을 9 개의 bar 객체의 스트림으로 변환시켰다.

마지막으로 위의 코드 예제는 스트림 연산의 파이프라인 하나로 줄일 수 있다.

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

FlatMap은 또한 Java 8에서 소개된 Optional 클래스에 대해서도 가능하다. 선택적인 flatMap 연산은 다른 타입의 Optional객체를 반환한다. 그래서 끔직한 null 확인을 막기 위해 활용할 수 있다.

이런 꽤나 계층적인 구조를 생각해보자.

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}

외부의 인스턴스의 내부 스트링 foo를 빼내기 위해서는 가능한 NullPointerException을 막기 위해 여러번의 null 확인을 추가해야 할 것이다.

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

선택적인 flatMap 연산을 활용해서 같은 동작을 얻어낼 수 있다.

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

flatMap을 각각 호출할 때마다 만약 원하는 객체가 존재하면 그 객체를 감싼, 아니면 null을 감싸고 있는 Optional 을 반환한다.

Reduce

감소 연산은 스트림의 모든 원소들을 하나의 결과로 합친다. Java 8 는 세 가지 다른 reduce 방법을 제공한다. 첫번째는 원소들의 스트림을 스트림 내의 하나의 원소로 줄이는 것이다. 가장 나이 많은 사람을 정하기 위해서 어떻게 이 방법을 사용할 수 있는지 보자.

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

reduce 메서드는 BinaryOperator라는 accumulator 함수를 받는다. 실제로 두 피연산자가 같은 타입을 공유하는 BiFunction이고 여기선 Person을 공유한다. BiFunctionFunction과 같지만 두 개의 인자를 받는다. 예제의 함수는 가장 높은 나이값을 가진 사람을 반환하기 위해 두 사람의 나이를 비교하는 함수다.

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

두 번째 reduce 메서드는 항등값과 BinaryOperator accumulator를 받는다. 이 메서드는 스트림에 있는 모든 사람들의 이름을 이어붙인 문자열과 나이를 전부 더한 값을 가지는 Person을 생성하는데 활용할 수 있다.

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76

세 번째 reduce 메서드는 세 개의 매개변수를 받는다. 항등값, BiFunction accumulator 그리고 BinaryOperator 타입의 combiner 함수. 항등값의 타입이 Person 으로 제한되지 않기 때문에 우리는 모든 사람들의 나이의 합을 알아내는데 이 감소연산을 활용할 수 있다.

여기서 볼 수 있듯이 결과는 76이다. 하지만 이 결과 밑에서 정확히 무슨 일이 일어났는가? 위의 코드에 약간의 디버그 출력을 추가해서 확장해보자.

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

여기서 볼 수 있듯이 accumulator 함수가 전부 다 한 것을 볼 수 있다. 처음에 초기 항등값인 0과 첫 번째 사람인 Max를 불렀다. 다음 세 단계는 sum이 남은 단계의 사람들의 나이만큼 계속 증가했고 전체 나이인 76이 되었다.

잠깐만 뭐라고? combiner가 한번도 호출되지 않았다고? 같은 스트림을 병렬로 실행하는 걸로 이 비밀을 끌어낼 것이다.

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

이 스트림을 병렬로 실행하니 완전히 다른 동작을 했다. 이제 combiner는 실제로 호출된다. accumulator가 병렬로 호출되기 때문에 combiner는 분리된 합계들을 압축하는데 필요하다.

이제 다음 챕터부터 병렬 스트림에 대해 깊게 들어가보자.

병렬 스트림

스트림은 아주 많은 양의 입력된 원소들에 대한 런타임 성능을 높히기 위해 병렬로 실행할 수 있다. 병렬 스트림은 정적 메서드인 ForkJoinPool.commonPool() 을 통해 사용가능한 공통의 ForkJoinPool을 사용할 수 있다. 기저에 깔려있는 쓰레드 풀의 크기는 최대 다섯 개다. – 사용가능한 물리적인 CPU 코어의 수에 따라 달려있다.

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

내 장치에서는 공통 풀은 기본값인 3개로 초기화 된다. 이 값은 다음과 같은 JVM 매개변수를 설정함에 따라 감소할 수도 있고 증가할 수도 있다.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

컬렉션은 원소들의 병렬 스트림을 만들기 위해 parallelStream() 메서드를 지원한다. 부가적으로 순차 스트림을 대응되는 병렬 스트림으로 바꾸기 위해 주어진 스트림에 중간 메서드인 parallel()을 호출할 수 있다.

병렬 스트림의 병렬 실행 동작을 이해하기 위해서 다음 예제는 표준 출력으로 현재 쓰레드의 정보를 출력한다.

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

디버그 출력을 조사해봄으로써 우리는 어느 쓰레드가 실제로 스트림 연산을 자주 실행하는지 더 알 수 있다.

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

여기서 볼 수 있듯이 병렬 스트림은 스트림 연산을 실행하기 위해 공통의 ForkJoinPool에서 모든 가능한 쓰레드를 활용한다. 특정한 쓰레드가 실제로 작동하는 동작이 비결정적이기 때문에 출력은 계속 실행하면서 달라질 수 있다.

스트림 연산 sorted 를 추가해서 예제를 확장해보자.

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

결과가 처음에는 이상하게 보일 것이다.

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

sort는 메인 스트림에서만 순차적으로 실행되는 것으로 보인다. 실제로 병렬 스트림에서의 sort는 새로운 Java 8 메서드인 Arrays.parallelSort()를 밑에서 사용한다. Javadoc에 명시되어 있듯이 이 메서드는 배열의 길이에 따라 정렬을 순차적으로 할 지 병렬적으로 할 지 결정한다.

만약 특정 배열의 길이가 프로세스에 할당할 수 있는 최소 단위보다 작으면 적합한 Arrays.sort 메서드로 정렬한다.

지난 섹션의 reduce 예제로 다시 돌아오자. 우리는 이미 combiner 함수가 순차 스트림에서가 아닌 병렬 스트림에서만 호출된다는 것을 알아냈다. 어떤 쓰레드들이 실제로 관여하고 있는지 보자.

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

콘솔 출력이 accumulator와 combiner 함수가 모든 가능한 쓰레드들에서 병렬로 실행되는 것을 보여준다.

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

요약하자면, 병렬 스트림은 많은 양의 입력 원소들의 스트림에서 좋은 성능 향상을 가져온다는 것을 시사한다. 하지만 reduce나 collect같은 몇몇 병렬 스트림 연산은 순차적으로 실행했을 때는 필요하지 않았던 추가적인 연산들(combine 연산)이 필요하다.

게다가 우리는 모든 병렬 스트림 연산들은 같은 JVM-범위의 공통 ForkJoinPool을 공유한다는 것을 배웠다. 그래서 응용 프로그램 내에서 병렬 스트림에 크게 의존하고 있는 다른 부분들이 잠재적으로 느려질 수 있기 때문에 이런 느린 블록킹 스트림 연산을 피하고 싶을 것이다.

여기까지

내 Java 8 스트림으로의 프로그래밍 가이드는 여기가 끝이다. 만약 네가 Java 8 스트림에 대해 더 배우는데 관심이 있다면,  Stream Javadoc 패키지 문서를 추천한다. 만약 네가 기저에 있는 동작원리에 대해 더 배우고 싶다면, Martin Fowlers의 Collection Pipelines 에 대한 기사를 읽어보길 원할 것이다.

만약 네가 JavaScript에도 관심이 있다면, JavaScript로 구현한 Java 8 스트림 API인 Stream.js 을 한번 보고 싶을 것이다. 또한 내가 쓴 Java 8 Tutorial 과 Java 8 Nashorn Tutorial 을 읽기 원할 것이다.

이 튜토리얼이 네게 도움이 되었고 네가 즐겁게 읽었길 바란다. 이 튜토리얼 샘플의 모든 소스 코드는 깃허브에 올라가 있다. 편한대로 이 리포지터리를 포크하거나 트위터를 통해 너의 피드백을 보내줘도 좋다.

좋은 코딩되길.

JAVA 8 스트림 튜토리얼”에 대한 3개의 생각

댓글 남기기