-
python多线程详解(超详细)
2019-09-28 08:33:31python中的多线程是一个非常重要的知识点,今天为大家对多线程进行详细的说明,代码中的注释有多线程的知识点还有测试用的实例。 import threading from threading import Lock... python多线程详解 什么是线程? ...python中的多线程是一个非常重要的知识点,今天为大家对多线程进行详细的说明,代码中的注释有多线程的知识点还有测试用的实例。
码字不易,阅读或复制完了,点个赞!import threading from threading import Lock,Thread import time,os ''' python多线程详解 什么是线程? 线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。 线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所 拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行 ''' ''' 为什么要使用多线程? 线程在程序中是独立的、并发的执行流。与分隔的进程相比,进程中线程之间的隔离程度要小,它们共享内存、文件句柄 和其他进程应有的状态。 因为线程的划分尺度小于进程,使得多线程程序的并发性高。进程在执行过程之中拥有独立的内存单元,而多个线程共享 内存,从而极大的提升了程序的运行效率。 线程比进程具有更高的性能,这是由于同一个进程中的线程都有共性,多个线程共享一个进程的虚拟空间。线程的共享环境 包括进程代码段、进程的共有数据等,利用这些共享的数据,线程之间很容易实现通信。 操作系统在创建进程时,必须为改进程分配独立的内存空间,并分配大量的相关资源,但创建线程则简单得多。因此,使用多线程 来实现并发比使用多进程的性能高得要多。 ''' ''' 总结起来,使用多线程编程具有如下几个优点: 进程之间不能共享内存,但线程之间共享内存非常容易。 操作系统在创建进程时,需要为该进程重新分配系统资源,但创建线程的代价则小得多。因此使用多线程来实现多任务并发执行比使用多进程的效率高 python语言内置了多线程功能支持,而不是单纯地作为底层操作系统的调度方式,从而简化了python的多线程编程。 ''' ''' 普通创建方式 ''' # def run(n): # print('task',n) # time.sleep(1) # print('2s') # time.sleep(1) # print('1s') # time.sleep(1) # print('0s') # time.sleep(1) # # if __name__ == '__main__': # t1 = threading.Thread(target=run,args=('t1',)) # target是要执行的函数名(不是函数),args是函数对应的参数,以元组的形式存在 # t2 = threading.Thread(target=run,args=('t2',)) # t1.start() # t2.start() ''' 自定义线程:继承threading.Thread来定义线程类,其本质是重构Thread类中的run方法 ''' # class MyThread(threading.Thread): # def __init__(self,n): # super(MyThread,self).__init__() #重构run函数必须写 # self.n = n # # def run(self): # print('task',self.n) # time.sleep(1) # print('2s') # time.sleep(1) # print('1s') # time.sleep(1) # print('0s') # time.sleep(1) # # if __name__ == '__main__': # t1 = MyThread('t1') # t2 = MyThread('t2') # t1.start() # t2.start() ''' 守护线程 下面这个例子,这里使用setDaemon(True)把所有的子线程都变成了主线程的守护线程, 因此当主线程结束后,子线程也会随之结束,所以当主线程结束后,整个程序就退出了。 所谓’线程守护’,就是主线程不管该线程的执行情况,只要是其他子线程结束且主线程执行完毕,主线程都会关闭。也就是说:主线程不等待该守护线程的执行完再去关闭。 ''' # def run(n): # print('task',n) # time.sleep(1) # print('3s') # time.sleep(1) # print('2s') # time.sleep(1) # print('1s') # # if __name__ == '__main__': # t=threading.Thread(target=run,args=('t1',)) # t.setDaemon(True) # t.start() # print('end') ''' 通过执行结果可以看出,设置守护线程之后,当主线程结束时,子线程也将立即结束,不再执行 ''' ''' 主线程等待子线程结束 为了让守护线程执行结束之后,主线程再结束,我们可以使用join方法,让主线程等待子线程执行 ''' # def run(n): # print('task',n) # time.sleep(2) # print('5s') # time.sleep(2) # print('3s') # time.sleep(2) # print('1s') # if __name__ == '__main__': # t=threading.Thread(target=run,args=('t1',)) # t.setDaemon(True) #把子线程设置为守护线程,必须在start()之前设置 # t.start() # t.join() #设置主线程等待子线程结束 # print('end') ''' 多线程共享全局变量 线程时进程的执行单元,进程时系统分配资源的最小执行单位,所以在同一个进程中的多线程是共享资源的 ''' # g_num = 100 # def work1(): # global g_num # for i in range(3): # g_num+=1 # print('in work1 g_num is : %d' % g_num) # # def work2(): # global g_num # print('in work2 g_num is : %d' % g_num) # # if __name__ == '__main__': # t1 = threading.Thread(target=work1) # t1.start() # time.sleep(1) # t2=threading.Thread(target=work2) # t2.start() ''' 由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据, 所以出现了线程锁,即同一时刻允许一个线程执行操作。线程锁用于锁定资源,可以定义多个锁,像下面的代码,当需要独占 某一个资源时,任何一个锁都可以锁定这个资源,就好比你用不同的锁都可以把这个相同的门锁住一样。 由于线程之间是进行随机调度的,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期, 我们因此也称为“线程不安全”。 为了防止上面情况的发生,就出现了互斥锁(Lock) ''' # def work(): # global n # lock.acquire() # temp = n # time.sleep(0.1) # n = temp-1 # lock.release() # # # if __name__ == '__main__': # lock = Lock() # n = 100 # l = [] # for i in range(100): # p = Thread(target=work) # l.append(p) # p.start() # for p in l: # p.join() ''' 递归锁:RLcok类的用法和Lock类一模一样,但它支持嵌套,在多个锁没有释放的时候一般会使用RLock类 ''' # def func(lock): # global gl_num # lock.acquire() # gl_num += 1 # time.sleep(1) # print(gl_num) # lock.release() # # # if __name__ == '__main__': # gl_num = 0 # lock = threading.RLock() # for i in range(10): # t = threading.Thread(target=func,args=(lock,)) # t.start() ''' 信号量(BoundedSemaphore类) 互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,比如厕所有3个坑, 那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去 ''' # def run(n,semaphore): # semaphore.acquire() #加锁 # time.sleep(3) # print('run the thread:%s\n' % n) # semaphore.release() #释放 # # # if __name__== '__main__': # num=0 # semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 # for i in range(22): # t = threading.Thread(target=run,args=('t-%s' % i,semaphore)) # t.start() # while threading.active_count() !=1: # pass # else: # print('----------all threads done-----------') ''' python线程的事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,其主要提供以下的几个方法: clear将flag设置为 False set将flag设置为 True is_set判断是否设置了flag wait会一直监听flag,如果没有检测到flag就一直处于阻塞状态 事件处理的机制:全局定义了一个Flag,当Flag的值为False,那么event.wait()就会阻塞,当flag值为True, 那么event.wait()便不再阻塞 ''' event = threading.Event() def lighter(): count = 0 event.set() #初始者为绿灯 while True: if 5 < count <=10: event.clear() #红灯,清除标志位 print("\33[41;lmred light is on...\033[0m]") elif count > 10: event.set() #绿灯,设置标志位 count = 0 else: print('\33[42;lmgreen light is on...\033[0m') time.sleep(1) count += 1 def car(name): while True: if event.is_set(): #判断是否设置了标志位 print('[%s] running.....'%name) time.sleep(1) else: print('[%s] sees red light,waiting...'%name) event.wait() print('[%s] green light is on,start going...'%name) # startTime = time.time() light = threading.Thread(target=lighter,) light.start() car = threading.Thread(target=car,args=('MINT',)) car.start() endTime = time.time() # print('用时:',endTime-startTime) ''' GIL 全局解释器 在非python环境中,单核情况下,同时只能有一个任务执行。多核时可以支持多个线程同时执行。但是在python中,无论有多少个核 同时只能执行一个线程。究其原因,这就是由于GIL的存在导致的。 GIL的全程是全局解释器,来源是python设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到GIL,我们可以 把GIL看做是“通行证”,并且在一个python进程之中,GIL只有一个。拿不到线程的通行证,并且在一个python进程中,GIL只有一个, 拿不到通行证的线程,就不允许进入CPU执行。GIL只在cpython中才有,因为cpython调用的是c语言的原生线程,所以他不能直接操 作cpu,而只能利用GIL保证同一时间只能有一个线程拿到数据。而在pypy和jpython中是没有GIL的 python在使用多线程的时候,调用的是c语言的原生过程。 ''' ''' python针对不同类型的代码执行效率也是不同的 1、CPU密集型代码(各种循环处理、计算等),在这种情况下,由于计算工作多,ticks技术很快就会达到阀值,然后出发GIL的 释放与再竞争(多个线程来回切换当然是需要消耗资源的),所以python下的多线程对CPU密集型代码并不友好。 2、IO密集型代码(文件处理、网络爬虫等设计文件读写操作),多线程能够有效提升效率(单线程下有IO操作会进行IO等待, 造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序的执行 效率)。所以python的多线程对IO密集型代码比较友好。 ''' ''' 主要要看任务的类型,我们把任务分为I/O密集型和计算密集型,而多线程在切换中又分为I/O切换和时间切换。如果任务属于是I/O密集型, 若不采用多线程,我们在进行I/O操作时,势必要等待前面一个I/O任务完成后面的I/O任务才能进行,在这个等待的过程中,CPU处于等待 状态,这时如果采用多线程的话,刚好可以切换到进行另一个I/O任务。这样就刚好可以充分利用CPU避免CPU处于闲置状态,提高效率。但是 如果多线程任务都是计算型,CPU会一直在进行工作,直到一定的时间后采取多线程时间切换的方式进行切换线程,此时CPU一直处于工作状态, 此种情况下并不能提高性能,相反在切换多线程任务时,可能还会造成时间和资源的浪费,导致效能下降。这就是造成上面两种多线程结果不能的解释。 结论:I/O密集型任务,建议采取多线程,还可以采用多进程+协程的方式(例如:爬虫多采用多线程处理爬取的数据);对于计算密集型任务,python此时就不适用了。 '''
-
python多线程
2019-10-12 11:20:59python多线程及notify和wait的是使用python多线程多线程创建方式多线程管理 python多线程 python主要是通过thread和threading这两个模块来实现多线程支持。python的thread模块是比较底层的模块,python的threading...python多线程
python主要是通过thread和threading这两个模块来实现多线程支持。python的thread模块是比较底层的模块,python的threading模块是对thread做了一些封装,可以更加方便的被使用。但是python由于GIL(global interpreter lock 全局解释锁)的存在无法使用threading充分利用CPU资源,GIL使得python同一个时刻只有一个线程在一个cpu上执行字节码,并且无法将多个线程映射到多个cpu上,即不能发挥多个cpu的优势。即多线程只能在一个CPU上执行,CPU是按照时间片轮询的方式来执行子线程的。
多线程创建方式
- 使用Thread类进行创建
from threading import Thread t = Thread(target=function_name, args=(parameter1, parameterN)) t.start()
function_name为启动线程的名字,parameter1, parameterN为对应的参数
- 直接继承Thread类进行创建
from threading import Thread # 创建一个类,必须要继承Thread class MyThread(Thread): # 继承Thread的类,需要实现run方法,线程就是从这个方法开始的 def run(self): # 具体的逻辑 function_name(self.parameter1) def __init__(self, parameter1): # 需要执行父类的初始化方法 Thread.__init__(self) # 如果有参数,可以封装在类里面 self.parameter1 = parameter1 # 如果有参数,实例化的时候需要把参数传递过去 t = MyThread(parameter1) # 同样使用start()来启动线程 t.start()
- 使用Thread中Timer创建循环定时线程
import threading def heartBeat(self): heartThread = thrreading.Timer(5,heartBeat)
多线程管理
- 锁机制
由于线程之间随机调度:某线程可能在执行n条后,CPU接着执行其他线程。为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。
Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
实现方法:
acquire(): 尝试获得锁定。使线程进入同步阻塞状态。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
以更新订单簿和删除订单簿为例,更新和删除相同orderBookId下的订单簿操作,需要在更新和删除进行加锁处理,保证在同一时刻只能进行更新或者删除操作。
但不同的orderBookId下的订单簿更新和删除是可以同步进行的,所以加锁的维度是给每一个orderBookId分配锁,而不是所有的orderBookId共享一个锁;
import threading class OrderBookManger(object): def __init__(self): self.__lockManger = {} def addOrderBook(self,orderBookId): self.__lockManger[orderBookId] = threading.LOCK() def update(self,orderBookId): self.__lockManger[orderBookId].acquire() ##to do somthing self.self.__lockManger[orderBookId].release() def delete(self,orderBookId): self.__lockManger[orderBookId].acquire() ##to do somthing self.self.__lockManger[orderBookId].release()
- condition类及wait和notify方法
当小伙伴a在往火锅里面添加鱼丸,这个就是生产者行为;另外一个小伙伴b在吃掉鱼丸就是消费者行为。当火锅里面鱼丸达到一定数量加满后b才能吃,这就是一种条件判断了,需要用到condition类。
除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。
condition类包含的操作:
acquire(): 线程锁
release(): 释放锁
wait(timeout): 线程挂起,并且释放锁,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。
notify(n=1): 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程
正常唤醒场景:当a同学往火锅里面添加鱼丸加满后(最多5个,加满后通知b去吃掉),通知b同学去吃掉鱼丸(吃到0的时候通知a同学继续添加)
##生产者消费者举例 # coding=utf-8 import threading import time con = threading.Condition() num = 0 # 生产者 class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): # 锁定线程 global num con.acquire() ##锁定线程,消费者代码无法同步运行 while True: print "开始添加!!!" #<2> num += 1 print "火锅里面鱼丸个数:%s" % str(num) time.sleep(1) if num >= 5: print "火锅里面里面鱼丸数量已经到达5个,无法添加了!" # 唤醒处于等待的线程,消费者线程会被唤醒<1>代码处,但notify不释放锁,所以此时消费者还未拿到锁 con.notify() # 等待通知,并释放锁,则消费者拿到锁,开始运行<1>处代码 con.wait() # 释放锁 con.release() # 消费者 class Consumers(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): con.acquire() global num while True: print "开始吃啦!!!" ##<1> num -= 1 print "火锅里面剩余鱼丸数量:%s" %str(num) time.sleep(2) if num <= 0: print "锅底没货了,赶紧加鱼丸吧!" con.notify() # 唤醒生产者线程<2> # 等待通知,释放锁,开始运行生产者<2>处代码 con.wait() con.release() p = Producer() c = Consumers() p.start() c.start()
上述情况,生产者和消费者在同一个锁定池中,当生产者获取锁之后,消费者代码不能执行,直到生产者释放锁之后才能进行;需要注意的是notify不会释放锁,一定要在notify之后增加释放锁的操作。
异常情况:唤醒和等待动作处于不同的锁定池中,会出现无法唤醒wait状态线程。下述场景fetch操作从队列中取数据当队列为空时进行fetch操作进行等待挂起;put操作往队列中添加数据
import threading,queue class BaseStrategy(): def __init__(self): self.quote_td = threading.Thread(target=self.fetch) self.cond = threading.condition() elf.quote_td.start() self.msg_queue = queue.Queue() def fetch(self): while(true): self.cond.acquire() if self.msg_Queue.size() == 0: ##(2) self.cond.wait() ##(4) one = self.msg_queue.get() ##(3) obj = BaseStrategy() def putone(): while(True): if obj.msg_queue.size()==0: obj.msg_queue.put("33) obj.cond.acquire() obj.cond.notify() obj.cond.release() else: ##(3) obj.msg_queue.put("34) putone()
上述代码中由于putone和fetch不在一个锁定池中,条件变量只是针对线程msg_queue,所以putone 和 fetch 会有交替执行情况,当putone函数执行到(3)时,fetch拿到执行权,执行了(3)之后,循环继续执行(2),通过语句(4)把自己挂起,这时putone拿到执行权,继续从(3)开始执行,循环后会一直走(3),就会导致fetch线程一直不会被notify。
解决方案:目的是当队列为空时进行等待挂起,所以直接使用了队列的get(timeout=5)方法,python队列是线程安全的,每次从队列中获取数据时如果队列有数据则直接获取,否则等待5s钟,仍拿不到数据则报异常。修改后的代码如下import threading,queue,Queue class BaseStrategy(): def __init__(self): self.quote_td = threading.Thread(target=self.fetch) self.cond = threading.condition() elf.quote_td.start() self.msg_queue = queue.Queue() def fetch(self): while(true): try: one = self.msg_queue.get(timeout=5) except Queue.Empty: print("empty queue") continue obj = BaseStrategy() def putone(): while(True): obj.msg_queue.put("34) putone()
-
Python多线程
2019-11-28 23:08:07本课程主要讲解使用Python中的_thread和threading模块实现多线程编程,以及队列Queue的使用。课程中使用队列实现了生产者和消费者的程序编写。 重点:_thread和threading、Queue -
Python 多线程
2016-08-31 13:17:54Python 多线程 多线程类似于同时执行多个不同程序,多线程运行有如下优点: 使用线程可以把占据长时间的程序中的任务放到后台去处理。 用户界面可以更加吸引人,这样比如用户点击了一个按钮去触发某些事件...Python 多线程
多线程类似于同时执行多个不同程序,多线程运行有如下优点:
- 使用线程可以把占据长时间的程序中的任务放到后台去处理。
- 用户界面可以更加吸引人,这样比如用户点击了一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度
- 程序的运行速度可能加快
- 在一些等待的任务实现上如用户输入、文件读写和网络收发数据等,线程就比较有用了。在这种情况下我们可以释放一些珍贵的资源如内存占用等等。
线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
每个线程都有他自己的一组CPU寄存器,称为线程的上下文,该上下文反映了线程上次运行该线程的CPU寄存器的状态。
指令指针和堆栈指针寄存器是线程上下文中两个最重要的寄存器,线程总是在进程得到上下文中运行的,这些地址都用于标志拥有线程的进程地址空间中的内存。
- 线程可以被抢占(中断)。
- 在其他线程正在运行时,线程可以暂时搁置(也称为睡眠) -- 这就是线程的退让。
开始学习Python线程
Python中使用线程有两种方式:函数或者用类来包装线程对象。
函数式:调用thread模块中的start_new_thread()函数来产生新线程。语法如下:
thread.start_new_thread ( function, args[, kwargs] )
参数说明:
- function - 线程函数。
- args - 传递给线程函数的参数,他必须是个tuple类型。
- kwargs - 可选参数。
实例:
#!/usr/bin/python # -*- coding: UTF-8 -*- import thread import time # 为线程定义一个函数 def print_time( threadName, delay): count = 0 while count < 5: time.sleep(delay) count += 1 print "%s: %s" % ( threadName, time.ctime(time.time()) ) # 创建两个线程 try: thread.start_new_thread( print_time, ("Thread-1", 2, ) ) thread.start_new_thread( print_time, ("Thread-2", 4, ) ) except: print "Error: unable to start thread" while 1: pass
执行以上程序输出结果如下:
Thread-1: Thu Jan 22 15:42:17 2009 Thread-1: Thu Jan 22 15:42:19 2009 Thread-2: Thu Jan 22 15:42:19 2009 Thread-1: Thu Jan 22 15:42:21 2009 Thread-2: Thu Jan 22 15:42:23 2009 Thread-1: Thu Jan 22 15:42:23 2009 Thread-1: Thu Jan 22 15:42:25 2009 Thread-2: Thu Jan 22 15:42:27 2009 Thread-2: Thu Jan 22 15:42:31 2009 Thread-2: Thu Jan 22 15:42:35 2009
线程的结束一般依靠线程函数的自然结束;也可以在线程函数中调用thread.exit(),他抛出SystemExit exception,达到退出线程的目的。
线程模块
Python通过两个标准库thread和threading提供对线程的支持。thread提供了低级别的、原始的线程以及一个简单的锁。
thread 模块提供的其他方法:
- threading.currentThread(): 返回当前的线程变量。
- threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
- threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
- run(): 用以表示线程活动的方法。
-
start():启动线程活动。
- join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
- isAlive(): 返回线程是否活动的。
- getName(): 返回线程名。
- setName(): 设置线程名。
使用Threading模块创建线程
使用Threading模块创建线程,直接从threading.Thread继承,然后重写__init__方法和run方法:
#!/usr/bin/python # -*- coding: UTF-8 -*- import threading import time exitFlag = 0 class myThread (threading.Thread): #继承父类threading.Thread def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 print "Starting " + self.name print_time(self.name, self.counter, 5) print "Exiting " + self.name def print_time(threadName, delay, counter): while counter: if exitFlag: thread.exit() time.sleep(delay) print "%s: %s" % (threadName, time.ctime(time.time())) counter -= 1 # 创建新线程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 开启线程 thread1.start() thread2.start() print "Exiting Main Thread"
以上程序执行结果如下;
Starting Thread-1 Starting Thread-2 Exiting Main Thread Thread-1: Thu Mar 21 09:10:03 2013 Thread-1: Thu Mar 21 09:10:04 2013 Thread-2: Thu Mar 21 09:10:04 2013 Thread-1: Thu Mar 21 09:10:05 2013 Thread-1: Thu Mar 21 09:10:06 2013 Thread-2: Thu Mar 21 09:10:06 2013 Thread-1: Thu Mar 21 09:10:07 2013 Exiting Thread-1 Thread-2: Thu Mar 21 09:10:08 2013 Thread-2: Thu Mar 21 09:10:10 2013 Thread-2: Thu Mar 21 09:10:12 2013 Exiting Thread-2
线程同步
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。如下:
多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。
考虑这样一种情况:一个列表里所有元素都是0,线程"set"从后向前把所有元素改成1,而线程"print"负责从前往后读取列表并打印。
那么,可能线程"set"开始改的时候,线程"print"便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。为了避免这种情况,引入了锁的概念。
锁有两种状态——锁定和未锁定。每当一个线程比如"set"要访问共享数据时,必须先获得锁定;如果已经有别的线程比如"print"获得锁定了,那么就让线程"set"暂停,也就是同步阻塞;等到线程"print"访问完毕,释放锁以后,再让线程"set"继续。
经过这样的处理,打印列表时要么全部输出0,要么全部输出1,不会再出现一半0一半1的尴尬场面。
实例:
#!/usr/bin/python # -*- coding: UTF-8 -*- import threading import time class myThread (threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): print "Starting " + self.name # 获得锁,成功获得锁定后返回True # 可选的timeout参数不填时将一直阻塞直到获得锁定 # 否则超时后将返回False threadLock.acquire() print_time(self.name, self.counter, 3) # 释放锁 threadLock.release() def print_time(threadName, delay, counter): while counter: time.sleep(delay) print "%s: %s" % (threadName, time.ctime(time.time())) counter -= 1 threadLock = threading.Lock() threads = [] # 创建新线程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 开启新线程 thread1.start() thread2.start() # 添加线程到线程列表 threads.append(thread1) threads.append(thread2) # 等待所有线程完成 for t in threads: t.join() print "Exiting Main Thread"
线程优先级队列( Queue)
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
Queue模块中的常用方法:
- Queue.qsize() 返回队列的大小
- Queue.empty() 如果队列为空,返回True,反之False
- Queue.full() 如果队列满了,返回True,反之False
- Queue.full 与 maxsize 大小对应
- Queue.get([block[, timeout]])获取队列,timeout等待时间
- Queue.get_nowait() 相当Queue.get(False)
- Queue.put(item) 写入队列,timeout等待时间
- Queue.put_nowait(item) 相当Queue.put(item, False)
- Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
- Queue.join() 实际上意味着等到队列为空,再执行别的操作
实例:
#!/usr/bin/python # -*- coding: UTF-8 -*- import Queue import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print "Starting " + self.name process_data(self.name, self.q) print "Exiting " + self.name def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print "%s processing %s" % (threadName, data) else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = Queue.Queue(10) threads = [] threadID = 1 # 创建新线程 for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # 填充队列 queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # 等待队列清空 while not workQueue.empty(): pass # 通知线程是时候退出 exitFlag = 1 # 等待所有线程完成 for t in threads: t.join() print "Exiting Main Thread"
以上程序执行结果:
Starting Thread-1 Starting Thread-2 Starting Thread-3 Thread-1 processing One Thread-2 processing Two Thread-3 processing Three Thread-1 processing Four Thread-2 processing Five Exiting Thread-3 Exiting Thread-1 Exiting Thread-2 Exiting Main Thread
-
python多线程爬取多个网址_python多线程爬取某网站全部H漫画
2020-12-29 16:00:21python多线程爬取某网站全部H漫画首发于个人博客(客官大人来这里看啊!):www.gunnerx.vip前言最近学习python多线程与爬虫相关知识,想试着练练手。正好想到常逛的一个正(瑟)经(琴)漫画的网站,决定想办法把上面全部...python多线程爬取某网站全部H漫画
首发于个人博客(客官大人来这里看啊!):www.gunnerx.vip
前言
最近学习python多线程与爬虫相关知识,想试着练练手。正好想到常逛的一个正(瑟)经(琴)漫画的网站,决定想办法把上面全部漫画都爬下来,以便✋ 。
声明
事先声明,我只是个刚学爬虫不久的菜鸡,所以代码应该有很多有问题,和可以改进的地方,希望大家轻喷但是多多评论帮我指出问题
分析
要写爬虫首先当然是要分析网页喽。谷歌chrome和火狐firefox都能打开开发者工具分析,不过因为火狐的是中文界面,所以就选择fx。
打开网站,https://www.xxxxx.com (网址打码!)
由于过于瑟琴,这里就不放图了。
分析一番,发现比如一本h漫画叫a,那它的url就是https://www.xxxxx.com/中文h漫/a/1,它每一页的url,比如第3页,就是https://www.xxxxx.com/中文h漫/a/1/page/3。对每一页所在的页面分析,发现漫画的每一页的图片的真实url就在页面html的一个img标签中,如图
如图的img标签中的src属性便是图片的真实url,只要请求这个url便可卸载此张图片。
因为每一本漫画的所有页url都是按规律的,所以只要得到每本漫画的总页数便可构造他全部页的url,进而得到每一个图片的真实url,进行下载。
分析发现,漫画的每一个页面中都有一个快速选择页数的小控件,如图
分析html页面找到其对应的标签:
因此,只要找到这个标签便可解析得到总页数。
以上是针对如何爬取一本漫画的所有图片,若要爬取所有页面,只需先写一个爬虫把所有网站上所有h漫画的名字爬下保存下来,即可构建url。
综上,大致思路如下:
1.首先请求页面 https://www.xxxxx.com想办法爬取所有的漫画名字列表names(一共32页382本漫画)
2.对每一个漫画名字name的某一页p可构建出url:https://www.xxxxx.com/中字h漫/name/1/page/p
3.请求此页面并解析此页面,找到此页漫画图片的真实url地址jpg_url
3.请求jpg_url,以二进制形式下载图片并保存至本地
技术选型
技术选型方面,考虑使用requests库来构建http请求;引入concurrent.future库,维护一个线程池来实现多线程爬虫;html页面解析选用beautifulsoup库来处理;另外因为爬取时间较长所以可以引入smptlib,email来实现爬取完毕后自动发送邮件。
源码
一共三个脚本,第一个名字爬虫实现爬取网站上所有漫画的名字存入一个txt文档中以供图片爬虫调用,第二个图片爬虫脚本,第三个为实现发送邮件的脚本
get_names.py
# get_name.py
# 爬取主页所有漫画名字存入names.txt
import os
import time
from concurrent import futures
import requests
from bs4 import BeautifulSoup
# 请求一个url的函数,若请求失败,过2s重试,最多重试10次
def req_url(url):
attempts = 0
success = False
while attempts < 10 and not success:
try:
r = requests.get(url=url, headers=headers)
r.keep_alive = False
success = True
r.raise_for_status()
return r
except requests.exceptions.HTTPError:
print('状态码非200!')
return None
except Exception: # 若请求失败,过2s重试,最多重试10次
time.sleep(2)
attempts += 1
print('****第{}次重连{}****'.format(attempts, url))
if attempts == 10:
print('连接失败! {}'.format(url))
return None
# 取得一个页面内的所有漫画名并写入文件保存
def get_name(url):
html = BeautifulSoup(req_url(url).text, 'lxml')
for h5 in html.find_all('h5')[1:]:
name = h5.a.get_text()
print(name)
with open('names.txt', 'a') as f:
f.write(name)
f.write('\n')
if __name__ == '__main__':
# 设置最大重新连接次数
requests.adapters.DEFAULT_RETRIES = 5
# 域名
top_url = 'https://www.xxxxx.com'# 打码
# 请求url列表
urls = ['{}/page/{}'.format(top_url, i) for i in range(1, 33)]
# 请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:68.0) Gecko/20100101 Firefox/68.0'
}
# 8线程同时爬取
with futures.ThreadPoolExecutor(max_workers=8) as pool:
tasks = [pool.submit(get_name, url) for url in urls]
get_jpgs.py
# get_jpgs.py
# 爬取names.txt中的漫画
import os
import time
from concurrent import futures
import requests
from bs4 import BeautifulSoup
from send_email import send
# 请求一个url的函数,若请求失败,过2s重试,最多重试10次
def req_url(url):
attempts = 0
success = False
while attempts < 10 and not success:
try:
r = requests.get(url=url, headers=headers)
r.keep_alive = False
success = True
r.raise_for_status()
return r
except requests.exceptions.HTTPError:
print('状态码非200!')
urls_not200.append(url)
return None
except Exception: # 若请求失败,过2s重试,最多重试10次
time.sleep(2)
attempts += 1
print('****第{}次重连{}****'.format(attempts, url))
if attempts == 10:
print('连接失败! {}'.format(url))
urls_fails.append(url)
return None
# 取得当前漫画总页数
def get_page(name):
name_url = '{}/中字h漫/{}/1'.format(url, name)
r = req_url(name_url)
if r is not None:
html = BeautifulSoup(r.text, 'lxml') # 解析html页面以找到页数
select = html.find('select', id='single-pager')
pages = (len(select.contents) - 1) // 2
return pages
return None
# 请求图片真实url并保存在本地
def get_jpg(name):
stime = time.time()
os.mkdir('imgs/{}'.format(name))
pages = get_page(name) # 取得当前漫画总页数
if pages is not None:
print('{} 总页数: {}'.format(name[0:6], pages))
for page in range(1,pages+1): # 遍历全部页码
page_url = '{}/中字h漫/{}/1/p/{}'.format(url, name, page)
r = req_url(page_url)
if r is not None:
html = BeautifulSoup(r.text, 'lxml')
img = html.find('img',id='image-{}'.format(page-1)) # 取得图片所在标签
jpg_url = img.attrs['data-src'] # 取得图片真实地址
r = req_url(jpg_url)
if r is not None:
jpg = r.content # 下载图片
with open('imgs/{}/{}.jpg'.format(name,page), "wb")as f: # 存入本地
f.write(jpg)
print('{} {}.jpg 保存完成!'.format(name[0:6],page))
etime = time.time()
print('**** ****')
print('{} 全部保存完成,耗时 {:.2f}s'.format(name, etime-stime))
print('**** ****')
return name
return None
if __name__ == '__main__':
start_time = time.time()
# 设置最大连接数
requests.adapters.DEFAULT_RETRIES = 5
# 域名,请求头
url = 'https://www.xxxxx.com'# 打码
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:68.0) Gecko/20100101 Firefox/68.0'
}
urls_not200 = [] # 保存状态码非200的url
urls_fails = [] # 保存请求失败的url
# 打开爬好的漫画名字文件并构建列表names
with open('names.txt', 'r') as f:
names = [line.rstrip('\n') for line in f]
os.mkdir('imgs')
# 线程池
with futures.ThreadPoolExecutor(max_workers=25) as pool: # 线程池,最大25个线程
tasks = [pool.submit(get_jpg, name) for name in names]
for task in futures.as_completed(tasks):
print('----线程结束!----')
time_sec = time.time() - start_time
time_min = time_sec / 60
time_hou = time_min / 60
print('全部完成, 耗时 {:.2f}s! 即{:.2f}分钟,即{:.2f}小时'.format(time_sec, time_min, time_hou))
send()
print('urls_not200共{}个'.format(len(urls_not200)))
print('urls_not200: ',urls_not200)
print('urls_fails共{}个'.format(len(urls_fails)))
print('urls_fails: ',urls_fails)
send_email.py
此脚本大部分参考 https://blog.csdn.net/LeoPhilo/article/details/89074232
以QQ邮箱为例,因为QQ 邮箱一般默认关闭SMTP服务,所以我们得先去开启它
# smtplib 用于邮件的发信动作
import smtplib
from email.mime.text import MIMEText
# email 用于构建邮件内容
from email.header import Header
# 用于构建邮件头
def send():
# 发信方的信息:发信邮箱,QQ 邮箱授权码
from_addr = 'xxx@qq.com'
password = 'xxx'
# 收信方邮箱
to_addr = 'xxx@qq.com'
# 发信服务器
smtp_server = 'smtp.qq.com'
# 邮箱正文内容,第一个参数为内容,第二个参数为格式(plain 为纯文本),第三个参数为编码
str = '爬取完毕,或者出错了!'
msg = MIMEText(str, 'plain', 'utf-8')
# 邮件头信息
msg['From'] = Header(from_addr)
msg['To'] = Header(to_addr)
msg['Subject'] = Header('python test')
# 开启发信服务,这里使用的是加密传输
server = smtplib.SMTP_SSL(smtp_server)
server.connect(smtp_server, 465)
server = smtplib.SMTP_SSL(smtp_server)
# 登录发信邮箱
server.login(from_addr, password)
# 发送邮件
server.sendmail(from_addr, to_addr, msg.as_string())
# 关闭服务器
server.quit()
成果
名字太过se qing ,打码。
总结
8说了,开冲!
-
python多线程教程:python多线程详解
2020-02-03 11:49:20文章目录一、线程介绍二...python多线程详解 一、线程介绍 什么是线程 线程(Thread)也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。线程自己不拥有系统资源... -
python 多线程
2015-07-24 15:41:47threading提供了一个比thread模块更高层的API来提供线程的并发性。这些线程并发运行并共享内存。 下面来看threading模块的具体用法: ... 一、Thread的使用 目标... 这里对使用多线程并发,和不适用多线程 -
python多线程爬取某网站全部H漫画
2020-02-09 05:27:13python多线程爬取某网站全部h漫画 首发于个人博客:https://gunnerx.github.io/ 前言 最近学习python多线程与爬虫相关知识,想试着练练手。正好想到常逛的一个正(se)经(qing)漫画的网站,决定想办法把上面全部漫画都... -
Python之多线程:python多线程设计之同时执行多个函数命令详细攻略
2018-12-23 22:05:17Python之多线程:python多线程设计之同时执行多个函数命令详细攻略 目录 实现功能 采取方法 应用场景 实现功能 同时执行多个函数命令 采取方法 T1、单个实现 import threading threading.... -
python多线程下载图片
2017-08-30 21:09:57python 多线程
-
RAD Studio 10.4.2 KeyPatch
-
commons-fileupload-1.4-bin.zip
-
数据库连接池
-
SSH Shell_16.09_xclient.info.dmg
-
Unity ILRuntime框架设计
-
Java - 设计模式之工厂模式
-
Oracle的时间溯回语句
-
项目经理成长之路
-
龟兔赛跑预测
-
Oreo支付系统.rar
-
QQHelper1.3.2.apk
-
eWeather_HD_Pro_v8.1.0.zip
-
Liunx 优化思路与实操步骤
-
基于距离测度的成本和精度意识科学工作流检索
-
JMETER 性能测试基础课程
-
工程制图 AutoCAD 2012 从二维到三维
-
蓝桥杯快速幂
-
MaxScale 实现 MySQL 读写分离与负载均衡
-
C++代码规范和Doxygen根据注释自动生成手册
-
天然富硒科技成果转化-李喜贵:成立联合体谋定农业大健康