当前位置: 代码迷 >> 综合 >> day9 线程与进程、队列
  详细解决方案

day9 线程与进程、队列

热度:75   发布时间:2023-12-25 19:34:33.0

文章目录

  • 1. 进程与线程
  • 2. 队列 queue

1. 进程与线程

??进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。而负责执行任务则是cpu。
??线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

进程与线程的区别:

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

Python GIL(Global Interpreter Lock)


??In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)


??上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候在同一时刻只允许一个线程运行。

??首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

这篇文章透彻的剖析了GIL对python多线程的影响,强烈推荐看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf


创建进程的类:Process

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进
程中的任务(尚未启动)强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:

1 group参数未使用,值始终为None2 target表示调用对象,即子进程要执行的任务3 args表示调用对象的位置参数元组,args=(1,2,'egon',)4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}5 name为子进程的名称

方法介绍:

 1 p.start():启动进程,并调用该子进程中的p.run() 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁4 p.is_alive():如果p仍然运行,返回True 5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  

线程有两种调用方式:
1)直接调用

import threading
import timedef sayhi(num): #定义每个线程要运行的函数print("running on number:%s" %num)time.sleep(3)if __name__ == '__main__':t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例t1.start() #启动线程t2.start() #启动另一个线程print(t1.getName()) #获取线程名print(t2.getName())

2)继承式调用

import threading
import timeclass MyThread(threading.Thread):def __init__(self,num):threading.Thread.__init__(self)self.num = numdef run(self):#定义每个线程要运行的函数print("running on number:%s" %self.num)time.sleep(3)if __name__ == '__main__':t1 = MyThread(1)t2 = MyThread(2)t1.start()t2.start()

例:

import threading,timedef run(n):print('task',n)time.sleep(2)print('work done...',n,threading.current_thread())start_time = time.time()t_obj = []
for i in range(20):t = threading.Thread(target=run,args=('t-%s'%i,))t.setDaemon(True) #把当前线程设为守护线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会#退出,由m启动的其它子线程会同时退出,不管是否执行完任务t.start()t_obj.append(t) #为了不阻塞后面线程的启动,先将每个进程对象放到一个列表里,再进行 join# for t in t_obj: #循环线程示例列表,等待所有线程结束
# t.join() #相当于 wait() 等待线程结束print('all thread has done.....')print('cost:',time.time() - start_time)''' t1 = threading.Thread(target=run,args=('t1',)) t2 = threading.Thread(target=run,args=('t2',))t1.start() t2.start() '''

线程锁(互斥锁Mutex)

import time
import threadingdef addNum():global num #在每个线程中都获取这个全局变量print('--get num:',num )time.sleep(1)lock.acquire() #修改数据前加锁num  -=1 #对此公共变量进行-1操作lock.release() #修改后释放num = 100  #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range(100):t = threading.Thread(target=addNum)t.start()thread_list.append(t)for t in thread_list: #等待所有线程执行完毕t.join()print('final num:', num )

RLock(递归锁)
说白了就是在一个大锁中还要再包含子锁

import threading,timedef run1():print("grab the first part data")lock.acquire()global numnum +=1lock.release()return num
def run2():print("grab the second part data")lock.acquire()global  num2num2+=1lock.release()return num2
def run3():lock.acquire()res = run1()print('--------between run1 and run2-----')res2 = run2()lock.release()print(res,res2)if __name__ == '__main__':num,num2 = 0,0lock = threading.RLock()for i in range(10):t = threading.Thread(target=run3)t.start()while threading.active_count() != 1:print(threading.active_count())
else:print('----all threads done---')print(num,num2)

Semaphore(信号量)
??互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import threading,timedef run(n):semaphore.acquire()time.sleep(1)print("run the thread: %s\n" %n)semaphore.release()if __name__ == '__main__':num= 0semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行for i in range(20):t = threading.Thread(target=run,args=(i,))t.start()while threading.active_count() != 1:pass #print threading.active_count()
else:print('----all threads done---')print(num)

Timer
??This class represents an action that should be run only after a certain amount of time has passed

??Timers are started, as with threads, by calling their start() method. The timer can be stopped (before its action has begun) by calling thecancel() method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.

def hello():print("hello, world")t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed

Events


An event is a simple synchronization object;

the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event()

#a client thread can wait for the flag to be set
event.wait()

#a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.


??通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

import threading,timeevent = threading.Event()def lighter():count = 0event.set() #先设为绿灯while True:if count > 5 and count < 10: #改成红灯event.clear() #把标志位清空print('\033[41;1mred light is on...\033[0m')elif count > 10:event.set() #变绿灯count = 0else:print('\033[42;1mgreen light is on...\033[0m')time.sleep(1)count += 1def car(name):while True:if event.is_set(): #代表绿灯print('[%s] running...'%name)time.sleep(1)else:print('[%s] sees red light,waiting...'%name)event.wait()print('\033[34;1m[%s] green light is on,start going...\033[0m'%name)light = threading.Thread(target=lighter,)
light.start()car1 = threading.Thread(target=car,args=('Tesla',))
car1.start()

运行结果:

2. 队列 queue

??queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

一个生产者消费者的例子:

import time,random
import queue,threading
q = queue.Queue()
def Producer(name):count = 0while count <20:time.sleep(random.randrange(3))q.put(count)print('Producer %s has produced %s baozi..' %(name, count))count +=1
def Consumer(name):count = 0while count <20:time.sleep(random.randrange(4))if not q.empty():data = q.get()print(data)print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))else:print("-----no baozi anymore----")count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()