6 Mart 2019 Çarşamba

Spliterator Arayüzü - parallelStream()'in Çalışması İçin Gerekir

Giriş
Açıklaması şöyle.
At the lowest level, all streams are driven by a spliterator.
Açıklaması şöyle. Yani parallelStream()'in çalışması için listeyi küçük parçalara böler
An Iterator is a simple representation of a series of elements that can be iterated over.
A Spliterator can be used to split given element set into multiple sets so that we can perform some kind of operations/calculations on each set in different threads independently, possibly taking advantage of parallelism. It is designed as a parallel analogue of Iterator. Other than collections, the source of elements covered by a Spliterator could be, for example, an array, an IO channel, or a generator function.

Main methods in the Spliterator interface are:

tryAdvance()
With tryAdvance(), we can traverse underlying elements one by one (just like Iterator.next()). If a remaining element exists, this method performs the consumer action on it, returning true; else returns false.

forEachRemaining()
For sequential bulk traversal we can use forEachRemaining():

trySplit()
Splits this spliterator into two and returns the new one. An ideal trySplit method should divide its elements exactly in half, allowing balanced parallel computation.
Kendi Sınıfımız
Örnek
Şöyle yaparız
//A {@link #ORDERED} implementation of {@link Spliterator} for {@link List}.
//It accepts {@link List} that can be divided in N splits.
//These splits can be concurrently iterated by {@link #tryAdvance(Consumer)}
class CappedListSpliterator<T> implements Spliterator<T> {

  private List<T> items;
  private int begin;
  private final int end;
  private int maxConcurrency;


  //maxConcurrency - how many split original List would have.
  static <T> CappedListSpliterator<T> of(@NonNull List<T> items,
                                         int maxConcurrency) {
    return new CappedListSpliterator<T>(items, maxConcurrency);
  }

  private CappedListSpliterator(@NonNull List<T> items,
                                int maxConcurrency) {
    this.items = items;
    this.maxConcurrency = maxConcurrency;
    this.begin = 0;
    this.end = items.size();
  }

  private CappedListSpliterator(@NonNull List<T> items,
                                int begin,
                                int end) {
    this.items = items;
    this.begin = begin;
    this.end = end;
    this.maxConcurrency = 1;
  }

  //As splits are {@link #ORDERED}, they have defined beginning and end.
  //This increments begin by one with each call and returns false when end and
//begin became equal.
  @Override
  public boolean tryAdvance(Consumer action) {
    if (this.end <= this.begin) {
      return false;
    }
    action.accept(items.get(begin));
    this.begin += 1;
    return true;
  }

  //COPYING FROM DOC
  //If this spliterator can be partitioned, returns a Spliterator
  //covering elements, that will, upon return from this method, not
  //be covered by this Spliterator.
  @Override
  public Spliterator trySplit() {
    if (maxConcurrency <= 1) {
      return null;
    }

    int newBegin = begin + (end - begin) / maxConcurrency;
    CappedListSpliterator<T> newSplit =  new CappedListSpliterator(items,
              begin,
              newBegin);
    this.begin = newBegin;
    this.maxConcurrency -= 1;
    return newSplit;
  }

  @Override
  public long estimateSize() {
    return end - begin;
  }

