1.主线程对子线程的监管
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: ZhongyuanYang
@file: threading_test.py
@time: 2018/9/3 21:32
@desc:测试threading的使用 主线程对子线程的监管
"""
import threading
import time, randomclass ThreadDemo(threading.Thread):def __init__(self, index, create_time):threading.Thread.__init__(self)self.index = indexself.create_time = create_timeself.local = threading.local()#local数据对象def run(self):time.sleep(0.1)# print (time.time() - self.create_time),'\t', self.indexprint "time_consuming:%d \t index:%d \n" % ((time.time() - self.create_time),self.index)print ("Thread %d exit.....\n" % (self.index))while 1:#timeout时,每个子线程仍然存活,直到主线程结束,子线程会一同结束;i = 0i += 1if __name__ == "__main__":threads = []for index in range(5):thread = ThreadDemo(index, time.time())thread.setDaemon(True)#设置后台运行 默认Falsethread.start()thread.setName(str(index))#命名print"Threading name is %s" % thread.getName()threads.append(thread)# thread.join(timeout=0.5)for thread in threads:thread.join(2)#Wait until the thread terminates,阻塞,可设置timeout;#join()返回值None,需用isAlive()判断是否超时time.sleep(1)print("thread_name",thread.getName())if not thread.isAlive():# Return whether the thread is alive.print "Thread %s timeout" % thread.getName()continueprint "Main thread exit......""""总结如下:1.join()中的timeout参数设置时间,只有在setDaemon(True)时有效,若setDaemon(False),join(timeout=2)在2s之后任然会阻塞,等待子线程的结束。2.join(timeout=2)函数在这里只相当于一个“采样”,它不会在超时的时候终止相应thread的执行------超时后,"Thread %s timeout" % thread.getName()不会被打印,说明线程并没有结束超时了的话,isAlive()方法肯定返回的True(因为join()方法不会结束线程,所以线程仍然是活动的)3.如果某个子线程的daemon属性为False,主线程结束时会检测该子线程是否结束,如果该子线程还在运行,则主线程会等待它完成后再退出;如果某个子线程的daemon属性为True,主线程运行结束时不对这个子线程进行检查而直接退出
,同时所有daemon值为True的子线程将随主线程一起结束,而不论是否运行完成。"""
2.threading.local() test ---线程全局变量
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: ZhongyuanYang
@file: threading_test2.py
@time: 2018/9/4 11:20
@desc:在函数调用的时候,参数传递起来很麻烦,对此,可以使用threading.local()数据对象创建参数变量,这样
一个线程的所有调用到的处理函数都可以 非常方便地访问这些参数变量而且 local数据对象,其成员变量在每个线程中是相互独立的
"""
'''
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,
这样一个线程的所有调用到的处理函数都可以 非常方便地访问这些资源
'''
import threading
import time, random#----------------------------------------------test1
# 创建全局ThreadLocal对象:
local_school = threading.local()def process_student():print 'Hello, %s (in %s)' % (local_school.student, threading.current_thread().name)def process_thread(name):local_school.student = name#绑定threading.local()数据对象的student,一个线程中的所有process_student()'''
如果不使用threading.local()数据对象,以上两个函数就要如下定义:
def process_student(name):print 'Hello, %s (in %s)' % (name, threading.current_thread().name)def process_thread(name):process_student(name)参数变量 name 传递起来很麻烦,特别是函数调用层次更多的时候
'''print 'Test1>>>>>>>>>>>>>>>>>>>>>>>>>>start'threads = []
for i in range(20):thread = threading.Thread(target= process_thread, args=('Bob-%s' % i,), name='Thread-%s' % i)thread.start()threads.append(thread)for thread in threads:thread.join()
print 'Test1>>>>>>>>>>>>>>>>>>>>>>>>>>end''''
全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。
你可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,
可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
'''#----------------------------------------------test2
class ThreadLocal(threading.Thread):def __init__(self):threading.Thread.__init__(self)self.local = threading.local() # local数据对象,其成员变量在每个线程中是相互独立的'''经测试,self.number,self.local.number,number产生效果一致,每个线程中的这些变量相互独立,并不会相互影响,对于local()类的作用还需进一步探索----------见test1'''self.number = []def run(self):time.sleep(random.random()) # 0-1随机self.local.number = []number = []for i in range(5):self.local.number.append(random.choice(range(10)))self.number.append(random.choice(range(10)))time.sleep(2.0)# print(threading.currentThread(), self.local.number)print(threading.currentThread(), self.number)if __name__ == "__main__":print 'Test2>>>>>>>>>>>>>>>>>>>>>>>>>>start'print(threading.currentThread())#Return the current Thread object, corresponding to the caller's thread of control.threads = []for i in range(10):thread = ThreadLocal()thread.start()threads.append(thread)for i in range(10):threads[i].join()print('Test2>>>>>>>>>>>>>>>>>>>>>>>>>>end')
3.线程之间的同步-----锁机制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: ZhongyuanYang
@file: threading_同步test1.py
@time: 2018/9/5 10:13
@desc:测试线程之间的同步-----锁机制
"""
'''结果表明:若不加 锁机制,100个线程运行到最后,value值<100'''import threading
import time, randomclass Counter():"""计数器"""def __init__(self):self.value = 0self.lock = threading.Lock()#生成 锁def increment(self):self.lock.acquire()#获取锁,进入临界区self.value += 1value = self.valueself.lock.release()#释放锁,离开临界区return value# return self.valueclass ThreadCounter(threading.Thread):"""调用Counter实例,生成线程进行计数"""def __init__(self,create_time, counter):self.create_time = create_timethreading.Thread.__init__(self)self.counter = counterdef run(self):time.sleep(1.0)value = self.counter.increment()# print ((time.time() - self.create_time), '\tvalue: ', value)#打印散乱print "%f \t value:%d \n" % ((time.time() - self.create_time), value)if __name__ == "__main__":counter = Counter()create_time = time.time()for i in range(100):thread = ThreadCounter(create_time, counter)thread.start()
4.线程之间的同步-----条件变量
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: ZhongyuanYang
@file: threading_同步test2.py
@time: 2018/9/5 14:23
@desc:测试线程之间的同步-----条件变量
------生产者-消费者问题模型
"""import threading
import timeclass Goods():'''产品类'''def __init__(self):self.count = 0def produce(self, num = 1):#生产1个self.count += numdef consume(self):#消耗1个self.count -= 1def isEmpty(self):#判断产品是否为空return not self.countclass Producer(threading.Thread):'''生产线程'''def __init__(self, condition, goods, sleeptime = 1):threading.Thread.__init__(self)self.condition = conditionself.goods = goodsself.sleeptime = sleeptimedef run(self):while True:self.condition.acquire()self.goods.produce()print "Goods count: %d. Producre thread produced 1 item." % self.goods.countself.condition.notifyAll()#唤醒等待此条件的线程self.condition.release()time.sleep(self.sleeptime)class Consumer(threading.Thread):'''消耗线程'''def __init__(self, index, condition, goods, sleeptime=4):threading.Thread.__init__(self, name=str(index))self.condition = conditionself.goods = goodsself.sleeptime = sleeptimedef run(self):while True:time.sleep(self.sleeptime)self.condition.acquire()while self.goods.isEmpty():#如果为空,则等待 被唤醒self.condition.wait()#timeout=Noneself.goods.consume()print "Goods count: %d. Consumer thread consumed 1 item." % self.goods.countself.condition.release()if __name__ == "__main__":goods = Goods()condition = threading.Condition()producrer = Producer(goods=goods, condition=condition )producrer.start()#启动生产线程for i in range(5):consumer = Consumer(index=i, goods=goods, condition=condition)consumer.start()#启动消费线程
5.线程之间的同步-----同步队列 queue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: ZhongyuanYang
@file: threading_同步test3.py
@time: 2018/9/5 15:15
@desc:测试线程之间的同步-----同步队列 queue
"""
'''
此包中的常用方法(q = Queue.Queue()):q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)非阻塞
q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作
'''import threading, queue
import time, randomclass Worker(threading.Thread):'''取出队列数据'''def __init__(self, index, queue):threading.Thread.__init__(self)self.index = indexself.queue = queuedef run(self):while True:time.sleep(random.random())item = self.queue.get()#同步队列中获取对象if item == None:#循环终止条件breakprint "task: %d,iterm:%d finished" %(self.index, item)self.queue.task_done()#在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号。每一条get()语句后需要一条task_done# self.queue.join()#等待队列中任务完成操作if __name__ == "__main__":queue = queue.Queue(0)#不限长度for i in range(2):Worker(index=i, queue=queue).start()#生成两个线程for i in range(20):queue.put(i)#同步队列中加入对象for i in range(2):queue.put(None)queue.join() # 等待队列中任务完成操作-----实际上意味着等到队列为空,再执行别的操作
6.信号量 实现同步 等同于 锁机制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: ZhongyuanYang
@file: threading_同步test4.py
@time: 2018/9/5 17:53
@desc:信号量 实现同步 等同于 锁机制
"""
import threading
import timesemaphore = threading.Semaphore(5)def func():if semaphore.acquire():print (threading.currentThread().getName() + ' get semaphore')time.sleep(2)semaphore.release()for i in range(20):t1 = threading.Thread(target=func)t1.start()
7.工作者类+调用类+caller的模型
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: ZhongyuanYang
@file: 工作者类+调用类+caller的模型.py
@time: 2018/9/4 15:53
@desc:
"""import os, sys, re, math
import threading
import time
import random########################################################################
class Soldier(threading.Thread):"""The class of a soldier."""# -------------------------------------------------------------------def __init__(self, name):"""Constructor"""threading.Thread.__init__(self, name=name)self.name = name # the name of this soldierself.setDaemon(True) # this is a daemon thread.# the time of this soldier need to finish the jobself.playTime = random.randint(1, 10)##Return random integer in range [a, b], including both end points.# is the soldier stop shotting, if timeout or the target has been killed,# he may stop.self.isStopped = Falseself.isSuccess = False # did he kill the target?# --------------------------------------------------------------------def assassinate(self):"""The task, to kill the target.暗杀任务"""for i in range(self.playTime):print '%s play(%d)' % (self.name, i + 1)time.sleep(1)# ----------------------------------------------------------------def run(self):"""Start to move ..."""print '%s has moved out, need time: %d ...' % (self.name, self.playTime)self.assassinate()print '%s stopped ...' % self.nameself.isStopped = True # the target has been killed, then he stopped.self.isSuccess = True########################################################################
class Commander(threading.Thread):"""The class of commander, a commander will command only one soldier."""# ----------------------------------------------------------------------def __init__(self, soldier):"""Constructor"""threading.Thread.__init__(self, name='Commander')self.soldier = soldier# ----------------------------------------------------------------------def run(self):"""Authorize the soldier to start killing."""self.soldier.start()try:# Boss said: give the soldier 8 seconds to finish his jobself.soldier.join(8)except:pass# Use the class's own attribute to judge whether it is timeout.# if self.soldier.isAlive():if not self.soldier.isStopped:print '%s is timeout!' % self.soldier.name# the soldier run out his time, then he stopped.self.soldier.isStopped = True# ----------------------------------------------------------------------
def killing():"""Let's pull the trigger, start killing !"""t1 = time.time()# Get ready for the commandersl_commander = []for i in range(10): # 10 soldiers# get ready for the soldiersoldier = Soldier('soldier-%d' % (i + 1))if i == 5 or i == 9:soldier.playTime = 10000l_commander.append(Commander(soldier))# Soldiers move out one by one.for cmd in l_commander:cmd.start()# Judge whether the helicopter should go. If all the soldiers are stop,# that is, all finished job or timeout, then it should go!isBreak = Falsewhile not isBreak:isBreak = Truefor cmd in l_commander:if cmd.soldier.isStopped == False:isBreak = False# Check the results of the battle at the schedule time.for cmd in l_commander:print '%s, is success: %s' % (cmd.soldier.name, cmd.soldier.isSuccess)# Go back to base.time.sleep(20)# Check the results at the final time.for cmd in l_commander:print '%s, is success: %s' % (cmd.soldier.name, cmd.soldier.isSuccess)t2 = time.time()print 'Total time: %.2f' % (float(t2 - t1))if __name__ == "__main__":killing()