Concurrency#
Todo:#
[ ] lost update
[ ] happens before
Race Condition#
https://web.mit.edu/6.031/www/sp21/classes/20-concurrency/#race_condition
A race condition means that the correctness of the program depends on the relative timing of events in concurrent computations A and B. When this happens we say “A is in race with B.”
check-then-act: a potentially stale observation is used to decide what to do next.
read-modify-write: new state is derived from previous state (may be stale). e.g.,
count++
.
Synchronizers#
CountDownLatch
#
A synchronization aid that allows one or more threads (by calling await()
) to wait until a set of operations being performed in other threads completes.
Initialized with a given count.
The
await
method blocks until the current count is reached to zero due to invocations of thecountDown()
.If count is already zero invocation to
await
returns immediately.The count cannot be reset.
Threads calling
countDown
need not wait for the count.
Use cases:
A CountDownLatch with a count of one serves as a simple on/off latch, or gate: all thread invoking
await
wait at the gate until it is opened by a thread invokingcountDown()
.A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
class Driver {
private static final int N = 10;
public static void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; i++>) {
new Thread(new Workder(startSignal, doneSignal)).start();
}
doSomeWorkBeforeWorker();
startSignal.countDown();
doSomeWork();
doneSignal.await();
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException e) {
System.out.println("I got interrupted :-()");
return;
}
}
void doWork() {
System.out.println("Doing work...");
}
}
Semaphore#
A counting semaphore maintains a set of permits that can be acquired and released. If a permit is unavailable, the acquire operation will block the thread.”
Liveness#
Ability to make progress.
Deadlock
Livelock
Starvation
Deadlock#
Deadlock is a situation where two or more threads are blocked forever, waiting for each other.
Broken
1public class Deadlock {
2 public static void main(String[] args) {
3 Account a = new Account("A", 1000);
4 Account b = new Account("B", 1000);
5
6 Thread t1 = new Thread(() -> transfer(a, b, 200));
7 Thread t2 = new Thread(() -> transfer(b, a, 100));
8
9 t1.start();
10 t2.start();
11 }
12
13 public static void transfer(Account from, Account to, int amount) {
14 System.out.println(
15 Thread.currentThread().getName() +
16 ": trying to acquired lock of " +
17 from.getId()
18 );
19 synchronized (from) {
20 System.out.println(
21 Thread.currentThread().getName() + ": lock acquired on " + from.getId()
22 );
23 System.out.println(
24 Thread.currentThread().getName() +
25 ": trying to acquired lock of " +
26 to.getId()
27 );
28 synchronized (to) {
29 System.out.println(
30 Thread.currentThread().getName() + ": lock acquired on " + to.getId()
31 );
32 from.debit(amount);
33 to.credit(amount);
34 }
35 }
36 System.out.println("Transfer completed");
37 }
38}
39
40class Account {
41 private final String id;
42 private int balance;
43
44 public Account(String id, int balance) {
45 this.id = id;
46 this.balance = balance;
47 }
48
49 public void debit(int amount) {
50 balance -= amount;
51 }
52
53 public void credit(int amount) {
54 balance += amount;
55 }
56
57 public String getId() {
58 return id;
59 }
60
61 public int getBalance() {
62 return balance;
63 }
64}
Fixed
1public class Deadlock {
2
3 public static void main(String[] args) {
4 Account a = new Account("A", 1000);
5 Account b = new Account("B", 1000);
6
7 Thread t1 = new Thread(() -> transfer(a, b, 200));
8 Thread t2 = new Thread(() -> transfer(b, a, 100));
9
10 t1.start();
11 t2.start();
12 }
13
14 public static void transfer(Account from, Account to, int amount) {
15 Account first = from;
16 Account second = to;
17
18 if (from.getId().compareTo(to.getId()) >= 0) {
19 first = to;
20 second = from;
21 }
22
23 System.out.println(
24 Thread.currentThread().getName() +
25 ": trying to acquired lock of " +
26 first.getId()
27 );
28 synchronized (first) {
29 System.out.println(
30 Thread.currentThread().getName() + ": lock acquired on " + first.getId()
31 );
32 System.out.println(
33 Thread.currentThread().getName() +
34 ": trying to acquired lock of " +
35 to.getId()
36 );
37 synchronized (second) {
38 System.out.println(
39 Thread.currentThread().getName() +
40 ": lock acquired on " +
41 second.getId()
42 );
43 from.debit(amount);
44 to.credit(amount);
45 }
46 }
47 System.out.println("Transfer completed");
48 }
49}
50
51class Account {
52 private final String id;
53 private int balance;
54
55 public Account(String id, int balance) {
56 this.id = id;
57 this.balance = balance;
58 }
59
60 public void debit(int amount) {
61 balance -= amount;
62 }
63
64 public void credit(int amount) {
65 balance += amount;
66 }
67
68 public String getId() {
69 return id;
70 }
71
72 public int getBalance() {
73 return balance;
74 }
75}
Livelock#
A thread often acts in response to the action of another thread. If the other thread’s action is also a response to the action of another thread, then livelock may result.
As with deadlock, livelocked threads are unable to make further progress. However, the threads are not blocked they are simply too busy responding to each other to resume work.
https://docs.oracle.com/javase/tutorial/essential/concurrency/newlocks.html
BlockingQueue#
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
Throws exception |
Special value |
Blocks |
Times out |
|
---|---|---|---|---|
Insert |
|
|
|
|
Remove |
|
|
|
|
Examine |
|
|
not applicable |
not applicable |
Does not accept
null
elements
Producer Consumer#
1public class BlockingQueueUsage {
2
3 public static void main(String[] args) {
4 BlockingQueue<Integer> q = new ArrayBlockingQueue<>(5);
5
6 new Thread(new Producer(q)).start();
7 new Thread(new Consumer(q)).start();
8 new Thread(new Consumer(q)).start();
9 }
10}
11
12class Producer implements Runnable {
13
14 private final BlockingQueue<Integer> q;
15
16 Producer(BlockingQueue<Integer> q) {
17 this.q = q;
18 }
19
20 @Override
21 public void run() {
22 try {
23 while (true) {
24 int value = ThreadLocalRandom.current().nextInt();
25 System.out.println("Produced value: " + value);
26 Thread.sleep(1000);
27 q.put(value);
28 }
29 } catch (InterruptedException ignore) {}
30 }
31}
32
33class Consumer implements Runnable {
34
35 private final BlockingQueue<Integer> q;
36
37 Consumer(BlockingQueue<Integer> q) {
38 this.q = q;
39 }
40
41 @Override
42 public void run() {
43 try {
44 while (true) {
45 int value = q.take();
46 System.out.println(
47 Thread.currentThread().getName() + " Consumed value: " + value
48 );
49 }
50 } catch (InterruptedException ignore) {}
51 }
52}
ThreadLocal#
This class provides thread-local variables, which are distinct from regular variables in that each thread accessing one via its
get
orset
method has a separate, independently initialized copy.ThreadLocal
instances are commonlyprivate static
fields in classes that aim to associate state with a thread, such as a user ID or transaction ID.Each thread maintains an implicit reference to its thread-local variable copy as long as the thread is alive and the
ThreadLocal
instance is accessible. Once the thread ceases, all its thread-local variable copies become eligible for garbage collection, unless there are other references to these copies.
Use cases:
Thread confinement
Per thread object for performance
Per thread context
1public class ThreadLocalUsage {
2
3 public static void main(String[] args) throws InterruptedException {
4 ExecutorService executor = Executors.newFixedThreadPool(4);
5
6 IntStream
7 .range(1, 10)
8 .forEach(i ->
9 executor.execute(() -> {
10 System.out.println(
11 Thread.currentThread().getName() + " Id: " + ThreadId.get()
12 );
13 })
14 );
15
16 executor.shutdown();
17 executor.awaitTermination(5, TimeUnit.SECONDS);
18 }
19}
20
21class ThreadId {
22
23 private static final AtomicInteger id = new AtomicInteger(0);
24 private static final ThreadLocal<Integer> threadId = ThreadLocal.withInitial(() ->
25 id.incrementAndGet()
26 );
27
28 public static int get() {
29 return threadId.get();
30 }
31}
Tips:
Cleanup once not required anymore
Delegate to frameworks e.g., Spring context holders