9 Ekim 2019 Çarşamba

Stream Paralelleştirme Metodları

Giriş
Paralelleştirme için iki yöntem var. 

1. stream'in parallel() metodunun çağrılarak paralel bir stream haline getirilmesi veya 
2. mevcut collection nesnesinin parallelStream() metodunun çağrılarak bir paralel stream haline getirilmesi. 

Bu iki yöntem arasındaki fark burada anlatılıyor. Stream'ler bu paralelleştirme metodlarını çağırmadan da kendi içinde yapabilir deniyor. Açıklaması şöyle.
The internal iteration in the Streams library can automatically choose a data representation and implementation of parallelism to match your hardware. By contrast, once you’ve chosen external iteration by writing for-each, then you’ve essentially committed to self-manage any parallelism. (Self-managing in practice means either “one fine day we’ll parallelize this” or “starting the long and arduous battle involving tasks and synchronized”.)
Java 8 needed an interface like Collection but without iterators, ergo Stream!

1. Stream.parallel() metodu
Stream'in büyüklüğü 100 bin veya daha fazla ise parallel() metodunu kullanmakta fayda var. Daha küçük sayılarda paralel yöntemin performansı sırayla dolaşmaya göre daha kötü.

paralelleştirirken identity değelere dikkat etmek gerekir.

Örnek - hatalı sonuç
Elimizde şöyle bir kod olsun. Çıktı olarak 2,622,450,600 alırız
long res = LongStream.of(2, 53, 17, 21, 11, 42, 30)
                     .reduce(5, (acc, next) -> acc * next);
System.out.println(res);
Ancak bu kodu paralel hale getirirsek yanlış sonuç alırız. Ben 40,975,790,625,000 alıyorum. Sebebi ise işin 2 kısma bölünürken veya paralel hale gelirken her kısmın 5 ile çarpılması. Yani identity değer olması gerekenden çok daha fazla kullanılıyor.
long res = LongStream.of(2, 53, 17, 21, 11, 42, 30)
                     .parallel()
                     .reduce(5, (acc, next) -> acc * next);
System.out.println(res);
Aslında küçük bir println ile şunu görürüz.
5 x 53
5 x 42
5 x 30
5 x 21
5 x 11
210 x 150
5 x 2
105 x 55
5 x 17
5775 x 31500
265 x 85
10 x 22525
225250 x 181912500
40975790625000
Yani iş 14 parçaya bölünüyor. Çözüm basit. Şöyle yaparız
long res = LongStream.of(2, 53, 17, 21, 11, 42, 30)
                     .parallel()
                     .reduce(1, (acc, next) -> acc * next) * 5;
System.out.println(res);
paralel Çalışan forEach() ve forEachOrdered() Farkı
stream paralel çalışsa bile forEachOrdered() stream'in sıralamasını korur. forEach() ise korumaz. forEachOrdered() açıklaması şöyle
Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order. This is a terminal operation. 
forEach açıklaması şöyle
Performs an action for each element of this stream. This is also a terminal operation. The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. 
Örnek - paralel Olarak forEach
Şöyle yaparız.
Stream.of(runnable1, runnable2, runnable3).parallel().forEach(r -> r.run());
Örnek - paralel Olarak forEachOrdered
Elimizde şöyle bir kod olsun
List<Human> humans = List.of(new Human(10), new Human(15), new Human(11), new Human(20),
new Human(22));

humans.stream().parallel()
  .filter(human -> human.age>10)
  .map(human->human+",")
  .forEachOrdered(System.out::println);

humans.stream().parallel()
  .filter(human -> human.age>10)
  .map(human->human+",")
  .forEach(System.out::println);
Çıktı olarak şunu alırız.
15,11,20,22 //forEachOrdered sırayı korur 11,15,20,22 //forEach sırayı korumaz
Örnek - paralel Olarak map()
Şöyle yaparız
String id = Stream.<Supplier<String>>of(
        () -> getIdFromResponse(response), 
        () -> getIdFromRequest(request)
    )
    .parallel()
    .map(Supplier::get)
    .filter(Objects::nonNull)
    .findFirst()
    .orElseThrow():
Örnek - paralel Olarak reduce()
Şöyle yaparız. Burada Foo nesnesi son iki kısmı birleştirirken yeniden yaratılır ve en son iki paralel iş birleştirilir. Dolayısıyla sorun çıkmıyor.
final long SIZE = 1000;         // max stream size

final StringBuilder builder = Stream
        .generate(() -> "a")    // generate an infinite stream of "a"
        .limit(SIZE)            // make it finite
        .parallel()             // make it parallel
        .reduce(new Foo(), Foo::append, (b1, b2) -> b1);
                                // put each element in the builder

Örnek  - paralel Olarak count()
Şöyle yaparız.
Collection<Person> persons = ...;
long count = persons.parallelStream().
  .filter(...)
  .filter(...)
  .filter(...)
  .count();
Örnek 
Stream'in paralel olması Collector'ı serial ise işe yaramaz. Elimizde şöyle bir kod olsun.
IntStream.range(0,10).parallel().boxed().collect(Collectors.toList());
Çıktı olarak şunu alırız.
ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2. Collections.parallelStream metodu
Örnek
Şöyle yaparız.
boolean result = list.parallelStream()
                             .anyMatch(p -> ...);
Örnek
Şöyle yaparız.
List<Type> data = ...;
List<Other> out = data.parallelStream()
    .map(t -> doSomeWork(t))
    .collect(Collectors.toList());
Ya da şöyle yaparız.
List<String> urls = ....

Map<String, String> map = urls.parallelStream()
                          .collect(Collectors.toMap(u -> u, u -> download(u)));
Görsel olarak şuna karşılık gelir.
   |.|
   |.|
   |4|
   |3|
   |2|    <- incoming queue
   |1|
  / | \
 2  1  3  <- worker threads
  \ | /
   |3|
   |2|    <- outgoing queue
   |1|
Diğer Konular
Stream.limit metodu
Önemli Not : Limit belirtilen sayıya ulaşılıncaya kadar çalışır. Aşağıdaki kod 5'ten küçük sadece 5 sayı üretiyor. limit ise 10 verildiği için takılıyor.
List<Integer> integer = Stream.generate(new Supplier<Integer>() {
    int i = 0 ;

    @Override
    public Integer get() {
        return ++i;
    }
}).filter(j -> j < 5).limit(10).collect(Collectors.toList());
Şimdi esas konuya gelelim. paralel yapılan işlemlerde Stream'den elde edilen veriyi sayı olarak sınırlamak isteriz. İlk yöntem limit() metodunu kullanıyor.
Stream<Long> stream = 
  Stream.generate(() -> ThreadLocalRandom.current().nextLong()) ;

List<Long> list1 = 
  stream.parallel().limit(10_000_000).collect(Collectors.toList()) ;
Bu yöntemde thread'ler sayıyı sınırlamak için kendi aralarında sürekli konuşurlar. Bu yüzden çok verimli olmayabilir.

İkinci yöntemde stream sınırlı tutuluyor. Bu yöntem daha hızlı. Yani parallel() den sonra limit() iyi değil.
Stream<Long> stream = 
  ThreadLocalRandom.current().longs(10_000_000).mapToObj(Long::new) ;

List<Long> list = 
  stream.parallel().collect(Collectors.toList()) ;





Hiç yorum yok:

Yorum Gönder