Showing posts with label Thread. Show all posts
Showing posts with label Thread. Show all posts

Thursday, 14 April 2016

Differences between CyclicBarrier and CountDownLatch

Here are main differences between CyclicBarrier and CountDownLatch concurrency utils in Java :

1) CyclicBarrier is resulable, CountDownLatch is not.

2) Both CyclicBarrier and CountDownLatch wait for fixed number of threads.

3) CountDownLatch is advanceable but CyclicBarrier is not.

Thursday, 23 January 2014

How do I implement task prioritization using an ExecutorService in Java 5?


package example;

import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

interface PriorityCallable<T> extends Callable<T> {
    int getPriority();
}

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L,
                TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(10,
                        PriorityFuture.COMP)) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                System.out.println("######## newTaskFor : -------");
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
               
                return new PriorityFuture<T>(newTaskFor,
                        ((PriorityCallable<T>) callable).getPriority());
            }
        };
    }

    public static void main(String[] args) throws InterruptedException,
            ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

Friday, 10 January 2014

Thread output in certain order

package threadseq;

import java.util.*;

public class OrderThreads {
public static void main(String... args) {
Results results = new Results();
new Thread(new Task(0, "red", results)).start();
new Thread(new Task(1, "orange", results)).start();
new Thread(new Task(2, "yellow", results)).start();
new Thread(new Task(3, "green", results)).start();
new Thread(new Task(4, "blue", results)).start();
new Thread(new Task(5, "indigo", results)).start();
new Thread(new Task(6, "violet", results)).start();
}
}

class Results {
private List<String> results = new ArrayList<String>();
private int i = 0;

public synchronized void submit(int order, String result) {
while (results.size() <= order){
results.add(null);
}
results.set(order, result);
while ((i < results.size()) && (results.get(i) != null)) {
System.out.println("result delivered: " + i + " " + results.get(i));
++i;
}
}
}

class Task implements Runnable {
private final int order;
private final String result;
private final Results results;

public Task(int order, String result, Results results) {
this.order = order;
this.result = result;
this.results = results;
}

public void run() {
try {
Thread.sleep(Math.abs(result.hashCode() % 1000)); // simulate a
// long-running
// computation
} catch (InterruptedException e) {
} // you'd want to think about what to do if interrupted
System.out.println("task finished: " + order + " " + result);
results.submit(order, result);
}
}

Friday, 4 October 2013

Using Callable to Return Results From Runnables

The Runnable interface has been around since the beginning of time for the Java platform. It allows you to define a task to be completed by a thread. As most people probably know already, it offers a single method run() that accepts no arguments and returns no values, nor can it throw any checked exceptions. If you need to get a value back from the now-completed task, you must use a method outside the interface and wait for some kind of notification message that the task completed. For example, the following demonstrates what you might do for just such a scenario:
  Runnable runnable = ...;
  Thread t = new Thread(runnable);
  t.start();
  t.join();
  String value = someMethodtoGetSavedValue()
Nothing is inherently wrong with this code, but it can be done differently now, thanks to the Callable interface introduced in J2SE 5.0. Instead of having a run() method, the Callable interface offers a call() method, which can return an Object or, more specifically, any type that is introduced in the genericized form:
  public interface Callable<V> {
     V call() throws Exception;
  }
Because you cannot pass a Callable into a Thread to execute, you instead use the ExecutorService to execute the Callable object. The service accepts Callable objects to run by way of the submit() method:
  <T> Future<T> submit(Callable<T> task)
As the method definition shows, submitting a Callable object to the ExecutorService returns a Future object. Theget() method of Future will then block until the task is completed. This is the equivalent of the join() call in the first example. Actually, it is the equivalent of both the join() call and the get value call as get() returns the value calculated by the Callable instance.
To demonstrate, the following example creates separate Callable instances for each word passed in on the command line and sums up their length. Each Callable will just calculate the sum of its individual word. The set ofFuture objects are saved to acquire the calculated value from each. If the order of the returned values needed to be preserved, a List could be used instead.
import java.util.\*;
import java.util.concurrent.\*;

public class CallableExample {

  public static class WordLengthCallable
        implements Callable {
    private String word;
    public WordLengthCallable(String word) {
      this.word = word;
    }
    public Integer call() {
      return Integer.valueOf(word.length());
    }
  }

