IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)

Thinking in C++ - Volume 2

Thinking in C++ - Volume 2

Date de publication : 25/01/2007 , Date de mise à jour : 25/01/2007


3.4. Concurrency
3.4.1. Motivation
3.4.2. Concurrency in C++
3.4.2.1. Installing ZThreads
3.4.3. Defining Tasks
3.4.4. Using Threads
3.4.4.1. Creating responsive user interfaces
3.4.4.2. Simplifying with Executors
3.4.4.3. Yielding
3.4.4.4. Sleeping
3.4.4.5. Priority
3.4.5. Sharing limited resources
3.4.5.1. Ensuring the existence of objects
3.4.5.2. Improperly accessing resources
3.4.5.3. Controlling access
3.4.5.4. Simplified coding with Guards
3.4.5.5. Thread local storage
3.4.6. Terminating tasks
3.4.6.1. Preventing iostream collision
3.4.6.2. The ornamental garden
3.4.6.3. Terminating when blocked
3.4.6.4. Interruption
3.4.7. Cooperation between threads
3.4.7.1. Wait and signal
3.4.7.2. Producer–consumer relationships
3.4.7.3. Solving threading problems with queues
3.4.7.4. Broadcast
3.4.8. Deadlock
3.4.9. Summary
3.4.10. Exercises


3.4. Concurrency

Objects provide a way to divide a program into independent sections. Often, you also need to partition a program into separate, independently running subtasks.

Using multithreading, each of these independent subtasks is driven by a thread of execution, and you program as if each thread has the CPU to itself. An underlying mechanism is dividing up the CPU time for you, but in general, you don't need to think about it, which helps to simplify programming with multiple threads.

A process is a self-contained program running within its own address space. A multitasking operating system can run more than one process (program) at a time, while making it look as if each one is chugging along on its own, by periodically switching the CPU from one task to another. A thread is a single sequential flow of control within a process. A single process can thus have multiple concurrently executing threads. Since the threads run within a single process, they share memory and other resources. The fundamental difficulty in writing multithreaded programs is coordinating the use of those resources between different threads.

There are many possible uses for multithreading, but you'll most often want to use it when you have some part of your program tied to a particular event or resource. To keep from holding up the rest of your program, you create a thread associated with that event or resource and let it run independently of the main program.

Concurrent programming is like stepping into an entirely new world and learning a new programming language, or at least a new set of language concepts. With the appearance of thread support in most microcomputer operating systems, extensions for threads have also been appearing in programming languages or libraries. In all cases, thread programming:

1.  Seems mysterious and requires a shift in the way you think about programming.

2.  Looks similar to thread support in other languages. When you understand threads, you understand a common tongue.

Understanding concurrent programming is on the same order of difficulty as understanding polymorphism. If you apply some effort, you can fathom the basic mechanism, but it generally takes deep study and understanding to develop a true grasp of the subject. The goal of this chapter is to give you a solid foundation in the basics of concurrency so that you can understand the concepts and write reasonable multithreaded programs. Be aware that you can easily become overconfident. If you are writing anything complex, you will need to study dedicated books on the topic.


3.4.1. Motivation

One of the most compelling reasons for using concurrency is to produce a responsive user interface. Consider a program that performs some CPU-intensive operation and thus ends up ignoring user input and being unresponsive. The program needs to continue performing its operations, and at the same time it needs to return control to the user interface so that the program can respond to the user. If you have a “quit” button, you don't want to be forced to poll it in every piece of code you write in your program. (This would couple your quit button across the program and be a maintenance headache.) Yet you want the quit button to be responsive, as if you were checking it regularly.

A conventional function cannot continue performing its operations and at the same time return control to the rest of the program. In fact, this sounds like an impossibility, as if the CPU must be in two places at once, but this is precisely the illusion that concurrency provides (in the case of multiprocessor systems, this may be more than an illusion).

You can also use concurrency to optimize throughput. For example, you might be able to do important work while you're stuck waiting for input to arrive on an I/O port. Without threading, the only reasonable solution is to poll the I/O port, which is awkward and can be difficult.

If you have a multiprocessor machine, multiple threads can be distributed across multiple processors, which can dramatically improve throughput. This is often the case with powerful multiprocessor web servers, which can distribute large numbers of user requests across CPUs in a program that allocates one thread per request.

A program that uses threads on a single-CPU machine is still just doing one thing at a time, so it must be theoretically possible to write the same program without using any threads. However, multithreading provides an important organizational benefit: The design of your program can be greatly simplified. Some types of problems, such as simulation—a video game, for example—are difficult to solve without support for concurrency.

The threading model is a programming convenience to simplify juggling several operations at the same time within a single program: The CPU will pop around and give each thread some of its time.(148) Each thread has the consciousness of constantly having the CPU to itself, but the CPU's time is actually sliced among all the threads. The exception is a program that is running on multiple CPU. But one of the great things about threading is that you are abstracted away from this layer, so your code does not need to know whether it is running on a single CPU or many.(149) Thus, using threads is a way to create transparently scalable programs—if a program is running too slowly, you can easily speed it up by adding CPUs to your computer. Multitasking and multithreading tend to be the most reasonable ways to utilize multiprocessor systems.

Threading can reduce computing efficiency somewhat, but the net improvement in program design, resource balancing, and user convenience is often quite valuable. In general, threads enable you to create a more loosely coupled design; otherwise, parts of your code would be forced to pay explicit attention to tasks that would normally be handled by threads.


3.4.2. Concurrency in C++

When the C++ Standards Committee was creating the initial C++ Standard, a concurrency mechanism was explicitly excluded because C didn't have one and also because there were a number of competing approaches to implementing concurrency. It seemed too much of a constraint to force programmers to use only one of these.

The alternative turned out to be worse, however. To use concurrency, you had to find and learn a library and deal with its idiosyncrasies and the uncertainties of working with a particular vendor. In addition, there was no guarantee that such a library would work on different compilers or across different platforms. Also, since concurrency was not part of the standard language, it was more difficult to find C++ programmers who also understood concurrent programming.

Another influence may have been the Java language, which included concurrency in the core language. Although multithreading is still complicated, Java programmers tend to start learning and using it from the beginning.

The C++ Standards Committee is considering the addition of concurrency support to the next iteration of C++, but at the time of this writing it is unclear what the library will look like. We decided to use the ZThread library as the basis for this chapter. We preferred the design, and it is open-source and freely available at http://zthread.sourceforge.net. Eric Crahen of IBM, the author of the ZThread library, was instrumental in creating this chapter.(150)

This chapter uses only a subset of the ZThread library, in order to convey the fundamental ideas of threading. The ZThread library contains significantly more sophisticated thread support than is shown here, and you should study that library further in order to fully understand its capabilities.


3.4.2.1. Installing ZThreads

Please note that the ZThread library is an independent project and is not supported by the authors of this book; we are simply using the library in this chapter and cannot provide technical support for installation issues. See the ZThread web site for installation support and error reports.

The ZThread library is distributed as source code. After downloading it (version 2.3 or greater) from the ZThread web site, you must first compile the library, and then configure your project to use the library.

The preferred method for compiling ZThreads for most flavors of UNIX (Linux, SunOS, Cygwin, etc.) is to use the configure script. After unpacking the files (using tar),simply execute:


./configure && make
install
from the main directory of the ZThreads archive to compile and install a copy of the library in the /usr/local directory. You can customize a number of options when using this script, including the locations of files. For details, use this command:


./configure –help
The ZThreads code is structured to simplify compilation for other platforms and compilers (such as Borland, Microsoft, and Metrowerks). To do this, create a new project and add all the .cxx files in the src directory of the ZThreads archive to the list of files to be compiled. Also, be sure to include the include directory of the archive in the header search path for your project. The exact details will vary from compiler to compiler so you'll need to be somewhat familiar with your toolset to be able to use this option.

Once the compilation has succeeded, the next step is to create a project that uses the newly compiled library. First, let the compiler know where the headers are located so that your #include statements will work properly. Typically, you will add an option such as the following to your project:


-I/path/to/installation/include
If you used the configure script, the installation path will be whatever you selected for the prefix (which defaults to /usr/local). If you used one of the project files in the build directory, the installation path would simply be the path to the main directory of the ZThreads archive.

Next, you'll need to add an option to your project that will let the linker know where to find the library. If you used the configure script, this will look like:


-L/path/to/installation/lib
–lZThread
If you used one of the project files provided, this will look like:


-L/path/to/installation/Debug
ZThread.lib
Again, if you used the configure script, the installation path will be whatever you selected for the prefix. If you used a provided project file, the path will be the path to the main directory of the ZThreads archive.

Note that if you're using Linux, or if you are using Cygwin (www.cygwin.com) under Windows, you may not need to modify your include or library path; the installation process and defaults will often take care of everything for you.

Under Linux, you will probably need to add the following to your .bashrc so that the runtime system can find the shared library file LibZThread-x.x.so.O when it executes the programs in this chapter:

export LD_LIBRARY_PATH=/usr/local/lib:${LD_LIBRARY_PATH}

(Assuming you used the default installation process and the shared library ended up in /user/local/lib; otherwise, change the path to your location).


3.4.3. Defining Tasks

A thread carries out a task, so you need a way to describe that task. The Runnable class provides a common interface to execute any arbitrary task. Here is the core of the ZThread Runnable class, which you will find in Runnable.h in the include directory, after installing the ZThread library:


class Runnable {
public:
  virtual void run() = 0;
  virtual ~Runnable() {}
};
By making this an abstract base class, Runnable is easily combinable with a base class and other classes.

To define a task, simply inherit from the Runnable class and override run( ) to make the task do your bidding.

For example, the following LiftOff task displays the countdown before liftoff:


//: C11:LiftOff.h
// Demonstration of the Runnable interface.
#ifndef LIFTOFF_H
#define LIFTOFF_H
#include <iostream>
#include "zthread/Runnable.h"

class LiftOff : public ZThread::Runnable {
  int countDown;
  int id;
public:
  LiftOff(int count, int ident = 0) :
    countDown(count), id(ident) {}
  ~LiftOff() {
    std::cout << id << "
completed" << std::endl;
  }
  void run() {
    while(countDown--)
      std::cout << id << ":"
<< countDown << std::endl;
    std::cout << "Liftoff!" <<
std::endl;
  }
};
#endif // LIFTOFF_H ///:~
The identifier id distinguishes between multiple instances of the task. If you only make a single instance, you can use the default value for ident. The destructor will allow you to see that a task is properly destroyed.

In the following example, the task's run( ) is not driven by a separate thread; it is simply called directly in main( ):


//: C11:NoThread.cpp
#include "LiftOff.h"

int main() {
  LiftOff launch(10);
  launch.run();
} ///:~
When a class is derived from Runnable, it must have a run( ) function, but that's nothing special—it doesn't produce any innate threading abilities.

To achieve threading behavior, you must use the Thread class.


3.4.4. Using Threads

To drive a Runnable object with a thread, you create a separate Thread object and hand a Runnable pointer to the Thread'sconstructor. This performs the thread initialization and then calls the Runnable's run( ) as an interruptible thread. By driving LiftOff with a Thread, the example below shows how any task can be run in the context of another thread:


//: C11:BasicThreads.cpp
// The most basic use of the Thread class.
//{L} ZThread
#include <iostream>
#include "LiftOff.h"
#include "zthread/Thread.h"
using namespace ZThread;
using namespace std;