  @Override
  public int characteristics() {
    return SIZED | NONNULL | SUBSIZED | ORDERED;
  }
}
Kullanmak için şöyle yaparız
public class ParallelForkJoinWithCustomSpliterator {
  private static final int POOL_SIZE = 10;
  private static final int MAX_CONCURRENCY = 2;
  private static final int SLEEP_TIME_IN_MILLIS = 5000;
  private static final ForkJoinPool IO_FORK_JOIN_POOL = new ForkJoinPool(POOL_SIZE);
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    IO_FORK_JOIN_POOL.submit(() -> StreamSupport
      //Using of Custom Spliterator
      .stream(CappedListSpliterator.of(Arrays.asList(...), MAX_CONCURRENCY),
        true)
      .forEach(ParallelStreamWithForkJoin::sleep))
      .get();
      System.out.println("End Of Program");
  }

  public static void sleep(String word) {
    try {
      //Simulate blocking by sleep.
      Thread.sleep(SLEEP_TIME_IN_MILLIS);
      System.out.println(word);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
constructor
Bu sınıfı yaratmak için
1. stream.splitIterator() kullanılır.
2. Iterable.splitIterator() kullanılır.
3. Normal iterator Spliterators.spliteratorUnknownSize() ile Spliterator nesnesine verilir ve SplitIterator yaratılır.

Örnek
Stream'den yaratmak için şöyle yaparız
Spliterator<String> sp=list.stream().filter(s -> ...).spliterator();
Örnek
Stream'den yaratmak için şöyle yaparız.
Spliterator<String> source = new Random()
            .ints(11, 0, 7) // size, origin, bound
            .filter(nr -> nr % 2 != 0)
            .mapToObj(Integer::toString)
            .spliterator();
Örnek
Iterator'den yaratmak için şöyle yaparız.
Stream<E> stream = StreamSupport.stream(
  Spliterators.spliteratorUnknownSize(sourceIterator, Spliterator.ORDERED), false);
Örnek
Iterable.splitIterator() ile şöyle yaparız
List<Integer> coll = IntStream.range(0, 150_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 1);
Spliterator<List<Integer>> sp = it.spliterator();
Örnek
Şöyle yaparız
List<Integer> coll = IntStream.range(0, 15_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 5000);

List<List<Integer>> list = new ArrayList<>();
it.forEach(list::add);

StreamSupport.stream(list.spliterator(), true)
  .map(x -> {
    System.out.println(
      "Thread : " + Thread.currentThread().getName() +
      " processed elements in the range : " + x.get(0) + " , " + x.get(x.size() - 1)
    );
    return x;
  })
  .flatMap(List::stream)
  .collect(Collectors.toList());
Çıktı olarak şunu alırız
Thread : ForkJoinPool.commonPool-worker-5 processed elements in the range : 10000 , 14999
Thread : ForkJoinPool.commonPool-worker-19 processed elements in the range : 0 , 4999
Thread : main processed elements in the range : 5000 , 9999
characteristics metodu
Açıklaması şöyle.
returns an int encoding the set of characteristics of the Spliterator itself. The Spliterator clients can use these characteristics to better control and optimize its usage.
ORDERED,DISTINCT,SORTED,SIZED,NONNULL,IMMUTABLE,CONCURRENT,SUBSIZED dönebilir.

CONCURRENT Characteristics
Açıklaması şöyle.
A Spliterator that does not report IMMUTABLE or CONCURRENT is expected to have a documented policy (for example throwing ConcurrentModificationException) concerning structural interference detected during traversal.
Açıklaması şöyle.
The source of this Spliterator may be safely concurrently modified by other threads without any synchronization.
Stream'i paralel dolaşmak için kullanılır.
The Spliterator is another new interface added to Java 8; its name stands for “splitable iterator.”
Like Iterators, Spliterators are used to traverse the elements of a source, but they’re also
designed to do this in parallel.
IMMUTABLE Characteristics
Açıklaması şöyle.
Characteristic value signifying that the element source cannot be structurally modified; that is, elements cannot be added, replaced, or removed, so such changes cannot occur during traversal.
SORTED Characteristics
Örnek
Şöyle yaparız. true döner
System.out.println(
    IntStream.range(0, 4)
             .spliterator()
             .hasCharacteristics(Spliterator.SORTED)
);
estimateSize metodu
Açıklaması şöyle.
A Spliterator may also provide an estimation of the number of the elements remaining to be traversed via its estimateSize method, because even an inaccurate but quick-to-compute value can be useful to split the structure more or less evenly.
forEachRemaining metodu - Bulk işlemler içindir
Elimizde bir BufferedReader olsun. İlk satırı için bir işlem diğerleri için başka işlem yapmak için şöyle yaparız.
Spliterator<String> sp = reader.lines().spliterator();    

sp.tryAdvance(YourConsumer)
sp.forEachRemaining(DifferentConsumer)
tryAdvance metodu - Bir sonraki Eleman İstenir. True veya False Döner
Açıklaması şöyle.
The tryAdvance method behaves in a way similar to a normal Iterator in the sense that it’s used to sequentially consume the elements of the Spliterator one by one, returning true if there are still other elements to be traversed.
Örnek
Şöyle yaparız.
Spliterator<String> source = ...

while (source.tryAdvance(s -> {...})) {
  ...
}
Örnek
Şöyle yaparız.
Spliterator<String> sp= ...
if(sp.tryAdvance(token -> System.out.println("this is first non-empty token: "+token))) {
  ...
}
trySplit metodu - SplitIterator Ortadan Tekrar İkiye Bölünür
Örnek
Şöyle yaparız
List<Integer> coll = IntStream.range(0, 150_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 1);
Spliterator<List<Integer>> sp = it.spliterator();

Spliterator<List<Integer>> one = sp.trySplit();
System.out.println(one.getExactSizeIfKnown());

Spliterator<List<Integer>> two = sp.trySplit();
System.out.println(two.getExactSizeIfKnown());

Spliterator<List<Integer>> three = sp.trySplit();
System.out.println(three.getExactSizeIfKnown());

Spliterator<List<Integer>> four = sp.trySplit();
System.out.println(four.getExactSizeIfKnown());
Çıktı olarak şunu alırız. Burada her trySplit() ile yaratılan yeni SplitIterator 1024 olarak büyüyor.
1024
2048
3072
4096
Açıklaması şöyle
... Spliterator comes from an Iterable, that does not have a known size. So the implementation internally will buffer the elements into a buffer of size 1024 and continue to increase the buffer on next iterations.

Hiç yorum yok:

Yorum Gönder