-
2020-11-30 17:42:38
ReentrantLock实现原理
ReentrantLock lock=new ReentrantLock(); lock.lock(); System.out.println("lock"); lock.unlock();
1 ReentrantLock底层是基于AQS(AbstractQueuedSynchronizer)的。
AbstractQueuedSynchronizer包含的属性包括int成员变量state表示同步状态、一个node对象(pre,next,thread,waitStatus线程状态)、两个节点(头节点,尾节点)、当前持有锁的线程。先从公平锁开始
1 当第一个线程t1执行lock.lock()方法时:
1.1会先调用acquire方法public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1.2 tryAcquire方法中尝试加锁
protected final boolean tryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取同步状态 int c = getState(); //判断同步状态是否为0,此时是第一个线程没有其他线程持有锁,所以c==0 if (c == 0) { //hasQueuedPredecessors()会判断是否需要排队,此时第一个线程是不需要排队的 //compareAndSetState时cas操作改变state的状态, //若改变成功把当前线程设置为持有锁的线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
1.3上一步中的hasQueuedPredecessors方法
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //这里队列还没有被初始化,所以h==t返回false return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
!hasQueuedPredecessors()则返回true,表示不需要排队,直接cas操作改变state的状态
cas成功后tryAcquire返回true,acquire中的!tryAcquire(arg)为false,此时直接退出acquire
2 当第二个线程t2进来时,
2.1此时调用tryAcquire方法会判断state==1,protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //第二个线程进来判断state==1 //然后判断当前线程和持有锁的线程是不是同一个(可重入) //这里时第二个线程会直接返回false else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
tryAcquire返回false,acquire中的!tryAcquire(arg)为true,
2.2此时直接进入到acquire中acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),先调用
addWaiter(Node.EXCLUSIVE)private Node addWaiter(Node mode) { //根据当前线程来实例化一个node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //如果tail为空代表队列还没初始化 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //此时队列还没初始化,直接进入enq方法 enq(node); return node; }
2.3 上一步中的enq方法
/** 这个方法执行的操作就是当队列第一次初始化时 会new一个空节点,然后把头结点指向new出来的这个空节点,尾节点指向上一步传过来的 根据当前线程来实例化的node(这里就表示为t2)。 至于为什么要new一个空节点出来,是为了表示正在持有锁的那个线程,当t2持有锁后会将t2设置为头节点, t2的thread设置为null */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //第一次循环进来尾节点为null,cas将头指针指向new的空节点 if (compareAndSetHead(new Node())) //然后尾指针也指向头指针 tail = head; } else { //第二次循环进来尾节点不为null,下面就是插入t2,将尾指针指向t2 //此时t2已经入队,不代表立即排队等待 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
2.4 addWaiter返回t2,传给acquireQueued方法
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //当t2入队后并不会立即park,而是自旋一次,看能不能拿到锁 //会先判断自己上一个是不是head //(若自己前面还有线程在等待,就没必要尝试拿锁了,轮不到自己) //tryAcquire尝试获取锁,和前面一样 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //依然拿不到锁,t2阻塞--park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
3 当第三个线程t3来时
操作和t2一样,不同的是:
–addWaiter方法,根据当前线程来实例化一个node后,此时判断tail不为空,将node的pre指向t2,队尾指向t3,t2的next指向t3,就是把t3入队
–t3入队后就不会自旋了,因为他的上一个不是head,直接park睡眠非公平锁情况
非公平锁会有两次抢锁过程
1 当线程进来直接cas尝试获取锁,
2 若失败会调用tryAcquire方法再次尝试加锁(判断锁的状态,若state为0直接cas获取锁,不会判断是否需要排队)/** 和公平锁的区别在于,直接cas尝试获取锁, */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
若第一次抢锁失败tryAcquire调用的是以下方法
/** 和公平锁的区别在于,若state为0直接cas获取锁,不会判断是否需要排队 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
更多相关内容 -
微课讲稿_Java多线程原理.pptx
2020-07-27 03:12:19Java多线程原理;Java 多线程原理;Java 多线程原理;Java 多线程原理;Java 多线程原理;Java 多线程原理;Java 多线程原理;Java 多线程原理;Java 多线程原理;Java 多线程原理;Java 多线程原理;Java 多线程原理;Java 多... -
python多线程原理及其实现
2019-02-17 16:02:48Python多线程原理与实现 目的: (1)了解python线程执行原理 (2)掌握多线程编程与线程同步 (3)了解线程池的使用 1 线程基本概念 1.1 线程是什么? 线程是指进程内的一个执行单元,也是进程内的可调度实体...文章目录
1 线程基本概念
1.1 线程是什么?
线程是指进程内的一个执行单元,也是进程内的可调度实体.
与进程的区别:
(1) 地址空间:进程内的一个执行单元;进程至少有一个线程;它们共享进程的地址空间;而进程有自己独立的地址空间;
(2) 资源拥有:进程是资源分配和拥有的单位,同一个进程内的线程共享进程的资源
(3) 线程是处理器调度的基本单位,但进程不是.
(4) 二者均可并发执行.简而言之,一个程序至少有一个进程,一个进程至少有一个线程.
线程的划分尺度小于进程,使得多线程程序的并发性高。
另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。1.2 线程和进程关系?
进程就是一个应用程序在处理机上的一次执行过程,它是一个动态的概念,而线程是进程中的一部分,进程包含多个线程在运行。
多线程可以共享全局变量,多进程不能。多线程中,所有子线程的进程号相同;多进程中,不同的子进程进程号不同。
进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动, 进程是系统进行资源分配和调度的一个独立单位.
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行.
2 Python线程模块
python主要是通过thread和threading这两个模块来实现多线程支持。python的thread模块是比较底层的模块,python的threading模块是对thread做了一些封装,可以更加方便的被使用。但是python(cpython)由于GIL的存在无法使用threading充分利用CPU资源,如果想充分发挥多核CPU的计算能力需要使用multiprocessing模块(Windows下使用会有诸多问题)。
python3.x中已经摒弃了Python2.x中采用函数式thread模块中的start_new_thread()函数来产生新线程方式。
python3.x中通过threading模块创建新的线程有两种方法:一种是通过threading.Thread(Target=executable Method)-即传递给Thread对象一个可执行方法(或对象);第二种是继承threading.Thread定义子类并重写run()方法。第二种方法中,唯一必须重写的方法是run()
(1)通过threading.Thread进行创建多线程
import threading import time def target(): print("the current threading %s is runing" %(threading.current_thread().name)) time.sleep(1) print("the current threading %s is ended"%(threading.current_thread().name)) print("the current threading %s is runing"%(threading.current_thread().name)) ## 属于线程t的部分 t = threading.Thread(target=target) t.start() ## 属于线程t的部分 t.join() # join是阻塞当前线程(此处的当前线程时主线程) 主线程直到Thread-1结束之后才结束 print("the current threading %s is ended"%(threading.current_thread().name))
(2)通过继承threading.Thread定义子类创建多线程
使用Threading模块创建线程,直接从threading.Thread继承,然后重写__init__方法和run方法:
import threading import time class myThread(threading.Thread): # 继承父类threading.Thread def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 print("Starting " + self.name) print_time(self.name, self.counter, 5) print("Exiting " + self.name) def print_time(threadName, delay, counter): while counter: time.sleep(delay) print("%s process at: %s" % (threadName, time.ctime(time.time()))) counter -= 1 # 创建新线程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 开启线程 thread1.start() thread2.start() # 等待线程结束 thread1.join() thread2.join() print("Exiting Main Thread")
通过以上案例可以知道,thread1和thread2执行顺序是乱序的。要使之有序,需要进行线程同步
3 线程间同步
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。
需要注意的是,Python有一个GIL(Global Interpreter Lock)机制,任何线程在运行之前必须获取这个全局锁才能执行,每当执行完100条字节码,全局锁才会释放,切换到其他线程执行。
多线程实现同步有四种方式:
锁机制,信号量,条件判断和同步队列。
下面我主要关注两种同步机制:锁机制和同步队列。
(1)锁机制
threading的Lock类,用该类的acquire函数进行加锁,用realease函数进行解锁
import threading import time class myThread(threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): print("Starting " + self.name) # 获得锁,成功获得锁定后返回True # 可选的timeout参数不填时将一直阻塞直到获得锁定 # 否则超时后将返回False threadLock.acquire() print_time(self.name, self.counter, 5) # 释放锁 threadLock.release() def print_time(threadName, delay, counter): while counter: time.sleep(delay) print("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1 threadLock = threading.Lock() threads = [] # 创建新线程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 开启新线程 thread1.start() thread2.start() # 添加线程到线程列表 threads.append(thread1) threads.append(thread2) # 等待所有线程完成 for t in threads: t.join() print("Exiting Main Thread")
(2) 线程同步队列queue
python2.x中提供的Queue, Python3.x中提供的是queue
见import queue.
Python的queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
queue模块中的常用方法:
- queue.qsize() 返回队列的大小
- queue.empty() 如果队列为空,返回True,反之False
- queue.full() 如果队列满了,返回True,反之False
- queue.full 与 maxsize 大小对应
- queue.get([block[, timeout]])获取队列,timeout等待时间
- queue.get_nowait() 相当Queue.get(False)
- queue.put(item) 写入队列,timeout等待时间
- queue.put_nowait(item) 相当Queue.put(item, False)
- queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
- queue.join() 实际上意味着等到队列为空,再执行别的操作
案例1:
import queue import threading import time exitFlag = 0 class myThread(threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print("Starting " + self.name) process_data(self.name, self.q) print("Exiting " + self.name) def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print("%s processing %s" % (threadName, data)) else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = queue.Queue(10) threads = [] threadID = 1 # 创建新线程 for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # 填充队列 queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # 等待队列清空 while not workQueue.empty(): pass # 通知线程是时候退出 exitFlag = 1 # 等待所有线程完成 for t in threads: t.join() print("Exiting Main Thread")
案例2:
import time import threading import queue class Worker(threading.Thread): def __init__(self, name, queue): threading.Thread.__init__(self) self.queue = queue self.start() #执行run() def run(self): #循环,保证接着跑下一个任务 while True: # 队列为空则退出线程 if self.queue.empty(): break # 获取一个队列数据 foo = self.queue.get() # 延时1S模拟你要做的事情 time.sleep(1) # 打印 print(self.getName() + " process " + str(foo)) # 任务完成 self.queue.task_done() # 队列 queue = queue.Queue() # 加入100个任务队列 for i in range(100): queue.put(i) # 开10个线程 for i in range(10): threadName = 'Thread' + str(i) Worker(threadName, queue) # 所有线程执行完毕后关闭 queue.join()
4 线程池
4.1 传统多线程问题?
传统多线程方案会使用“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。
一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。
有没有一种高效的解决方案呢? —— 线程池
4.2 线程池基本原理:
我们把任务放进队列中去,然后开N个线程,每个线程都去队列中取一个任务,执行完了之后告诉系统说我执行完了,然后接着去队列中取下一个任务,直至队列中所有任务取空,退出线程。
使用线程池:
由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。线程池要设置为多少?
服务器CPU核数有限,能够同时并发的线程数有限,并不是开得越多越好,以及线程切换是有开销的,如果线程切换过于频繁,反而会使性能降低
线程执行过程中,计算时间分为两部分:
- CPU计算,占用CPU
- 不需要CPU计算,不占用CPU,等待IO返回,比如recv(), accept(), sleep()等操作,具体操作就是比如
访问cache、RPC调用下游service、访问DB,等需要网络调用的操作
那么如果计算时间占50%, 等待时间50%,那么为了利用率达到最高,可以开2个线程:
假如工作时间是2秒, CPU计算完1秒后,线程等待IO的时候需要1秒,此时CPU空闲了,这时就可以切换到另外一个线程,让CPU工作1秒后,线程等待IO需要1秒,此时CPU又可以切回去,第一个线程这时刚好完成了1秒的IO等待,可以让CPU继续工作,就这样循环的在两个线程之前切换操作。那么如果计算时间占20%, 等待时间80%,那么为了利用率达到最高,可以开5个线程:
可以想象成完成任务需要5秒,CPU占用1秒,等待时间4秒,CPU在线程等待时,可以同时再激活4个线程,这样就把CPU和IO等待时间,最大化的重叠起来抽象一下,计算线程数设置的公式就是:
N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y,则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。
由于有GIL的影响,python只能使用到1个核,所以这里设置N=1import queue import threading import time # 声明线程池管理类 class WorkManager(object): def __init__(self, work_num=1000, thread_num=2): self.work_queue = queue.Queue() # 任务队列 self.threads = [] # 线程池 self.__init_work_queue(work_num) # 初始化任务队列,添加任务 self.__init_thread_pool(thread_num) # 初始化线程池,创建线程 """ 初始化线程池 """ def __init_thread_pool(self, thread_num): for i in range(thread_num): # 创建工作线程(线程池中的对象) self.threads.append(Work(self.work_queue)) """ 初始化工作队列 """ def __init_work_queue(self, jobs_num): for i in range(jobs_num): self.add_job(do_job, i) """ 添加一项工作入队 """ def add_job(self, func, *args): self.work_queue.put((func, list(args))) # 任务入队,Queue内部实现了同步机制 """ 等待所有线程运行完毕 """ def wait_allcomplete(self): for item in self.threads: if item.isAlive(): item.join() class Work(threading.Thread): def __init__(self, work_queue): threading.Thread.__init__(self) self.work_queue = work_queue self.start() def run(self): # 死循环,从而让创建的线程在一定条件下关闭退出 while True: try: do, args = self.work_queue.get(block=False) # 任务异步出队,Queue内部实现了同步机制 do(args) self.work_queue.task_done() # 通知系统任务完成 except: break # 具体要做的任务 def do_job(args): time.sleep(0.1) # 模拟处理时间 print(threading.current_thread()) print(list(args)) if __name__ == '__main__': start = time.time() work_manager = WorkManager(100, 10) # 或者work_manager = WorkManager(10000, 20) work_manager.wait_allcomplete() end = time.time() print("cost all time: %s" % (end - start))
5 协程
在python GIL之下,同一时刻只能有一个线程在运行,那么对于CPU计算密集的程序来说,线程之间的切换开销就成了拖累,而以I/O为瓶颈的程序正是协程所擅长的:
Python中的协程经历了很长的一段发展历程。其大概经历了如下三个阶段:
- 最初的生成器变形yield/send
- 引入@asyncio.coroutine和yield from
- 在最近的Python3.5版本中引入async/await关键字
5.1 从yield说起
先看一段普通的计算斐波那契续列的代码
newlist =[1] def newfib(n): a=0 b=1 while n-1: a,b=b,a+b n =n-1 newlist.append(b) return newlist print(newfib(10)) # [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
如果我们仅仅是需要拿到斐波那契序列的第n位,或者仅仅是希望依此产生斐波那契序列,那么上面这种传统方式就会比较耗费内存。
这时,yield就派上用场了。
def fib(n): a = 0 b = 1 while n: yield b a, b = b, a + b n-=1 for fib_res in fib(20): print(fib_res)
**当一个函数中包含yield语句时,python会自动将其识别为一个生成器。**这时fib(20)并不会真正调用函数体,而是以函数体生成了一个生成器对象实例。
yield在这里可以保留fib函数的计算现场,暂停fib的计算并将b返回。而将fib放入for…in循环中时,每次循环都会调用next(fib(20)),唤醒生成器,执行到下一个yield语句处,直到抛出StopIteration异常。此异常会被for循环捕获,导致跳出循环。
5.2 Send来了
从上面的程序中可以看到,目前只有数据从fib(20)中通过yield流向外面的for循环;如果可以向fib(20)发送数据,那不是就可以在Python中实现协程了嘛。
于是,Python中的生成器有了send函数,yield表达式也拥有了返回值。
我们用这个特性,模拟一个慢速斐波那契数列的计算:
import time,random def stupid_fib(n): a = 0 b = 1 while n: sleep_cnt = yield b print('let me think {0} secs'.format(sleep_cnt)) time.sleep(sleep_cnt) a, b = b, a + b n-= 1 print('-' * 10 + 'test yield send' + '-' * 10) N = 20 sfib = stupid_fib(N) fib_res = next(sfib) while True: print(fib_res) try: fib_res = sfib.send(random.uniform(0, 0.5)) except StopIteration: break
6. python 进行并发编程
在Python 2的时代,高性能的网络编程主要是使用Twisted、Tornado和Gevent这三个库,但是它们的异步代码相互之间既不兼容也不能移植。
asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。
asyncio
的编程模型就是一个消息循环。我们从asyncio
模块中直接获取一个EventLoop
的引用,然后把需要执行的协程扔到EventLoop
中执行,就实现了异步IO。 Python的在3.4中引入了协程的概念,可是这个还是以生成器对象为基础。
Python 3.5添加了async和await这两个关键字,分别用来替换
asyncio.coroutine
和yield from
。 python3.5则确定了协程的语法。下面将简单介绍asyncio的使用。实现协程的不仅仅是asyncio,tornado和gevent, vloop都实现了类似的功能。
6.1 使用asyncio
用
asyncio
实现Hello world
代码如下:import asyncio @asyncio.coroutine def hello(): print("Hello world!") # 异步调用asyncio.sleep(1): r = yield from asyncio.sleep(1) print("Hello again!") # 获取EventLoop: loop = asyncio.get_event_loop() # 执行coroutine loop.run_until_complete(hello()) loop.close()
@asyncio.coroutine
把一个generator标记为coroutine类型,然后,我们就把这个
coroutine扔到
EventLoop中执行。 hello()
会首先打印出Hello world!
,然后,yield from
语法可以让我们方便地调用另一个generator
。由于asyncio.sleep()
也是一个coroutine
,所以线程不会等待asyncio.sleep()
,而是直接中断并执行下一个消息循环。当asyncio.sleep()
返回时,线程就可以从yield from
拿到返回值(此处是None
),然后接着执行下一行语句。 把
asyncio.sleep(1)
看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop
中其他可以执行的coroutine
了,因此可以实现并发执行。我们用Task封装两个
coroutine
试试:import threading import asyncio @asyncio.coroutine def hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(1) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
观察执行过程:
Hello world! (<_MainThread(MainThread, started 140735195337472)>) Hello world! (<_MainThread(MainThread, started 140735195337472)>) (暂停约1秒) Hello again! (<_MainThread(MainThread, started 140735195337472)>) Hello again! (<_MainThread(MainThread, started 140735195337472)>)
由打印的当前线程名称可以看出,两个
coroutine
是由同一个线程并发执行的。如果把
asyncio.sleep()
换成真正的IO操作,则多个coroutine
就可以由一个线程并发执行。asyncio案例实战
我们用
asyncio
的异步网络连接来获取sina、sohu和163的网站首页:async_wget.py
import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host) connect = asyncio.open_connection(host, 80) reader, writer = yield from connect header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
结果信息如下:
wget www.sohu.com... wget www.sina.com.cn... wget www.163.com... (等待一段时间) (打印出sohu的header) www.sohu.com header > HTTP/1.1 200 OK www.sohu.com header > Content-Type: text/html ... (打印出sina的header) www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT ... (打印出163的header) www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0 ...
可见3个连接由一个线程通过
coroutine
并发完成。6.2 使用async/await
import asyncio import re async def browser(host, port=80): # 连接host reader, writer = await asyncio.open_connection(host, port) print(host, port, '连接成功!') # 发起 / 主页请求(HTTP协议) # 发送请求头必须是两个空行 index_get = 'GET {} HTTP/1.1\r\nHost:{}\r\n\r\n'.format('/', host) writer.write(index_get.encode()) await writer.drain() # 等待向连接写完数据(请求发送完成) # 开始读取响应的数据报头 while True: line = await reader.readline() # 等待读取响应数据 if line == b'\r\n': break print(host, '<header>', line) # 读取响应的数据body body = await reader.read() print(encoding) print(host, '<content>', body) if __name__ == '__main__': loop = asyncio.get_event_loop() tasks = [browser(host) for host in ['www.dushu.com', 'www.sina.com.cn', 'www.baidu.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close() print('---over---')
7 小结
asyncio
提供了完善的异步IO支持;异步操作需要在
coroutine
中通过yield from
完成;多个
coroutine
可以封装成一组Task然后并发执行。 -
深入了解多线程的原理
2018-05-25 15:35:48即便不考虑多核心,在单核下,多线程也是有意义的,因为在一些操作,比如IO操作阻塞的时候,是不需要CPU参与的,这时候CPU就可以另开一个线程去做别的事情,等待IO操作完成再回到之前的线程继续执行即可 为什么...说在前面的话
使用多线程的目的
- 在多个CPU核心下,多线程的好处是显而易见的,不然多个CPU核心只跑一个线程其他的核心就都浪费了
- 即便不考虑多核心,在单核下,多线程也是有意义的,因为在一些操作,比如IO操作阻塞的时候,是不需要CPU参与的,这时候CPU就可以另开一个线程去做别的事情,等待IO操作完成再回到之前的线程继续执行即可
为什么要使用线程池,能为我们带来什么好处?
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高相应速度:当任务到达时,任务可以不需要等到线程创建就能立刻执行
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
使用线程池的风险
- 死锁
- 资源不足
- 并发错误
- 线程泄漏
- 请求过载
线程池的使用
我们可以通过ThreadPoolExecutor来创建一个线程池。
new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
创建一个线程池需要输入几个参数:
- corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即便其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择一下几种阻塞队列:
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO(先进先出)排序元素,吞吐量通常高于ArrayBlockingQueue。静态工厂方法Executors.NewFixedThreadPool()使用了这个队列。
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等待另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常高于LikedBlockingQueue,静态工厂方法Executor.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
- ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意思的名字,DEBUG和定位问题时非常有帮助。
RejectedExecutionHandler(饱和策略):当队列和线程都满了,说明线程池处于饱和状态,那么必须采用一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下为JDK1.5提供的四种策略。
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用哲所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
- 当前也可以根据应用场景需求来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
KeepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
- TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS,千分之一毫秒)和毫微秒(NANOSECONDS,千分之一微秒)。
向线程池提交任务
我们可以使用execute提交的任务,但是execute方法没有返回值,所以无法判断任务知否被线程池执行成功。通过以下代码可知execute方法输入的任务是一个Runnable类的实例。
threadsPool.execute(new Runnable(){ @Override public void run(){ //TODO Auto-generated method stub } });
我们也可以使用submit方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住知道任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞一段时间后立马返回,这时有可能任务没有执行完。
try{ Object s=future.get(); }catch(InterruptedException e){ //处理中断异常 }catch(ExecutionException e){ //处理无法执行任务异常 }finally{ //关闭线程池 executor.shutdown(); } }
线程池的关闭
我们可以通过调用线程池的shutdown或者shutdownNow方法来关闭线程池,但是它们的实现原理不同,shutdown的原理是只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置为STOP,然后尝试停止所有的正在执行或者暂停任务的线程,并返回等待执行任务的列表。
只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。线程池的实现原理
流程分析:线程池的主要工作流程如下图:
合理的配置线程池
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务
2. 任务的优先级:高,中和低
3. 任务的执行时间:长,中和短
4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没有必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级搞得任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需求设大一点,比如说几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,二我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他的任务。线程池的监控
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
- taskCount:线程池需要执行的任务数量
- completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
- largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
- getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
- getActiveCount:获取活动的线程数。通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated 方法,我们可以在任务执行前,执行后和线程关闭前干一些事。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。
-
多线程锁原理和实现案例
2019-01-07 19:06:20设计多线程锁操作系统实现原理、有哪些多线程锁,如何使用这些锁 -
Python多线程的原理与实现
2018-08-21 17:55:21Python多线程原理与实战 目的: (1)了解python线程执行原理 (2)掌握多线程编程与线程同步 (3)了解线程池的使用 1 线程基本概念 1.1 线程是什么? 线程是指进程内的一个执行单元,也是进程内的可调度...Python多线程原理与实战
目的:
(1)了解python线程执行原理
(2)掌握多线程编程与线程同步
(3)了解线程池的使用
1 线程基本概念
1.1 线程是什么?
线程是指进程内的一个执行单元,也是进程内的可调度实体.
与进程的区别:
(1) 地址空间:进程内的一个执行单元;进程至少有一个线程;它们共享进程的地址空间;而进程有自己独立的地址空间;
(2) 资源拥有:进程是资源分配和拥有的单位,同一个进程内的线程共享进程的资源
(3) 线程是处理器调度的基本单位,但进程不是.
(4) 二者均可并发执行.简而言之,一个程序至少有一个进程,一个进程至少有一个线程.
线程的划分尺度小于进程,使得多线程程序的并发性高。
另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。1.2 线程和进程关系?
进程就是一个应用程序在处理机上的一次执行过程,它是一个动态的概念,而线程是进程中的一部分,进程包含多个线程在运行。
多线程可以共享全局变量,多进程不能。多线程中,所有子线程的进程号相同;多进程中,不同的子进程进程号不同。
进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位.
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行.
2 Python线程模块
python主要是通过thread和threading这两个模块来实现多线程支持。python的thread模块是比较底层的模块,python的threading模块是对thread做了一些封装,可以更加方便的被使用。但是python(cpython)由于GIL的存在无法使用threading充分利用CPU资源,如果想充分发挥多核CPU的计算能力需要使用multiprocessing模块(Windows下使用会有诸多问题)。
2.1 如何创建线程
python3.x中已经摒弃了Python2.x中采用函数式thread模块中的start_new_thread()函数来产生新线程方式。
python3.x中通过threading模块创建新的线程有两种方法:一种是通过threading.Thread(Target=executable Method)-即传递给Thread对象一个可执行方法(或对象);第二种是继承threading.Thread定义子类并重写run()方法。第二种方法中,唯一必须重写的方法是run()
(1)通过threading.Thread进行创建多线程
import threading import time def target(): print("the current threading %s is runing" %(threading.current_thread().name)) time.sleep(1) print("the current threading %s is ended"%(threading.current_thread().name)) print("the current threading %s is runing"%(threading.current_thread().name)) ## 属于线程t的部分 t = threading.Thread(target=target) t.start() ## 属于线程t的部分 t.join() # join是阻塞当前线程(此处的当前线程时主线程) 主线程直到Thread-1结束之后才结束 print("the current threading %s is ended"%(threading.current_thread().name))
(2)通过继承threading.Thread定义子类创建多线程
使用Threading模块创建线程,直接从threading.Thread继承,然后重写init方法和run方法:
import threading import time class myThread(threading.Thread): # 继承父类threading.Thread def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 print("Starting " + self.name) print_time(self.name, self.counter, 5) print("Exiting " + self.name) def print_time(threadName, delay, counter): while counter: time.sleep(delay) print("%s process at: %s" % (threadName, time.ctime(time.time()))) counter -= 1 # 创建新线程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 开启线程 thread1.start() thread2.start() # 等待线程结束 thread1.join() thread2.join() print("Exiting Main Thread")
通过以上案例可以知道,thread1和thread2执行顺序是乱序的。要使之有序,需要进行线程同步
3 线程间同步
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。
需要注意的是,Python有一个GIL(Global Interpreter Lock)机制,任何线程在运行之前必须获取这个全局锁才能执行,每当执行完100条字节码,全局锁才会释放,切换到其他线程执行。
3.1 线程同步问题
多线程实现同步有四种方式:
锁机制,信号量,条件判断和同步队列。
下面我主要关注两种同步机制:锁机制和同步队列。
(1)锁机制
threading的Lock类,用该类的acquire函数进行加锁,用realease函数进行解锁
import threading import time class myThread(threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): print("Starting " + self.name) # 获得锁,成功获得锁定后返回True # 可选的timeout参数不填时将一直阻塞直到获得锁定 # 否则超时后将返回False threadLock.acquire() print_time(self.name, self.counter, 5) # 释放锁 threadLock.release() def print_time(threadName, delay, counter): while counter: time.sleep(delay) print("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1 threadLock = threading.Lock() threads = [] # 创建新线程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 开启新线程 thread1.start() thread2.start() # 添加线程到线程列表 threads.append(thread1) threads.append(thread2) # 等待所有线程完成 for t in threads: t.join() print("Exiting Main Thread")
(2) 线程同步队列queue
python2.x中提供的Queue, Python3.x中提供的是queue
见import queue.
Python的queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
queue模块中的常用方法:
- queue.qsize() 返回队列的大小
- queue.empty() 如果队列为空,返回True,反之False
- queue.full() 如果队列满了,返回True,反之False
- queue.full 与 maxsize 大小对应
- queue.get([block[, timeout]])获取队列,timeout等待时间
- queue.get_nowait() 相当Queue.get(False)
- queue.put(item) 写入队列,timeout等待时间
- queue.put_nowait(item) 相当Queue.put(item, False)
- queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
- queue.join() 实际上意味着等到队列为空,再执行别的操作
案例1:
import queue import threading import time exitFlag = 0 class myThread(threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print("Starting " + self.name) process_data(self.name, self.q) print("Exiting " + self.name) def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print("%s processing %s" % (threadName, data)) else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = queue.Queue(10) threads = [] threadID = 1 # 创建新线程 for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # 填充队列 queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # 等待队列清空 while not workQueue.empty(): pass # 通知线程是时候退出 exitFlag = 1 # 等待所有线程完成 for t in threads: t.join() print("Exiting Main Thread")
案例2:
import time import threading import queue class Worker(threading.Thread): def __init__(self, name, queue): threading.Thread.__init__(self) self.queue = queue self.start() #执行run() def run(self): #循环,保证接着跑下一个任务 while True: # 队列为空则退出线程 if self.queue.empty(): break # 获取一个队列数据 foo = self.queue.get() # 延时1S模拟你要做的事情 time.sleep(1) # 打印 print(self.getName() + " process " + str(foo)) # 任务完成 self.queue.task_done() # 队列 queue = queue.Queue() # 加入100个任务队列 for i in range(100): queue.put(i) # 开10个线程 for i in range(10): threadName = 'Thread' + str(i) Worker(threadName, queue) # 所有线程执行完毕后关闭 queue.join()
4 线程池
传统多线程问题?
传统多线程方案会使用“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。
一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。
有没有一种高效的解决方案呢? —— 线程池
线程池基本原理:
我们把任务放进队列中去,然后开N个线程,每个线程都去队列中取一个任务,执行完了之后告诉系统说我执行完了,然后接着去队列中取下一个任务,直至队列中所有任务取空,退出线程。
使用线程池:
由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。线程池要设置为多少?
服务器CPU核数有限,能够同时并发的线程数有限,并不是开得越多越好,以及线程切换是有开销的,如果线程切换过于频繁,反而会使性能降低
线程执行过程中,计算时间分为两部分:
- CPU计算,占用CPU
- 不需要CPU计算,不占用CPU,等待IO返回,比如recv(), accept(), sleep()等操作,具体操作就是比如
访问cache、RPC调用下游service、访问DB,等需要网络调用的操作
那么如果计算时间占50%, 等待时间50%,那么为了利用率达到最高,可以开2个线程:
假如工作时间是2秒, CPU计算完1秒后,线程等待IO的时候需要1秒,此时CPU空闲了,这时就可以切换到另外一个线程,让CPU工作1秒后,线程等待IO需要1秒,此时CPU又可以切回去,第一个线程这时刚好完成了1秒的IO等待,可以让CPU继续工作,就这样循环的在两个线程之前切换操作。那么如果计算时间占20%, 等待时间80%,那么为了利用率达到最高,可以开5个线程:
可以想象成完成任务需要5秒,CPU占用1秒,等待时间4秒,CPU在线程等待时,可以同时再激活4个线程,这样就把CPU和IO等待时间,最大化的重叠起来抽象一下,计算线程数设置的公式就是:
N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y,则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。
由于有GIL的影响,python只能使用到1个核,所以这里设置N=1import queue import threading import time # 声明线程池管理类 class WorkManager(object): def __init__(self, work_num=1000, thread_num=2): self.work_queue = queue.Queue() # 任务队列 self.threads = [] # 线程池 self.__init_work_queue(work_num) # 初始化任务队列,添加任务 self.__init_thread_pool(thread_num) # 初始化线程池,创建线程 """ 初始化线程池 """ def __init_thread_pool(self, thread_num): for i in range(thread_num): # 创建工作线程(线程池中的对象) self.threads.append(Work(self.work_queue)) """ 初始化工作队列 """ def __init_work_queue(self, jobs_num): for i in range(jobs_num): self.add_job(do_job, i) """ 添加一项工作入队 """ def add_job(self, func, *args): self.work_queue.put((func, list(args))) # 任务入队,Queue内部实现了同步机制 """ 等待所有线程运行完毕 """ def wait_allcomplete(self): for item in self.threads: if item.isAlive(): item.join() class Work(threading.Thread): def __init__(self, work_queue): threading.Thread.__init__(self) self.work_queue = work_queue self.start() def run(self): # 死循环,从而让创建的线程在一定条件下关闭退出 while True: try: do, args = self.work_queue.get(block=False) # 任务异步出队,Queue内部实现了同步机制 do(args) self.work_queue.task_done() # 通知系统任务完成 except: break # 具体要做的任务 def do_job(args): time.sleep(0.1) # 模拟处理时间 print(threading.current_thread()) print(list(args)) if __name__ == '__main__': start = time.time() work_manager = WorkManager(100, 10) # 或者work_manager = WorkManager(10000, 20) work_manager.wait_allcomplete() end = time.time() print("cost all time: %s" % (end - start))
进程石油系统分配资源、线程是由CPU调度、协程由用户控制
5 协程
在python GIL之下,同一时刻只能有一个线程在运行,那么对于CPU计算密集的程序来说,线程之间的切换开销就成了拖累,而以I/O为瓶颈的程序正是协程所擅长的:
Python中的协程经历了很长的一段发展历程。其大概经历了如下三个阶段:
- 最初的生成器变形yield/send
- 引入@asyncio.coroutine和yield from
- 在最近的Python3.5版本中引入async/await关键字
(1)从yield说起
先看一段普通的计算斐波那契续列的代码
def fibs(n): res = [0] * n index = 0 a = 0 b = 1 while index < n: res[index] = b a, b = b, a + b index += 1 return res for fib_res in fibs(20): print(fib_res)
如果我们仅仅是需要拿到斐波那契序列的第n位,或者仅仅是希望依此产生斐波那契序列,那么上面这种传统方式就会比较耗费内存。
这时,yield就派上用场了。
def fib(n): index = 0 a = 0 b = 1 while index < n: yield b a, b = b, a + b index += 1 for fib_res in fib(20): print(fib_res)
当一个函数中包含yield语句时,python会自动将其识别为一个生成器。这时fib(20)并不会真正调用函数体,而是以函数体生成了一个生成器对象实例。
yield在这里可以保留fib函数的计算现场,暂停fib的计算并将b返回。而将fib放入for…in循环中时,每次循环都会调用next(fib(20)),唤醒生成器,执行到下一个yield语句处,直到抛出StopIteration异常。此异常会被for循环捕获,导致跳出循环。
(2) Send来了
从上面的程序中可以看到,目前只有数据从fib(20)中通过yield流向外面的for循环;如果可以向fib(20)发送数据,那不是就可以在Python中实现协程了嘛。
于是,Python中的生成器有了send函数,yield表达式也拥有了返回值。
我们用这个特性,模拟一个慢速斐波那契数列的计算:
import time import random def stupid_fib(n): index = 0 a = 0 b = 1 while index < n: sleep_cnt = yield b print('let me think {0} secs'.format(sleep_cnt)) time.sleep(sleep_cnt) a, b = b, a + b index += 1 print('-' * 10 + 'test yield send' + '-' * 10) N = 20 sfib = stupid_fib(N) fib_res = next(sfib) #第一次必须要执行next()函数,让程序控制到yield b 位置 while True: print(fib_res) try: fib_res = sfib.send(random.uniform(0, 0.5)) except StopIteration: break
python 进行并发编程
在Python 2的时代,高性能的网络编程主要是使用Twisted、Tornado和Gevent这三个库,但是它们的异步代码相互之间既不兼容也不能移植。
asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。
asyncio
的编程模型就是一个消息循环。我们从asyncio
模块中直接获取一个EventLoop
的引用,然后把需要执行的协程扔到EventLoop
中执行,就实现了异步IO。 Python的在3.4中引入了协程的概念,可是这个还是以生成器对象为基础。
Python 3.5添加了async和await这两个关键字,分别用来替换
asyncio.coroutine
和yield from
。 python3.5则确定了协程的语法。下面将简单介绍asyncio的使用。实现协程的不仅仅是asyncio,tornado和gevent, vloop都实现了类似的功能。
(1)协程定义
用
asyncio
实现Hello world
代码如下:import asyncio @asyncio.coroutine def hello(): print("Hello world!") # 异步调用asyncio.sleep(1)-->协程函数: r = yield from asyncio.sleep(1) #此处为另外一个协程,不是休眠 print("Hello again!") # 获取EventLoop(事件循环器): loop = asyncio.get_event_loop() # 执行coroutine loop.run_until_complete(hello()) loop.close()
@asyncio.coroutine
把一个generator标记为coroutine类型,然后,我们就把这个
coroutine扔到
EventLoop中执行。 hello()
会首先打印出Hello world!
,然后,yield from
语法可以让我们方便地调用另一个generator
。由于asyncio.sleep()
也是一个coroutine
,所以线程不会等待asyncio.sleep()
,而是直接中断并执行下一个消息循环。当asyncio.sleep()
返回时,线程就可以从yield from
拿到返回值(此处是None
),然后接着执行下一行语句。 把
asyncio.sleep(1)
看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop
中其他可以执行的coroutine
了,因此可以实现并发执行。我们用Task封装两个
coroutine
试试:import threading import asyncio @asyncio.coroutine def hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(1) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
观察执行过程:
Hello world! (<_MainThread(MainThread, started 140735195337472)>) Hello world! (<_MainThread(MainThread, started 140735195337472)>) (暂停约1秒) Hello again! (<_MainThread(MainThread, started 140735195337472)>) Hello again! (<_MainThread(MainThread, started 140735195337472)>)
由打印的当前线程名称可以看出,两个
coroutine
是由同一个线程并发执行的。如果把
asyncio.sleep()
换成真正的IO操作,则多个coroutine
就可以由一个线程并发执行。asyncio案例实战
我们用
asyncio
的异步网络连接来获取sina、sohu和163的网站首页:async_wget.py
import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host) connect = asyncio.open_connection(host, 80) #等待打开host:80端口 reader, writer = yield from connect #开始链接。如果连接成功,则返回Reader和写writer的操作对象 header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
结果信息如下:
wget www.sohu.com... wget www.sina.com.cn... wget www.163.com... (等待一段时间) (打印出sohu的header) www.sohu.com header > HTTP/1.1 200 OK www.sohu.com header > Content-Type: text/html ... (打印出sina的header) www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT ... (打印出163的header) www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0 ...
可见3个连接由一个线程通过
coroutine
并发完成。(3) 使用async/await
import asyncio import re async def browser(host, port=80): # 连接host reader, writer = await asyncio.open_connection(host, port) print(host, port, '连接成功!') # 发起 / 主页请求(HTTP协议) # 发送请求头必须是两个空行 index_get = 'GET {} HTTP/1.1\r\nHost:{}\r\n\r\n'.format('/', host) writer.write(index_get.encode()) await writer.drain() # 等待向连接写完数据(请求发送完成) # 开始读取响应的数据报头 while True: line = await reader.readline() # 等待读取响应数据 if line == b'\r\n': break print(host, '<header>', line) # 读取响应的数据body body = await reader.read() print(encoding) print(host, '<content>', body) if __name__ == '__main__': loop = asyncio.get_event_loop() tasks = [browser(host) for host in ['www.dushu.com', 'www.sina.com.cn', 'www.baidu.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close() print('---over---')
小结
asyncio
提供了完善的异步IO支持;异步操作需要在
coroutine
中通过yield from
完成;多个
coroutine
可以封装成一组Task然后并发执行。 -
Windows环境下的多线程编程原理与应用 [完整 高清]
2018-09-07 11:18:44Windows环境下的多线程编程原理与应用 Windows环境下的多线程编程原理与应用 -
多线程、cpu本质原理
2019-04-07 11:48:441、操作系统如何实现多进程、多线程。 2、cpu基本结构,cpu如何和其他设备(网卡、显卡、声卡、磁盘、usb)通信。 3、java虚拟机原理。 4、多线程的安全问题的本质原因。 下面一一记录,本人文学水平有限,... -
Scrapy多线程爬虫原理
2019-10-16 09:39:46一:多线程爬虫原理 二:Scrapy框架 定义:Scrapy是基于Python实现,方便爬取网站数据、提取结构性数据的应用框架。 底层:使用Twisted异步网络框架来处理网络通讯,加快下载速度。 不用手动实现异步框架,包含了... -
windows系统多线程同步机制原理总结
2018-12-24 21:24:33windows系统多线程同步机制原理总结 同步问题是开发过程中遇到的重要问题之一。同步是要保证在并发执行的环境中各个控制流可以有序地执行,包括对于资源的共享或互斥访问,以及代码功能的逻辑顺序。 为了保证多线程... -
java多线程总结:原理结合源码详细讲解 - 简单实用
2018-08-09 11:14:57执行策略:线程执行的方式 串行执行 比如:医院给病人看病的时候,可以让所有的病人都拍成一个队形,让一个医生统一的看病。医生:线程。病人看病:任务 这种一个医生给一群站好队形的病人看病--映射到java就相当... -
JDK自带多线程工具包详解
2021-06-30 20:50:40由浅入深,通过图解和手写代码,讲解Java版的多线程,主要讲解如下内容: ...JDK多线程工具包中,若干种工具的原理和手写实现: ReentrantLock、CountDownLanuh、CyclicBarrier、Semaphore -
多线程下载的原理和基本用法
2017-05-15 23:12:22线程可以通俗的理解为下载的通道,一个线程就是文件下载的一个通道,多线程就是同时打开了多个通道对文件进行下载。当服务器提供下载服务时,用户之间共享带宽。本文介绍了多线程下载的原理和基本用法。 -
Nginx多线程原理
2015-11-11 09:12:01一、问题 一般情况下,nginx 是一个事件处理器,一个从内核获取连接事件并告诉系统如何处理的控制器。实际上,在操作系统做读写数据调度的时候,nginx是协同...但是所有处理过程都是在一个简单的线程循环中完成的 -
Java多线程之线程池原理剖析,锁的深度化
2018-07-27 02:55:30目录 1 线程池 1.1 好处 1.2 作用 1.3 线程池创建方式 1.4 原理分析 1.5 线程池合理配置 2 锁的深度化 ...1)降低资源消耗,重复利用已经创建好的线程,降低线程创建和销毁造成的消耗 2)提高... -
java多线程视频教程(共七套)
2019-05-07 18:18:3001、【中级原理】java多线程并发编程入门原理精通视频教程 02、【中级原理专题】java并发编程多线程高级专题课程 03. 【中级原理】高并发编程原理和线程池精通教程 04、【高级原理】Java并发多线程编程基础原理与... -
单核多线程与多核多线程
2018-09-29 17:20:33单核多线程与多核多线程 或许有些同学对于单核多线程和多核多线程有点误区,因为会听到一些同学问为什么单核能处理多线程,总结了一些干货,下面会通俗说明下。 线程和进程是什么 线程是CPU调度和分配的基本单位... -
多线程原理、线程安全函数和多线程程序需要注意的问题
2015-03-17 08:13:40一、多线程使用情景: 1.用户需要同时得到多个反馈,例如下载过程中进度条改变,读取文件的时候显示结果。 2.提高程序执行性能,提高CPU使用效率,。 多线程的主要是需要处理大量的IO操作或者处理的情况需要花... -
并发编程(一)多线程基础和原理
2020-07-19 01:44:02多线程基础 最近,准备回顾下多线程相关的知识体系,顺便在这里做个记录。 并发的发展历史 最早的计算机只能解决简单的数学运算问题,比如正弦、 余弦等。运行方式:程序员首先把程序写到纸上,然后穿 孔成卡片,... -
多线程之间实现通讯
2019-02-14 12:05:191.多线程之间通讯 多线程之间通讯,其实就是多个线程在操作同一个资源,但是操作的动作不同。 //共享资源实体类 class Person { private String name; private String sex; public String getName() { ... -
linux 多线程的实现的基本原理
2016-03-28 11:10:331. linux 多线程的基本概念 linux 是多用户、多任务的并发执行;所谓的并发是通过多进程、多线程来实现的; 1). 其中多进程有3种方式: 单机多实例(机器复用,一台机器启动多个进程,每个进程干自己的事情)多... -
c中的多线程详解
2017-05-30 19:56:11什么是线程? 线程是进程里面的一个单独的序列流,因为线程包含一些进程的属性,所以线程也叫轻量级进程。...为什么需要多线程? 线程是通过并行性改进应用程序的流行方式,例如在浏览器中多个选项卡可以对 -
StringBuffer 多线程字符串操作,StringBuffer原理深度解析
2018-09-12 19:53:57public class StringBufferDemo { /** * 多线程字符串操作 */ public static void main(String[] args) { String a="a"; String b="b"; String c=a+b+1;//... -
.NET使用多线程编程
2019-04-20 13:34:50本文介绍了如何使用System.Threading命名空间编写多线程应用程序。应用程序中的多线程可能会导致并发问题,例如竞争条件和死锁。 最后,本文讨论了各种同步技术,如Locks,Mutexes和Semaphores,以处理并发问题并... -
多线程下载的原理
2014-03-23 17:42:15假如我们把一个服务器上的文件看作是一个水缸里的水的话,那么多线程下载就相当于从水缸上打了多个小孔,然后塞进去小管道进行抽水。呵呵,也许这个比喻不够准确。多线程下载大致可分为以下几个步骤: 一、首先在... -
多线程锁的升级原理是什么?
2019-05-20 11:04:06多线程锁的升级原理是什么? 锁的级别从低到高: 无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁 锁分级别原因: 没有优化以前,sychronized是重量级锁(悲观锁),使用 wait 和 notify、notifyAll 来切换... -
一篇文章总结了JVM线程基本原理
2018-07-10 13:49:07用线程池初始化3条线程做自定义文件生成操作,用一条线程去处理第一步完成的结果Future,利用几天的时间结合项目,站在JVM的角度回顾JAVA线程的相关知识,接下去再整一篇线程安全的,之前写的多线程还是在一年以前,...