互斥体
概览
- 互斥体是"相互排斥“("mutual exclusion“)的缩写。互斥变量是多个写操作发生时执行线程同步和保护共享数据的主要手段之一.
- 一个互斥变量就像保护对共享的数据资源访问的“锁”。作为 Pthreads 互斥锁的基本概念是,在任何给定的时间只有一个线程可以锁定 (或自己的) 互斥变量。因此,即使多个线程试图锁定互斥对象也只有一个线程会成功。直到拥有线程解锁该互斥体前,没有其他线程可以占有该互斥体。线程必须"轮流"访问受保护的数据.
- 互斥体用来防止条件“竞争”。Mutexes can be used to prevent “race” conditions. 涉及银行交易记录的条件竞争示例如下:
Thread 1 Thread 2 Balance Read balance: $1000 $1000 Read balance: $1000 $1000 Deposit $200 $1000 Deposit $200 $1000 Update balance $1000+$200 $1200 Update balance $1000+$200 $1200 - 在上述示例中,当一个线程使用"Balance" 这个共享数据资源时应该使用互斥体锁住它.
- 很多时候由拥有互斥体的线程执行更新全局变量的操作。这是确保多个线程更新相同的变量的安全方式,最终的值就跟只有一个线程执行更新的结果一只。要更新的变量属于某个“临界区”( “critical section” ).
- 使用互斥体的典型流程如下:
- 创建和初始化一个互斥变量
- 多个线程试图锁定这个互斥体
- 只有一个线程能成功锁定并拥有此互斥体
- 所有者线程执行一组操作
- 所有者线程解锁互斥体
- 另一个线程获取互斥体,重复上述过程
- 最终,销毁互斥体
- 当多个线程竞争一个互斥体时,失败的线程将会在调用中阻塞(可以使用“trylock”来避免阻塞).
- 当保护共享的数据时,程序员责任是确保每个需要使用互斥体的线程都这样做。例如,如果 4 个线程正在更新相同的数据,但只有一个使用互斥锁,数据仍然可能会损坏.
创建和销毁互斥体
例程:-
pthread_mutex_init (mutex,attr) pthread_mutex_destroy (mutex)
pthread_mutexattr_init (attr)
pthread_mutexattr_destroy (attr)
使用:
- 使用前,必须以pthread_mutex_t类型声明并初始化mutex变量. 有两种方式来初始化mutex变量:
- 静态的, 在其声明时完成. 例如:
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER; - 动态地,使用 pthread_mutex_init() 完成. 这种方法允许设置mutex对象属性.
mutex初始化时为未加锁状态
- attr 对象用于建立一个互斥对象的属性,如果使用,类型必须为 pthread_mutexattr_t(可以指定为 NULL,以接受默认值)。Pthreads 标准定义了三个可选的互斥锁属性:
- Protocol: 指定用来防止一个互斥锁的优先级反转的协议.
- Prioceiling: 指定互斥锁的优先级上限.
- Process-shared: 互斥体可以进程间共享.
注意:并非所有实现都提供了上述三种可选属性.
- pthread_mutexattr_init() 和 pthread_mutexattr_destroy() 接口分别用于互斥体属性的初始化和销毁.
- 当mutex对象不再使用时,必须使用pthread_mutex_destroy() 来销毁.
锁定和解锁互斥体
例程:-
pthread_mutex_lock (mutex) pthread_mutex_trylock (mutex)
pthread_mutex_unlock (mutex)
使用:
- 线程使用pthread_mutex_lock()在指定mutex变量上获得锁。如果mutex已经被其他线程锁定,调用会阻塞等待该互斥体解锁.
- pthread_mutex_trylock() 会试图锁定互斥体。但是,如果互斥体已经被锁定,调用会以“busy”错误码马上返回. 该方法有助于防止死锁,比如优先级反转情况
- pthread_mutex_unlock() 被所有者线程调用用于解锁mutex. 当某个线程已经用mutex完成了对保护数据的访问,而其他线程也需要对保护数据访问时,要调用此接口来释放mutex使用权. 下述情况会返回错误:
- 如果mutex已经被解锁
- 如果mutex被其他线程占有
- 关于互斥体没有什么“神奇的”事,事实上,他们就像参与线程间的“君子协定”. 代码的编写者要确保需要mutex的线程都要正确调用锁定与解锁. 下面的场景说明了一种逻辑错误:
Thread 1 Thread 2 Thread 3 Lock Lock A = 2 A = A+1 A = A*B Unlock Unlock
Question: 当多个线程都在等待一个锁定的互斥体时,哪个线程在互斥体被释放后第一个获得它? 如果没有使用pthreads的优先级调度,剩下的工作就留给了操作系统,其表现多少有些随机性 |
Example: Using Mutexes
- 示例程序揭示了在一个打点线程中使用互斥体变量.
- 将主数据设置为全局作用于的结构体,所有线程都可以访问.
- 每个线程在数据的不同部分工作.
- 主线程等待所有的线程完成计算,随后打印结果.
#include <stdio.h>
#include <stdlib.h>/*
The following structure contains the necessary information
to allow the function "dotprod" to access its input data and
place its output so that it can be accessed later.
*/typedef struct
{double *a;double *b;double sum;int veclen;
} DOTDATA;#define VECLEN 100000
DOTDATA dotstr;/*
We will use a function (dotprod) to perform the scalar product.
All input to this routine is obtained through a structure of
type DOTDATA and all output from this function is written into
this same structure. While this is unnecessarily restrictive
for a sequential program, it will turn out to be useful when
we modify the program to compute in parallel.
*/void dotprod()
{/* Define and use local variables for convenience */int start, end, i;double mysum, *x, *y;start = 0;end = dotstr.veclen;x = dotstr.a;y = dotstr.b;/*
Perform the dot product and assign result
to the appropriate variable in the structure.
*/mysum = 0;for (i = start; i < end; i++){mysum += (x[i] * y[i]);}dotstr.sum = mysum;}/*
The main program initializes data and calls the dotprd() function.
Finally, it prints the result.
*/int main(int argc, char *argv[])
{int i, len;double *a, *b;/* Assign storage and initialize values */len = VECLEN;a = (double *)malloc(len * sizeof(double));b = (double *)malloc(len * sizeof(double));for (i = 0; i < len; i++){a[i] = 1;b[i] = a[i];}dotstr.veclen = len;dotstr.a = a;dotstr.b = b;dotstr.sum = 0;/* Perform the dotproduct */dotprod();/* Print result and release storage */printf("Sum = %f \n", dotstr.sum);free(a);free(b);
}
mutex版本:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>/*
The following structure contains the necessary information
to allow the function "dotprod" to access its input data and
place its output into the structure. This structure is
unchanged from the sequential version.
*/typedef struct
{double *a;double *b;double sum;int veclen;
} DOTDATA;/* Define globally accessible variables and a mutex */#define NUMTHRDS 4
#define VECLEN 100000
DOTDATA dotstr;
pthread_t callThd[NUMTHRDS];
pthread_mutex_t mutexsum;/*
The function dotprod is activated when the thread is created.
As before, all input to this routine is obtained from a structure
of type DOTDATA and all output from this function is written into
this structure. The benefit of this approach is apparent for the
multi-threaded program: when a thread is created we pass a single
argument to the activated function - typically this argument
is a thread number. All the other information required by the
function is accessed from the globally accessible structure.
*/void* dotprod(void *arg)
{/* Define and use local variables for convenience */int i, start, end, len;long offset;double mysum, *x, *y;offset = (long)arg;len = dotstr.veclen;start = offset * len;end = start + len;x = dotstr.a;y = dotstr.b;/*
Perform the dot product and assign result
to the appropriate variable in the structure.
*/mysum = 0;for (i = start; i < end; i++){mysum += (x[i] * y[i]);}/*
Lock a mutex prior to updating the value in the shared
structure, and unlock it upon updating.
*/pthread_mutex_lock(&mutexsum);dotstr.sum += mysum;printf("Thread %ld did %d to %d: mysum=%f global sum=%f\n", offset, start, end, mysum, dotstr.sum);pthread_mutex_unlock(&mutexsum);pthread_exit((void *)0);
}/*
The main program creates threads which do all the work and then
print out result upon completion. Before creating the threads,
The input data is created. Since all threads update a shared structure, we
need a mutex for mutual exclusion. The main thread needs to wait for
all threads to complete, it waits for each one of the threads. We specify
a thread attribute value that allow the main thread to join with the
threads it creates. Note also that we free up handles when they are
no longer needed.
*/int main(int argc, char *argv[])
{long i;double *a, *b;void *status;pthread_attr_t attr;/* Assign storage and initialize values */a = (double *)malloc(NUMTHRDS * VECLEN * sizeof(double));b = (double *)malloc(NUMTHRDS * VECLEN * sizeof(double));for (i = 0; i < VECLEN * NUMTHRDS; i++){a[i] = 1;b[i] = a[i];}dotstr.veclen = VECLEN;dotstr.a = a;dotstr.b = b;dotstr.sum = 0;pthread_mutex_init(&mutexsum, NULL);/* Create threads to perform the dotproduct */pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);for (i = 0; i < NUMTHRDS; i++){/* Each thread works on a different set of data.* The offset is specified by 'i'. The size of* the data for each thread is indicated by VECLEN.*/pthread_create(&callThd[i], &attr, dotprod, (void *)i);}pthread_attr_destroy(&attr);
/* Wait on the other threads */for (i = 0; i < NUMTHRDS; i++){pthread_join(callThd[i], &status);}
/* After joining, print out the results and cleanup */printf("Sum = %f \n", dotstr.sum);free(a);free(b);pthread_mutex_destroy(&mutexsum);pthread_exit(NULL);
}
条件变量
概览
- 条件变量提供线程同步的另一途径。互斥锁通过控制线程对数据的访问实现同步,条件变量允许线程根据数据的实际值同步.
- 没有条件变量,程序员需要让线程不断地轮询 (可能在一个临界区),以检查是否满足条件。这非常消耗资源,因为线程在这种活动中会一直处于繁忙状态。条件变量是不使用轮询而实现同一目标的方式.
- 条件变量始终和互斥锁一起使用.
- 如下展示了使用条件变量的步骤.
Main Thread - 声明和初始化需要同步的全局数据/变量 (如 "count")
- 声明和初始化条件变量对象
- 声明和初始化关联的mutex
- 创建工作线程 A 和 B
Thread A - 运行到需要特定条件发生的点(比如“count” 必须到某个值)
- 锁定互斥锁,检查全局变量的值
- 调用 pthread_cond_wait() 执行阻塞等待 从Thread-B发出的信号. 注意,调用 pthread_cond_wait() 会自动原子地对互斥体解锁,这样互斥体也能被Thread-B使用.
- 接收到信号后醒来. Mutex会被自动原子锁定
- 显式解锁互斥体
- 继续
Thread B - 处理(Do work)
- 锁定关联的mutex
- 改变Thread-A等待的全局变量的值.
- 检查Thread-A等待的变量的值,如果满足条件,就通知Thread-A.
- 解锁mutex.
- 继续
Main Thread - 合并 / 继续
创建和销毁条件变量
Routines:-
pthread_cond_init (condition,attr) pthread_cond_destroy (condition)
pthread_condattr_init (attr)
pthread_condattr_destroy (attr)
Usage:
- 条件变量必须以pthread_cond_t类型声明, 在使用前必须初始化.有两种方式来初始化条件变量:
- 静态的(在它声明时). 例如:
pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER; - 动态的, 使用 pthread_cond_init() 接口. 创建的条件变量的ID通过条件参数传递给调用线程. 这种方法允许设置条件变量对象的属性.
- 可选的 attr 对象用来设置条件变量属性。对于条件变量,只定义了一个属性︰ 进程共享的,它允许条件变量会被其它进程中线程访问。属性对象,如果使用,必须是类型 pthread_condattr_t (可被指定为为 NULL,以接受默认值)。.
注意,并非所有的实现都提供了进程共享属性.
- pthread_condattr_init() 和 pthread_condattr_destroy() 接口用来创建和销毁条件变量属性对象.
- 当条件变量不再使用时,必须调用pthread_cond_destroy() 来销毁它.
等待以及发信号给条件变量
Routines:-
pthread_cond_wait (condition,mutex) pthread_cond_signal (condition)
pthread_cond_broadcast (condition)
Usage:
- pthread_cond_wait() 阻塞调用线程直到触发特定条件。调用时要锁定mutex,在等待时,它会自动解锁mutex。接收到信号并唤醒线程后,会自动为调用线程锁定mutex 。程序员应该在线程不在使用mutex时解锁它.
Recommendation: 使用WHILE 循环而不是IF语句来检查等待的条件有助于处理一些潜在的问题, 比如:
- 如果多个线程等待同一个唤醒信号,他们会依次请求mutex,并且他们中得一个可能会修改大家都在等待的条件.
- 如果线程由于程序BUG而错误接收到信号
- Pthreads库允许向等待的线程发出虚假唤醒而不违反此标准.
- pthread_cond_signal() 常用来通知(唤醒)另一个正在等待此条件变量的线程。调用它之前要锁定mutex,过后要解锁,以便 能进行pthread_cond_wait()..
- 如果有多个线程处于阻塞等待状态,应该使用pthread_cond_broadcast() 而不是pthread_cond_signal()..
- 调用pthread_cond_wait()之前调用pthread_cond_signal() 是逻辑错误.
正确地锁定以及解锁关联的互斥变量至关重要. 例如:
|
Example: Using Condition Variables
- 下面简单的示例代码展示了如何使用条件变量.
- 主函数创建了三个线程.
- 其中两个线程执行任务并更新 "count" 变量.
- 第三个线程等待,直到 count 变量达到了特定的值.
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>#define NUM_THREADS 3
#define TCOUNT 10
#define COUNT_LIMIT 12int count = 0;
pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;void* inc_count(void *t)
{int i;long my_id = (long)t;for (i = 0; i < TCOUNT; i++){pthread_mutex_lock(&count_mutex);count++;/* Check the value of count and signal waiting thread when condition isreached. Note that this occurs while mutex is locked. */if (count == COUNT_LIMIT){printf("inc_count(): thread %ld, count = %d Threshold reached. ",my_id, count);pthread_cond_signal(&count_threshold_cv);printf("Just sent signal.\n");}printf("inc_count(): thread %ld, count = %d, unlocking mutex\n",my_id, count);pthread_mutex_unlock(&count_mutex);/* Do some work so threads can alternate on mutex lock */sleep(1);}pthread_exit(NULL);
}void* watch_count(void *t)
{long my_id = (long)t;printf("Starting watch_count(): thread %ld\n", my_id);/*Lock mutex and wait for signal. Note that the pthread_cond_wait routinewill automatically and atomically unlock mutex while it waits. Also, note that if COUNT_LIMIT is reached before this routine is run bythe waiting thread, the loop will be skipped to prevent pthread_cond_waitfrom never returning.*/pthread_mutex_lock(&count_mutex);while (count < COUNT_LIMIT){printf("watch_count(): thread %ld Count= %d. Going into wait...\n", my_id, count);pthread_cond_wait(&count_threshold_cv, &count_mutex);printf("watch_count(): thread %ld Condition signal received. Count= %d\n", my_id, count);printf("watch_count(): thread %ld Updating the value of count...\n", my_id, count);count += 125;printf("watch_count(): thread %ld count now = %d.\n", my_id, count);}printf("watch_count(): thread %ld Unlocking mutex.\n", my_id);pthread_mutex_unlock(&count_mutex);pthread_exit(NULL);
}int main(int argc, char *argv[])
{int i, rc;long t1 = 1, t2 = 2, t3 = 3;pthread_t threads[3];pthread_attr_t attr;/* Initialize mutex and condition variable objects */pthread_mutex_init(&count_mutex, NULL);pthread_cond_init(&count_threshold_cv, NULL);/* For portability, explicitly create threads in a joinable state */pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);pthread_create(&threads[0], &attr, watch_count, (void *)t1);pthread_create(&threads[1], &attr, inc_count, (void *)t2);pthread_create(&threads[2], &attr, inc_count, (void *)t3);/* Wait for all threads to complete */for (i = 0; i < NUM_THREADS; i++){pthread_join(threads[i], NULL);}printf("Main(): Waited and joined with %d threads. Final value of count = %d. Done.\n",NUM_THREADS, count);/* Clean up and exit */pthread_attr_destroy(&attr);pthread_mutex_destroy(&count_mutex);pthread_cond_destroy(&count_threshold_cv);pthread_exit(NULL);}
输出:
Starting watch_count(): thread 1
inc_count(): thread 2, count = 1, unlocking mutex
inc_count(): thread 3, count = 2, unlocking mutex
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count = 3, unlocking mutex
inc_count(): thread 2, count = 4, unlocking mutex
inc_count(): thread 3, count = 5, unlocking mutex
inc_count(): thread 2, count = 6, unlocking mutex
inc_count(): thread 3, count = 7, unlocking mutex
inc_count(): thread 2, count = 8, unlocking mutex
inc_count(): thread 3, count = 9, unlocking mutex
inc_count(): thread 2, count = 10, unlocking mutex
inc_count(): thread 3, count = 11, unlocking mutex
inc_count(): thread 2, count = 12 Threshold reached. Just sent signal.
inc_count(): thread 2, count = 12, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now = 137.
inc_count(): thread 3, count = 138, unlocking mutex
inc_count(): thread 2, count = 139, unlocking mutex
inc_count(): thread 3, count = 140, unlocking mutex
inc_count(): thread 2, count = 141, unlocking mutex
inc_count(): thread 3, count = 142, unlocking mutex
inc_count(): thread 2, count = 143, unlocking mutex
inc_count(): thread 3, count = 144, unlocking mutex
inc_count(): thread 2, count = 145, unlocking mutex
Main(): Waited on 3 threads. Final value of count = 145. Done.