A Simple Thread Barrier Implementation

Sometimes a group of concurrently running threads may need to rendezvous at a certain point in time before they can further proceed. This situation commonly arises in areas like event simulation, where the events are synchronized via a clock event (see illustration below):

Event Barrier

Event Barrier

In the above example, a number of threads start at different times with the arrival of each clock event and wait till all threads finish running before the next clock event arrives.

Such barrier can be implemented using various techniques with different level of difficulties. One possible implementation can be found in Joseph Albahari’s Threading in C# where a monitor construct is used to wait on a synchronization object in the main thread after the worker threads are queued.

    lock (workerLocker) 
    {
      while (runningWorkers > 0) 
        Monitor.Wait (workerLocker);
    }

In each of the worker thread, the number of running workers is decremented when the thread finishes:

    lock (workerLocker) 
    {
      runningWorkers--; 
      Monitor.Pulse (workerLocker);
    }

And when the running number of workers reaches zero, the monitor exist the wait condition and the main thread continues.

The wait on the locker object forms a barrier in the example above. The method illustrated here is simple and elegant. It can be used to create the barriers needed to synchronize the clock event mentioned earlier.

Threadpooling might not always be the optimal solution though. While the number of threads in a thread pool is typically bounded to a few dozens, it may incur excessive context switching penalties if the majority of the threads are CPU bound. In a computation intensive situation, a fixed number of threads approximating the number of computational cores would be more appropriate (for instance, four threads on a four core CPU).

Here I will show another relatively simple way to construct a barrier where only a predetermined maximum number of threads can reach at a given time. To achieve such a ceiling, a global semaphore is used (in this example three concurrent threads can enter the semaphore):

public class Global
{
    public static Semaphore sp = new Semaphore(3, 3);
}

To simplify coding and shield the programmer from the synchronization mechanism details, I created an IEvent interface and an abstract EventBase as follows:

interface IEvent
{
    void OnEvent(int arg);
}
public abstract class EventBase : IEvent
{
    #region IEvent Members
    public void OnEvent(int arg)
    {
        Global.sp.WaitOne();
        Run(arg);
        Global.sp.Release();
    }
    #endregion
    
    public virtual void Run(int arg)
    {
    }
}

Users can simply implement their code that needs to block at the barrier by inheriting from EventBase and overridding the Run() method:

public class Event1 : EventBase
{
    public override void Run(int arg)
    {
        Console.WriteLine("Thread {0}, arg {1}", Thread.CurrentThread.ManagedThreadId, arg);
        Thread.Sleep(1000);
    }
}

To illustrate how this barrier implementation can be used considering the following code:

class Program
{
    delegate void MyEvent(int arg);

    static void Main(string[] args)
    {
        IEvent e = new Event1();
        MyEvent m = new MyEvent(e.OnEvent);

        List<IAsyncResult> l = new List<IAsyncResult>();

        for (int i = 0 ; i < 6 ; i++) 
        {
          IAsyncResult r = m.BeginInvoke(i, null, null);
          l.Add(r);
        }

        foreach (IAsyncResult r in l)
        {
            m.EndInvoke(r);
        }
    }
}

In the code above, we simulate six concurrent threads that need to be synchronized. The multi-threading is achieved via asynchronous invocation. Because the capacity of the semaphore is set to be 3 in this example and if this code is run on a quad-core machine, you will see that the six asynchronous threads are run in two batches, three at a time. And all the threads are synchronized at the end. Here is the sample output:

Thread 10, arg 1
Thread 11, arg 2
Thread 6, arg 0
Thread 13, arg 4
Thread 10, arg 5
Thread 12, arg 3

Using this approach, the number of concurrently running threads can be precisely controlled by the maximum threads allowed in the semaphore.

Be Sociable, Share!

One Comment

Leave a Reply