16 Şubat 2018 Cuma

RxJava Observable Sınıfı

Giriş
RxJava, ReactiveX kavramının Java gerçekleştirimi. Şu satırı dahil ederiz.
<dependency>
  <groupId>io.reactivex</groupId>
  <artifactId>rxjava</artifactId>
  <version>1.3.2</version>
</dependency>
Gradle ile şöyle yaparız.
dependencies {
  ...
  compile 'io.reactivex:rxandroid:1.2.1'
  compile 'io.reactivex:rxjava:1.2.1'
}
Olayı basit tutmak için RxJava'nın amacını asenkron bir işlemi gerçekleştirip bunun sonucunu UI'da göstermek diye düşünelim. Klasik programlama dillerinde asenkron işlem tüm cevabı hazırlayıp gönderir.

RxJava ile cevap peyderpey gönderilebilir. Böylece UI ara ara güncellenerek en son durum görüntülebilir.


Back Pressure
Back Pressure ile ilgili detaylı örnekler burada.

Non-Blocking Back Pressure
ReactiveX ise Reactive Manifesto ile ilgili. Reactive Manifesto'nun bir amacı şunu sunmak.
"a standard for asynchronous stream processing with non-blocking back pressure"
Burada back-pressure kavramını haberleşme protokollerindeki "flow control" kavramı ile aynı şeymiş gibi düşünmek başlamayı kolaylaştırıyor.

Producer'ın hızına yetişemeyen Consumer , bir şekilde Producer'a yavaşlaması gerektiğini bildirir.

"Blocking back-pressure" yönteminde Producer bekler. Asenkron çalışmada Producer'ı bekletme şansımız yok. Dolayısıyla "Non-blocking back-pressure" yöntemi kullanılıyor.

"Non-blocking back-pressure" yönteminde Consumer hazır olduğunda Producer'a kaç tane nesne istediğini söyler. Yani push yöntemi yerine pull yöntemi kullanılır. Producer da elindekileri gönderir.

Java 8 Stream İle Farkı Nedir?
Açıklaması şöyle.
Java 8 Streams are pull based. You iterate over a java 8 stream consuming each item. And it could be an endless stream.

RXJava Observable is by default push based. You subscribe to an Observable and you will get notified when the next item arrives (onNext), or when the stream is completed (onCompleted), or when an error occured (onError).
Observable Benzeri Sınıflar
Single,Completable, Maybe gibi benzer metodları sunan sınıflar da var.

buffer metodu
Satırları 4'er okumak için şöyle yaparız.
Stream<String> stream = Files.lines(Paths.get("file.txt"))

Observable.fromIterable(stream::iterator)
          .buffer(4)                      // Observable<List<String>>
          .map(x -> String.join(", ", x)) // Observable<String>
          .forEach(System.out::println);
create metodu
Şöyle yaparız.
Observable.create(new ObservableOnSubscribe<Integer>() {
  @Override
  public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
    if (!observableEmitter.isDisposed())
      observableEmitter.onComplete();
  }
}).subscribe (...);
fromIterable metodu
Şöyle yaparız.
List<Form> form = ...;
Observable.fromIterable(form)
.concatMapIterable(Form::getFormVersions) // get form version list from single form object
.doOnSubscribe(disposable -> AppLogger.i(tag, "Form Versions download is subscribed"))
.filter(this::checkFormVersionToDownloadOrNot) //get those versions to be downloaded
.doOnNext(formVersion -> { // get formVersion object

  AppLogger.i(tag, "download this form ---------> " + formVersion.getFormUrl());
  AppLogger.i(tag, formVersion.getFormUrl());
  String constructURL = formVersion.getFormUrl();
  /* download form gzip process starts from here */
  ...
})
.doOnTerminate(() -> AppLogger.i(tag, "Form Versions terminated"))
.doOnError(Throwable::printStackTrace)
.subscribe(
  formVersion -> AppLogger.i(tag, "Form Versions download completed"),
  throwable -> AppLogger.e(tag, throwable.getMessage(), throwable)
 );
just metodu
Örnek
Tek bir nesne dönmek için şöyle yaparız.
Observable.just("item").subscribe(
  new Observer<String>() {
  @Override
  public void onSubscribe(Disposable d) {
   ...
  }

  @Override
  public void onNext(String s) {
    ...
  }

  @Override
  public void onError(Throwable e) {
    ...
  }

  @Override
  public void onComplete() {
    ...
  }
})
Örnek
Liste kullanmak için şöyle yaparız.
List<Foo> list = ...;
Observable.just(list).flatMapIterable(i -> i).groupBy(f -> f.Id)
Örnek
Dizi kullanmak için şöyle yaparız.
String[] strings = {"Hello", "World"};
Observable<String[]> stringsObservable = Observable.just(strings);
range metodu
Elimizde şöyle bir sınıf olsun.
public class MyClass {
  private int number;

  public MyClass(int number) {
    super();
    this.number = number;
  }

  public int getNumber() {
    return number;
  }

}
Bu sınıftan 50 tane farklı sayı ile ilklendirmek için şöyle yaparız.
 Observable.range(1, 10).map(number -> new MyClass(number)).toList()
  .subscribe(myClasses -> printData(myClasses));
Nesne listesi oluşturulduktan sonra şu metod çağrılır.
static void printData(List<MyClass> mydata) {
  mydata.stream().forEach(myClass -> System.out.println(myClass.getNumber()));
}
subscribe metodu
Örnek
Şöyle yaparız.
Observable.just(1).subscribe(new Observer<Integer>() {

  Disposable disposable;

  @Override
  public void onSubscribe(Disposable disposable) {
    System.out.println("Subscribed");
    this.disposable = disposable;
  }

  @Override
  public void onNext(Integer integer) {
    System.out.println(integer);
    System.out.println(disposable.isDisposed());
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("Error");
    System.out.println(disposable.isDisposed());
  }

  @Override
  public void onComplete() {
    System.out.println("Complete");
    System.out.println(disposable.isDisposed());
  }
})
Örnek
Bu örnekte Observer yerine Subscriber kullanılıyor. Subscriber da Observer arayüzünden kalıtır.
public abstract class Subscriber<T> implements Observer<T>, Subscription
Şöyle yaparız.
Subscription subscription =  Observable.subscribe(new Subscriber<Type>() {
  @Override
  public void onCompleted() {
  }

  @Override
  public void onError(Throwable e) {
  }

  @Override
  public void onNext(String responseString) {
  }
});