20 Haziran 2019 Perşembe

ThreadPoolExecutor Sınıfı

Giriş
ThreadPoolExecutor sınıfı karmaşık bir constructor'a sahip. Bu yüzden ExecutorService.xyz() şeklindeki factory metodlar ile yaratıp kullanması daha kolay.

Kalıtım hiyerarşisi şöyle

ExecutorService <--ThreadPoolExecutor

Bu sınıfa bir iş verilince şu adımlar izlenir.
When a task is submitted

1.If poolSize is less than corePoolSize, a new thread is created, even if there are idle threads.
2.If poolSize is equal to the corePoolSize then task is added to the queue. It won't create new threads until queue is exhausted.
3.if workQueue is exhausted then new thread is created till poolSize becomes maximumPoolSize.
4.If poolSize is equal to the maximumPoolSize throw RejectedExecutionException
constructor
Parametreler şöyle

1. corePoolSize
Bu parametre kaç tane thread ile çalışmamız gerektiğini belirtir

CPU-Intensive Tasks
Açıklaması şöyle
A common rule of thumb is to use the number of CPU cores available
Örnek
Şöyle yaparız
int numThreads = Runtime.getRuntime().availableProcessors(); // the number of CPU cores
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
Eğer başka işler için de işlemci zamanı bırakmak istersek mevcut çekirdek sayısından daha az bir değer kullanırız. Şöyle yaparız
int availableCores = Runtime.getRuntime().availableProcessors();
int numberOfThreads = Math.max(availableCores - 1, 1); // Adjust as needed

ExecutorService threadPool = Executors.newFixedThreadPool(numberOfThreads);
IO-intensive tasks
Sayısı hesaplamak için yardımcı olabilecek bir açıklama şöyle. Burada 100% işlemci kullanımı düşünülüyor
In order to calculate the ideal number of threads for an IO-bound task, we can use the following formula provided by Brian Goetz. If the threads spend S units of service time (running and utilizing CPU) and W units of waiting time (blocked in IO operation) and there are N processor cores, then


number of threads = N * (1 + Wait time / Service time)
Açıklama şöyle. Burada 100% işlemci kullanımı değil bir başka yüzde düşünülüyor. O yüzden Target CPU utilization ile çarpım var.
Number of threads = Number of Available Cores * Target CPU utilization * (1 + Wait time / Service time)
Örnek 
S yani service time veya hesaplama süresi 1 saniye olsun. 
W yani wait time yani IO 2 saniye olsun. 
N yani 4 çekirdek olsun.
Bu durumda 4 * (1 + 2/1) = 12 thread lazım

Sayı çalışma esnasında şöyle değiştirilir ancak bu metod çok iyi çalışmayabiliyor.
ThreadPoolExecutor es = new ThreadPoolExecutor(1, 100, 30, TimeUnit.DAYS, 
new LinkedBlockingQueue<Runnable>());
es.setCorePoolSize(2);   
Örnek
Açıklaması şöyle
The I/O-intensive tasks have a blocking coefficient of 0.5, meaning that they spend 50% of their time waiting for I/O operations to complete.

Number of threads = 4 cores * 0.5 * (1 + 0.5) = 3 threads

2. maximumPoolSize
En fazla kaç thread ile çalışmamız gerektiğini belirtir. I/O kullanmayan işlerde en fazla sayı sanırım işlemci sayısı + 1 kadar olmalı.

3. keepAliveTime
Thread sayısı coroPoolSize'dan fazlaysa idle thread'in ne kadar yaşayacağınız gösterir.

4. BackingQueue
SynchronousQueue : Direct handoff
LinkedBlockingQueue : Unbounded
ArrayBlockingQueue : Bounded

gibi sınıflar olabilir.

Örnek
Thread sayısı 10-50 arasında olan, ve en fazla 20 tane işi kuyrukta bekleten bir şey yaratmak istersek şöyle yaparız. Bence en ideal kod bu!
new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size
Örnek
Bir başka örnek'te kuyruk sınırsız olduğu için thread sayısı 10-50 olsa bile yeni işleri kuyruğa ekleyerek kabul etmeye devam eder. FixedThreadPool'a çok benziyor. Tek farkı thread'lerin idle süresi biraz daha uzun tutulmuş.
BlockingQueue<Runnable>  queue = new LinkedBlockingQueue<>();
new ThreadPoolExecutor(10, 50, 10 * 60, TimeUnit.SECONDS, queue);
Örnek
Şöyle yaparız.
//core 5 max 10 with 60 second idle time
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,
  new LinkedBlockingQueue<Runnable>());
afterExecute metodu
ThreadPoolExecutor Kullanımı yazısına taşıdım

beforeExecute metodu
Şöyle yaparız.
Semaphore semaphore = new Semaphore(1000);
ThreadPoolExecutor executor = new ThreadPoolExecutor(...){
  protected void beforeExecute(Runnable r, Throwable t) { 
    semaphore.release();
  }
}
getActiveCount metodu
İstatistik amacıyla o an çalışmakta olan thread sayısını döner.

getCompletedTaskCount metodu
İstatistik amacıyla o ana kadar bitmiş iş sayısını döner.

getQueue metodu
Kuyruğa erişime izin verir.
Örnek
Şöyle yaparız.
BlockingQueue<Runnable> coreQueue = es.getQueue();
getTaskCount metodu
İstatistik amacıyla o ana kadar eklenmiş toplam (bitmiş veya beklemekte olan) iş sayısını döner.
Örnek
Şöyle yaparız.
long submitted = executor.getTaskCount();
long completed = executor.getCompletedTaskCount();
long notCompleted = submitted - completed; // approximate
isTerminated metodu
Şöyle kullanırız.
ThreadPoolExecutor es = new ThreadPoolExecutor(...);
...
x.shutdown();
while (!x.isTerminated()) {...}
setCorePoolSize metodu
Sayı çalışma esnasında şöyle değiştirilir. Ancak bu metod çok iyi çalışmayabiliyor.
ThreadPoolExecutor x = new ThreadPoolExecutor(1, 100, 30, TimeUnit.DAYS, 
new LinkedBlockingQueue<Runnable>());
x.setCorePoolSize(2);   
setRejectedExecutionHandler metodu
Şöyle yaparız.
threadPool.setRejectedExecutionHandler(rejectedHandler);


Hiç yorum yok:

Yorum Gönder