SE205: Lab on Design Patterns for Concurrent Systems in POSIX

Laurent Pautet (pautet@telecom-paristech.fr)

Index


1 Lab on Design Patterns for Concurrent Systems in POSIX

Objectives

This lab aims to discover the asynchronous execution model proposed by Java and to implement it in C. The model consists in defining objects of the Runnable or Callable type and having them executed by a bounded list of threads under the management of an ExecutorService scheduler. This offers different execution semantics and different resource configurations. You can find its specification under ExecutorService.

This lab does not cover all services by ExecutorService. It illustrates how some of them work. Course material on tasks can be found here. You can view the full documentation of POSIX functions related to threads by following this link.

Sources

You will find all the sources in this compressed archive. Several scenario files are provided to verify your solutions. In addition, an implementation in Java using ExecutorService will allow you to compare the expected result of your implementation in C / POSIX.

You must reuse the protected circular buffer implemented in a previous lab. You must have done at least the first 4 questions of this previous lab. That is to say the implementation with the conditional variables for the blocking, non-blocking and timed semantics.

To decompress, use GNU tar:

tar zxf src.tar.gz

How to submit your work

To send your work to your teacher, you will build an archive compressing a directory with your name (<Firstname.Lastname>) and containing only the files .c and .h.

How To Debug

To find your errors, we strongly recommend that you use gdb. It is critical to debug your C programs using gdb and not filling your program with printf. If you have a memory problem (SIGSEGV, ...), do:

gdb ./main_executor
(gdb) run test-20.txt

In case of a problem, the program will stop on the incorrect memory access. To understand the issue, use gdb commands:

MacOS

For MacOS users, it will be necessary to make your gdb operational and MacOS does not facilitate the task. You will find the procedure by following this link. If this link is not sufficient, there are many guides to solve this problem.


1.1 Overview of the system architecture

The main program is located in main_executor.c.

The main code consists, after initialization of internal structures of the program, in reading a scenario file passed on the command line and creating as many jobs (job_t) as indicated in the scenario using the job_table_size variable.

Each job is described by a job_t structure, described in scenario.h, containing an index id on its position in the jobs table and an exec_time execution time (or job computation time).

// scenario.h

typedef struct {
  int    id;
  long   exec_time;
} job_t;

