1. Solairs下的多线程
线程分成两种,一种是POSIX格式的(使用 pthread.h),一种是Solairs格式的(thread.h),建议使用 POSIX格式。
一般使用2个函数,更多函数内容可以参考:http://baike.baidu.com/view/974776.htm
int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine)(void*), void *restrict arg);
用于建立线程
int pthread_detach(pthread_t thread);
用于设定线程在线程主程序终止以后,自动停止并释放相关资源。
2. 线程运行参数的范围问题
int thread_arg; int main( int argc, char **argv){ pthread_t thread;thread_arg = 1;result = pthread_create (&thread, (const pthread_attr_t *) NULL, sock_thread_fun, /** 线程主程序 */(void *) &thread_arg); /** 传送进入的主程序的参数,需要传送全局变量的指针 */if (result != 0){fprintf (stderr, "Pthread_create failed (no = %d).", result);elsepthread_detach (thread); }static void * sock_thread_fun (void * arg) { int conn_fd;conn_fd = * ((int *) arg);DO_SOMTHING......return (void *) NULL; }
上面是一个简单的线程创建示例。
这里需要注意:传送给线程的参数,他不像调用函数传送参数,可以使用上一级函数内声明参数。线程建立传送的运行参数,一定要使用全局参数。
3. 多线程运行参数传送
上面的程序在传送参数的时候,若只建立一个线程没有问题,若循环建立多个线程,就会出现问题。
线程的建立过程实际上是异步的过程,若多个线程要读取同一个需要变化的全局变量,就可能出现线程读取的数据是已经被变更的数据。
int thread_arg; int main( int argc, char **argv){ pthread_t thread; int i;for (i = 0; i < 10; i++) {thread_arg = i; result = pthread_create (&thread, (const pthread_attr_t *) NULL, sock_thread_fun, /** 线程主程序 */void *) &thread_arg); /** 传送进入的主程序的参数,需要传送全局变量的指针 */if (result != 0){fprintf (stderr, "Pthread_create failed (no = %d).", result);elsepthread_detach (thread);} }static void * sock_thread_fun (void * arg) { int conn_fd;conn_fd = * ((int *) arg); DO_SOMTHING......return (void *) NULL; }
按照上面的例子,就可能出现,conn_fd 被赋值错误的问题。
例如:第三次调用线程,可能 conn_fd 获得的数值是 1,而不是应该的 2。
利用POSIX线程中的 KEY 数据可以让每个线程获取独立的数据区域,但是操作起来过于繁琐,可以简单的利用双向链表和线程互斥锁的功能完成这个工作。
typedef struct np_thread_arg {struct np_thread_arg * prev;struct np_thread_arg * next;int conn_fd; } NP_THREAD_ARG;NP_THREAD_ARG first_thread_arg; pthread_mutex_t mutex_arg;int main( int argc, char **argv){ pthread_t thread; int i;pthread_mutex_init (&mutex_arg, NULL);for (i = 0; i < 10; i++) {add_thread_arg (i); result = pthread_create (&thread, (const pthread_attr_t *) NULL, sock_thread_fun, /** 线程主程序 */void *) &thread_arg); /** 传送进入的主程序的参数,需要传送全局变量的指针 */if (result != 0){fprintf (stderr, "Pthread_create failed (no = %d).", result);elsepthread_detach (thread);} }static void * sock_thread_fun (void * arg) { int conn_fd;conn_fd = * ((int *) arg);DO_SOMTHING......remove_thread_arg ((NP_THREAD_ARG *) arg); return (void *) NULL; }/** 添加一个线程的数据集 */ void add_thread_arg (int conn_fd) { NP_THREAD_ARG * tmp_node;tmp_node = (NP_THREAD_ARG *) malloc (sizeof (NP_THREAD_ARG));tmp_node->conn_fd = conn_fd;pthread_mutex_lock (&mutex_arg); /** 添加互斥锁 */if (!first_thread_arg) {first_thread_arg = tmp_node;first_thread_arg->next = tmp_node;first_thread_arg->prev = tmp_node;}else {tmp_node->next = first_thread_arg;tmp_node->prev = first_thread_arg->prev;tmp_node->next->prev = tmp_node;tmp_node->prev->next = tmp_node;}pthread_mutex_unlock (&mutex_arg); /** 解除互斥锁 */return; }/** 移除一个线程的数据集 */ void remove_thread_arg (NP_THREAD_ARG * del_node) { NP_THREAD_ARG * curNode;pthread_mutex_lock (&mutex_arg); /** 添加互斥锁 */curNode = del_node->next;curNode->prev = del_node->prev;curNode->prev->next = curNode;if (curNode == del_node) { /** 说明链表内只有一个节点 */free (del_node);curNode = NULL;}else {free (del_node);}first_thread_arg = curNode;pthread_mutex_unlock (&mutex_arg); /** 解除互斥锁 */return; }
按照上面的范例,我们通过一个简单的结构,可以完成在不同线程之间独立的传送数据进入。
若希望线程执行中有一些数据返回,可以使用 pthread_join () 把当前线程阻塞,然后将数据交给其他线程处理。
4. Mysql多线程通讯问题
我们的项目需要每个线程操作数据库。
有两个选择,进程采用一个独立的MYSQL链接,处理所有相关操作;每个线程采用单独的MYSQL链接。
考虑到数据库的压力和监控的方便性,我们选择了所有线程使用一个进程生成的MYSQL链接的方式。
这样,就在MYSQL中引起了线程安全的问题,若没有任何处理,就可能在你的程序运行过程中,数据库连接被中断掉。
Mysql error: Lost connection to MySQL server during query
这样的错误信息引起,是因为多个线程对于同一个MYSQL连接进行操作而造成的。
对于这样的问题,MYSQL的手册中有详细的相关说明:http://dev.mysql.com/doc/refman/5.1/zh/apis.html#threaded-clients
这里需要注意的2点内容:
1. 需要重新编译Mysql的客户端库,并且替换,不需要处理服务器端的库;
2. 编译程序需要使用 “-lmysqlclient_r” 的连接命令,替换常用的 “-lmysqlclient” 参数
5. 如何查看当前系统是否支持多线程安全?
一般在编译MYSQL的时候,我们都不会添加多线程安全选项。你的系统是否有,可以通过如下两点来判断。
1. 到源码编译的位置,打开头10行的 “config.status”文件,就可以看到自己的 configure 命令参数当初设定的内容,若存在 ”--enable-thread-safe-client“参数,说明有这个功能;
2. 到LIB库的位置查看相关链接库的内容;
libmysqlclient_r.so -> libmysqlclient_r.so.10.0.0
libmysqlclient_r.so.10.0.0
libmysqlclient.so -> libmysqlclient.so.10.0.0
libmysqlclient.so.10 -> libmysqlclient.so.10.0.0
libmysqlclient.so.10.0.0
若看到上面两行红色的,就说明是支持多线程客户端安全的,这里需要注意,若看到如下内容,说明是不支持的。
libmysqlclient_r.so -> libmysqlclient.so.10.0.0
libmysqlclient.so -> libmysqlclient.so.10.0.0
libmysqlclient.so.10 -> libmysqlclient.so.10.0.0
libmysqlclient.so.10.0.0
上面个两个部分的区别,请自己仔细比较
6. 使用MYSQL多线程安全
使用多线程安全中,主要是建立数据库连接,和断开数据库连接的位置需要改动。代码添加两个简单的小函数解决这个问题。
typedef struct np_thread_arg {struct np_thread_arg * prev;struct np_thread_arg * next;int conn_fd; } NP_THREAD_ARG;NP_THREAD_ARG first_thread_arg; pthread_mutex_t mutex_arg; MYSQL * db_handle;int main( int argc, char **argv){ pthread_t thread; int i;pthread_mutex_init (&mutex_arg, NULL);db_handle = db_init (&init_info);for (i = 0; i < 10; i++) {add_thread_arg (i);result = pthread_create (&thread, (const pthread_attr_t *) NULL, sock_thread_fun, /** 线程主程序 */void *) &thread_arg); /** 传送进入的主程序的参数,需要传送全局变量的指针 */if (result != 0){fprintf (stderr, "Pthread_create failed (no = %d).", result);elsepthread_detach (thread);}db_close (db_handle); }static void * sock_thread_fun (void * arg) { int conn_fd;conn_fd = * ((int *) arg);DO_SOMTHING......remove_thread_arg ((NP_THREAD_ARG *) arg);return (void *) NULL; }/** 添加一个线程的数据集 */ void add_thread_arg (int conn_fd) { NP_THREAD_ARG * tmp_node;tmp_node = (NP_THREAD_ARG *) malloc (sizeof (NP_THREAD_ARG));tmp_node->conn_fd = conn_fd;pthread_mutex_lock (&mutex_arg); /** 添加互斥锁 */if (!first_thread_arg) {first_thread_arg = tmp_node;first_thread_arg->next = tmp_node;first_thread_arg->prev = tmp_node;}else {tmp_node->next = first_thread_arg;tmp_node->prev = first_thread_arg->prev;tmp_node->next->prev = tmp_node;tmp_node->prev->next = tmp_node;}pthread_mutex_unlock (&mutex_arg); /** 解除互斥锁 */return; }/** 移除一个线程的数据集 */ void remove_thread_arg (NP_THREAD_ARG * del_node) { NP_THREAD_ARG * curNode;pthread_mutex_lock (&mutex_arg); /** 添加互斥锁 */curNode = del_node->next;curNode->prev = del_node->prev;curNode->prev->next = curNode;if (curNode == del_node) { /** 说明链表内只有一个节点 */free (del_node);curNode = NULL;}else {free (del_node);}first_thread_arg = curNode;pthread_mutex_unlock (&mutex_arg); /** 解除互斥锁 */return; }/** 建立数据库连接 */ MYSQL * db_init (NP_DB_INITINFO * init_info) { MYSQL * db_handle; my_init (); /** 为线程安全,在链接以前调用一次 */mysql_init (NULL); db_handle = mysql_init (NULL); mysql_thread_init (); /** 线程安全初始化 */ if (mysql_real_connect (db_handle, init_info->hostname, init_info->username, init_info->userpass, init_info->dbname, init_info->port, NULL, 0) == NULL) {db_close (db_handle);return NULL;}fprintf (stdout, "DB connectiong created (id = %u).", db_handle->thread_id);return db_handle; }/** 关闭数据库连接 */ void db_close (MYSQL * db_handle) {if(db_handle != NULL) {unsigned long db_thread_id = db_handle->thread_id;mysql_close (db_handle); mysql_thread_end(); fprintf (stdout, "DB connection closed (id = %u).", db_thread_id);}return; }
和一般的MYSQL调用之间,区别就在于红色的4行内容。有他们就可以正式使用客户端多线程安全库了。
7. 数据库操作需要注意的内容
在MYSQL官方文档中,特别提到了如下内容:
”如果使用了mysql_use_result,务必确保无其他线程正在使用相同的连接,直至关闭了结果集为止。然而,对于线程式客户端,最好是共享相同的连接以使用mysql_store_result()。“
这个内容非常重要,查询您自己的代码,替换 mysql_use_result 函数的使用。
”如果打算在相同的连接上使用多个线程,必须在mysql_query()和mysql_store_result()调用组合上拥有互斥锁。一旦mysql_store_result()准备就绪,可释放锁定,其他线程可在相同的连接上执行查询。“
一般其他操作不会去查询结果,所以主要针对 SELECT 语句,可以封装一个自己的小函数解决这个问题。
MYSQL_RES * np_select (MYSQL * db_handle, const char * sql, pthread_mutex_t * mutex) { MYSQL_RES * res_set;if (mutex) { /** 建立了互斥锁 */pthread_mutex_lock(mutex);}if (mysql_query(db_handle, sql)) {return NULL;}res_set = mysql_store_result (db_handle);if (mutex) { /** 解除互斥锁 */pthread_mutex_unlock(mutex);}return res_set; }
参考资料:
- http://dev.mysql.com/doc/refman/5.1/zh/apis.html#threaded-clients
- http://baike.baidu.com/view/974776.htm