int main() {
  try {
    Thread t(new LiftOff(10));
    cout << "Waiting for LiftOff"
<< endl;
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
Synchronization_Exception is part of the ZThread library and is the base class for all ZThread exceptions. It will be thrown if there is an error starting or using a thread.

A Thread constructor only needs a pointer to a Runnable object. Creating a Thread object will perform the necessary initialization for the thread and then call that Runnable's run( ) member function to start the task. Even though the Thread constructor is, in effect, making a call to a long-running function, that constructor quickly returns. In effect, you have made a member function call to LiftOff::run( ), and that function has not yet finished, but because LiftOff::run( ) is being executed by a different thread, you can still perform other operations in the main( ) thread. (This ability is not restricted to the main( ) thread—any thread can start another thread.) You can see this by running the program. Even though LiftOff::run( ) has been called, the “Waiting for LiftOff” message will appear before the countdown has completed. Thus, the program is running two functions at once—LiftOff::run( ) and main( ).

You can easily add more threads to drive more tasks. Here, you can see how all the threads run in concert with one another:


//: C11:MoreBasicThreads.cpp
// Adding more threads.
//{L} ZThread
#include <iostream>
#include "LiftOff.h"
#include "zthread/Thread.h"
using namespace ZThread;
using namespace std;

int main() {
  const int SZ = 5;
  try {
    for(int i = 0; i < SZ; i++)
      Thread t(new LiftOff(10, i));
    cout << "Waiting for LiftOff"
<< endl;
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
The second argument for the LiftOff constructor identifies each task. When you run the program, you'll see that the execution of the different tasks is mixed together as the threads are swapped in and out. This swapping is automatically controlled by the thread scheduler. If you have multiple processors on your machine, the thread scheduler will quietly distribute the threads among the processors.

The for loop can seem a little strange at first because t is being created locally inside the for loop and then immediately goes out of scope and is destroyed. This makes it appear that the thread itself might be immediately lost, but you can see from the output that the threads are indeed running to conclusion. When you create a Thread object, the associated thread is registered with the threading system, which keeps it alive. Even though the stack-based Thread object is lost, the thread itself lives on until its associated task completes. Although this may be counterintuitive from a C++ standpoint, the concept of threads is a departure from the norm: a thread creates a separate thread of execution that persists after the function call ends. This departure is reflected in the persistence of the underlying thread after the object vanishes.


3.4.4.1. Creating responsive user interfaces

As stated earlier, one of the motivations for using threading is to create a responsive user interface. Although we don't cover graphical user interfaces in this book, you can still see a simple example of a console-based user interface.

The following example reads lines from a file and prints them to the console, sleeping (suspending the current thread) for a second after each line is displayed. (You'll learn more about sleeping later in the chapter.) During this process, the program doesn't look for user input, so the UI is unresponsive:


//: C11:UnresponsiveUI.cpp {RunByHand}
// Lack of threading produces an unresponsive UI.
//{L} ZThread
#include <iostream>
#include <fstream>
#include <string>
#include "zthread/Thread.h"
using namespace std;
using namespace ZThread;

int main() {
  cout << "Press <Enter> to
quit:" << endl;
  ifstream file("UnresponsiveUI.cpp");
  string line;
  while(getline(file, line)) {
    cout << line << endl;
    Thread::sleep(1000); // Time in milliseconds
  }
  // Read input from the console
  cin.get();
  cout << "Shutting down..." <<
endl;
} ///:~
To make the program responsive, you can execute a task that displays the file in a separate thread. The main thread can then watch for user input so the program becomes responsive:


//: C11:ResponsiveUI.cpp {RunByHand}
// Threading for a responsive user interface.
//{L} ZThread
#include <iostream>
#include <fstream>
#include <string>
#include "zthread/Thread.h"
using namespace ZThread;
using namespace std;

class DisplayTask : public Runnable {
  ifstream in;
  string line;
  bool quitFlag;
public:
  DisplayTask(const string& file) : quitFlag(false)
{
    in.open(file.c_str());
  }
  ~DisplayTask() { in.close(); }
  void run() {
    while(getline(in, line) && !quitFlag) {
      cout << line << endl;
      Thread::sleep(1000);
    }
  }
  void quit() { quitFlag = true; }
};

int main() {
  try {
    cout << "Press <Enter> to
quit:" << endl;
    DisplayTask* dt = new
DisplayTask("ResponsiveUI.cpp");
    Thread t(dt);
    cin.get();
    dt->quit();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
  cout << "Shutting down..." <<
endl;
} ///:~
Now the main( ) thread can respond immediately when you press <Return> and call quit( ) on the DisplayTask.

This example also shows the need for communication between tasks—the task in the main( ) thread needs to tell the DisplayTask to shut down. Since we have a pointer to the DisplayTask, you might think of just calling delete on that pointer to kill the task, but this produces unreliable programs. The problem is that the task could be in the middle of something important when you destroy it, and so you are likely to put the program in an unstable state. Here, the task itself decides when it's safe to shut down. The easiest way to do this is by simply notifying the task that you'd like it to stop by setting a Boolean flag. When the task gets to a stable point it can check that flag and do whatever is necessary to clean up before returning from run( ). When the task returns from run( ), the Thread knows that the task has completed.

Although this program is simple enough that it should not have any problems, there are some small flaws regarding inter-task communication. This is an important topic that will be covered later in this chapter.


3.4.4.2. Simplifying with Executors

You can simplify your coding overhead by using ZThread Executors. Executors provide a layer of indirection between a client and the execution of a task; instead of a client executing a task directly, an intermediate object executes the task.

We can show this by using an Executor instead of explicitly creating Thread objects in MoreBasicThreads.cpp. A LiftOff object knows how to run a specific task; like the Command Pattern, it exposes a single function to be executed. An Executor object knows how build the appropriate context to execute Runnable objects. In the following example, the ThreadedExecutor creates one thread per task:


//: c11:ThreadedExecutor.cpp
//{L} ZThread
#include <iostream>
#include "zthread/ThreadedExecutor.h"
#include "LiftOff.h"
using namespace ZThread;
using namespace std;

int main() {
  try {
    ThreadedExecutor executor;
    for(int i = 0; i < 5; i++)
      executor.execute(new LiftOff(10, i));
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
Note that in some cases a single Executor can be used to create and manage all the threads in your system. You must still place the threading code inside a try block because an Executor's execute( ) function may throw a Synchronization_Exception if something goes wrong. This is true for any function that involves changing the state of a synchronization object (starting threads, acquiring mutexes, waiting on conditions, etc.), as you will learn later in this chapter.

The program will exit as soon as all the tasks in the Executor complete.

In the previous example, the ThreadedExecutor creates a thread for each task that you want to run, but you can easily change the way these tasks are executed by replacing the ThreadedExecutor with a different type of Executor. In this chapter, using a ThreadedExecutor is fine, but in production code it might result in excessive costs from the creation of too many threads. In that case, you can replace it with a PoolExecutor, which will use a limited set of threads to execute the submitted tasks in parallel:


//: C11:PoolExecutor.cpp
//{L} ZThread
#include <iostream>
#include "zthread/PoolExecutor.h"
#include "LiftOff.h"
using namespace ZThread;
using namespace std;

int main() {
  try {
    // Constructor argument is minimum number of
threads:
    PoolExecutor executor(5);
    for(int i = 0; i < 5; i++)
      executor.execute(new LiftOff(10, i));
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
With the PoolExecutor, you do expensive thread allocation once, up front, and the threads are reused when possible. This saves time because you aren't constantly paying for thread creation overhead for every single task. Also, in an event-driven system, events that require threads to handle them can be generated as quickly as you want by simply fetching them from the pool. You don't overrun the available resources because the PoolExecutor uses a bounded number of Thread objects. Thus, although this book will use ThreadedExecutors, consider using PoolExecutors in production code.

A ConcurrentExecutor is like a PoolExecutor with a fixed size of one thread. This is useful for anything you want to run in another thread continually (a long-lived task), such as a task that listens to incoming socket connections. It is also handy for short tasks that you want to run in a thread, for example, small tasks that update a local or remote log, or for an event-dispatching thread.

If more than one task is submitted to a ConcurrentExecutor, each task will run to completion before the next task is begun, all using the same thread. In the following example, you'll see each task completed, in the order that it was submitted, before the next one is begun. Thus, a ConcurrentExecutor serializes the tasks that are submitted to it.


//: C11:ConcurrentExecutor.cpp
//{L} ZThread
#include <iostream>
#include "zthread/ConcurrentExecutor.h"
#include "LiftOff.h"
using namespace ZThread;
using namespace std;

int main() {
  try {
    ConcurrentExecutor executor;
    for(int i = 0; i < 5; i++)
      executor.execute(new LiftOff(10, i));
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
Like a ConcurrentExecutor, a SynchronousExecutor is used when you want only one task at a time to run, serially instead of concurrently. Unlike ConcurrentExecutor, a SynchronousExecutor doesn't create or manage threads on it own. It uses the thread that submits the task and thus only acts as a focal point for synchronization. If you have n threads submitting tasks to a SynchronousExecutor, no two tasks are ever run at once. Instead, each one is run to completion, then the next one in the queue is begun.

For example, suppose you have a number of threads running tasks that use the file system, but you are writing portable code so you don't want to use flock( ) or another OS-specific call to lock a file. You can run these tasks with a SynchronousExecutor to ensure that only one task at a time is running from any thread. This way, you don't need to deal with synchronizing on the shared resource (and you won't clobber the file system in the meantime). A better solution is to synchronize on the resource (which you'll learn about later in this chapter), but a SynchronousExecutor lets you skip the trouble of getting coordinated properly just to prototype something.


//: C11:SynchronousExecutor.cpp
//{L} ZThread
#include <iostream>
#include "zthread/SynchronousExecutor.h"
#include "LiftOff.h"
using namespace ZThread;
using namespace std;

int main() {
  try {
    SynchronousExecutor executor;
    for(int i = 0; i < 5; i++)
      executor.execute(new LiftOff(10, i));
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
When you run the program, you'll see that the tasks are executed in the order they are submitted, and each task runs to completion before the next one starts. What you don't see is that no new threads are created—the main( ) thread is used for each task, since in this example, that's the thread that submits all the tasks. Because SynchronousExecutor is primarily for prototyping, you may not use it much in production code.


3.4.4.3. Yielding

If you know that you've accomplished what you need to during one pass through a loop in your run( ) function (most run( ) functions involve a long-running loop), you can give a hint to the thread scheduling mechanism that you've done enough and that some other thread might as well have the CPU. This hint (and it is a hint—there's no guarantee your implementation will listen to it) takes the form of the yield( ) function.

We can make a modified version of the LiftOff examples by yielding after each loop:


//: C11:YieldingTask.cpp
// Suggesting when to switch threads with yield().
//{L} ZThread
#include <iostream>
#include "zthread/Thread.h"
#include "zthread/ThreadedExecutor.h"
using namespace ZThread;
using namespace std;

class YieldingTask : public Runnable {
  int countDown;
  int id;
public:
  YieldingTask(int ident = 0) : countDown(5), id(ident)
{}
  ~YieldingTask() {
    cout << id << " completed"
<< endl;
  }
  friend ostream&
  operator<<(ostream& os, const
YieldingTask& yt) {
    return os << "#" << yt.id
<< ": " << yt.countDown;
  }
  void run() {
    while(true) {
      cout << *this << endl;
      if(--countDown == 0) return;
      Thread::yield();
    }
  }
};

int main() {
  try {
    ThreadedExecutor executor;
    for(int i = 0; i < 5; i++)
      executor.execute(new YieldingTask(i));
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
You can see that the task's run( ) member function consists entirely of an infinite loop. By using yield( ), the output is evened up quite a bit over that without yielding. Try commenting out the call to Thread::yield( ) to see the difference. In general, however, yield( ) is useful only in rare situations, and you can't rely on it to do any serious tuning of your application.


3.4.4.4. Sleeping

Another way you can control the behavior of your threads is by calling sleep( ) to cease execution of a thread for a given number of milliseconds. In the preceding example, if you replace the call to yield( ) with a call to sleep( ), you get the following:


//: C11:SleepingTask.cpp
// Calling sleep() to pause for awhile.
//{L} ZThread
#include <iostream>
#include "zthread/Thread.h"
#include "zthread/ThreadedExecutor.h"
using namespace ZThread;
using namespace std;

class SleepingTask : public Runnable {
  int countDown;
  int id;
public:
  SleepingTask(int ident = 0) : countDown(5), id(ident)
{}
  ~SleepingTask() {
    cout << id << " completed"
<< endl;
  }
  friend ostream&
  operator<<(ostream& os, const
SleepingTask& st) {
    return os << "#" << st.id
<< ": " << st.countDown;
  }
  void run() {
    while(true) {
      try {
        cout << *this << endl;
        if(--countDown == 0) return;
        Thread::sleep(100);
      } catch(Interrupted_Exception& e) {
        cerr << e.what() << endl;
      }
    }
  }
};

int main() {
  try {
    ThreadedExecutor executor;
    for(int i = 0; i < 5; i++)
      executor.execute(new SleepingTask(i));
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
Thread::sleep( ) can throw an Interrupted_Exception (you'll learn about interrupts later), and you can see that this is caught in run( ). But the task is created and executed inside a try block in main( ) that catches Synchronization_Exception (the base class for all ZThread exceptions), so wouldn't it be possible to just ignore the exception in run( ) and assume that it will propagate to the handler in main( )? This won't work because exceptions won't propagate across threads back to main( ). Thus, you must handle any exceptions locally that may arise within a task.

You'll notice that the threads tend to run in any order, which means that sleep( ) is also not a way for you to control the order of thread execution. It just stops the execution of the thread for awhile. The only guarantee that you have is that the thread will sleep at least 100 milliseconds (in this example), but it may take longer before the thread resumes execution because the thread scheduler still has to get back to it after the sleep interval expires.

If you must control the order of execution of threads, your best bet is to use synchronization controls (described later) or, in some cases, not to use threads at all, but instead to write your own cooperative routines that hand control to each other in a specified order.


3.4.4.5. Priority

The priority of a thread conveys the importance of a thread to the scheduler. Although the order that the CPU runs a set of threads is indeterminate, the scheduler will lean toward running the waiting thread with the highest priority first. However, this doesn't mean that threads with lower priority aren't run (that is, you can't get deadlocked because of priorities). Lower priority threads just tend to run less often.

Here's MoreBasicThreads.cpp modified so that the priority levels are demonstrated. The priorities are adjusting by using Thread's setPriority( ) function.


//: C11:SimplePriorities.cpp
// Shows the use of thread priorities.
//{L} ZThread
#include <iostream>
#include "zthread/Thread.h"
using namespace ZThread;
using namespace std;

const double pi = 3.14159265358979323846;
const double e = 2.7182818284590452354;

class SimplePriorities : public Runnable {
  int countDown;
  volatile double d; // No optimization
  int id;
public:
  SimplePriorities(int ident=0): countDown(5),
id(ident) {}
  ~SimplePriorities() {
    cout << id << " completed"
<< endl;
  }
  friend ostream&
  operator<<(ostream& os, const
SimplePriorities& sp) {
    return os << "#" << sp.id
<< " priority: "
      << Thread().getPriority()
      << " count: "<<
sp.countDown;
  }
  void run() {
    while(true) {
      // An expensive, interruptable operation:
      for(int i = 1; i < 100000; i++)
        d = d + (pi + e) / double(i);
      cout << *this << endl;
      if(--countDown == 0) return;
    }
  }
};

int main() {
  try {
    Thread high(new SimplePriorities);
    high.setPriority(High);
    for(int i = 0; i < 5; i++) {
      Thread low(new SimplePriorities(i));
      low.setPriority(Low);
    }
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
Here, operator<<( ) is overridden to display the identifier, priority, and countDown value of the task.

You can see that the priority level of thread high is at the highest level, and all the rest of the threads are at the lowest level. We are not using an Executor in this example because we need direct access to the threads in order to set their priorities.

Inside SimplePriorities::run( ), 100,000 repetitions of a rather expensive floating-point calculation are performed, involving double addition and division. The variable d is volatile to try to ensure that no compilers optimizations are performed. Without this calculation, you don't see the effect of setting the priority levels. (Try it: comment out the for loop containing the double calculations.) With the calculation, you see that thread high is given a higher preference by the thread scheduler. (At least, this was the behavior on a Windows machine.) The calculation takes long enough that the thread scheduling mechanism jumps in, changes threads, and pays attention to the priorities so that thread high gets preference.

You can also read the priority of an existing thread with getPriority( ) and change it at any time (not just before the thread is run, as in SimplePriorities.cpp) with setPriority( ).

Mapping priorities to operating systems is problematic. For example, Windows 2000 has seven priority levels, while Sun's Solaris has 231 levels. The only portable approach is to stick to very large priority granulations, such as the Low, Medium, and High used in the ZThread library.


3.4.5. Sharing limited resources

You can think of a single-threaded program as one lonely entity moving around through your problem space and doing one thing at a time. Because there's only one entity, you never have to think about the problem of two entities trying to use the same resource at the same time: problems such as two people trying to park in the same space, walk through a door at the same time, or even talk at the same time.

With multithreading things aren't lonely anymore, but you now have the possibility of two or more threads trying to use the same resource at once. This can cause two different kinds of problems. The first is that the necessary resources may not exist. In C++, the programmer has complete control over the lifetime of objects, and it's easy to create threads that try to use objects that get destroyed before those threads complete.

The second problem is that two or more threads may collide when they try to access the same resource at the same time. If you don't prevent such a collision, you'll have two threads trying to access the same bank account at the same time, print to the same printer, adjust the same valve, and so on.

This section introduces the problem of objects that vanish while tasks are still using them and the problem of tasks colliding over shared resources. You'll learn about the tools that are used to solve these problems.


3.4.5.1. Ensuring the existence of objects

Memory and resource management are major concerns in C++. When you create any C++ program, you have the option of creating objects on the stack or on the heap (using new). In a single-threaded program, it's usually easy to keep track of object lifetimes so that you don't try to use objects that are already destroyed.

The examples shown in this chapter create Runnable objects on the heap using new, but you'll notice that these objects are never explicitly deleted. However, you can see from the output when you run the programs that the thread library keeps track of each task and eventually deletes it, because the destructors for the tasks are called. This happens when the Runnable::run( ) member function completes—returning from run( ) indicates that the task is finished.

Burdening the thread with deleting a task is a problem. That thread doesn't necessarily know if another thread still needs to make a reference to that Runnable, and so the Runnable may be prematurely destroyed. To deal with this problem, tasks in ZThreads are automatically reference-counted by the ZThread library mechanism. A task is maintained until the reference count for that task goes to zero, at which point the task is deleted. This means that tasks must always be deleted dynamically, and so they cannot be created on the stack. Instead, tasks must always be created using new, as you see in all the examples in this chapter.

Often you must also ensure that non-task objects stay alive as long as tasks need them. Otherwise, it's easy for objects that are used by tasks to go out of scope before those tasks are completed. If this happens, the tasks will try to access illegal storage and will cause program faults. Here's a simple example:


//: C11:Incrementer.cpp {RunByHand}
// Destroying objects while threads are still
// running will cause serious problems.
//{L} ZThread
#include <iostream>
#include "zthread/Thread.h"
#include "zthread/ThreadedExecutor.h"
using namespace ZThread;
using namespace std;

class Count {
  enum { SZ = 100 };
  int n[SZ];
public:
  void increment() {
    for(int i = 0; i < SZ; i++)
      n[i]++;
  }
};

class Incrementer : public Runnable {
  Count* count;
public:
  Incrementer(Count* c) : count(c) {}
  void run() {
    for(int n = 100; n > 0; n--) {
      Thread::sleep(250);
      count->increment();
    }
  }
};

int main() {
  cout << "This will cause a segmentation
fault!" << endl;
  Count count;
  try {
    Thread t0(new Incrementer(&count));
    Thread t1(new Incrementer(&count));
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
The Count class may seem like overkill at first, but if n is only a single int (rather than an array), the compiler can put it into a register and that storage will still be available (albeit technically illegal) after the Count object goes out of scope. It's difficult to detect the memory violation in that case. Your results may vary depending on your compiler and operating system, but try making it n a single int and see what happens. In any event, if Count contains an array of ints as above, the compiler is forced to put it on the stack and not in a register.

Incrementer is a simple task that uses a Count object. In main( ), you can see that the Incrementer tasks are running for long enough that the Count object will go out of scope, and so the tasks try to access an object that no longer exists. This produces a program fault.

To fix the problem, we must guarantee that any objects shared between tasks will be around as long as those tasks need them. (If the objects were not shared, they could be composed directly into the task'sclass and thus tie their lifetime to that task.) Since we don't want the static program scope to control the lifetime of the object, we put the object on the heap. And to make sure that the object is not destroyed until there are no other objects (tasks, in this case) using it, we use reference counting.

Reference counting was explained thoroughly in volume one of this book and further revisited in this volume. The ZThread library includes a template called CountedPtr that automatically performs reference counting and deletes an object when the reference count goes to zero. Here's the above program modified to use CountedPtr to prevent the fault:


//: C11:ReferenceCounting.cpp
// A CountedPtr prevents too-early destruction.
//{L} ZThread
#include <iostream>
#include "zthread/Thread.h"
#include "zthread/CountedPtr.h"
using namespace ZThread;
using namespace std;

class Count {
  enum { SZ = 100 };
  int n[SZ];
public:
  void increment() {
    for(int i = 0; i < SZ; i++)
      n[i]++;
  }
};

class Incrementer : public Runnable {
  CountedPtr<Count> count;
public:
  Incrementer(const CountedPtr<Count>& c ) : count(c)
{}
  void run() {
    for(int n = 100; n > 0; n--) {
      Thread::sleep(250);
      count->increment();
    }
  }
};

int main() {
  CountedPtr<Count> count(new Count);
  try {
    Thread t0(new Incrementer(count));
    Thread t1(new Incrementer(count));
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
Incrementer now contains a CountedPtr object, which manages a Count. In main( ), the CountedPtr objects are passed into the two Incrementer objects by value, so the copy-constructor is called, increasing the reference count. As long as the tasks are still running, the reference count will be nonzero, and so the Count object managed by the CountedPtr will not be destroyed. Only when all the tasks using the Count are completed will delete be called (automatically) on the Count object by the CountedPtr.

Whenever you have objects that are used by more than one task, you'll almost always need to manage those objects using the CountedPtr template in order to prevent problems arising from object lifetime issues.


3.4.5.2. Improperly accessing resources

Consider the following example where one task generates even numbers and other tasks consume those numbers. Here, the only job of the consumer threads is to check the validity of the even numbers.

We'll first define EvenChecker,the consumer thread, since it will be reused in all the subsequent examples. To decouple EvenChecker from the various types of generators that we will experiment with, we'll create an interface called Generator, which contains the minimum necessary functions that EvenChecker must know about: that it has a nextValue( ) function and that it can be canceled.


//: C11:EvenChecker.h
#ifndef EVENCHECKER_H
#define EVENCHECKER_H
#include <iostream>
#include "zthread/CountedPtr.h"
#include "zthread/Thread.h"
#include "zthread/Cancelable.h"
#include "zthread/ThreadedExecutor.h"

class Generator : public ZThread::Cancelable {
  bool canceled;
public:
  Generator() : canceled(false) {}
  virtual int nextValue() = 0;
  void cancel() { canceled = true; }
  bool isCanceled() { return canceled; }
};

class EvenChecker : public ZThread::Runnable {
  ZThread::CountedPtr<Generator> generator;
  int id;
public:
  EvenChecker(ZThread::CountedPtr<Generator>&
g, int ident)
  : generator(g), id(ident) {}
  ~EvenChecker() {
    std::cout << "~EvenChecker "
<< id << std::endl;
  }
  void run() {
    while(!generator->isCanceled()) {
      int val = generator->nextValue();
      if(val % 2 != 0) {
        std::cout << val << " not
even!" << std::endl;
        generator->cancel(); // Cancels all
EvenCheckers
      }
    }
  }
  // Test any type of generator:
  template<typename GenType> static void test(int
n = 10) {
    std::cout << "Press Control-C to
exit" << std::endl;
    try {
      ZThread::ThreadedExecutor executor;
      ZThread::CountedPtr<Generator> gp(new
GenType);
      for(int i = 0; i < n; i++)
        executor.execute(new
EvenChecker(gp, i));
    } catch(ZThread::Synchronization_Exception& e)
{
      std::cerr << e.what() << std::endl;
    }
  }
};
#endif // EVENCHECKER_H ///:~
The Generator class introduces the abstract Cancelable class, which is part of the ZThread library. The goal of Cancelable is to provide a consistent interface to change the state of an object via the cancel( ) function and to see whether the object has been canceled with the isCanceled( ) function. Here, we use the simple approach of a bool canceled flag, similar to the quitFlag previously seen in ResponsiveUI.cpp. Note that in this example the class that is Cancelable is not Runnable. Instead, all the EvenChecker tasks that depend on the Cancelable object (the Generator) test it to see if it's been canceled, as you can see in run( ). This way, the tasks that share the common resource (the Cancelable Generator) watch that resource for the signal to terminate. This eliminates the so-called race condition, where two or more tasks race to respond to a condition and thus collide or otherwise produce inconsistent results. You must be careful to think about and protect against all the possible ways a concurrent system can fail. For example, a task cannot depend on another task because task shutdown order is not guaranteed. Here, by making tasks depend on non-task objects (which are reference counted using CountedPtr) we eliminate the potential race condition.

In later sections, you'll see that the ZThread library contains more general mechanisms for termination of threads.

Since multiple EvenChecker objects may end up sharing a Generator, the CountedPtr template is used to reference count the Generator objects.

The last member function in EvenChecker is a static member template that sets up and performs a test of any type of Generator by creating one inside a CountedPtr and then starting a number of EvenCheckers that use that Generator. If the Generator causes a failure, test( ) will report it and return; otherwise, you must press Control-C to terminate it.

EvenChecker tasks constantly read and test the values from their associated Generator. Note that if generator->isCanceled( ) is true, run( ) returns, which tells the Executor in EvenChecker::test( ) that the task is complete. Any EvenChecker task can call cancel( ) on its associated Generator, which will cause all other EvenCheckers using that Generator to gracefully shut down.

The EvenGenerator is simple—nextValue( ) produces the next even value:


//: C11:EvenGenerator.cpp
// When threads collide.
//{L} ZThread
#include <iostream>
#include "EvenChecker.h"
#include "zthread/ThreadedExecutor.h"
using namespace ZThread;
using namespace std;

class EvenGenerator : public Generator {
  unsigned int currentEvenValue; // Unsigned can't
overflow
public:
  EvenGenerator() { currentEvenValue = 0; }
  ~EvenGenerator() { cout <<
"~EvenGenerator" << endl; }
  int nextValue() {
    ++currentEvenValue; // Danger point here!
    ++currentEvenValue;
    return currentEvenValue;
  }
};

int main() {
  EvenChecker::test<EvenGenerator>();
} ///:~
It's possible for one thread to call nextValue( ) after the first increment of currentEvenValue and before the second (at the place in the code commented “Danger point here!”), which puts the value into an “incorrect” state. To prove that this can happen, EvenChecker::test( ) creates a group of EvenChecker objects to continually read the output of an EvenGenerator and test to see if each one is even. If not, the error is reported and the program is shut down.

This program may not detect the problem until the EvenGenerator has completed many cycles, depending on the particulars of your operating system and other implementation details. If you want to see it fail much faster, try putting a call to yield( ) between the first and second increments. In any event, it will eventually fail because the EvenChecker threads are able to access the information in EvenGenerator while it's in an “incorrect” state.


3.4.5.3. Controlling access

The previous example shows a fundamental problem when using threads: You never know when a thread might be run. Imagine sitting at a table with a fork, about to spear the last piece of food on a platter, and as your fork reaches for it, the food suddenly vanishes (because your thread was suspended and another diner came in and ate the food). That's the problem you're dealing with when writing concurrent programs.

Occasionally you don't care if a resource is being accessed at the same time you're trying to use it. But in most cases you do care, and for multithreading to work, you need some way to prevent two threads from accessing the same resource, at least during critical periods.

Preventing this kind of collision is simply a matter of putting a lock on a resource when one thread is using it. The first thread that accesses a resource locks it, and then the other threads cannot access that resource until it is unlocked, at which time another thread locks and uses it, and so on. If the front seat of the car is the limited resource, the child who shouts “Dibs!” acquires the lock.

Thus, we need to be able to prevent any other tasks from accessing the storage when that storage is not in a proper state. That is, we need to have a mechanism that excludes a second task from accessing the storage when a first task is already using it. This idea is fundamental to all multithreading systems and is called mutual exclusion; the mechanism used abbreviates this to mutex. The ZThread library contains a mutex mechanism declared in the header Mutex.h.

To solve the problem in the above program, we identify the critical sections where mutual exclusion must apply; then we acquire the mutex before entering the critical section and release it at the end of the critical section. Only one thread can acquire the mutex at any time, so mutual exclusion is achieved:


//: C11:MutexEvenGenerator.cpp {RunByHand}
// Preventing thread collisions with mutexes.
//{L} ZThread
#include <iostream>
#include "EvenChecker.h"
#include "zthread/ThreadedExecutor.h"
#include "zthread/Mutex.h"
using namespace ZThread;
using namespace std;

class MutexEvenGenerator : public Generator {
  unsigned int currentEvenValue;
  Mutex lock;
public:
  MutexEvenGenerator() { currentEvenValue = 0; }
  ~MutexEvenGenerator() {
    cout << "~MutexEvenGenerator"
<< endl;
  }
  int nextValue() {
    lock.acquire();
    ++currentEvenValue;
    Thread::yield(); // Cause failure faster
    ++currentEvenValue;
    int rval = currentEvenValue;
    lock.release();
    return rval;
  }
};

int main() {
  EvenChecker::test<MutexEvenGenerator>();
} ///:~
MutexEvenGenerator adds a Mutex called lock and uses acquire( ) and release( ) to create a critical section within nextValue( ). In addition, a call to Thread::yield( ) is inserted between the two increments, to raise the likelihood of a context switch while currentEvenValue is in an odd state. Because the mutex prevents more than one thread at a time in the critical section, this will not produce a failure, but calling yield( ) is a helpful way to promote a failure if it's going to happen.

Note that nextValue( ) must capture the return value inside the critical section because if you return from inside the critical section, you won't release the lock and will thus prevent it from being acquired again. (This usually leads to deadlock, which you'll learn about at the end of this chapter.)

The first thread that enters nextValue( ) acquires the lock, and any further threads that try to acquire the lock are blocked from doing so until the first thread releases the lock. At that point, the scheduling mechanism selects another thread that is waiting on the lock. This way, only one thread at a time can pass through the code that is guarded by the mutex.


3.4.5.4. Simplified coding with Guards

The use of mutexes rapidly becomes complicated when exceptions are introduced. To make sure that the mutex is always released, you must ensure that each possible exception path includes a call to release( ). In addition, any function that has multiple return paths must carefully ensure that it calls release( ) at the appropriate points.

These problems can be easily solved by using the fact that a stack-based (auto) object has a destructor that is always called regardless of how you exit from a function scope. In the ZThread library, this is implemented as the Guard template. The Guard template creates objects that acquire( ) a Lockable object when constructed and release( ) that lock when destroyed. Guard objects created on the local stack will automatically be destroyed regardless of how the function exits and will always unlock the Lockable object. Here's the above example reimplemented using Guards:


//: C11:GuardedEvenGenerator.cpp {RunByHand}
// Simplifying mutexes with the Guard template.
//{L} ZThread
#include <iostream>
#include "EvenChecker.h"
#include "zthread/ThreadedExecutor.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
using namespace ZThread;
using namespace std;

class GuardedEvenGenerator : public Generator {
  unsigned int currentEvenValue;
  Mutex lock;
public:
  GuardedEvenGenerator() { currentEvenValue = 0; }
  ~GuardedEvenGenerator() {
    cout << "~GuardedEvenGenerator"
<< endl;
  }
  int nextValue() {
    Guard<Mutex> g(lock);
    ++currentEvenValue;
    Thread::yield();
    ++currentEvenValue;
    return currentEvenValue;
  }
};

int main() {
  EvenChecker::test<GuardedEvenGenerator>();
} ///:~
Note that the temporary return value is no longer necessary in nextValue( ). In general, there is less code to write, and the opportunity for user error is greatly reduced.

An interesting feature of the Guard template is that it can be used to manipulate other guards safely. For example, a second Guard can be used to temporarily unlock a guard:


//: C11:TemporaryUnlocking.cpp
// Temporarily unlocking another guard.
//{L} ZThread
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
using namespace ZThread;

class TemporaryUnlocking {
  Mutex lock;
public:
  void f() {
    Guard<Mutex> g(lock);
    // lock is acquired
    // ...
    {
      Guard<Mutex, UnlockedScope> h(g);
      // lock is released
      // ...
      // lock is acquired
    }
    // ...
    // lock is released
  }
};

int main() {
  TemporaryUnlocking t;
  t.f();
} ///:~
A Guard can also be used to try to acquire a lock for a certain amount of time and then give up:


//: C11:TimedLocking.cpp
// Limited time locking.
//{L} ZThread
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
using namespace ZThread;

class TimedLocking {
  Mutex lock;
public:
  void f() {
    Guard<Mutex, TimedLockedScope<500> >
g(lock);
    // ...
  }
};

int main() {
  TimedLocking t;
  t.f();
} ///:~
In this example, a Timeout_Exception will be thrown if the lock cannot be acquired within 500 milliseconds.

Synchronizing entire classes

The ZThread library also provides a GuardedClass template to automatically create a synchronized wrapper for an entire class. This means that every member function in the class will automatically be guarded:


//: C11:SynchronizedClass.cpp {-dmc}
//{L} ZThread
#include "zthread/GuardedClass.h"
using namespace ZThread;

class MyClass {
public:
  void func1() {}
  void func2() {}
};

int main() {
  MyClass a;
  a.func1(); // Not synchronized
  a.func2(); // Not synchronized
  GuardedClass<MyClass> b(new MyClass);
  // Synchronized calls, only one thread at a time
allowed:
  b->func1();
  b->func2();
} ///:~
Object a is a not synchronized, so func1( ) and func2( ) can be called at any time by any number of threads. Object b is protected by the GuardedClass wrapper, so each member function is automatically synchronized and only one function per object can be called any time.

The wrapper locks at a class level of granularity, which may affect performance.(151) If a class contains some unrelated functions, it may be better to synchronize those functions internally with two different locks. However, if you find yourself doing this, it means that one class contains groups of data that may not be strongly associated. Consider breaking the class into two classes.

Guarding all member functions of a class with a mutex does not automatically make that class thread-safe. You must carefully consider all threading issues in order to guarantee thread safety.


3.4.5.5. Thread local storage

A second way to eliminate the problem of tasks colliding over shared resources is to eliminate the sharing of variables, which can be done by creating different storage for the same variable, for each different thread that uses an object. Thus, if you have five threads using an object with a variable x, thread local storage automatically generates five different pieces of storage for x. Fortunately, the creation and management of thread local storage is taken care of automatically by ZThread's ThreadLocal template, as seen here:


//: C11:ThreadLocalVariables.cpp {RunByHand}
// Automatically giving each thread its own storage.
//{L} ZThread
#include <iostream>
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
#include "zthread/ThreadedExecutor.h"
#include "zthread/Cancelable.h"
#include "zthread/ThreadLocal.h"
#include "zthread/CountedPtr.h"
using namespace ZThread;
using namespace std;

class ThreadLocalVariables : public Cancelable {
  ThreadLocal<int> value;
  bool canceled;
  Mutex lock;
public:
  ThreadLocalVariables() : canceled(false) {
    value.set(0);
  }
  void increment() { value.set(value.get() + 1); }
  int get() { return value.get(); }
  void cancel() {
    Guard<Mutex> g(lock);
    canceled = true;
  }
  bool isCanceled() {
    Guard<Mutex> g(lock);
    return canceled;
  }
};

class Accessor : public Runnable {
  int id;
  CountedPtr<ThreadLocalVariables> tlv;
public:
  Accessor(CountedPtr<ThreadLocalVariables>&
tl, int idn)
  : id(idn), tlv(tl) {}
  void run() {
    while(!tlv->isCanceled()) {
      tlv->increment();
      cout << *this << endl;
    }
  }
  friend ostream&
    operator<<(ostream& os, Accessor& a)
{
    return os << "#" << a.id
<< ": " << a.tlv->get();
  }
};

int main() {
  cout << "Press <Enter> to quit"
<< endl;
  try {
    CountedPtr<ThreadLocalVariables>
      tlv(new ThreadLocalVariables);
    const int SZ = 5;
    ThreadedExecutor executor;
    for(int i = 0; i < SZ; i++)
      executor.execute(new
Accessor(tlv, i));
    cin.get();
    tlv->cancel(); // All Accessors will quit
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
When you create a ThreadLocal object by instantiating the template, you are only able to access the contents of the object using the get( ) and set( ) member functions. The get( ) function returns a copy of the object that is associated with that thread, and set( ) inserts its argument into the object stored for that thread, returning the old object that was in storage. You can see this is use in increment( ) and get( ) in ThreadLocalVariables.

Since tlv is shared by multiple Accessor objects, it is written as Cancelable so that the Accessors can be signaled when we want to shut the system down.

When you run this program, you'll see evidence that the individual threads are each allocated their own storage.


3.4.6. Terminating tasks

In previous examples, we have seen the use of a “quit flag” or the Cancelable interface in order to terminate a task. This is a reasonable approach to the problem. However, in some situations the task must be terminated more abruptly. In this section, you'll learn about the issues and problems of such termination.

First, let's look at an example that not only demonstrates the termination problem but is also an additional example of resource sharing. To present this example, we'll first need to solve the problem of iostream collision


3.4.6.1. Preventing iostream collision

You may have noticed in previous examples that the output is sometimes garbled. C++ iostreams were not created with threading in mind, so there's nothing to keep one thread's output from interfering with another thread's output. Thus, you must write your applications so that they synchronize the use of iostreams.

To solve the problem, we need to create the entire output packet first and then explicitly decide when to try to send it to the console. One simple solution is to write the information to an ostringstream and then use a single object with a mutex as the point of output among all threads, to prevent more than one thread from writing at the same time:


//: C11:Display.h
// Prevents ostream collisions.
#ifndef DISPLAY_H
#define DISPLAY_H
#include <iostream>
#include <sstream>
#include "zthread/Mutex.h"
#include "zthread/Guard.h"

class Display { // Share one of these among all threads
  ZThread::Mutex iolock;
public:
  void output(std::ostringstream& os) {
    ZThread::Guard<ZThread::Mutex> g(iolock);
    std::cout << os.str();
  }
};
#endif // DISPLAY_H ///:~
This way, the standard operator<<( ) functions are predefined for us and the object can be built in memory using familiar ostream operators. When a task wants to display output, it creates a temporary ostringstream object that it uses to build up the desired output message. When it calls output( ), the mutex prevents multiple threads from writing to this Display object. (You must use only one Display object in your program, as you'll see in the following examples.)

This just shows the basic idea, but if necessary, you can build a more elaborate framework. For example, you could enforce the requirement that there be only one Display object in a program by making it a Singleton. (The ZThread library has a Singleton template to support Singletons.)


3.4.6.2. The ornamental garden

In this simulation, the garden committee would like to know how many people enter the garden each day through its multiple gates. Each gate has a turnstile or some other kind of counter, and after the turnstile count is incremented, a shared count is incremented that represents the total number of people in the garden.


//: C11:OrnamentalGarden.cpp {RunByHand}
//{L} ZThread
#include <vector>
#include <cstdlib>
#include <ctime>
#include "Display.h"
#include "zthread/Thread.h"
#include "zthread/FastMutex.h"
#include "zthread/Guard.h"
#include "zthread/ThreadedExecutor.h"
#include "zthread/CountedPtr.h"
using namespace ZThread;
using namespace std;

class Count : public Cancelable {
  FastMutex lock;
  int count;
  bool paused, canceled;
public:
  Count() : count(0), paused(false), canceled(false) {}
  int increment() {
    // Comment the following line to see counting fail:
    Guard<FastMutex> g(lock);
    int temp = count ;
    if(rand() % 2 == 0) // Yield half the time
      Thread::yield();
    return (count  = ++temp);
  }
  int value() {
    Guard<FastMutex> g(lock);
    return count;
  }
  void cancel() {
    Guard<FastMutex> g(lock);
    canceled = true;
  }
  bool isCanceled() {
    Guard<FastMutex> g(lock);
    return canceled;
  }
  void pause() {
    Guard<FastMutex> g(lock);
    paused = true;
  }
  bool isPaused() {
    Guard<FastMutex> g(lock);
    return paused;
  }
};

class Entrance : public Runnable {
  CountedPtr<Count> count;
  CountedPtr<Display> display;
  int number;
  int id;
  bool waitingForCancel;
public:
  Entrance(CountedPtr<Count>& cnt,
    CountedPtr<Display>& disp, int idn)
  : count(cnt), display(disp), number(0), id(idn),
    waitingForCancel(false) {}
  void run() {
    while(!count->isPaused()) {
      ++number;
      {
        ostringstream os;
        os << *this << " Total: "
           << count->increment() <<
endl;
        display->output(os);
      }
      Thread::sleep(100);
    }
    waitingForCancel = true;
    while(!count->isCanceled()) // Hold here...
      Thread::sleep(100);
    ostringstream os;
    os << "Terminating " << *this
<< endl;
    display->output(os);
  }
  int getValue() {
    while(count->isPaused() &&
!waitingForCancel)
      Thread::sleep(100);
    return number;
  }
  friend ostream&
  operator<<(ostream& os, const Entrance&
e) {
    return os << "Entrance " <<
e.id << ": " << e.number;
  }
};

int main() {
  srand(time(0)); // Seed the random number generator
  cout << "Press <ENTER> to quit"
<< endl;
  CountedPtr<Count> count(new Count);
  vector<Entrance*> v;
  CountedPtr<Display> display(new Display);
  const int SZ = 5;
  try {
    ThreadedExecutor executor;
    for(int i = 0; i < SZ; i++) {
      Entrance* task = new
Entrance(count, display, i);
      executor.execute(task);
      // Save the pointer to the task:
      v.push_back(task);
    }
    cin.get(); // Wait for user to press <Enter>
    count->pause(); // Causes tasks to stop counting
    int sum = 0;
    vector<Entrance*>::iterator it = v.begin();
    while(it != v.end()) {
      sum += (*it)->getValue();
      ++it;
    }
    ostringstream os;
    os << "Total: " <<
count->value() << endl
       << "Sum of Entrances: " <<
sum << endl;
    display->output(os);
    count->cancel(); // Causes threads to quit
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
Count is the class that keeps the master count of garden visitors. The single Count object defined in main( ) as count is held as a CountedPtr in Entrance and thus is shared by all Entrance objects. A FastMutex called lock is used in this example instead of an ordinary Mutex because a FastMutex uses the native operating system mutex and will thus yield more interesting results.

A Guard is used with lock in increment( ) to synchronize access to count. This function uses rand( ) to cause a yield( ) roughly half the time, in between fetching count into temp and incrementing and storing temp back into count. Because of this, if you comment out the Guard object definition, you will rapidly see the program break because multiple threads will be accessing and modifying count simultaneously.

The Entrance class also keeps a local number with the number of visitors that have passed through this particular entrance. This provides a double-check against the count object to make sure that the proper number of visitors is being recorded. Entrance::run( ) simply increments number and the count object and sleeps for 100 milliseconds.

In main, a vector<Entrance*> is loaded with each Entrance that is created. After the user presses <Enter>, this vector is used to iterate over all the individual Entrance values and total them.

This program goes to quite a bit of extra trouble to shut everything down in a stable fashion. Part of the reason for this is to show just how careful you must be when terminating a multithreaded program, and part of the reason is to demonstrate the value of interrupt( ), which you will learn about shortly.

All the communication between the Entrance objects takes place through the single Count object. When the user presses <Enter>, main( ) sends the pause( ) message to count. Since each Entrance::run( ) is watching the count object to see whether it is paused, this causes each Entrance to move into the waitingForCancel state, where it is no longer counting, but it is still alive. This is essential because main( ) must still be able to safely iterate over the objects in the vector<Entrance*>. Note that because there is a slight possibility that the iteration might occur before an Entrance has finished counting and moved into the waitingForCancel state, the getValue( ) function cycles through calls to sleep( ) until the object moves into waitingForCancel. (This is one form of what is called a busy wait, which is undesirable. You'll see the preferred approach of using wait( ) later in the chapter.) Once main( ) completes its iteration through the vector<Entrance*>, the cancel( ) message is sent to the count object, and once again all the Entrance objects are watching for this state change. At this point, they print a termination message and exit from run( ), which causes each task to be destroyed by the threading mechanism.

As this program runs, you will see the total count and the count at each entrance displayed as people walk through a turnstile. If you comment out the Guard object in Count::increment( ),you'll notice that the total number of people is not what you expect it to be. The number of people counted by each turnstile will be different from the value in count. As long as the Mutex is there to synchronize access to the Counter, things work correctly. Keep in mind that Count::increment( ) exaggerates the potential for failure by using temp and yield( ). In real threading problems, the possibility for failure may be statistically small, so you can easily fall into the trap of believing that things are working correctly. Just as in the example above, there are likely to be hidden problems that haven't occurred to you, so be exceptionally diligent when reviewing concurrent code.

Atomic operations

Note that Count::value( ) returns the value of count using a Guard object for synchronization. This brings up an interesting point because this code will probably work fine with most compilers and systems without synchronization. The reason is that, in general, a simple operation such as returning an int will be an atomic operation, which means that it will probably happen in a single microprocessor instruction that will not get interrupted. (The multithreading mechanism is unable to stop a thread in the middle of a microprocessor instruction.) That is, atomic operations are not interruptible by the threading mechanism and thus do not need to be guarded.(152) In fact, if we removed the fetch of count into temp and removed the yield( ), and instead simply incremented count directly, we probably wouldn't need a lock because the increment operation is usually atomic, as well.(153)

The problem is that the C++ Standard doesn't guarantee atomicity for any of these operations. Although operations such as returning an int and incrementing an int are almost certainly atomic on most machines, there's no guarantee. And because there's no guarantee, you have to assume the worst. Sometimes you might investigate the atomicity behavior on a particular machine (usually by looking at assembly language) and write code based on those assumptions. That's always dangerous and ill-advised. It's too easy for that information to be lost or hidden, and the next person that comes along may assume that this code can be ported to another machine and then go mad tracking down the occasional glitch caused by thread collisions.

So, while removing the guard on Count::value( ) seems to work, it's not airtight, and thus on some machines you may see aberrant behavior.


3.4.6.3. Terminating when blocked

Entrance::run( ) in the previous example includes a call to sleep( ) in the main loop. We know that sleep( ) will eventually wake up and the task will reach the top of the loop where it has an opportunity to break out of that loop by checking the isPaused( ) status. However, sleep( ) is just one situation where a thread is blocked from executing, and sometimes you must terminate a task that's blocked.

Thread states

A thread can be in any one of four states:

  1. New: A thread remains in this state only momentarily, as it is being created. It allocates any necessary system resources and performs initialization. At this point it becomes eligible to receive CPU time. The scheduler will then transition this thread to the runnable or blocked state.
  2. Runnable: This means that a thread can be run when the time-slicing mechanism has CPU cycles available for the thread. Thus, the thread might or might not be running at any moment, but there's nothing to prevent it from being run if the scheduler can arrange it; it's not dead or blocked.
  3. Blocked: The thread could be run, but something prevents it. (It might be waiting for I/O to complete, for example.) While a thread is in the blocked state, the scheduler will simply skip it and not give it any CPU time. Until a thread reenters the runnable state, it won't perform any operations.
  4. Dead: A thread in the dead state is no longer schedulable and will not receive any CPU time. Its task is completed, and it is no longer runnable. The normal way for a thread to die is by returning from its run( ) function.
Becoming blocked

A thread is blocked when it cannot continue running. A thread can become blocked for the following reasons:

  • You've put the thread to sleep by calling sleep(milliseconds), in which case it will not be run for the specified time.
  • You've suspended the execution of the thread with wait( ). It will not become runnable again until the thread gets the signal( ) or broadcast( ) message. We'll examine these in a later section.
  • The thread is waiting for some I/O to complete.
  • The thread is trying to enter a block of code that is guarded by a mutex, and that mutex has already been acquired by another thread.
The problem we need to look at now is this: sometimes you want to terminate a thread that is in a blocked state. If you can't wait for it to get to a point in the code where it can check a state value and decide to terminate on its own, you have to force the thread out of its blocked state.


3.4.6.4. Interruption

As you might imagine, it's much messier to break out of the middle of a Runnable::run( ) function than it is to wait for that function to get to a test of isCanceled( ) (or some other place where the programmer is ready to leave the function). When you break out of a blocked task, you might need to destroy objects and clean up resources. Because of this, breaking out of the middle of a task's run( ) is more like throwing an exception than anything else, so in ZThreads, exceptions are used for this kind of abort. (This walks the fine edge of being an inappropriate use of exceptions, because it means you are often using them for control flow.)(154) To return to a known good state when terminating a task this way, carefully consider the execution paths of your code and properly clean up everything inside the catch clause. We'll look at these issues in this section.

To terminate a blocked thread, the ZThread library provides the Thread::interrupt( ) function. This sets the interrupted status for that thread. A thread with its interrupted status set will throw an Interrupted_Exception if it is already blocked or it attempts a blocking operation. The interrupted status will be reset when the exception is thrown or if the task calls Thread::interrupted( ). As you'll see, Thread::interrupted( ) provides a second way to leave your run( ) loop, without throwing an exception.

Here's an example that shows the basics of interrupt( ):


//: C11:Interrupting.cpp
// Interrupting a blocked thread.
//{L} ZThread
#include <iostream>
#include "zthread/Thread.h"
using namespace ZThread;
using namespace std;

class Blocked : public Runnable {
public:
  void run() {
    try {
      Thread::sleep(1000);
      cout << "Waiting for get() in
run():";
      cin.get();
    } catch(Interrupted_Exception&) {
      cout << "Caught
Interrupted_Exception" << endl;
      // Exit the task
    }
  }
};

int main(int argc, char* argv[]) {
  try {
    Thread t(new Blocked);
    if(argc > 1)
      Thread::sleep(1100);
    t.interrupt();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
You can see that, in addition to the insertion into cout, run( ) contains two other points where blocking can occur: the call to Thread::sleep(1000) and the call to cin.get( ). By giving the program any command-line argument, you tell main( ) to sleep long enough that the task will finish its sleep( ) and call cin.get( ).(155) If you don't give the program an argument, the sleep( ) in main( ) is skipped. Here, the call to interrupt( ) will occur while the task is sleeping, and you'll see that this will cause Interrupted_Exception to be thrown. If you give the program a command-line argument, you'll discover that a task cannot be interrupted if it is blocked on IO. That is, you can interrupt out of any blocking operation except IO.(156)

This is a little disconcerting if you're creating a thread that performs IO because it means that I/O has the potential of locking your multithreaded program. The problem is that, again, C++ was not designed with threading in mind; quite the opposite, it effectively pretends that threading doesn't exist. Thus, the iostream library is not thread-friendly. If the new C++ Standard decides to add thread support, the iostream library may need to be reconsidered in the process.

Blocked by a mutex

If you try to call a function whose mutex has already been acquired, the calling task will be suspended until the mutex becomes available. The following example tests whether this kind of blocking is interruptible:


//: C11:Interrupting2.cpp
// Interrupting a thread blocked
// with a synchronization guard.
//{L} ZThread
#include <iostream>
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
using namespace ZThread;
using namespace std;

class BlockedMutex {
  Mutex lock;
public:
  BlockedMutex() {
    lock.acquire();
  }
  void f() {
    Guard<Mutex> g(lock);
    // This will never be available
  }
};

class Blocked2 : public Runnable {
  BlockedMutex blocked;
public:
  void run() {
    try {
      cout << "Waiting for f() in
BlockedMutex" << endl;
      blocked.f();
    } catch(Interrupted_Exception& e) {
      cerr << e.what() << endl;
      // Exit the task
    }
  }
};

int main(int argc, char* argv[]) {
  try {
    Thread t(new Blocked2);
    t.interrupt();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
The class BlockedMutex has a constructor that acquires the object's own Mutex and never releases it. For that reason, if you try to call f( ), you will always be blocked because the Mutex cannot be acquired. In Blocked2, the run( ) function will be stopped at the call to blocked.f( ). When you run the program you'll see that, unlike the iostream call, interrupt( ) can break out of a call that's blocked by a mutex.(157)

Checking for an interrupt

Note that when you call interrupt( ) on a thread, the only time that the interrupt occurs is when the task enters, or is already inside, a blocking operation (except, as you've seen, in the case of IO, where you're just stuck). But what if you've written code that may or may not make such a blocking call, depending on the conditions in which it is run? If you can only exit by throwing an exception on a blocking call, you won't always be able to leave the run( ) loop. Thus, if you call interrupt( ) to stop a task, your task needs a second opportunity to exit in the event that your run( ) loop doesn't happen to be making any blocking calls.

This opportunity is presented by the interrupted status, which is set by the call to interrupt( ). You check for the interrupted status by calling interrupted( ). This not only tells you whether interrupt( ) has been called, it also clears the interrupted status. Clearing the interrupted status ensures that the framework will not notify you twice about a task being interrupted. You will be notified via either a single Interrupted_Exception, or a single successful Thread::interrupted( ) test. If you want to check again to see whether you were interrupted, you can store the result when you call Thread::interrupted( ).

The following example shows the typical idiom that you should use in your run( ) function to handle both blocked and non-blocked possibilities when the interrupted status is set:


//: C11:Interrupting3.cpp {RunByHand}
// General idiom for interrupting a task.
//{L} ZThread
#include <iostream>
#include "zthread/Thread.h"
using namespace ZThread;
using namespace std;

const double PI = 3.14159265358979323846;
const double E = 2.7182818284590452354;

class NeedsCleanup {
  int id;
public:
  NeedsCleanup(int ident) : id(ident) {
    cout << "NeedsCleanup " << id
<< endl;
  }
  ~NeedsCleanup() {
    cout << "~NeedsCleanup " <<
id << endl;
  }
};

class Blocked3 : public Runnable {
  volatile double d;
public:
  Blocked3() : d(0.0) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        point1:
        NeedsCleanup n1(1);
        cout << "Sleeping" <<
endl;
        Thread::sleep(1000);
        point2:
        NeedsCleanup n2(2);
        cout << "Calculating" <<
endl;
        // A time-consuming, non-blocking operation:
        for(int i = 1; i < 100000; i++)
          d = d + (PI + E) / (double)i;
      }
      cout << "Exiting via while() test"
<< endl;
    } catch(Interrupted_Exception&) {
      cout << "Exiting via
Interrupted_Exception" << endl;
    }
  }
};

int main(int argc, char* argv[]) {
  if(argc != 2) {
    cerr << "usage: " << argv[0]
      << " delay-in-milliseconds"
<< endl;
    exit(1);
  }
  int delay = atoi(argv[1]);
  try {
    Thread t(new Blocked3);
    Thread::sleep(delay);
    t.interrupt();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
The NeedsCleanup class emphasizes the necessity of proper resource cleanup if you leave the loop via an exception. Note that no pointers are defined in Blocked3::run( ) because, for exception safety, all resources must be enclosed in stack-based objects so that the exception handler can automatically clean them up by calling the destructor.

You must give the program a command-line argument which is the delay time in milliseconds before it calls interrupt( ). By using different delays, you can exit Blocked3::run( ) at different points in the loop: in the blocking sleep( ) call, and in the non-blocking mathematical calculation. You'll see that if interrupt( ) is called after the label point2 (during the non-blocking operation), first the loop is completed, then all the local objects are destructed, and finally the loop is exited at the top via the while statement. However, if interrupt( ) is called between point1 and point2 (after the while statement but before or during the blocking operation sleep( )), the task exits via the Interrupted_Exception. In that case, only the stack objects that have been created up to the point where the exception is thrown are cleaned up, and you have the opportunity to perform any other cleanup in the catch clause.

A class designed to respond to an interrupt( ) must establish a policy that ensures it will remain in a consistent state. This generally means that all resource acquisition should be wrapped inside stack-based objects so that the destructors will be called regardless of how the run( ) loop exits. Correctly done, code like this can be elegant. Components can be created that completely encapsulate their synchronization mechanisms but are still responsive to an external stimulus (via interrupt( )) without adding any special functions to an object's interface.


3.4.7. Cooperation between threads

As you've seen, when you use threads to run more than one task at a time, you can keep one task from interfering with another task's resources by using a mutex to synchronize the behavior of the two tasks. That is, if two tasks are stepping on each other over a shared resource (usually memory), you use a mutex to allow only one task at a time to access that resource.

With that problem solved, you can move on to the issue of getting threads to cooperate, so that multiple threads can work together to solve a problem. Now the issue is not about interfering with one another, but rather about working in unison, since portions of such problems must be solved before other portions can be solved. It's much like project planning: the footings for the house must be dug first, but the steel can be laid and the concrete forms can be built in parallel, and both of those tasks must be finished before the concrete foundation can be poured. The plumbing must be in place before the concrete slab can be poured, the concrete slab must be in place before you start framing, and so on. Some of these tasks can be done in parallel, but certain steps require all tasks to be completed before you can move ahead.

The key issue when tasks are cooperating is handshaking between those tasks. To accomplish this handshaking, we use the same foundation: the mutex, which in this case guarantees that only one task can respond to a signal. This eliminates any possible race conditions. On top of the mutex, we add a way for a task to suspend itself until some external state changes (“the plumbing is now in place”), indicating that it's time for that task to move forward. In this section, we'll look at the issues of handshaking between tasks, the problems that can arise, and their solutions.


3.4.7.1. Wait and signal

In ZThreads, the basic class that uses a mutex and allows task suspension is the Condition, and you can suspend a task by calling wait( ) on a Condition. When external state changes take place that might mean that a task should continue processing, you notify the task by calling signal( ), to wake up one task, or broadcast( ), to wake up all tasks that have suspended themselves on that Condition object.

There are two forms of wait( ). The first form takes an argument in milliseconds that has the same meaning as in sleep( ): “pause for this period of time.” The second form takes no arguments; this version is more commonly used. Both forms of wait( ) release the Mutex that is controlled by the Condition object and suspends the thread until that Condition object receives a signal( ) or broadcast( ). The first form may also terminate if it times out before a signal( ) or broadcast( ) is received.

Because wait( ) releases the Mutex, it means that the Mutex can be acquired by another thread. Thus, when you call wait( ) you're saying “I've done all I can right now so I'm going to wait right here, but I want to allow other synchronized operations to take place if they can.”

Typically, you use wait( ) when you're waiting for some condition to change that is under the control of forces outside the current function. (Often, this condition will be changed by another thread.) You don't want to idly loop while testing the condition inside your thread; this is called a “busy wait,” and it's usually a bad use of CPU cycles. Thus, wait( ) suspends the thread while waiting for the world to change, and only when a signal( ) or broadcast( ) occurs (suggesting that something of interest may have happened), does the thread wake up and check for changes. So wait( ) provides a way to synchronize activities between threads.

Let's look at a simple example. WaxOMatic.cpp has two processes: one to apply wax to a Car and one to polish it. The polishing process cannot do its job until the application process is finished, and the application process must wait until the polishing process is finished before it can put on another coat of wax. Both WaxOn and WaxOff use the Car object, which contains a Condition that it uses to suspend a thread inside waitForWaxing( ) or waitForBuffing( ):


//: C11:WaxOMatic.cpp {RunByHand}
// Basic thread cooperation.
//{L} ZThread
#include <iostream>
#include <string>
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
#include "zthread/Condition.h"
#include "zthread/ThreadedExecutor.h"
using namespace ZThread;
using namespace std;

class Car {
  Mutex lock;
  Condition condition;
  bool waxOn;
public:
  Car() : condition(lock), waxOn(false) {}
  void waxed() {
    Guard<Mutex> g(lock);
    waxOn = true; // Ready to buff
    condition.signal();
  }
  void buffed() {
    Guard<Mutex> g(lock);
    waxOn = false; // Ready for another coat of wax
    condition.signal();
  }
  void waitForWaxing() {
    Guard<Mutex> g(lock);
    while(waxOn == false)
      condition.wait();
  }
  void waitForBuffing() {
    Guard<Mutex> g(lock);
    while(waxOn == true)
      condition.wait();
  }
};

class WaxOn : public Runnable {
  CountedPtr<Car> car;
public:
  WaxOn(CountedPtr<Car>& c) : car(c) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        cout << "Wax On!" <<
endl;
        Thread::sleep(200);
        car->waxed();
        car->waitForBuffing();
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Ending Wax On process"
<< endl;
  }
};

class WaxOff : public Runnable {
  CountedPtr<Car> car;
public:
  WaxOff(CountedPtr<Car>& c) : car(c) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        car->waitForWaxing();
        cout << "Wax Off!" <<
endl;
        Thread::sleep(200);
        car->buffed();
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Ending Wax Off process"
<< endl;
  }
};

int main() {
  cout << "Press <Enter> to quit"
<< endl;
  try {
    CountedPtr<Car> car(new Car);
    ThreadedExecutor executor;
    executor.execute(new WaxOff(car));
    executor.execute(new WaxOn(car));
    cin.get();
    executor.interrupt();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
In Car's constructor, a single Mutex is wrapped in a Condition object so that it can be used to manage inter-task communication. However, the Condition object contains no information about the state of your process, so you need to manage additional information to indicate process state. Here, Car has a single bool waxOn, which indicates the state of the waxing-polishing process.

In waitForWaxing( ), the waxOn flag is checked, and if it is false, the calling thread is suspended by calling wait( ) on the Condition object. It's important that this occur inside a guarded clause, where the thread has acquired the lock (here, by creating a Guard object). When you call wait( ), the thread is suspended and the lock is released. It is essential that the lock be released because, to safely change the state of the object (for example, to change waxOn to true, which must happen if the suspended thread is to ever continue), that lock must be available to be acquired by some other task. In this example, when another thread calls waxed( ) to tell it that it's time to do something, the mutex must be acquired in order to change waxOn to true. Afterward, waxed( ) sends a signal( ) to the Condition object, which wakes up the thread suspended in the call to wait( ). Although signal( ) may be called inside a guarded clause—as it is here—you are not required to do this.(158)

In order for a thread to wake up from a wait( ), it must first reacquire the mutex that it released when it entered the wait( ). The thread will not wake up until that mutex becomes available.

The call to wait( ) is placed inside a while loop that checks the condition of interest. This is important for two reasons:(159)

  • It is possible that when the thread gets a signal( ), some other condition has changed that is not associated with the reason that we called wait( ) here. If that is the case, this thread should be suspended again until its condition of interest changes.
  • By the time this thread awakens from its wait( ), it's possible that some other task has changed things such that this thread is unable or uninterested in performing its operation at this time. Again, it should be re-suspended by calling wait( ) again.
Because these two reasons are always present when you are calling wait( ), always write your call to wait( ) inside a while loop that tests for your condition(s) of interest.

WaxOn::run( ) represents the first step in the process of waxing the car, so it performs its operation (a call to sleep( ) to simulate the time necessary for waxing). It then tells the car that waxing is complete, and calls waitForBuffing( ), which suspends this thread with a wait( ) until the WaxOff process calls buffed( ) for the car, changing the state and calling notify( ). WaxOff::run( ), on the other hand, immediately moves into waitForWaxing( ) and is thus suspended until the wax has been applied by WaxOn and waxed( ) is called. When you run this program, you can watch this two-step process repeat itself as control is handed back and forth between the two threads. When you press the <Enter> key, interrupt( ) halts both threads—when you call interrupt( ) for an Executor, it calls interrupt( ) for all the threads it is controlling.


3.4.7.2. Producer–consumer relationships

A common situation in threading problems is the producer-consumer relationship, where one task is creating objects and other tasks are consuming them. In such a situation, make sure that (among other things) the consuming tasks do not accidentally skip any of the produced objects.

To show this problem, consider a machine that has three tasks: one to make toast, one to butter the toast, and one to put jam on the buttered toast.


//: C11:ToastOMatic.cpp {RunByHand}
// Problems with thread cooperation.
//{L} ZThread
#include <iostream>
#include <cstdlib>
#include <ctime>
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
#include "zthread/Condition.h"
#include "zthread/ThreadedExecutor.h"
using namespace ZThread;
using namespace std;

// Apply jam to buttered toast:
class Jammer : public Runnable {
  Mutex lock;
  Condition butteredToastReady;
  bool gotButteredToast;
  int jammed;
public:
  Jammer() : butteredToastReady(lock) {
    gotButteredToast = false;
    jammed = 0;
  }
  void moreButteredToastReady() {
    Guard<Mutex> g(lock);
    gotButteredToast = true;
    butteredToastReady.signal();
  }
  void run() {
    try {
      while(!Thread::interrupted()) {
        {
          Guard<Mutex> g(lock);
          while(!gotButteredToast)
            butteredToastReady.wait();
          ++jammed;
        }
        cout << "Putting jam on toast "
<< jammed << endl;
        {
          Guard<Mutex> g(lock);
          gotButteredToast = false;
        }
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Jammer off" << endl;
  }
};

// Apply butter to toast:
class Butterer : public Runnable {
  Mutex lock;
  Condition toastReady;
  CountedPtr<Jammer> jammer;
  bool gotToast;
  int buttered;
public:
  Butterer(CountedPtr<Jammer>& j)
  : toastReady(lock), jammer(j) {
    gotToast = false;
    buttered = 0;
  }
  void moreToastReady() {
    Guard<Mutex> g(lock);
    gotToast = true;
    toastReady.signal();
  }
  void run() {
    try {
      while(!Thread::interrupted()) {
        {
          Guard<Mutex> g(lock);
          while(!gotToast)
            toastReady.wait();
          ++buttered;
        }
        cout << "Buttering toast "
<< buttered << endl;
        jammer->moreButteredToastReady();
        {
          Guard<Mutex> g(lock);
          gotToast = false;
        }
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Butterer off" <<
endl;
  }
};

class Toaster : public Runnable {
  CountedPtr<Butterer> butterer;
  int toasted;
public:
  Toaster(CountedPtr<Butterer>& b) :
butterer(b) {
    toasted = 0;
  }
  void run() {
    try {
      while(!Thread::interrupted()) {
        Thread::sleep(rand()/(RAND_MAX/5)*100);
        // ...
        // Create new toast
        // ...
        cout << "New toast " <<
++toasted << endl;
        butterer->moreToastReady();
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Toaster off" <<
endl;
  }
};

int main() {
  srand(time(0)); // Seed the random number generator
  try {
    cout << "Press <Return> to
quit" << endl;
    CountedPtr<Jammer> jammer(new Jammer);
    CountedPtr<Butterer> butterer(new
Butterer(jammer));
    ThreadedExecutor executor;
    executor.execute(new Toaster(butterer));
    executor.execute(butterer);
    executor.execute(jammer);
    cin.get();
    executor.interrupt();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
The classes are defined in the reverse order that they operate to simplify forward-referencing issues.

Jammer and Butterer both contain a Mutex, a Condition, and some kind of internal state information that changes to indicate that the process should suspend or resume. (Toaster doesn't need these since it is the producer and doesn't have to wait on anything.) The two run( ) functions perform an operation, set a state flag, and then call wait( ) to suspend the task. The moreToastReady( ) and moreButteredToastReady( ) functions change their respective state flags to indicate that something has changed and the process should consider resuming and then call signal( ) to wake up the thread.

The difference between this example and the previous one is that, at least conceptually, something is being produced here: toast. The rate of toast production is randomized a bit, to add some uncertainty. And you'll see that when you run the program, things aren't going right because many pieces of toast appear to be getting dropped on the floor—not buttered, not jammed.


3.4.7.3. Solving threading problems with queues

Often, threading problems are based on the need for tasks to be serialized—that is, to take care of things in order. ToastOMatic.cpp must not only take care of things in order, it must be able to work on one piece of toast without worrying that toast is falling on the floor in the meantime. You can solve many threading problems by using a queue that synchronizes access to the elements within:


//: C11:TQueue.h
#ifndef TQUEUE_H
#define TQUEUE_H
#include <deque>
#include "zthread/Thread.h"
#include "zthread/Condition.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"

template<class T> class TQueue {
  ZThread::Mutex lock;
  ZThread::Condition cond;
  std::deque<T> data;
public:
  TQueue() : cond(lock) {}
  void put(T item) {
    ZThread::Guard<ZThread::Mutex> g(lock);
    data.push_back(item);
    cond.signal();
  }
  T get() {
    ZThread::Guard<ZThread::Mutex> g(lock);
    while(data.empty())
      cond.wait();
    T returnVal = data.front();
    data.pop_front();
    return returnVal;
  }
};
#endif // TQUEUE_H ///:~
This builds on the Standard C++ Library deque by adding:

  1. Synchronization to ensure that no two threads add objects at the same time.
  2. wait( ) and signal( ) so that a consumer thread will automatically suspend if the queue is empty, and resume when more elements become available.
This relatively small amount of code can solve a remarkable number of problems.(160)

Here's a simple test that serializes the execution of LiftOff objects. The consumer is LiftOffRunner, which pulls each LiftOff object off the TQueue and runs it directly. (That is, it uses its own thread by calling run( ) explicitly rather than starting up a new thread for each task.)


//: C11:TestTQueue.cpp {RunByHand}
//{L} ZThread
#include <string>
#include <iostream>
#include "TQueue.h"
#include "zthread/Thread.h"
#include "LiftOff.h"
using namespace ZThread;
using namespace std;

class LiftOffRunner : public Runnable {
  TQueue<LiftOff*> rockets;
public:
  void add(LiftOff* lo) { rockets.put(lo); }
  void run() {
    try {
      while(!Thread::interrupted()) {
        LiftOff* rocket = rockets.get();
        rocket->run();
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Exiting LiftOffRunner"
<< endl;
  }
};

int main() {
  try {
    LiftOffRunner* lor = new LiftOffRunner;
    Thread t(lor);
    for(int i = 0; i < 5; i++)
      lor->add(new LiftOff(10, i));
    cin.get();
    lor->add(new LiftOff(10, 99));
    cin.get();
    t.interrupt();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
The tasks are placed on the TQueue by main( ) and are taken off the TQueue by the LiftOffRunner. Notice that LiftOffRunner can ignore the synchronization issues because they are solved by the TQueue.

Proper toasting

To solve the ToastOMatic.cpp problem, we can run the toast through TQueues between processes. And to do this, we will need actual toast objects, which maintain and display their state:


//: C11:ToastOMaticMarkII.cpp {RunByHand}
// Solving the problems using TQueues.
//{L} ZThread
#include <iostream>
#include <string>
#include <cstdlib>
#include <ctime>
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
#include "zthread/Condition.h"
#include "zthread/ThreadedExecutor.h"
#include "TQueue.h"
using namespace ZThread;
using namespace std;

class Toast {
  enum Status { DRY, BUTTERED, JAMMED };
  Status status;
  int id;
public:
  Toast(int idn) : status(DRY), id(idn) {}
  #ifdef __DMC__ // Incorrectly requires default
  Toast() { assert(0); } // Should never be called
  #endif
  void butter() { status = BUTTERED; }
  void jam() { status = JAMMED; }
  string getStatus() const {
    switch(status) {
      case DRY: return "dry";
      case BUTTERED: return "buttered";
      case JAMMED: return "jammed";
      default: return "error";
    }
  }
  int getId() { return id; }
  friend ostream& operator<<(ostream& os,
const Toast& t) {
    return os << "Toast " << t.id
<< ": " << t.getStatus();
  }
};

typedef CountedPtr< TQueue<Toast> >
ToastQueue;

class Toaster : public Runnable {
  ToastQueue toastQueue;
  int count;
public:
  Toaster(ToastQueue& tq) : toastQueue(tq),
count(0) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        int delay = rand()/(RAND_MAX/5)*100;
        Thread::sleep(delay);
        // Make toast
        Toast t(count++);
        cout << t << endl;
        // Insert into queue
        toastQueue->put(t);
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Toaster off" <<
endl;
  }
};

// Apply butter to toast:
class Butterer : public Runnable {
  ToastQueue dryQueue, butteredQueue;
public:
  Butterer(ToastQueue& dry, ToastQueue&
buttered)
  : dryQueue(dry), butteredQueue(buttered) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        // Blocks until next piece of toast is
available:
        Toast t = dryQueue->get();
        t.butter();
        cout << t << endl;
        butteredQueue->put(t);
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Butterer off" <<
endl;
  }
};

// Apply jam to buttered toast:
class Jammer : public Runnable {
  ToastQueue butteredQueue, finishedQueue;
public:
  Jammer(ToastQueue& buttered, ToastQueue&
finished)
  : butteredQueue(buttered), finishedQueue(finished) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        // Blocks until next piece of toast is
available:
        Toast t = butteredQueue->get();
        t.jam();
        cout << t << endl;
        finishedQueue->put(t);
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Jammer off" << endl;
  }
};

// Consume the toast:
class Eater : public Runnable {
  ToastQueue finishedQueue;
  int counter;
public:
  Eater(ToastQueue& finished)
  : finishedQueue(finished), counter(0) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        // Blocks until next piece of toast is
available:
        Toast t = finishedQueue->get();
        // Verify that the toast is coming in order,
        // and that all pieces are getting jammed:
        if(t.getId() != counter++ ||
           t.getStatus() != "jammed") {
          cout << ">>>> Error:
" << t << endl;
          exit(1);
        } else
          cout << "Chomp! " << t
<< endl;
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Eater off" << endl;
  }
};

int main() {
  srand(time(0)); // Seed the random number generator
  try {
    ToastQueue dryQueue(new TQueue<Toast>),
               butteredQueue(new TQueue<Toast>),
               finishedQueue(new TQueue<Toast>);
    cout << "Press <Return> to
quit" << endl;
    ThreadedExecutor executor;
    executor.execute(new Toaster(dryQueue));
    executor.execute(new
Butterer(dryQueue,butteredQueue));
    executor.execute(
      new Jammer(butteredQueue, finishedQueue));
    executor.execute(new Eater(finishedQueue));
    cin.get();
    executor.interrupt();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
Two things are immediately apparent in this solution: first, the amount and complexity of code within each Runnable class is dramatically reduced by the use of the TQueue because the guarding, communication, and wait( )/signal( ) operations are now taken care of by the TQueue. The Runnable classes don't have Mutexes or Condition objects anymore. Second, the coupling between the classes is eliminated because each class communicates only with its TQueues. Notice that the definition order of the classes is now independent. Less code and less coupling are always good things, which suggests that the use of the TQueue has a positive effect here, as it does on most problems.


3.4.7.4. Broadcast

The signal( ) function wakes up one thread that is waiting on a Condition object. However, multiple threads may be waiting on the same condition object, and in that case you might want to wake them all up using broadcast( ) instead of signal( ).

As an example that brings together many of the concepts in this chapter, consider a hypothetical robotic assembly line for automobiles. Each Car will be built in several stages, and in this example we'll look at a single stage: after the chassis has been created, at the time when the engine, drive train, and wheels are attached. The Cars are transported from one place to another via a CarQueue, which is a type of TQueue. A Director takes each Car (as a raw chassis) from the incoming CarQueue and places it in a Cradle, which is where all the work is done. At this point, the Director tells all the waiting robots (using broadcast( )) that the Car is in the Cradle ready for the robots to work on it. The three types of robots go to work, sending a message to the Cradle when they finish their tasks. The Director waits until all the tasks are complete and then puts the Car onto the outgoing CarQueue to be transported to the next operation. Here, the consumer of the outgoing CarQueue is a Reporter object, which just prints the Car to show that the tasks have been properly completed.


//: C11:CarBuilder.cpp {RunByHand}
// How broadcast() works.
//{L} ZThread
#include <iostream>
#include <string>
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
#include "zthread/Condition.h"
#include "zthread/ThreadedExecutor.h"
#include "TQueue.h"
using namespace ZThread;
using namespace std;

class Car {
  int id;
  bool engine, driveTrain, wheels;
public:
  Car(int idn) : id(idn), engine(false),
  driveTrain(false), wheels(false) {}
  // Empty Car object:
  Car() : id(-1), engine(false),
  driveTrain(false), wheels(false) {}
  // Unsynchronized -- assumes atomic bool operations:
  int getId() { return id; }
  void addEngine() { engine = true; }
  bool engineInstalled() { return engine; }
  void addDriveTrain() { driveTrain = true; }
  bool driveTrainInstalled() { return driveTrain; }
  void addWheels() { wheels = true; }
  bool wheelsInstalled() { return wheels; }
  friend ostream& operator<<(ostream& os,
const Car& c) {
    return os << "Car " << c.id
<< " ["
      << " engine: " << c.engine
      << " driveTrain: " <<
c.driveTrain
      << " wheels: " << c.wheels
<< " ]";
  }
};

typedef CountedPtr< TQueue<Car> > CarQueue;

class ChassisBuilder : public Runnable {
  CarQueue carQueue;
  int counter;
public:
  ChassisBuilder(CarQueue& cq) :
carQueue(cq),counter(0) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        Thread::sleep(1000);
        // Make chassis:
        Car c(counter++);
        cout << c << endl;
        // Insert into queue
        carQueue->put(c);
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "ChassisBuilder off"
<< endl;
  }
};

class Cradle {
  Car c; // Holds current car being worked on
  bool occupied;
  Mutex workLock, readyLock;
  Condition workCondition, readyCondition;
  bool engineBotHired, wheelBotHired,
driveTrainBotHired;
public:
  Cradle()
  : workCondition(workLock), readyCondition(readyLock)
{
    occupied = false;
    engineBotHired = true;
    wheelBotHired = true;
    driveTrainBotHired = true;
  }
  void insertCar(Car chassis) {
    c = chassis;
    occupied = true;
  }
  Car getCar() { // Can only extract car once
    if(!occupied) {
      cerr << "No Car in Cradle for
getCar()" << endl;
      return Car(); // "Null" Car object
    }
    occupied = false;
    return c;
  }
  // Access car while in cradle:
  Car* operator->() { return &c; }
  // Allow robots to offer services to this cradle:
  void offerEngineBotServices() {
    Guard<Mutex> g(workLock);
    while(engineBotHired)
      workCondition.wait();
    engineBotHired = true; // Accept the job
  }
  void offerWheelBotServices() {
    Guard<Mutex> g(workLock);
    while(wheelBotHired)
      workCondition.wait();
    wheelBotHired = true; // Accept the job
  }
  void offerDriveTrainBotServices() {
    Guard<Mutex> g(workLock);
    while(driveTrainBotHired)
      workCondition.wait();
    driveTrainBotHired = true; // Accept the job
  }
  // Tell waiting robots that work is ready:
  void startWork() {
    Guard<Mutex> g(workLock);
    engineBotHired = false;
    wheelBotHired = false;
    driveTrainBotHired = false;
    workCondition.broadcast();
  }
  // Each robot reports when their job is done:
  void taskFinished() {
    Guard<Mutex> g(readyLock);
    readyCondition.signal();
  }
  // Director waits until all jobs are done:
  void waitUntilWorkFinished() {
    Guard<Mutex> g(readyLock);
    while(!(c.engineInstalled() &&
c.driveTrainInstalled()
            && c.wheelsInstalled()))
      readyCondition.wait();
  }
};

typedef CountedPtr<Cradle> CradlePtr;

class Director : public Runnable {
  CarQueue chassisQueue, finishingQueue;
  CradlePtr cradle;
public:
  Director(CarQueue& cq, CarQueue& fq,
CradlePtr cr)
  : chassisQueue(cq), finishingQueue(fq), cradle(cr) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        // Blocks until chassis is available:
        cradle->insertCar(chassisQueue->get());
        // Notify robots car is ready for work
        cradle->startWork();
        // Wait until work completes
        cradle->waitUntilWorkFinished();
        // Put car into queue for further work
        finishingQueue->put(cradle->getCar());
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Director off" <<
endl;
  }
};

class EngineRobot : public Runnable {
  CradlePtr cradle;
public:
  EngineRobot(CradlePtr cr) : cradle(cr) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        // Blocks until job is offered/accepted:
        cradle->offerEngineBotServices();
        cout << "Installing engine"
<< endl;
        (*cradle)->addEngine();
        cradle->taskFinished();
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "EngineRobot off" <<
endl;
  }
};

class DriveTrainRobot : public Runnable {
  CradlePtr cradle;
public:
  DriveTrainRobot(CradlePtr cr) : cradle(cr) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        // Blocks until job is offered/accepted:
        cradle->offerDriveTrainBotServices();
        cout << "Installing DriveTrain"
<< endl;
        (*cradle)->addDriveTrain();
        cradle->taskFinished();
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "DriveTrainRobot off"
<< endl;
  }
};

class WheelRobot : public Runnable {
  CradlePtr cradle;
public:
  WheelRobot(CradlePtr cr) : cradle(cr) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        // Blocks until job is offered/accepted:
        cradle->offerWheelBotServices();
        cout << "Installing Wheels"
<< endl;
        (*cradle)->addWheels();
        cradle->taskFinished();
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "WheelRobot off" <<
endl;
  }
};

class Reporter : public Runnable {
  CarQueue carQueue;
public:
  Reporter(CarQueue& cq) : carQueue(cq) {}
  void run() {
    try {
      while(!Thread::interrupted()) {
        cout << carQueue->get() << endl;
      }
    } catch(Interrupted_Exception&) { /* Exit */ }
    cout << "Reporter off" <<
endl;
  }
};

int main() {
  cout << "Press <Enter> to quit"
<< endl;
  try {
    CarQueue chassisQueue(new TQueue<Car>),
             finishingQueue(new TQueue<Car>);
    CradlePtr cradle(new Cradle);
    ThreadedExecutor assemblyLine;
    assemblyLine.execute(new EngineRobot(cradle));
    assemblyLine.execute(new DriveTrainRobot(cradle));
    assemblyLine.execute(new WheelRobot(cradle));
    assemblyLine.execute(
      new Director(chassisQueue, finishingQueue,
cradle));
    assemblyLine.execute(new Reporter(finishingQueue));
    // Start everything running by producing chassis:
    assemblyLine.execute(new
ChassisBuilder(chassisQueue));
    cin.get();
    assemblyLine.interrupt();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
You'll notice that Car takes a shortcut: it assumes that bool operations are atomic, which, as previously discussed, is sometimes a safe assumption but requires careful thought.(161) Each Car begins as an unadorned chassis, and different robots will attach different parts to it, calling the appropriate “add” function when they do.

A ChassisBuilder simply creates a new Car every second and places it into the chassisQueue. A Director manages the build process by taking the next Car off the chassisQueue, putting it into the Cradle, telling all the robots to startWork( ), and suspending itself by calling waitUntilWorkFinished( ). When the work is done, the Director takes the Car out of the Cradle and puts in into the finishingQueue.

The Cradle is the crux of the signaling operations. A Mutex and a Condition object control both the working of the robots and indicate whether all the operations are finished. A particular type of robot can offer its services to the Cradle by calling the “offer” function appropriate to its type. At this point, that robot thread is suspended until the Director calls startWork( ), which changes the hiring flags and calls broadcast( ) to tell all the robots to show up for work. Although this system allows any number of robots to offer their services, each one of those robots has its thread suspended by doing so. You could imagine a more sophisticated system where the robots register themselves with many different Cradles without being suspended by that registration process and then reside in a pool waiting for the first Cradle that needs a task completed.

After each robot finishes its task (changing the state of the Car in the process), it calls taskFinished( ), which sends a signal( ) to the readyCondition, which is what the Director is waiting on in waitUntilWorkFinished( ). Each time the director thread awakens, the state of the Car is checked, and if it still isn't finished, that thread is suspended again.

When the Director inserts a Car into the Cradle, you can perform operations on that Car via the operator->( ). To prevent multiple extractions of the same car, a flag causes an error report to be generated. (Exceptions don't propagate across threads in the ZThread library.)

In main( ), all the necessary objects are created and the tasks are initialized, with the ChassisBuilder begun last to start the process. (However, because of the behavior of the TQueue, it wouldn't matter if it were started first.) Note that this program follows all the guidelines regarding object and task lifetime presented in this chapter, and so the shutdown process is safe.


3.4.8. Deadlock

Because threads can become blocked and because objects can have mutexes that prevent threads from accessing that object until the mutex is released, it's possible for one thread to get stuck waiting for another thread, which in turn waits for another thread, and so on, until the chain leads back to a thread waiting on the first one. You get a continuous loop of threads waiting on each other, and no one can move. This is called deadlock.

If you try running a program and it deadlocks right away, you immediately know you have a problem, and you can track it down. The real problem is when your program seems to be working fine but has the hidden potential to deadlock. In this case, you may get no indication that deadlocking is a possibility, so it will be latent in your program until it unexpectedly happens to a customer. (And you probably won't be able to easily reproduce it.) Thus, preventing deadlock through careful program design is a critical part of developing concurrent programs.

Let's look at the classic demonstration of deadlock, invented by Edsger Dijkstra: the dining philosophers problem. The basic description specifies five philosophers (but the example shown here will allow any number). These philosophers spend part of their time thinking and part of their time eating. While they are thinking, they don't need any shared resources, but they eat using a limited number of utensils. In the original problem description, the utensils are forks, and two forks are required to get spaghetti from a bowl in the middle of the table, but it seems to make more sense to say that the utensils are chopsticks. Clearly, each philosopher will require two chopsticks in order to eat.

A difficulty is introduced into the problem: as philosophers, they have very little money, so they can only afford five chopsticks. These are spaced around the table between them. When a philosopher wants to eat, they must pick up the chopstick to the left and the one to the right. If the philosopher on either side is using a desired chopstick, our philosopher must wait until the necessary chopsticks become available.


//: C11:DiningPhilosophers.h
// Classes for Dining Philosophers.
#ifndef DININGPHILOSOPHERS_H
#define DININGPHILOSOPHERS_H
#include <string>
#include <iostream>
#include <cstdlib>
#include "zthread/Condition.h"
#include "zthread/Guard.h"
#include "zthread/Mutex.h"
#include "zthread/Thread.h"
#include "Display.h"

class Chopstick {
  ZThread::Mutex lock;
  ZThread::Condition notTaken;
  bool taken;
public:
  Chopstick() : notTaken(lock), taken(false) {}
  void take() {
    ZThread::Guard<ZThread::Mutex> g(lock);
    while(taken)
      notTaken.wait();
    taken = true;
  }
  void drop() {
    ZThread::Guard<ZThread::Mutex> g(lock);
    taken = false;
    notTaken.signal();
  }
};

class Philosopher : public ZThread::Runnable {
  Chopstick& left;
  Chopstick& right;
  int id;
  int ponderFactor;
  ZThread::CountedPtr<Display> display;
  int randSleepTime() {
    if(ponderFactor == 0) return 0;
    return rand()/(RAND_MAX/ponderFactor) * 250;
  }
  void output(std::string s) {
    std::ostringstream os;
    os << *this << " " << s
<< std::endl;
    display->output(os);
  }
public:
  Philosopher(Chopstick& l, Chopstick& r,
  ZThread::CountedPtr<Display>& disp, int
ident,int ponder)
  : left(l), right(r), id(ident), ponderFactor(ponder),
    display(disp) {}
  virtual void run() {
    try {
      while(!ZThread::Thread::interrupted()) {
        output("thinking");
        ZThread::Thread::sleep(randSleepTime());
        // Hungry
        output("grabbing right");
        right.take();
        output("grabbing left");
        left.take();
        output("eating");
        ZThread::Thread::sleep(randSleepTime());
        right.drop();
        left.drop();
      }
    } catch(ZThread::Synchronization_Exception& e)
{
      output(e.what());
    }
  }
  friend std::ostream&
  operator<<(std::ostream& os, const
Philosopher& p) {
    return os << "Philosopher " <<
p.id;
  }
};
#endif // DININGPHILOSOPHERS_H
///:~
No two Philosophers can take( ) a Chopstick at the same time, since take( ) is synchronized with a Mutex. In addition, if the chopstick has already been taken by one Philosopher, another can wait( ) on the available Condition until the Chopstick becomes available when the current holder calls drop( ) (which must also be synchronized to prevent race conditions and ensure memory visibility in multiprocessor systems).

Each Philosopher holds references to their left and right Chopstick so they can attempt to pick those up. The goal of the Philosopher is to think part of the time and eat part of the time, and this is expressed in main( ). However, you will observe that if the Philosophers spend very little time thinking, they will all be competing for the Chopsticks while they try to eat, and deadlock will happen much more quickly. So you can experiment with this, the ponderFactor weights the length of time that a Philosopher tends to spend thinking and eating. A smaller ponderFactor will increase the probability of deadlock.

In Philosopher::run( ), each Philosopher just thinks and eats continuously. You see the Philosopher thinking for a randomized amount of time, then trying to take( ) the right and then the left Chopstick, eating for a randomized amount of time, and then doing it again. Output to the console is synchronized as seen earlier in this chapter.

This problem is interesting because it demonstrates that a program can appear to run correctly but actually be deadlock prone. To show this, the command-line argument adjusts a factor to affect the amount of time each philosopher spends thinking. If you have lots of philosophers or they spend a lot of time thinking, you may never see the deadlock even though it remains a possibility. A command-line argument of zero tends to make it deadlock fairly quickly:(162)


//: C11:DeadlockingDiningPhilosophers.cpp {RunByHand}
// Dining Philosophers with Deadlock.
//{L} ZThread
#include <ctime>
#include "DiningPhilosophers.h"
#include "zthread/ThreadedExecutor.h"
using namespace ZThread;
using namespace std;

int main(int argc, char* argv[]) {
  srand(time(0)); // Seed the random number generator
  int ponder = argc > 1 ? atoi(argv[1]) : 5;
  cout << "Press <ENTER> to quit"
<< endl;
  enum { SZ = 5 };
  try {
    CountedPtr<Display> d(new Display);
    ThreadedExecutor executor;
    Chopstick c[SZ];
    for(int i = 0; i < SZ; i++) {
      executor.execute(
        new Philosopher(c[i], c[(i+1) % SZ], d,
i,ponder));
    }
    cin.get();
    executor.interrupt();
    executor.wait();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
Note that the Chopstick objects do not need internal identifiers; they are identified by their position in the array c.Each Philosopher is given a reference to a left and right Chopstick object when constructed; these are the utensils that must be picked up before that Philosopher can eat. Every Philosopher except the last one is initialized by situating that Philosopher between the next pair of Chopstick objects. The last Philosopher is given the zeroth Chopstick for its right Chopstick, so the round table is completed. That's because the last Philosopher is sitting right next to the first one, and they both share that zeroth chopstick. With this arrangement, it's possible at some point for all the philosophers to be trying to eat and waiting on the philosopher next to them to put down their chopstick, and the program will deadlock.

If your threads (philosophers) are spending more time on other tasks (thinking) than eating, then they have a much lower probability of requiring the shared resources (chopsticks), and thus you can convince yourself that the program is deadlock free (using a nonzero ponder value), even though it isn't.

To repair the problem, you must understand that deadlock can occur if four conditions are simultaneously met:

1.  Mutual exclusion. At least one resource used by the threads must not be shareable. In this case, a chopstick can be used by only one philosopher at a time.

2.  At least one process must be holding a resource and waiting to acquire a resource currently held by another process. That is, for deadlock to occur, a philosopher must be holding one chopstick and waiting for another one.

3.  A resource cannot be preemptively taken away from a process. Processes only release resources as a normal event. Our philosophers are polite and they don't grab chopsticks from other philosophers.

4.  A circular wait can happen, whereby a process waits on a resource held by another process, which in turn is waiting on a resource held by another process, and so on, until one of the processes is waiting on a resource held by the first process, thus gridlocking everything. In DeadlockingDiningPhilosophers.cpp, the circular wait happens because each philosopher tries to get the right chopstick first and then the left.

Because all these conditions must be met to cause deadlock, you need to stop only one of them from occurring to prevent deadlock. In this program, the easiest way to prevent deadlock is to break condition four. This condition happens because each philosopher is trying to pick up their chopsticks in a particular sequence: first right, then left. Because of that, it's possible to get into a situation where each of them is holding their right chopstick and waiting to get the left, causing the circular wait condition. However, if the last philosopher is initialized to try to get the left chopstick first and then the right, that philosopher will never prevent the philosopher on the immediate right from picking up their left chopstick. In this case, the circular wait is prevented. This is only one solution to the problem, but you could also solve it by preventing one of the other conditions (see advanced threading books for more details):


//: C11:FixedDiningPhilosophers.cpp {RunByHand}
// Dining Philosophers without Deadlock.
//{L} ZThread
#include <ctime>
#include "DiningPhilosophers.h"
#include "zthread/ThreadedExecutor.h"
using namespace ZThread;
using namespace std;

int main(int argc, char* argv[]) {
  srand(time(0)); // Seed the random number generator
  int ponder = argc > 1 ? atoi(argv[1]) : 5;
  cout << "Press <ENTER> to quit"
<< endl;
  enum { SZ = 5 };
  try {
    CountedPtr<Display> d(new Display);
    ThreadedExecutor executor;
    Chopstick c[SZ];
    for(int i = 0; i < SZ; i++) {
      if(i < (SZ-1))
        executor.execute(
          new Philosopher(c[i], c[i + 1], d, i,
ponder));
      else
        executor.execute(
          new Philosopher(c[0], c[i], d, i, ponder));
    }
    cin.get();
    executor.interrupt();
    executor.wait();
  } catch(Synchronization_Exception& e) {
    cerr << e.what() << endl;
  }
} ///:~
By ensuring that the last philosopher picks up and puts down their left chopstick before their right, the deadlock is removed, and the program will run smoothly.

There is no language support to help prevent deadlock; it's up to you to avoid it by careful design. These are not comforting words to the person who's trying to debug a deadlocking program.


3.4.9. Summary

The goal of this chapter was to give you the foundations of concurrent programming with threads:

1.  You can run multiple independent tasks.

2.  You must consider all the possible problems when these tasks shut down. Objects or other tasks may disappear before tasks are finished with them.

3.  Tasks can collide with each other over shared resources. The mutex is the basic tool used to prevent these collisions.

4.  Tasks can deadlock if they are not carefully designed.

However, there are multiple additional facets of threading and tools to help you solve threading problems. The ZThreads library contains a number of these tools, such as semaphores and special types of queues, similar to the one you saw in this chapter. Explore that library as well as other resources on threading to gain more in-depth knowledge.

It is vital to learn when to use concurrency and when to avoid it. The main reasons to use it are:

  • To manage a number of tasks whose intermingling use the computer more efficiently (including the ability to transparently distribute the tasks across multiple CPUs).
  • To allow better code organization.
  • To be more convenient for the user.
The classic example of resource balancing is to use the CPU during I/O waits. The classic example of user convenience is to monitor a “stop” button during long downloads.

An additional advantage to threads is that they provide “light” execution context switches (on the order of 100 instructions) rather than “heavy” process context switches (thousands of instructions). Since all threads in a given process share the same memory space, a light context switch changes only program execution and local variables. A process change—the heavy context switch—must exchange the full memory space.

The main drawbacks to multithreading are:

  • Slowdown occurs while waiting for shared resources.
  • Additional CPU overhead is required to manage threads.
  • Unrewarded complexity arises from poor design decisions.
  • Opportunities are created for pathologies such as starving, racing, deadlock, and livelock.
  • Inconsistencies occur across platforms. When developing the original material (in Java) for this chapter, we discovered race conditions that quickly appeared on some computers but wouldn't appear on others. The C++ examples in this chapter behaved differently (but usually acceptably) under different operating systems. If you develop a program on a computer and things seem to work right, you might get an unwelcome surprise when you distribute it.
One of the biggest difficulties with threads occurs because more than one thread might be sharing a resource—such as the memory in an object—and you must make sure that multiple threads don't try to read and change that resource at the same time. This requires judicious use of synchronization tools, which must be thoroughly understood because they can quietly introduce deadlock situations.

In addition, there's a certain art to the application of threads. C++ is designed to allow you to create as many objects as you need to solve your problem—at least in theory. (Creating millions of objects for an engineering finite-element analysis, for example, might not be practical.) However, there is usually an upper bound to the number of threads you'll want to create, because at some number, threads may become balky. This critical point can be difficult to detect and will often depend on the OS and thread library; it could be fewer than a hundred or in the thousands. As you often create only a handful of threads to solve a problem, this is typically not much of a limit; but in a more general design it becomes a constraint.

Regardless of how simple threading can seem using a particular language or library, consider it a black art. There's always something you haven't considered that can bite you when you least expect it. (For example, note that because the dining philosophers problem can be adjusted so that deadlock rarely happens, you can get the impression that everything is OK.) An appropriate quote comes from Guido van Rossum, creator of the Python programming language:

In any project that is multithreaded, most bugs will come from threading issues. This is regardless of programming language—it's a deep, as yet un-understood property of threads.

For more advanced discussions of threading, see Parallel and Distributed Programming Using C++, by Cameron Hughes and Tracey Hughes, Addison Wesley 2004.


3.4.10. Exercises

Solutions to selected exercises can be found in the electronic document The Thinking in C++ Volume 2 Annotated Solution Guide, available for a small fee from www.MindView.net.

  1. Inherit a class from Runnable and override the run( ) function. Inside run( ), print a message, and then call sleep( ). Repeat this three times, and then return from run( ). Put a start-up message in the constructor and a shut-down message when the task terminates. Make several thread objects of this type, and run them to see what happens.
  2. Modify BasicThreads.cpp to make LiftOff threads start other LiftOff threads.
  3. Modify ResponsiveUI.cpp to eliminate any possible race conditions. (Assume bool operations are not atomic.)
  4. In Incrementer.cpp, modify the Count class to use a single int instead of an array of int. Explain the resulting behavior.
  5. In EvenChecker.h, correct the potential problem in the Generator class. (Assume bool operations are not atomic.)
  6. Modify EvenGenerator.cpp to use interrupt( ) instead of quit flags.
  7. In MutexEvenGenerator.cpp, change the code in MutexEvenGenerator::nextValue( ) so that the return expression precedes the release( ) statement and explain what happens.
  8. Modify ResponsiveUI.cpp to use interrupt( ) instead of the quitFlag approach.
  9. Look up the Singleton documentation in the ZThreads library. Modify OrnamentalGarden.cpp so that the Display object is controlled by a Singleton to prevent more than one Display from being accidentally created.
  10. In OrnamentalGarden.cpp, change the Count::increment( ) function so that it does a direct increment of count (that is, it just does a count++). Now remove the guard and see if that causes a failure. Is this safe and reliable?
  11. Modify OrnamentalGarden.cpp so that it uses interrupt( ) instead of the pause( ) mechanism. Make sure that your solution doesn't prematurely destroy objects.
  12. Modify WaxOMatic.cpp by adding more instances of the Process class so that it applies and polishes three coats of wax instead of just one.
  13. Create two Runnable subclasses, one with a run( ) that starts and calls wait( ). The other class's run( ) should capture the reference of the first Runnable object. Its run( ) should call signal( ) for the first thread after some number of seconds have passed so that first thread can print a message.
  14. Create an example of a “busy wait.” One thread sleeps for awhile and then sets a flag to true. The second thread watches that flag inside a while loop (this is the “busy wait”) and, when the flag becomes true, sets it back to false and reports the change to the console. Note how much wasted time the program spends inside the “busy wait,” and create a second version of the program that uses wait( ) instead of the “busy wait.” Extra: run a profiler to show the time used by the CPU in each case.
  15. Modify TQueue.h to add a maximum allowable element count. If the count is reached, further writes should be blocked until the count drops below the maximum. Write code to test this behavior.
  16. Modify ToastOMaticMarkII.cpp to create peanut-butter and jelly on toast sandwiches using two separate assembly lines and an output TQueue for the finished sandwiches. Use a Reporter object as in CarBuilder.cpp to display the results.
  17. Rewrite C07:BankTeller.cpp to use real threading instead of simulated threading.
  18. Modify CarBuilder.cpp to give identifiers to the robots, and add more instances of the different kinds of robots. Note whether all robots get utilized.
  19. Modify CarBuilder.cpp to add another stage to the car-building process, whereby you add the exhaust system, body, and fenders. As with the first stage, assume these processes can be performed simultaneously by robots.
  20. Modify CarBuilder.cpp so that Car has synchronized access to all the bool variables. Because Mutexes cannot be copied, this will require significant changes throughout the program.
  21. Using the approach in CarBuilder.cpp, model the house-building story that was given in this chapter.
  22. Create a Timer class with two options: (1) a one-shot timer that only goes off once (2) a timer that goes off at regular intervals. Use this class with C10:MulticastCommand.cpp to move the calls to TaskRunner::run( ) from the procedures into the timer.
  23. Change both of the dining philosophers examples so that the number of Philosophers is controlled on the command line, in addition to the ponder time. Try different values and explain the results.
  24. Change DiningPhilosophers.cpp so that the Philosophers just pick the next available chopstick. (When a Philosopher is done with their chopsticks, they drop them into a bin. When a Philosopher wants to eat, they take the next two available chopsticks from the bin.) Does this eliminate the possibility of deadlock? Can you reintroduce deadlock by simply reducing the number of available chopsticks?
 
(148) This is true when the system uses time slicing (Windows, for example). Solaris uses a FIFO concurrency model: unless a higher priority thread is awakened the current thread runs until it blocks or terminates. That means that other threads with the same priority don't run until the current one gives up the processor.
(149) Assuming you've designed it for multiple CPUs. Otherwise, code that seems to work fine on a time–sliced single processor system can fail when moved to multiple–CPU system, since the additional CPUs can reveal problems that a one–CPU system does not.
(150)  Much of this chapter began as a translation from the Concurrency chapter in Thinking in Java, 3rd edition, Prentice Hall 2003, although it has changed very significantly in the process.
(151) This can be significant. Usually only a small part of a function needs to be guarded. Putting the guard at the function entry point can often make the critical section longer than it needs to be.
(152) This is an oversimplification. Sometimes even when it seems like an atomic operation should be safe, it may not be, so you must be very careful when deciding that you can get away without synchronization. Removing synchronization is often a sign of premature optimization—things that can cause you a lot of trouble without gaining much. Or anything.
(153) Atomicity isn't the only issue. On multiprocessor systems visibility is much more of an issue than on single processor systems. Changes made by one thread, even if they're atomic in the sense of not being interruptible, might not be visible to other threads (the changes might be temporarily stored in a local processor cache, for example), so different threads will have a different view of the application's state. The synchronization mechanism forces changes by one thread on a multiprocessor system to be visible across the application, whereas without synchronization it's indeterminate when changes become visible.
(154) However, exceptions are never delivered asynchronously in ZThreads. Thus, there is no danger of something aborting mid–instruction/function call. And as long as you use the Guard template to acquire mutexes, the mutexes will be automatically released if an exception is thrown.
(155) Actually, sleep( ) only provides a minimum delay, not a guaranteed delay, so it's possible (although improbable) that the sleep(1100) will wake up before the sleep(1000).
(156) There is nothing in the C++ Standard that says that interruptions can't occur during IO operations. However, most implementations don't support it.
(157) Note that, although it's unlikely, the call to t.interrupt( ) could actually happen before the call to blocked.f( ).
(158) This is in contrast to Java, where you must hold the lock in order to call notify( ) (Java's version of signal( )). Although Posix threads, on which the ZThread library is loosely based, do not require that you hold the lock in order to call signal( ) or broadcast( ), it is often recommended.
(159) On some platforms there's a third way to come out of a wait( ), the so–called spurious wakeup. A spurious wakeup essentially means that a thread may prematurely stop blocking (while waiting on a condition variable or semaphore) without being prompted by a signal( ) or broadcast( ). The thread just wakes up, seemingly by itself. Spurious wakeups exist because implementing POSIX threads, or the equivalent, isn't always as straightforward as it should be on some platforms. By allowing spurious wakeups the job of building a library like pthreads is easier for those platforms. Spurious wakeups do not occur in ZThreads, because the library compensates for and hides these issues from the user.
(160) Note that if the readers stop for some reason, the writers will keep on writing until the system runs out of memory. If this is an issue with your program you can add a maximum allowable element count, and writers should then block if the queue is full.
(161) In particular, refer to the earlier footnote in this chapter on multiprocessors and visibility.
(162) At the time of this writing, Cygwin (www.cygwin.com) was undergoing changes and improvements to its threading support, but we were still unable to observe deadlocking behavior with this program under the available version of Cygwin. The program deadlocked quickly under, for example, Linux.
Ce document est issu de http://www.developpez.com et reste la propriété exclusive de son auteur. La copie, modification et/ou distribution par quelque moyen que ce soit est soumise à l'obtention préalable de l'autorisation de l'auteur.