Thursday, 23 January 2014

How do I implement task prioritization using an ExecutorService in Java 5?


package example;

import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

interface PriorityCallable<T> extends Callable<T> {
    int getPriority();
}

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L,
                TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(10,
                        PriorityFuture.COMP)) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                System.out.println("######## newTaskFor : -------");
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
               
                return new PriorityFuture<T>(newTaskFor,
                        ((PriorityCallable<T>) callable).getPriority());
            }
        };
    }

    public static void main(String[] args) throws InterruptedException,
            ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

No comments: