Concurrency#

Todo:#

  • [ ] lost update

  • [ ] happens before

Race Condition#

https://web.mit.edu/6.031/www/sp21/classes/20-concurrency/#race_condition

A race condition means that the correctness of the program depends on the relative timing of events in concurrent computations A and B. When this happens we say “A is in race with B.”

  • check-then-act: a potentially stale observation is used to decide what to do next.

  • read-modify-write: new state is derived from previous state (may be stale). e.g., count++.

Synchronizers#

CountDownLatch#

A synchronization aid that allows one or more threads (by calling await()) to wait until a set of operations being performed in other threads completes.

  • Initialized with a given count.

  • The await method blocks until the current count is reached to zero due to invocations of the countDown().

  • If count is already zero invocation to await returns immediately.

  • The count cannot be reset.

  • Threads calling countDown need not wait for the count.

Use cases:

  • A CountDownLatch with a count of one serves as a simple on/off latch, or gate: all thread invoking await wait at the gate until it is opened by a thread invoking countDown().

  • A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

class Driver {
    private static final int N = 10;

    public static void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; i++>) {
            new Thread(new Workder(startSignal, doneSignal)).start();
        }

        doSomeWorkBeforeWorker();
        startSignal.countDown();
        doSomeWork();
        doneSignal.await();
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException e) {
            System.out.println("I got interrupted :-()");
            return;
        }
    }

    void doWork() {
        System.out.println("Doing work...");
    }
}

Semaphore#

A counting semaphore maintains a set of permits that can be acquired and released. If a permit is unavailable, the acquire operation will block the thread.”

Liveness#

Ability to make progress.

  • Deadlock

  • Livelock

  • Starvation

Deadlock#

Deadlock is a situation where two or more threads are blocked forever, waiting for each other.

Broken
 1public class Deadlock {
 2  public static void main(String[] args) {
 3    Account a = new Account("A", 1000);
 4    Account b = new Account("B", 1000);
 5
 6    Thread t1 = new Thread(() -> transfer(a, b, 200));
 7    Thread t2 = new Thread(() -> transfer(b, a, 100));
 8
 9    t1.start();
10    t2.start();
11  }
12
13  public static void transfer(Account from, Account to, int amount) {
14    System.out.println(
15      Thread.currentThread().getName() +
16      ": trying to acquired lock of " +
17      from.getId()
18    );
19    synchronized (from) {
20      System.out.println(
21        Thread.currentThread().getName() + ": lock acquired on " + from.getId()
22      );
23      System.out.println(
24        Thread.currentThread().getName() +
25        ": trying to acquired lock of " +
26        to.getId()
27      );
28      synchronized (to) {
29        System.out.println(
30          Thread.currentThread().getName() + ": lock acquired on " + to.getId()
31        );
32        from.debit(amount);
33        to.credit(amount);
34      }
35    }
36    System.out.println("Transfer completed");
37  }
38}
39
40class Account {
41  private final String id;
42  private int balance;
43
44  public Account(String id, int balance) {
45    this.id = id;
46    this.balance = balance;
47  }
48
49  public void debit(int amount) {
50    balance -= amount;
51  }
52
53  public void credit(int amount) {
54    balance += amount;
55  }
56
57  public String getId() {
58    return id;
59  }
60
61  public int getBalance() {
62    return balance;
63  }
64}
Fixed
 1public class Deadlock {
 2
 3  public static void main(String[] args) {
 4    Account a = new Account("A", 1000);
 5    Account b = new Account("B", 1000);
 6
 7    Thread t1 = new Thread(() -> transfer(a, b, 200));
 8    Thread t2 = new Thread(() -> transfer(b, a, 100));
 9
10    t1.start();
11    t2.start();
12  }
13
14  public static void transfer(Account from, Account to, int amount) {
15    Account first = from;
16    Account second = to;
17
18    if (from.getId().compareTo(to.getId()) >= 0) {
19      first = to;
20      second = from;
21    }
22
23    System.out.println(
24      Thread.currentThread().getName() +
25      ": trying to acquired lock of " +
26      first.getId()
27    );
28    synchronized (first) {
29      System.out.println(
30        Thread.currentThread().getName() + ": lock acquired on " + first.getId()
31      );
32      System.out.println(
33        Thread.currentThread().getName() +
34        ": trying to acquired lock of " +
35        to.getId()
36      );
37      synchronized (second) {
38        System.out.println(
39          Thread.currentThread().getName() +
40          ": lock acquired on " +
41          second.getId()
42        );
43        from.debit(amount);
44        to.credit(amount);
45      }
46    }
47    System.out.println("Transfer completed");
48  }
49}
50
51class Account {
52  private final String id;
53  private int balance;
54
55  public Account(String id, int balance) {
56    this.id = id;
57    this.balance = balance;
58  }
59
60  public void debit(int amount) {
61    balance -= amount;
62  }
63
64  public void credit(int amount) {
65    balance += amount;
66  }
67
68  public String getId() {
69    return id;
70  }
71
72  public int getBalance() {
73    return balance;
74  }
75}

Livelock#

  • A thread often acts in response to the action of another thread. If the other thread’s action is also a response to the action of another thread, then livelock may result.

  • As with deadlock, livelocked threads are unable to make further progress. However, the threads are not blocked they are simply too busy responding to each other to resume work.

https://docs.oracle.com/javase/tutorial/essential/concurrency/newlocks.html

BlockingQueue#

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

Throws exception

Special value

Blocks

Times out

Insert

add(e)

offer(e)

put(e)

offer(e, time, unit)

Remove

remove()

poll()

take()

poll(time, unit)

Examine

element()

peek()

not applicable

not applicable

  • Does not accept null elements

Producer Consumer#

 1public class BlockingQueueUsage {
 2
 3  public static void main(String[] args) {
 4    BlockingQueue<Integer> q = new ArrayBlockingQueue<>(5);
 5
 6    new Thread(new Producer(q)).start();
 7    new Thread(new Consumer(q)).start();
 8    new Thread(new Consumer(q)).start();
 9  }
10}
11
12class Producer implements Runnable {
13
14  private final BlockingQueue<Integer> q;
15
16  Producer(BlockingQueue<Integer> q) {
17    this.q = q;
18  }
19
20  @Override
21  public void run() {
22    try {
23      while (true) {
24        int value = ThreadLocalRandom.current().nextInt();
25        System.out.println("Produced value: " + value);
26        Thread.sleep(1000);
27        q.put(value);
28      }
29    } catch (InterruptedException ignore) {}
30  }
31}
32
33class Consumer implements Runnable {
34
35  private final BlockingQueue<Integer> q;
36
37  Consumer(BlockingQueue<Integer> q) {
38    this.q = q;
39  }
40
41  @Override
42  public void run() {
43    try {
44      while (true) {
45        int value = q.take();
46        System.out.println(
47          Thread.currentThread().getName() + " Consumed value: " + value
48        );
49      }
50    } catch (InterruptedException ignore) {}
51  }
52}

ThreadLocal#

  • This class provides thread-local variables, which are distinct from regular variables in that each thread accessing one via its get or set method has a separate, independently initialized copy.

  • ThreadLocal instances are commonly private static fields in classes that aim to associate state with a thread, such as a user ID or transaction ID.

  • Each thread maintains an implicit reference to its thread-local variable copy as long as the thread is alive and the ThreadLocal instance is accessible. Once the thread ceases, all its thread-local variable copies become eligible for garbage collection, unless there are other references to these copies.

Use cases:

  • Thread confinement

  • Per thread object for performance

  • Per thread context

 1public class ThreadLocalUsage {
 2
 3  public static void main(String[] args) throws InterruptedException {
 4    ExecutorService executor = Executors.newFixedThreadPool(4);
 5
 6    IntStream
 7      .range(1, 10)
 8      .forEach(i ->
 9        executor.execute(() -> {
10          System.out.println(
11            Thread.currentThread().getName() + " Id: " + ThreadId.get()
12          );
13        })
14      );
15
16    executor.shutdown();
17    executor.awaitTermination(5, TimeUnit.SECONDS);
18  }
19}
20
21class ThreadId {
22
23  private static final AtomicInteger id = new AtomicInteger(0);
24  private static final ThreadLocal<Integer> threadId = ThreadLocal.withInitial(() ->
25    id.incrementAndGet()
26  );
27
28  public static int get() {
29    return threadId.get();
30  }
31}

Tips:

  • Cleanup once not required anymore

  • Delegate to frameworks e.g., Spring context holders