Executing a job consists in executing the main procedure main_job, described at the beginning of main_executor.c, which itself consists in signaling its start (see message "initiate"), in simulating a job whose duration is passed as a parameter, the exec_time attribute of the job_t structure, and finally to signal its termination (see message “complete").

If you progress through main_executor.c, an executor is created by specifying configuration parameters.

// executor.h

typedef struct _executor_t {
  thread_pool_t      * thread_pool;
  long                 keep_alive_time;
  protected_buffer_t * futures;
} executor_t;

executor is responsible for processing the work submitted to it. After creating the executor, the program creates as many callables as there are jobs and submits them to the executor using the submit_callable procedure. For each callable submitted, submit_callable returns a future. Subsequently, the program will collect the results from futures by get_callable_result, possibly blocking if the results are not yet available.

After creating the executor, the program populates the table of callables and futures using the job table. The callable_t structure, described in executor.h, has a pointer to the code of the main procedure to execute (main attribute) and the job characteristics (params attribute). The other attributes of callable_t will be explained later.

// executor.h

typedef void * (*main_func_t)(void *);

typedef struct {
  void       * params;
  main_func_t  main;
  long         period;
} callable_t;

Each callable is associated with a future. Thus, the future_t structure refers to the callable with which it is associated (callable attribute) as well as the result (result attribute) that it produces once it is in the completed state (completed attribute).

// executor.h

typedef struct {
  int             completed;
  callable_t    * callable;
  void          * result;
} future_t;

1.2 Implementation of a basic threads manager

To return to the system overview, we can note that in the executor.c file the submit_callable function requests the creation of a thread of the thread manager thread_pool, by calling pool_thread_create.

We will implement a first version of the thread manager in the file thread_pool.c and thread_pool.h. We briefly recall the principle of the principle of the thread manager. A thread manager (thread pool) maintains several threads waiting for execution requests. As shown in the the structure below, we do not maintain the list of allocated threads, but only their number and the main parameters core_pool_size and max_pool_size.

// thread_pool.h

typedef struct {
  int             core_pool_size;
  int             max_pool_size;
  int             size;
  int             shutdown;
} thread_pool_t;

Since the thread_pool structure is concurrently accessible, it must be be protected against concurrent access by adding a synchronization object as an attribute. This synchronization object must also be initialized in thread_pool_init.

For now, we are only interested in the case where the number of threads created is less than core_pool_size. You will need to complete code of create_pool_thread so that it creates a thread using the parameters passed to it, then updates the attributes of the thread_pool_t structure.

Having partially completed code of thread_pool_create, we can use the test-20 txt scenario to check the implementation. An example of expected is given below. Messages about callables are not important, since at the moment their results are not correctly processed. On the other hand, the tasks must be completed in in ascending order of execution time (1000, 3000, 4000, 7000) while they are started in a different order (1000, 7000, 3000, 4000). In fact, as core_pool_size equal to 4, 4 threads must be created and each of them runs a job in parallel with the others. Thus, the jobs always start at time 0 and end at their scheduled response time. Note that 4 core threads are created are created and that these 4 threads terminate.

core_pool_size = 4
max_pool_size = 4
blocking_queue_size = 4
keep_alive_time = -1
000 [main 00] thread created
000 [main 00] submit callable of id 0
000 [core 01] initiate execution=1000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 1
000 [core 02] initiate execution=7000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 2
000 [core 03] initiate execution=3000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 3
000 [main 00] get callable result of id 0
000 [main 00] get callable result of id 1
000 [main 00] get callable result of id 2
000 [main 00] get callable result of id 3
000 [core 04] initiate execution=4000 period=0
001 [core 01] complete execution=1000 period=0
001 [core 01] terminated
003 [core 03] complete execution=3000 period=0
003 [core 03] terminated
004 [core 04] complete execution=4000 period=0
004 [core 04] terminated
007 [core 02] complete execution=7000 period=0
007 [core 02] terminated
010 [main 00] executor shutdown activated
010 [main 00] pool not empty, exit process
010 [main 00] executor shutdown terminated

1.3 Storing execution results

We will now make sure to block until the result of the execution of an callable is available. To do this, we must modify the executor.c file. The result of the execution of an callable is stored in the future_t structure returned by submit_callable. The get_callable_result must block while waiting for the callable function terminates, i.e. the completed attribute of the future_t structure structure is true.

To do this, we use a mutex and a conditional variable. The first one to protect the structure against concurrent accesses, the second to wait until the result of the calculation associated with the callable is available. It is therefore necessary to perform the get_callable_result so that it blocks while waiting for the attribute completed attribute to be true.

Symmetrically, each callable is executed by the main procedure main_pool_thread of a pool thread. It will be necessary to complete code of main_pool_thread function so that, when the callable is executed the pool thread signals that the result is available by updating the completed attribute of future as well as the synchronization objects.

You can use the test-20.txt script to verify your implementation. In this scenario, the results should appear in the correct order and at the expected time. Note that callable results are retrieved in the same order in which they are order in which they are submitted to the executor. Thus, in the output shown below, the result of callable 0 submitted first is retrieved at time 1000 since its execution time is 1000 milliseconds. Then, the result of callable 1 submitted second is recovered at time 7000. On the other hand, it is only at time 7000 that the result of callable 3 can be recovered. Indeed, this one has terminated at the time 3000 but the main program retrieves the results in an order such that it has to wait to get the results of callable 2 at 7000 to be able to retrieve the one of callable 3. The same situation occurs for callable 4.

core_pool_size = 4
max_pool_size = 4
blocking_queue_size = 4
keep_alive_time = -1
000 [main 00] thread created
000 [main 00] submit callable of id 0
000 [core 01] initiate execution=1000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 1
000 [core 02] initiate execution=7000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 2
000 [core 03] initiate execution=3000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 3
000 [core 04] initiate execution=4000 period=0
001 [core 01] complete execution=1000 period=0
001 [core 01] terminated
001 [main 00] get callable result of id 0
003 [core 03] complete execution=3000 period=0
003 [core 03] terminated
004 [core 04] complete execution=4000 period=0
004 [core 04] terminated
007 [core 02] complete execution=7000 period=0
007 [core 02] terminated
007 [main 00] get callable result of id 1
007 [main 00] get callable result of id 2
007 [main 00] get callable result of id 3
017 [main 00] executor shutdown activated
017 [main 00] pool not empty, exit process
017 [main 00] executor shutdown terminated

1.4 Storing callables in a queue

For now, when a number of threads equal to core_pool_size has been created, the executor suspends the creation of threads. While the already created tasks are busy, the new callables are stored in a queue. In submit_callable, a call is made to pool_thread_create whose last parameter force is 0 (false), which indicates that we do not want to exceed the limit of core_pool_size. Thus, pool_thread_create returns 0 (false) to indicate that the limit has been reached.

// If the current thread pool size is not greater than core_pool_size,
// create a new thread. If it is and force is true, create a new
// thread as well. If a thread is created, increment the current
// thread pool size. Use main as a thread main procedure.

int pool_thread_create(thread_pool_t * thread_pool,
                       main_func_t     main,
                       void          * executor,
                       int             force);

Code of submit_callable must be completed so that new callables are stored in the executor’s future queue. No other thread is created until the queue is full.

In addition, the already created threads, after having executed their current work, must check the queue to extract a future object from it and execute the callable object it contains. Therefore, once the callable has been executed, we must complete code of main_pool_thread, so that the pool_thread extracts another future from the executor’s queue. We first consider the absence of keep_alive_time (FOREVER). The pool_thread blocks as long as no callable is present in the queue.

You can use the test-21.txt scenario to verify your implementation. In this example, there are only 2 pool threads and a queue of size 4 for 4 jobs to perform with computation time 1000ms, 7000ms, 3000ms, and 4000ms. The first thread performs the first 1000 ms job and the second the 7000 ms job. Unlike the previous cases, on the 1000 ms time, the first thread will extract from the queue the third job which lasts 3000 ms and which will therefore end on the 4000 ms time. Since the second 7000 ms job is still not completed, the thread will pull the last 4000 ms job from the queue and terminate on the 8000 ms time.

core_pool_size = 2
max_pool_size = 4
blocking_queue_size = 4
keep_alive_time = -1
000 [main 00] thread created
000 [main 00] submit callable of id 0
000 [main 00] thread created
000 [main 00] submit callable of id 1
000 [core 02] initiate execution=7000 period=0
000 [main 00] submit callable of id 2
000 [main 00] submit callable of id 3
000 [core 01] initiate execution=1000 period=0
001 [core 01] complete execution=1000 period=0
001 [core 01] initiate execution=3000 period=0
001 [main 00] get callable result of id 0
004 [core 01] complete execution=3000 period=0
004 [core 01] initiate execution=4000 period=0
007 [core 02] complete execution=7000 period=0
007 [main 00] get callable result of id 1
007 [main 00] get callable result of id 2
008 [core 01] complete execution=4000 period=0
008 [main 00] get callable result of id 3
018 [main 00] executor shutdown activated
018 [main 00] pool not empty, exit process
018 [main 00] executor shutdown terminated

1.5 Implementation of an advanced threads manager

We are now looking to complete the code of thread creation of the thread manager. This is about creating new threads when the callable queue is full. However, we should not exceed a maximum of max_pool_size of threads.

As indicated previously, the pool_thread_create function has a force parameter that allows you to force the creation of a thread if the number of threads created is greater than or equal to core_pool_size.

You must first complete code of pool_thread_create for a thread to be created when the number of threads created is greater than or equal to core_pool_size, when this number is lower than max_pool_size and when the force parameter is true.

To implement this functionality, you must also complete code of submit_callable so that once the queue is full, a pool_thread is created by a call to pool_thread_create under the circumstances described above. However, care must be taken to preserve the order of the callables. If the current callable cannot be processed, extract the first callable from the queue, insert the current callable and assign the first callable to the newly created thread.

You can use the test-22.txt scenario to verify your implementation. In this example, 4 jobs of computation time 1000ms, 7000ms, 3000ms, and 4000ms are submitted to the executor. The manager allows the immediate creation of 2 (core) pool threads since core_pool_size is equal to 2. So 2 callables (1000 ms and 7000 ms) will be processed at time 0. The third callable will be stored in the queue. However, this one being of size 1 and the queue being full with the callable of 3000 ms, the fourth callable of 4000 ms will cause the creation of a third thread, knowing that max_pool_size is equal to 4. This thread is a temporary pool thread (temp) and not a core pool thread. It is therefore necessary to check in output that the 4000 ms job is processed last and that only 3 threads are created (2 core threads and one temp thread).

core_pool_size = 2
max_pool_size = 4
blocking_queue_size = 1
keep_alive_time = -1
000 [main 00] thread created
000 [main 00] submit callable of id 0
000 [core 01] initiate execution=1000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 1
000 [main 00] submit callable of id 2
000 [core 02] initiate execution=7000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 3
000 [temp 03] initiate execution=3000 period=0
001 [core 01] complete execution=1000 period=0
001 [core 01] initiate execution=4000 period=0
001 [main 00] get callable result of id 0
003 [temp 03] complete execution=3000 period=0
005 [core 01] complete execution=4000 period=0
007 [core 02] complete execution=7000 period=0
007 [main 00] get callable result of id 1
007 [main 00] get callable result of id 2
007 [main 00] get callable result of id 3
017 [main 00] executor shutdown activated
017 [main 00] pool not empty, exit process
017 [main 00] executor shutdown terminated

1.6 Expiration of a thread after a period of inactivity

We want a thread to expire when it remains inactive for a time specified by keep_alive_time, when the number of created threads is greater than core_pool_size and when the callable queue is empty.

You must complete the code for main_pool_thread, so that once the thread has finished its current work, it waits for a relative delay of keep_alive_time for an callable to extract from the queue. If no future (hence no callable) is returned, the thread should call pool_thread_terminate, to see if it should stop because the number of threads created is greater than core_pool_size. Remember that POSIX functions take as time parameters dates (or absolute delays), and not durations (or relative delays).

It is therefore necessary to complete code of pool_thread_terminate so that the function confirms or not that the thread can be terminated, the number of threads created being greater than core_pool_size. As the number of threads created and therefore as the thread_pool structure is going to be concurrently accessed, care must be taken to protect thread_pool against concurrent access.

You can use the test-23.txt scenario to verify your implementation. In this example, 4 jobs of computation time 1000ms, 7000ms, 3000ms, and 4000ms are submitted to the executor. The manager does not allow the creation of any core thread pool since core_pool_size is 0. Since the queue is of size 1, the first callable is stored in this queue. Then 3 pool threads are created to execute the callables of 1000 ms, 7000 ms and 3000 ms. On time 1000, the thread in charge of the 1000 ms callable takes care of the 4000 ms callable and ends on time 5000. As keep_alive_time is 1000ms and core_pool_size is 0, the 3 callable threads should end at 4000ms (3000ms + 1000ms), 6000ms (1000ms + 4000ms + 1000ms) and 8000ms (7000ms + 1000ms).

core_pool_size = 0
max_pool_size = 4
blocking_queue_size = 1
keep_alive_time = 1000
000 [main 00] submit callable of id 0
000 [main 00] thread created
000 [main 00] submit callable of id 1
000 [temp 01] initiate execution=1000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 2
000 [main 00] thread created
000 [main 00] submit callable of id 3
000 [temp 02] initiate execution=7000 period=0
000 [temp 03] initiate execution=3000 period=0
001 [temp 01] complete execution=1000 period=0
001 [temp 01] initiate execution=4000 period=0
001 [main 00] get callable result of id 0
001 [main 00] get callable result of id 1
003 [temp 03] complete execution=3000 period=0
004 [temp 03] terminated
005 [temp 01] complete execution=4000 period=0
006 [temp 01] terminated
007 [temp 02] complete execution=7000 period=0
007 [main 00] get callable result of id 2
007 [main 00] get callable result of id 3
008 [temp 02] terminated
017 [main 00] executor shutdown activated
017 [main 00] executor shutdown terminated

1.7 Consistent termination of the threads manager

The executor_shutdown function should initiate an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down. It should halt the processing of tasks waiting on new callables to be submitted. This method should also wait for previously submitted tasks to terminate (some of these features are specific to our lab).

Up to now, executor_shutdown in the executor.c file does actually nothing. But when the main thread terminates, the implicit call to system call exit terminates the process and all its threads. This gives the feeling that the function is doing its job, but the program ends without taking care to shutdown active threads (core threads remain block on future queue for instance) and it does not wait for all the tasks to be terminated (shutdown output gives “pool not empty, exit process”). We will now have to deal with this problem.

In executor.c, executor_shutdown calls thread_pool_shutdown to signal the thread pool to terminate as soon as possible and to stop creating threads.

This is not sufficient because some threads can be blocked indefinitely waiting for callables or futures (core threads or temp threads using an infinite keep_alive_time). To unblock them, we must complete the code of executor_shutdown and add a shutdown future in the future queue.

In the code of main_pool_thread, when such a future is dequeued, the thread will be aware of the shutdown and will propagate the shutdown process by also adding a shutdown future to unblock another thread.

executor_shutdown should also terminate when the thread pool is empty (see wait_thread_pool_empty). In thread_pool.c, functions pool_thread_terminate and wait_thread_pool_empty should be completed. We must add to the thread_pool_t structure of thread_pool.h a synchronization object that will block any thread waiting for the termination of all active pool threads. Then we shall have to use this object in wait_thread_pool_empty and pool_thread_terminate.

You can use the test-24.txt scenario to verify your implementation. In this example, 4 jobs of computation time 1000ms, 7000ms, 3000ms, and 4000ms are submitted to the executor. A single thread processes these jobs and finishes their execution on time 15000. executor_shutdown starts on the time 25000. The thread must therefore output a message like [core 01] terminated and the message pool not empty, exit process should no longer appear.


core_pool_size = 1
max_pool_size = 4
blocking_queue_size = 1
keep_alive_time = 20000
000 [main 00] thread created
000 [main 00] submit callable of id 0
000 [main 00] submit callable of id 1
000 [core 01] initiate execution=1000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 2
000 [temp 02] initiate execution=7000 period=0
000 [main 00] thread created
000 [main 00] submit callable of id 3
000 [temp 03] initiate execution=3000 period=0
001 [core 01] complete execution=1000 period=0
001 [core 01] initiate execution=4000 period=0
001 [main 00] get callable result of id 0
003 [temp 03] complete execution=3000 period=0
005 [core 01] complete execution=4000 period=0
007 [temp 02] complete execution=7000 period=0
007 [main 00] get callable result of id 1
007 [main 00] get callable result of id 2
007 [main 00] get callable result of id 3
017 [main 00] executor shutdown activated
017 [temp 03] terminated
017 [core 01] terminated
017 [temp 02] terminated
017 [main 00] executor shutdown terminated

1.8 Implementation of periodic threads

We want to implement periodic threads according to the so-called fixed frequency policy (withFixedRate). When the period configuration variable is set to a non-zero value in milliseconds, the callable created will be considered as periodic with period period. As a periodic task, it will no longer provide results (since their execution is now infinite) and will keep executing the same callable.

We have to complete the code of main_pool_thread so that the thread has periodic behavior if the period of the callable is not zero.

You can use the test-25.txt scenario to verify your implementation. We create 4 callables whose execution is repeated every 8000 ms. As shutdown occurs after 10,000 ms, callables will only end after their second execution, at 16,000 ms.

core_pool_size = 2
max_pool_size = 4
blocking_queue_size = 0
keep_alive_time = 5000
000 [main 00] thread created
000 [main 00] submit callable of id 0
000 [core 01] initiate execution=1000 period=8000
000 [main 00] thread created
000 [main 00] submit callable of id 1
000 [core 02] initiate execution=7000 period=8000
000 [main 00] thread created
000 [main 00] submit callable of id 2
000 [temp 03] initiate execution=3000 period=8000
000 [main 00] thread created
000 [main 00] submit callable of id 3
000 [temp 04] initiate execution=4000 period=8000
001 [core 01] complete execution=1000 period=8000
003 [temp 03] complete execution=3000 period=8000
004 [temp 04] complete execution=4000 period=8000
007 [core 02] complete execution=7000 period=8000
008 [temp 03] initiate execution=3000 period=8000
008 [temp 04] initiate execution=4000 period=8000
008 [core 01] initiate execution=1000 period=8000
008 [core 02] initiate execution=7000 period=8000
009 [core 01] complete execution=1000 period=8000
010 [main 00] executor shutdown activated
011 [temp 03] complete execution=3000 period=8000
012 [temp 04] complete execution=4000 period=8000
015 [core 02] complete execution=7000 period=8000
016 [core 02] terminated
016 [core 01] terminated
016 [temp 03] terminated
016 [temp 04] terminated
016 [main 00] executor shutdown terminated