Thursday, September 27, 2012

Multi-Threading with ThreadPool and a Thread Manager

The past few weeks have been spent trying to make some tasks in a winforms app multi-threaded. Threaded applications can get tricky really fast. I needed to create a thread manager that helped me queue and create new threads via the ThreadPool. Here is the solution I came up with. I by no means think is the best or only solution. It in fact has one constraint with it I will call out at the end. It is an approach to doing threading in .Net applications though.

I needed a class that would do the following:

- Calculate how many threads to spin up

- Calculate how much work to give to each thread

- Queue new threads with the ThreadPool

- Determine if WaitHandles needed to be created for the new threads

- Pass the WaitHandles to the new threads so they can signal when complete

- Return a collection of created WaitHandles back so future process could wait on all threads if needed

First I created a ThreadState object to hold the information I needed to pass to each thread.

   1:  public class ThreadState
   2:  {
   3:       public int UnitId { get; set; }
   4:       public int PageSize { get; set; }
   5:       public int PageNumber { get; set; }
   6:       public WaitHandle WaitHandle { get; set; }
   7:  }

Now the ThreadManager class:

   1:  public WaitHandle[] QueueThreads(double totalCount, ThreadTypes threadType, int UnitID, WaitHandle[] threadsToWaitfor)
   2:          {
   3:              int MaxThreadCount = int.Parse(System.Configuration.ConfigurationManager.AppSettings["MAX_THREADS"]);
   4:              int pageSize = (int)Math.Round(totalCount / MaxThreadCount, 0);
   5:              List<WaitHandle> createdHandles = new List<WaitHandle>();
   7:              //Set the max threads the computer should prep form. This means all additional requests after this number
   8:              // will be queued and wait for previous threads to finish. 
   9:              ThreadPool.SetMaxThreads(MaxThreadCount, MaxThreadCount);
  11:              // This sets the number of idles threads the process should keep ready for additional requests. 
  12:              ThreadPool.SetMinThreads(5,5);
  14:              if (threadsToWaitfor != null && threadsToWaitfor.Count() > 0)
  15:                  WaitHandle.WaitAll(threadsToWaitfor);
  18:              for (int i = 1; i <= MaxThreadCount; i++)
  19:              {
  20:                  // add additional buffer records for final page
  21:                  if (i == MaxThreadCount)
  22:                      pageSize += 10;
  24:                  ThreadState state = new ThreadState()
  25:                  {
  26:                      OrgUnitId = orgUnitID,
  27:                      PageNumber = i,
  28:                      PageSize = pageSize
  29:                  };
  31:                  if (createWaitHandles)
  32:                  {
  33:                      // Each thread should only have 1 handle linked to it that it is responsible for calling single on when it is done.
  34:                      var threadHandle = new AutoResetEvent(false);
  35:                      state.WaitHandle = threadHandle;
  36:                      createdHandles.Add(threadHandle);
  37:                  }
  39:                  switch (threadType)
  40:                  {
  41:                      case ThreadTypes:
  42:                          ThreadPool.QueueUserWorkItem(new ApplicationConverter().ConvertApplications, state);
  43:                          break;
  45:                      default:
  46:                          break;
  47:                  }
  50:              }
  52:              // return an array list for all handles created. This allows other threads to be linked as dependant on these. 
  53:              return createdHandles.ToArray();
  54:          }

Here is the ConvertApplications method:

   1:  public void ConvertApplications(Object state)
   2:  {
   3:     ThreadState tState = (ThreadState)state;
   4:     _pageSize = tState.PageSize;
   5:     _pageNumber = tState.PageNumber;
   6:     _orgUnitId = tState.OrgUnitId;
   7:     DoTheWork();
   9:    // If we have a waithandle it means we need to single when this process has completed
  10:    if (tState.WaitHandle != null)
  11:    {
  12:        ((System.Threading.AutoResetEvent)tState.WaitHandle).Set();
  13:    }
  14:  }

The idea with the ThreadManager class is you can pass in some basic information about the work you want to do and it will create the threads it needs. My work was pulling data from a database with an ORM tool that could handle paging. So the ThreadManager looks at the total count of records to process passed in against the max thread count the app is configured for and calculates how much work each thread needs to do. At line 14 it can stop and wait for other outstanding threads to signal they are done before creating and queuing additional threads.

Since creating AutoResetEvents is heavy on the system there is a flag you can pass in saying rather or not you need threads to single when they are done. This collection of AutoResetEvents is then stored in a collection and returned. This in turn allows your main thread to wait if needed.

This approach works really well, but there is one flaw in it. If you don’t need to spin up and keep track of more then 64 threads this will work fine. If you make the call “WaitHandle.WaitAll()” and pass in an area of WaitHandles that is greater than 64 you will get an error. 64 is the upper cap that WaitHandle.WaitAll can monitor. The reason for this is to try and limit the overhead created for all these WaitHandles. For some situations this may be just the manager you need (or at least a good start on it). For other situation not so much.

No comments: