27 Mart 2018 Salı

RxJava Observable ve Threading

subscribeOn metodu
Verinin hangi thread kullanılarak üretileceğini belirtir. Bir kere çağrılabilir.

Örnek - Schedulers.newThread
Şöyle yaparız. Veri yeni bir  thread içinde üretilir. UI thread içinde tüketilir.
myService.fetchSomeIntegersFromServer()
  .subscribeOn(Schedulers.newThread())
  .filter(integer -> {
    System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
    return true;
  })
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(integer1 -> { doSomething(integer1) });
Örnek
Bu örnek aslında Project Reactor örneği ancak RxJava ile aynı mantık olduğu için ekledim. Şöyle yaparız. Burada tüm veri tabanı işlemi arka planda yapılıyor ve sonuç ta responseObserver::onNext() çağrısının yine bu arka plan thread üzerinde yapıyor.
public void sayHello(HelloRequestDto request,
StreamObserver<HelloResponseDto> responseObserver) {
  sayHelloTo(request.getName())
    .map(message -> HelloResponseDto.newBuilder().setResponse(message).build())
    .subscribe(responseObserver::onNext, responseObserver::onError,
responseObserver::onCompleted);
}
 
private Mono<String> sayHelloTo(String name) {
  return Mono.fromCallable(() -> incrementAndGetHelloCount(name))
    .map(count -> String.format("Hello, %s! You have called %d times.", name, count))
    .subscribeOn(Schedulers.boundedElastic());
}
 
private Long incrementAndGetHelloCount(String name) {
  return ...;
}
Açıklaması şöyle 
Functionally, this means that the thread that executes sayHello will effectively return immediately and be able to service other requests while another thread interacts with the database and responds with the “hello” message.

Hiç yorum yok:

Yorum Gönder