  public static void main(String args[]) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    Set<Future<Integer>> set = new HashSet<Future≶Integer>>();
    for (String word: args) {
      Callable<Integer> callable = new WordLengthCallable(word);
      Future<Integer> future = pool.submit(callable);
      set.add(future);
    }
    int sum = 0;
    for (Future<Integer> future : set) {
      sum += future.get();
    }
    System.out.printf("The sum of lengths is %s%n", sum);
    System.exit(sum);
  }
}
The WordLengthCallable saves each word and uses the word's length as the value returned by the call()method. This value could take some time to generate but in this case is known immediately. The only requirement ofcall() is the value is returned at the end of the call. When the get() method of Future is later called, the Futurewill either have the value immediately if the task runs quickly, as in this case, or will wait until the value is done generating. Multiple calls to get() will not cause the task to be rerun in the thread.
Because the goal of the program is to calculate the sum of all word lengths, it doesn't matter in which order theCallable tasks finish. It is perfectly OK if the last task completes before the first three. The first get() call to Futurewill just wait for the first task in the Set to complete. This does not block other tasks from running to completion separately. It is just waiting for that one thread or task to complete.

Daemon Threads

A “daemon” thread is one that is supposed to provide a general service in the background as long as the program is running, but is not part of the essence of the program. Thus, when all of the non-daemon threads complete, the program is terminated. Conversely, if there are any non-daemon threads still running, the program doesn’t terminate. There is, for instance, a non-daemon thread that runs main( ).
//: c13:SimpleDaemons.java
// Daemon threads don't prevent the program from ending.

public class SimpleDaemons extends Thread {
  public SimpleDaemons() {
    setDaemon(true); // Must be called before start()
    start();
  }
  public void run() {
    while(true) {
      try {
        sleep(100);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
      System.out.println(this);
    }
  }
  public static void main(String[] args) {
    for(int i = 0; i < 10; i++)
      new SimpleDaemons();
  }
} ///:~

You must set the thread to be a daemon by calling setDaemon( ) before it is started. In run( ), the thread is put to sleep for a little bit. Once the threads are all started, the program terminates immediately, before any threads can print themselves, because there are no non-daemon threads (other than main( )) holding the program open. Thus, the program terminates without printing any output.
You can find out if a thread is a daemon by calling isDaemon( ). If a thread is a daemon, then any threads it creates will automatically be daemons, as the following example demonstrates:
//: c13:Daemons.java
// Daemon threads spawn other daemon threads.
import java.io.*;
import com.bruceeckel.simpletest.*;

class Daemon extends Thread {
  private Thread[] t = new Thread[10];
  public Daemon() {
    setDaemon(true);
    start();
  }
  public void run() {
    for(int i = 0; i < t.length; i++)
      t[i] = new DaemonSpawn(i);
    for(int i = 0; i < t.length; i++)
      System.out.println("t[" + i + "].isDaemon() = "
        + t[i].isDaemon());
    while(true)
      yield();
  }
}

class DaemonSpawn extends Thread {
  public DaemonSpawn(int i) {
    start();
    System.out.println("DaemonSpawn " + i + " started");
  }
  public void run() {
    while(true)
      yield();
  }
}

public class Daemons {
  private static Test monitor = new Test();
  public static void main(String[] args) throws Exception {
    Thread d = new Daemon();
    System.out.println("d.isDaemon() = " + d.isDaemon());
    // Allow the daemon threads to
    // finish their startup processes:
    Thread.sleep(1000);
    monitor.expect(new String[] {
      "d.isDaemon() = true",
      "DaemonSpawn 0 started",
      "DaemonSpawn 1 started",
      "DaemonSpawn 2 started",
      "DaemonSpawn 3 started",
      "DaemonSpawn 4 started",
      "DaemonSpawn 5 started",
      "DaemonSpawn 6 started",
      "DaemonSpawn 7 started",
      "DaemonSpawn 8 started",
      "DaemonSpawn 9 started",
      "t[0].isDaemon() = true",
      "t[1].isDaemon() = true",
      "t[2].isDaemon() = true",
      "t[3].isDaemon() = true",
      "t[4].isDaemon() = true",
      "t[5].isDaemon() = true",
      "t[6].isDaemon() = true",
      "t[7].isDaemon() = true",
      "t[8].isDaemon() = true",
      "t[9].isDaemon() = true"
    }, Test.IGNORE_ORDER + Test.WAIT);
  }
} ///:~

The Daemon thread sets its daemon flag to “true” and then spawns a bunch of other threads—which do not set themselves to daemon mode—to show that they are daemons anyway. Then it goes into an infinite loop that calls yield( ) to give up control to the other processes.
There’s nothing to keep the program from terminating once main( ) finishes its job, since there are nothing but daemon threads running. So that you can see the results of starting all the daemon threads, the main( ) thread is put to sleep for a second. Without this, you see only some of the results from the creation of the daemon threads. (Try sleep( ) calls of various lengths to see this behavior.)