Lessons in Multihreaded Programming: Don’t Assume Anything will Run in a Timely Manner

I recently ran into an issue, where I had a loop that was to be run concurrently in multiple threads, that looked something like this:

loop:
    main_stuff()
    only the last thread to get here in every wakeup cycle should perform this:
        additional_stuff()
    wait()

In other words, the goal was to have every thread run main_stuff(), but have additional_stuff() run after every thread has had a chance to run main_stuff().

The first solution I came up with was this (pseudocode):

AtomicInt totalThreads;
AtomicInt waitingThreads;
Object notifier;
try:
    totalThreads.increment()
    loop:
        main_stuff()
        if waitingThreads.incrementAndGet() == totalThreads.get()
            additional_stuff()
        synchronized (notifier) {
            notifier.wait()
        }
        waitingThreads.decrement()
finally:
    totalThreads.decrement()

At first glance, this would seem to work. Some testing indicated that it did. However, if an OS has exceptionally poor scheduling, or only has one CPU core to work with, then what would happen is:

  1. Let’s say two threads are at the wait() part
  2. They both get the signal to proceed
  3. One thread gets all the way back to the check before additional_stuff() before the other thread gets to the decrement()
  4. Because the number of waiting threads is still equal to the total number of threads in the loop, it runs additional_stuff() when it shouldn’t have.

In this particular case, additional_stuff() running an additional time wasn’t harmful to the operation of the program, merely a performance issue. But the issue didn’t even manifest until testing it on multiple different platforms.

The fully-working solution that I came up with after seeing the problem was this:

AtomicInt totalThreads;
AtomicInt waitingThreads;
AtomicInt counter;
Object notifier;
try:
    int lastCounter = counter.get();
    totalThreads.increment()
    loop: 
        main_stuff()
        if waitingThreads.incrementAndGet() == totalThreads.get()
            additional_stuff()
        synchronized (notifier) {
            while counter.get() == lastCounter():
                notifier.wait()
        }
finally:
    totalThreads.decrement()

void notify():
    synchronized (notifier) {
        notifier.notifyAll()
        counter.set(0)
    }

Notice this version correctly handles this, by having the notification operation reset the variable, then waiting for every thread to catch back up. It also accounts for multiple notifications happening while one or more threads are still processing main_stuff().

Leave a Reply