精华内容
下载资源
问答
  • Python 线程启动和关闭

    千次阅读 2020-06-24 17:58:23
    Python自带的线程函数,发现没有停止的功能,有些需要重载后使用,但是这里还是用了网上的一个脚本通用的方法,实测还是有效的,只是从stop thread开始到真正退出,时间有延迟;实测多次发现至少需要3s import ...

    线程启动和关闭的Demo

    • Python自带的线程函数,发现没有停止的功能,有些需要重载后使用,但是这里还是用了网上的一个脚本通用的方法,实测还是有效的,只是从stop thread开始到真正退出,时间有延迟;实测多次发现至少需要3s
    import threading
    import sys, os, time
    import inspect
    import ctypes
    import thread
    
    
    def TestThreadExit():
        print "Start Now"
        Thrd = threading.Thread(target=ThrdMointor)
        Thrd.start()
    
        print "Start Thrd now "
        time_counter = 10
        while time_counter > 0:
            print "timer = " + str(time_counter) + "\n"
            time_counter = time_counter - 1
            if time_counter <= 0:
                if Thrd.isAlive():
                    stop_thread(Thrd)
    
                print "time end..."
            time.sleep(1)
    
        while Thrd.isAlive():
            if Thrd.isAlive():
                stop_thread(Thrd)
                print "The Thrd is still alive "
            else:
                print "The Thrd is dead."
    
            time.sleep(0.2)
        Thrd.join()
    
    
    def ThrdMointor():
        timeCounter = 0
        while True:
            print "Test Monitor time counter = " + str(timeCounter) + '\n'
            time.sleep(1)
            timeCounter += 1
    
    
    def _async_raise(tid, exctype):
        """raises the exception, performs cleanup if needed"""
        tid = ctypes.c_long(tid)
        if not inspect.isclass(exctype):
            exctype = type(exctype)
        res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
        if res == 0:
            raise ValueError("invalid thread id")
        elif res != 1:
            # """if it returns a number greater than one, you're in trouble,
            # and you should call it again with exc=NULL to revert the effect"""
            ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
            raise SystemError("PyThreadState_SetAsyncExc failed")
    
    
    def stop_thread(thread):
        _async_raise(thread.ident, SystemExit)
    
    
    
    if __name__=="__main__":
        TestThreadExit()
    
    
    

    测试结果显示,发现至少3s才能停止线程

    
    Start Now
    Test Monitor time counter = 0
    
    Start Thrd now 
    timer = 10
    
    Test Monitor time counter = 1
    
    timer = 9
    
    Test Monitor time counter = 2
    
    timer = 8
    
    Test Monitor time counter = 3
    
    timer = 7
    
    Test Monitor time counter = 4
    
    timer = 6
    
    Test Monitor time counter = 5
    
    timer = 5
    
    Test Monitor time counter = 6
    
    timer = 4
    
    Test Monitor time counter = 7
    
    timer = 3
    
    Test Monitor time counter = 8
    
    timer = 2
    
    Test Monitor time counter = 9
    
    timer = 1
    
    time end...
    Test Monitor time counter = 10
    
    The Thrd is still alive 
    The Thrd is still alive 
    The Thrd is still alive 
    The Thrd is still alive 
    The Thrd is still alive 
    Test Monitor time counter = 11
    
    The Thrd is still alive 
    The Thrd is still alive 
    The Thrd is still alive 
    The Thrd is still alive 
    The Thrd is still alive 
    Test Monitor time counter = 12
    
    The Thrd is still alive 
    The Thrd is still alive 
    The Thrd is still alive 
    The Thrd is still alive 
    The Thrd is still alive 
    Test Monitor time counter = 13
    
    Process finished with exit code 0
    
    
    展开全文
  • Python 线程

    千次阅读 2019-06-06 17:54:45
    线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程不拥有私有的系统资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间...

    线程,有时被称为轻量进程,是程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程不拥有私有的系统资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。

    线程是程序中一个单一的顺序控制流程。进程内有一个相对独立的、可调度的执行单元,是系统独立调度和分派CPU的基本单位指令运行时的程序的调度单位。在单个程序中同时运行多个线程完成不同的工作,称为多线程。Python多线程用于I/O操作密集型的任务,如SocketServer网络并发,网络爬虫。

    现代处理器都是多核的,几核处理器只能同时处理几个线程,多线程执行程序看起来是同时进行,实际上是CPU在多个线程之间快速切换执行,这中间就涉及到上下问切换,所谓的上下文切换就是指一个线程Thread被分配的时间片用完了之后,线程的信息被保存起来,CPU执行另外的线程,再到CPU读取线程Thread的信息并继续执行Thread的过程。

    线程模块

    Python的标准库提供了两个模块:_thread和threading。_thread 提供了低级别的、原始的线程以及一个简单的互斥锁,它相比于 threading 模块的功能还是比较有限的。Threading模块是_thread模块的替代,在实际的开发中,绝大多数情况下还是使用高级模块threading,因此本书着重介绍threading高级模块的使用。

    Python创建Thread对象语法如下:

    import threading
    threading.Thread(target=None, name=None,  args=())

    主要参数说明:

    • target 是函数名字,需要调用的函数。

    • name 设置线程名字。

    • args 函数需要的参数,以元祖( tuple)的形式传入

    Thread 对象主要方法说明:

    • run(): 用以表示线程活动的方法。

    • start():启动线程活动。

    • join(): 等待至线程中止。

    • isAlive(): 返回线程是否活动的。

    • getName(): 返回线程名。

    • setName(): 设置线程名。

    Python中实现多线程有两种方式:函数式创建线程和创建线程类。

    第一种创建线程方式:函数式创建线程

    创建线程的时候,只需要传入一个执行函数和函数的参数即可完成threading.Thread实例的创建。下面的例子使用Thread类来产生2个子线程,然后启动2个子线程并等待其结束,

    import threading
    import time,random,math
    
    # idx 循环次数
    def printNum(idx):
        for num in range(idx ):
            #打印当前运行的线程名字
            print("{0}\tnum={1}".format(threading.current_thread().getName(), num) )
            delay = math.ceil(random.random() * 2)
            time.sleep(delay)
    
    
    if __name__ == '__main__':
        th1 = threading.Thread(target=printNum, args=(2,),name="thread1"  )
        th2 = threading.Thread(target=printNum, args=(3,),name="thread2" )
    
        #启动2个线程
        th1.start()
        th2.start()
    
        #等待至线程中止
        th1.join()
        th2.join()
        print("{0} 线程结束".format(threading.current_thread().getName()))

    运行脚本得到以下结果。

    thread1 num=0
    thread2 num=0
    thread1 num=1
    thread2 num=1
    thread2 num=2
    MainThread 线程结束

    运行脚本默认会启动一个线程,把该线程称为主线程,主线程有可以启动新的线程,Python的threading模块有个current_thread()函数,它将返回当前线程的示例。从当前线程的示例可以获得前运行线程名字,核心代码如下。

    threading.current_thread().getName()

    启动一个线程就是把一个函数和参数传入并创建Thread实例,然后调用start()开始执行

    th1 = threading.Thread(target=printNum, args=(2,),name="thread1" )
    th1.start()

    从返回结果可以看出主线程示例的名字叫MainThread,子线程的名字在创建时指定,本例创建了2个子线程,名字叫thread1和thread2。如果没有给线程起名字,Python就自动给线程命名为Thread-1,Thread-2…等等。在本例中定义了线程函数printNum(),打印idx次记录后退出,每次打印使用time.sleep()让程序休眠一段时间。

    第二种创建线程方式:创建线程类

    直接创建threading.Thread的子类来创建一个线程对象,实现多线程。通过继承Thread类,并重写Thread类的run()方法,在run()方法中定义具体要执行的任务。在Thread类中,提供了一个start()方法用于启动新进程,线程启动后会自动调用run()方法。

    import threading
    import time,random,math
    
    class MutliThread(threading.Thread):
        def __init__(self, threadName,num):
            threading.Thread.__init__(self)
            self.name = threadName
            self.num = num
    
        def run(self):
            for i in range(self.num):
                print("{0} i={1}".format(threading.current_thread().getName(), i))
                delay = math.ceil(random.random() * 2)
                time.sleep(delay)
    
    
    if __name__ == '__main__':
        thr1 = MutliThread("thread1",3)
        thr2 = MutliThread("thread2",2)
        
        # 启动线程
        thr1.start()
        thr2.start()
    
        # 等待至线程中止
        thr1.join()
        thr2.join()
        print("{0} 线程结束".format(threading.current_thread().getName()))

    运行脚本得到以下结果。

    thread1 i=0
    thread2 i=0
    thread1 i=1
    thread2 i=1
    thread1 i=2
    MainThread 线程结束

    从返回结果可以看出,通过创建Thread类来产生2个线程对象thr1和thr2,重写Thread类的run()函数,把业务逻辑放入其中,通过调用线程对象的start()方法启动线程。通过调用线程对象的join()函数,等待该线程完成,在继续下面的操作。

    在本例中,主线程MainThread等待子线程thread1和thread2线程运行结束后才输出” MainThread 线程结束”。如果子线程thread1和thread2不调用join()函数,那么主线程MainThread和2个子线程是并行执行任务的,2个子线程加上join()函数后,程序就变成顺序执行了。所以子线程用到join()的时候,通常都是主线程等到其他多个子线程执行完毕后再继续执行,其他的多个子线程并不需要互相等待。

    守护线程

    在线程模块中,使用子线程对象用到join()函数,主线程需要依赖子线程执行完毕后才继续执行代码。如果子线程不使用join()函数,主线程和子线程是并行运行的,没有依赖关系,主线程执行了,子线程也在执行。

    在多线程开发中,如果子线程设定为了守护线程,守护线程会等待主线程运行完毕后被销毁。一个主线程可以设置多个守护线程,守护线程运行的前提是,主线程必须存在,如果主线程不存在了,守护线程会被销毁。

    在本例中创建1个主线程3个子线程,让主线程和子线程并行执行。内容如下。

    import threading, time
    
    def run(taskName):
        print("任务:", taskName)
        time.sleep(2)
        print("{0} 任务执行完毕".format(taskName))  # 查看每个子线程
    
    if __name__ == '__main__':
        start_time = time.time()
        for i in range(3):
            thr = threading.Thread(target=run, args=("task-{0}".format(i),))
            thr.start()
    
        # 查看主线程和当前活动的所有线程数
        print("{0}线程结束,当线程数量={1}".format( threading.current_thread().getName(), threading.active_count()))
        print("消耗时间:", time.time() - start_time)

    运行脚本得到以下结果:

    任务: task-0
    任务: task-1
    任务: task-2
    MainThread线程结束,当线程数量=4
    消耗时间: 0.0009751319885253906
    task-2 任务执行完毕
    task-0 任务执行完毕
    task-1 任务执行完毕

    从返回结果可以看出,当前的线程个数是4,线程个数=主线程数 + 子线程数,在本例中有1个主线程和3个子线程。主线程执行完毕后,等待子线程执行完毕,程序才会退出。

    在本例的基础上,把所有的子线程都设置为守护线程。子线程变成守护线程后,只要主线程执行完毕,程序不管子线程有没有执行完毕,程序都会退出。使用线程对象的setDaemon(True)函数来设置守护线程。

    import threading, time
    
    def run(taskName):
        print("任务:", taskName)
        time.sleep(2)
        print("{0} 任务执行完毕".format(taskName))
    
    if __name__ == '__main__':
        start_time = time.time()
        for i in range(3):
            thr = threading.Thread(target=run, args=("task-{0}".format(i),))
            
            # 把子线程设置为守护线程,在启动线程前设置
            thr.setDaemon(True)
            thr.start()
    
        # 查看主线程和当前活动的所有线程数
        thrName = threading.current_thread().getName()
        thrCount = threading.active_count()
        print("{0}线程结束,当线程数量={1}".format(thrName, thrCount))
        print("消耗时间:", time.time() - start_time)

    运行脚本得到以下结果。

    任务: task-0
    任务: task-1
    任务: task-2
    MainThread线程结束,当线程数量=4
    消耗时间: 0.0010023117065429688

    从本例的返回结果可以看出,主线程执行完毕后,程序不会等待守护线程执行完毕后就退出了。设置线程对象为守护线程,一定要在线程对象调用start()函数前设置。

    多线程的锁机制

    多线程编程访问共享变量时会出现问题,但是多进程编程访问共享变量不会出现问题。因为多进程中,同一个变量各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享。

    多个进程之间对内存中的变量不会产生冲突,一个进程由多个线程组成,多线程对内存中的变量进行共享时会产生影响,所以就产生了死锁问题,怎么解决死锁问题是本节主要介绍的内容。

    1、变量的作用域

    一般在函数体外定义的变量称为全局变量,在函数内部定义的变量称为局部变量。全局变量所有作用域都可读,局部变量只能在本函数可读。函数在读取变量时,优先读取函数本身自有的局部变量,再去读全局变量。 内容如下。

    # 全局变量
    balance = 1
    
    def change():
        # 定义全局变量
        global balance
        balance = 100
    
        # 定义局部变量
        num = 20
        print("change() balance={0}".format(balance) )
    
    if __name__ == "__main__" :
        change()
        print("修改后的 balance={0}".format(balance) )

    运行脚本得到以下结果。

    change() balance=100
    修改后的 balance=100

    如果注释掉change()函数里的 global

    v1,那么得到的返回值是。
    change() balance=100
    修改后的 balance=1

    在本例中在change()函数外定义的变量balance是全局变量,在change()函数内定义的变量num是局部变量,全局变量默认是可读的,可以在任何函数中使用,如果需要改变全局变量的值,需要在函数内部使用global定义全局变量,本例中在change()函数内部使用global定义全局变量balance,在函数里就可以改变全局变量了。

    在函数里可以使用全局变量,但是在函数里不能改变全局变量。想实现多个线程共享变量,需要使用全局变量。在方法里加上全局关键字 global定义全局变量,多线程才可以修改全局变量来共享变量。

    2、多线程中的锁

    多线程同时修改全局变量时会出现数据安全问题,线程不安全就是不提供数据访问保护,有可能出现多个线程先后更改数据造成所得到的数据是脏数据。在本例中我们生成2个线程同时修改change()函数里的全局变量balance时,会出现数据不一致问题。

    本案例文件名为PythonFullStack\Chapter03\threadDemo03.py,内容如下。

    import threading
    
    balance = 100
    
    def change(num, counter):
        global balance
        for i in range(counter):
            balance += num
            balance -= num
            if balance != 100:
                # 如果输出这句话,说明线程不安全
                print("balance=%d" % balance)
            break
    
    if __name__ == "__main__":
        thr1 = threading.Thread(target=change,args=(100,500000),name='t1')
        thr2 = threading.Thread(target=change,args=(100,500000),name='t2')
        thr1.start()
        thr2.start()
        thr1.join()
        thr2.join()
        print("{0} 线程结束".format(threading.current_thread().getName()))

    运行以上脚本,当2个线程运行次数达到500000次时,会出现以下结果。

    balance=200
    MainThread 线程结束

    在本例中定义了一个全局变量balance,初始值为100,当启动2个线程后,先加后减,理论上balance应该为100。线程的调度是由操作系统决定的,当线程t1和t2交替执行时,只要循环次数足够多,balance结果就不一定是100了。从结果可以看出,在本例中线程t1和t2同时修改全局变量balance时,会出现数据不一致问题。

    注意

    在多线程情况下,所有的全局变量有所有线程共享。所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

    在多线程情况下,使用全局变量并不会共享数据,会出现线程安全问题。线程安全就是多线程访问时,采用了加锁机制,当一个线程访问该类的某个数据时,进行保护,其他线程不能进行访问直到该线程读取完,其他线程才可使用。不会出现数据不一致

    在单线程运行时没有代码安全问题。写多线程程序时,生成一个线程并不代表多线程。在多线程情况下,才会出现安全问题。

    针对线程安全问题,需要使用”互斥锁”,就像数据库里操纵数据一样,也需要使用锁机制。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

    互斥锁的核心代码如下:

    # 创建锁
    mutex = threading.Lock()
    
    # 锁定
    mutex.acquire()
    
    # 释放
    mutex.release()

    如果要确保balance计算正确,使用threading.Lock()来创建锁对象lock,把 lock.acquire()和lock.release()加在同步代码块里,本例的同步代码块就是对全局变量balance进行先加后减操作。

    当某个线程执行change()函数时,通过lock.acquire()获取锁,那么其他线程就不能执行同步代码块了,只能等待知道锁被释放了,获得锁才能执行同步代码块。由于锁只有一个,无论多少线程,同一个时刻最多只有一个线程持有该锁,所以修改全局变量balance不会产生冲突。改良后的代码内容如下。

    import threading
    
    balance = 100
    
    lock = threading.Lock()
    
    def change(num, counter):
        global balance
        for i in range(counter):
            # 先要获取锁
            lock.acquire()
            balance += num
            balance -= num
    
            # 释放锁
            lock.release()
    
            if balance != 100:
                # 如果输出这句话,说明线程不安全
                print("balance=%d" % balance)
                break
    
    if __name__ == "__main__":
        thr1 = threading.Thread(target=change,args=(100,500000),name='t1')
        thr2 = threading.Thread(target=change,args=(100,500000),name='t2')
        thr1.start()
        thr2.start()
        thr1.join()
        thr2.join()
        print("{0} 线程结束".format(threading.current_thread().getName()))

    在本例中2个线程同时运行lock.acquire()时,只有一个线程能成功的获取锁,然后执行代码,其他线程就继续等待直到获得锁位置。获得锁的线程用完后一定要释放锁,否则其他线程就会一直等待下去,成为死线程。

    在运行上面脚本就不会产生输出信息,证明代码是安全的。把 lock.acquire()和lock.release()加在同步代码块里,还要注意锁的力度不要加的太大了。第一个线程只有运行完了,第二个线程才能运行,所以锁要在需要同步代码里加上。

    范例代码

    其实现了一个读写锁,并可以指定读优先还是写优先,且读状态相互不影响,写状态不允许读状态。

    class RWLock(object):
        def __init__(self):
            self.lock = threading.Lock()
            self.rcond = threading.Condition(self.lock)
            self.wcond = threading.Condition(self.lock)
            self.read_waiter = 0    # 等待获取读锁的线程数
            self.write_waiter = 0   # 等待获取写锁的线程数
            self.state = 0          # 正数:表示正在读操作的线程数   负数:表示正在写操作的线程数(最多-1)
            self.owners = []        # 正在操作的线程id集合
            self.write_first = True # 默认写优先,False表示读优先
    
        def write_acquire(self, blocking=True):
            # 获取写锁只有当
            me = threading.get_ident()
            with self.lock:
                while not self._write_acquire(me):
                    if not blocking:
                        return False
                    self.write_waiter += 1
                    self.wcond.wait()
                    self.write_waiter -= 1
            return True
    
        def _write_acquire(self, me):
            # 获取写锁只有当锁没人占用,或者当前线程已经占用
            if self.state == 0 or (self.state < 0 and me in self.owners):
                self.state -= 1
                self.owners.append(me)
                return True
            if self.state > 0 and me in self.owners:
                raise RuntimeError('cannot recursively wrlock a rdlocked lock')
            return False
    
        def read_acquire(self, blocking=True):
            me = threading.get_ident()
            with self.lock:
                while not self._read_acquire(me):
                    if not blocking:
                        return False
                    self.read_waiter += 1
                    self.rcond.wait()
                    self.read_waiter -= 1
            return True
    
        def _read_acquire(self, me):
            if self.state < 0:
                # 如果锁被写锁占用
                return False
    
            if not self.write_waiter:
                ok = True
            else:
                ok = me in self.owners
            if ok or not self.write_first:
                self.state += 1
                self.owners.append(me)
                return True
            return False
    
        def unlock(self):
            me = threading.get_ident()
            with self.lock:
                try:
                    self.owners.remove(me)
                except ValueError:
                    raise RuntimeError('cannot release un-acquired lock')
    
                if self.state > 0:
                    self.state -= 1
                else:
                    self.state += 1
                if not self.state:
                    if self.write_waiter and self.write_first:   # 如果有写操作在等待(默认写优先)
                        self.wcond.notify()
                    elif self.read_waiter:
                        self.rcond.notify_all()
                    elif self.write_waiter:
                        self.wcond.notify()
    
        read_release = unlock
        write_release = unlock

    线程间通信

    cookbook 介绍:https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p03_communicating_between_threads.html

    Queue 对象:从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象。

    from queue import Queue
    from threading import Thread
    
    # Object that signals shutdown
    _sentinel = object()
    
    # A thread that produces data
    def producer(out_q):
        while running:
            # Produce some data
            ...
            out_q.put(data)
    
        # Put the sentinel on the queue to indicate completion
        out_q.put(_sentinel)
    
    # A thread that consumes data
    def consumer(in_q):
        while True:
            # Get some data
            data = in_q.get()
    
            # Check for termination
            if data is _sentinel:
                in_q.put(_sentinel)
                break
    
            # Process the data
            ...

    Condition 变量: 尽管队列是最常见的线程间通信机制,但是仍然可以自己通过创建自己的数据结构并添加所需的锁和同步机制来实现线程间通信。最常见的方法是使用 Condition 变量来包装你的数据结构。

    import heapq
    import threading
    
    class PriorityQueue:
        def __init__(self):
            self._queue = []
            self._count = 0
            self._cv = threading.Condition()
        def put(self, item, priority):
            with self._cv:
                heapq.heappush(self._queue, (-priority, self._count, item))
                self._count += 1
                self._cv.notify()
    
        def get(self):
            with self._cv:
                while len(self._queue) == 0:
                    self._cv.wait()
                return heapq.heappop(self._queue)[-1]

    线程状态传递(是否可以结束):

    • Queue 的 task_done() 和 join() 函数
    from queue import Queue
    from threading import Thread
    
    # A thread that produces data
    def producer(out_q):
        while running:
            # Produce some data
            ...
            out_q.put(data)
    
    # A thread that consumes data
    def consumer(in_q):
        while True:
            # Get some data
            data = in_q.get()
    
            # Process the data
            ...
            # Indicate completion
            in_q.task_done()
    
    # Create the shared queue and launch both threads
    q = Queue()
    t1 = Thread(target=consumer, args=(q,))
    t2 = Thread(target=producer, args=(q,))
    t1.start()
    t2.start()
    
    # Wait for all produced items to be consumed
    q.join()
    • 添加 Event 对象
    from queue import Queue
    from threading import Thread, Event
    
    # A thread that produces data
    def producer(out_q):
        while running:
            # Produce some data
            ...
            # Make an (data, event) pair and hand it to the consumer
            evt = Event()
            out_q.put((data, evt))
            ...
            # Wait for the consumer to process the item
            evt.wait()
    
    # A thread that consumes data
    def consumer(in_q):
        while True:
            # Get some data
            data, evt = in_q.get()
            # Process the data
            ...
            # Indicate completion
            evt.set()

    流量控制:对于“生产者”与“消费者”速度有差异的情况,为队列中的元素数量添加上限是有意义的。比如,一个“生产者”产生项目的速度比“消费者” “消费”的速度快,那么使用固定大小的队列就可以在队列已满的时候阻塞队列,以免未预期的连锁效应扩散整个程序造成死锁或者程序运行失常。在通信的线程之间进行“流量控制”是一个看起来容易实现起来困难的问题。如果你发现自己曾经试图通过摆弄队列大小来解决一个问题,这也许就标志着你的程序可能存在脆弱设计或者固有的可伸缩问题。get() 和 put() 方法都支持非阻塞方式和设定超时,例如:

    import queue
    q = queue.Queue()
    
    try:
        data = q.get(block=False)
    except queue.Empty:
        ...
    
    try:
        q.put(item, block=False)
    except queue.Full:
        log.warning('queued item %r discarded!', item)
        ...
    
    try:
        data = q.get(timeout=5.0)
    except queue.Empty:
        ...

    最后,有 q.qsize() , q.full() , q.empty() 等实用方法可以获取一个队列的当前大小和状态。但要注意,这些方法都不是线程安全的。可能你对一个队列使用 empty() 判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。所以,你最好不要在你的代码中使用这些方法。

    展开全文
  • 如何杀死一个python线程

    万次阅读 2017-10-04 16:45:42
    不要试图用强制方法杀掉一个python线程,这从服务设计上就存在不合理性。 多线程本用来任务的协作并发,如果你使用强制手段干掉线程,那么很大几率出现意想不到的bug。 话虽然这样说,但是有时候就有这样的需求,...

    “不要试图用强制方法杀掉一个python线程,这从服务设计上就存在不合理性。 多线程本用来任务的协作并发,如果你使用强制手段干掉线程,那么很大几率出现意想不到的bug。”

    话虽然这样说,但是有时候就有这样的需求,可以python本身没有提供这样的API,所以没办法在网上找了一圈,发现了两种方法。如下:

    方法一:

    利用setDaemon(True)这个函数的特性,特性如下:主线程A中,创建了子线程B,并且在主线程A中调用了B.setDaemon(),这个的意思是,把主线程A设置为守护线程,这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出。

    但是如果要做到主线程不结束,但还是要强行结束子线程。所以我就突发奇想,如果我把要杀死的子线程看做是孙线程,给一个标志位给子线程,主线程改变标志位,子线程检查到就break自己,这样孙线程不就结束了,主线程仍在运行。然而,想法很美好。。。先上代码。

    import threading
    
    flag = 0
    # 为线程定义一个函数
    def print_time():
       def printOne():
          while 1:
             print(111111111111)
             print(222222222222)
             print(333333333333)
             print(444444444444)
             print(555555555555)
             print(666666666666)
       th1 = threading.Thread(target=printOne)
       th1.setDaemon(True)
       th1.start()
       while 1:
          if flag:
             print("正在停止这个程序!!!")
             break
    i=5
    if i == 5:
          th = threading.Thread(target=print_time)
          th.start()
          flag=1
          th.join()
          print("++++++++++++++++++++++++++++++++++++++++++++++++++")
    while 1:
       pass
    

    执行代码,会发现孙线程并没有结束。很简单,因为孙线程它会等主线程结束,它才结束。去掉最后两行代码,孙线程就会结束,但这也是等主线程结束的。所以方法一不满足需求。

    方法二:

    使用ctypes强行杀掉线程。

    import threading
    import time
    import inspect
    import ctypes
    
    def _async_raise(tid, exctype):
        """raises the exception, performs cleanup if needed"""
        tid = ctypes.c_long(tid)
        if not inspect.isclass(exctype):
            exctype = type(exctype)
        res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
        if res == 0:
            raise ValueError("invalid thread id")
        elif res != 1:
            # """if it returns a number greater than one, you're in trouble,
            # and you should call it again with exc=NULL to revert the effect"""
            ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
            raise SystemError("PyThreadState_SetAsyncExc failed")
    
    def stop_thread(thread):
        _async_raise(thread.ident, SystemExit)
    
    def print_time():
        while 2:
             print(111111111111)
             print(222222222222)
             print(333333333333)
             print(444444444444)
             print(555555555555)
             print(666666666666)
    
    
    if __name__ == "__main__":
        t = threading.Thread(target=print_time)
        t.start()
    
        stop_thread(t)
        print("stoped")
        while 1:
            pass
    这个方法是在网上找的,推荐一下,非常干净利索的干掉了子线程。

    展开全文
  • 此文程序来自上述来源,对其做了一些注释、测试,谈谈自己的理解。 多线程,简单理解一下的话可以是多核/多机器/多副本同时运行,对于可并行处理的数据,(理想情况下)4线程可以把效率提高4倍。 实例说话 import ...

    学习来源

    原文地址

    分类目录——多线程

    此文程序来自上述来源,对其做了一些注释、测试,谈一谈自己的理解。

    多线程,简单从逻辑上理解一下的话可以是多核/多机器/多副本同时运行,对于可并行处理的数据,(理想情况下)4线程可以把效率提高4倍。实际上就线程上的多/并行而言,其物理实现上是每个线程分时获得要用到的资源,处理器切换很快,看起来就像是并行的,并不是真正意义上的并行。为什么不给每个线程分配足够的资源实现真正的并行呢,因为线程太小,量级不够,为如此小搓的任务调度物理资源得不偿失。那么谁够格来分到真正的资源呢——进程,多进程的话,会尽量基于每个进程足够的资源,来达到更高意义上的并行。但是遇到资源不足或者只运行单点操作(临界资源)的情况时,进程也只能等待。

    实例说话

    import threading
    
    def thread_job():	# 可以分配给线程的工作(函数)
        print('This is an added Thread, number is %s'%threading.current_thread())
    
    def main():
        new_thread = threading.Thread(target=thread_job)  
        # 添加线程,这个线程用来do thread_job
        # target属性用来指定该线程要做的工作(一个函数)
        new_thread.start()    # 这里才算开始,上一句相当于一个声明
    
    if __name__ == '__main__':
        main()
    
    展开全文
  • python3-启动个线程

    千次阅读 2018-10-26 22:45:42
    问题:在for循环里面想同时启动多个线程,但是程序运行过程中产生的是线程一个一个启动,即:启动第一个线程之后主线程等待子线程运行完成之后才启动第二个,代码如下: import threading import time def thread...
  • 一个进程可以有多个线程,但是只有一个主线程;线程切换分为两种:一种是I/O切换,一种是时间切换(I/O切换:一旦运行I/O任务时便进行线程切换,CPU开始执行其他线程;时间切换:一旦到了一定时间,线程也进行切换,CPU...
  • Python 线程线程 线程池

    千次阅读 2018-08-24 13:56:16
    现在有一个需求:如下图,我们需要把test.jpg 复制5000次到update文件夹下面。并且在复制过程中需要重命名,命名规则为 循环的当前次数.jpg 最后全部循环完成后,添加finish文件到update和result文件夹中。循环过程...
  • python线程详解(超详细)

    万次阅读 多人点赞 2019-09-28 08:33:31
    python中的多线程一个非常重要的知识点,今天为大家对多线程进行详细的说明,代码中的注释有多线程的知识点还有测试用的实例。 import threading from threading import Lock,Thread import time,os ''' python...
  • Python线程详解

    千次阅读 多人点赞 2018-08-10 19:56:57
    线程,有时被称为轻量进程(Lightweight Process,LWP),是程序执行流的最小单元。——百度百科 全文略长,可以没时间的可以直接看最后的总结 #导入线程模块 from threading import *  线程类Thread参数说明 ...
  • 今天小编就为大家分享篇解决python线程卡死的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
  • 线程1执行(cond),线程1执行一半等待(cond.wait()),线程2开始执行(cond),线程2执行完毕后(cond.notify()),线程1再接着执行。 import threading import time def go1(): with cond: #使用条件变量...
  • Python线程退出控制

    千次阅读 2019-01-04 17:57:49
    ctypes模块控制线程退出 Python中threading模块并没有...管理自己的线程,最好的处理方式是拥有一个请求退出标志,这样每个线程依据一定的时间间隔检查规则,看是不是需要退出。 例如下面的代码: import threading...
  • 主要介绍了Python线程编程():启动线程的两种方法,本文讲解了将函数传递进Thread对象、继承自threading.Thread类两种方法,需要的朋友可以参考下
  • Python 线程同步 线程优先级

    千次阅读 2017-08-01 11:52:34
    使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。如下: 多线程的优势在于...
  • Python线程间通信方式

    千次阅读 2019-06-04 20:16:30
    1、python线程 #! /usr/bin/evn python3 # --*-- coding: utf-8 --*-- #该实例反编译来说明函数执行流程 import dis ...# Python中一个线程对应于C语言中的一个线程(CPython而言)(Python并不一定...
  • Python线程面试题

    千次阅读 2018-08-16 15:38:06
    1.什么是GIL,怎么解决GIL? 在python的原始解释器CPython中存在着GIL(Global Interpreter Lock,全局解释器锁),...所以,虽然CPython的线程库直接封装了系统的原生线程,但CPython整体作为一个进程,同一时间只...
  • Python线程的线程名

    千次阅读 2015-09-04 23:02:37
    线程名在类的初始化中定义,也可以使用Thread对象的setName方法设置。使用Thread对象的getName方法获得对象名。 ''' import threading class MyThread(threading.Thread): def __init__(self, threadname): thread...
  • Python线程同步

    千次阅读 2020-02-01 14:56:53
    线程间共享全局变量,多个线程对该变量执行不同的操作时,该变量最终的结果可能是不确定的(每次线程执行后的结果不同),如:银行柜员问题 ,count的值是不确定的,要想count的值是一个确定的需对线程执行的代码段...
  • 请问python线程的rlock的实际用途有哪些,一个线程已经获得了锁,为什么要重复再加锁。
  • python线程

    千次阅读 2019-10-12 11:20:59
    python主要是通过thread和threading这两模块来实现多线程支持。python的thread模块是比较底层的模块,python的threading模块是对thread做了一些封装,可以更加方便的被使用。但是python由于GIL(global ...
  • 我们进行程序开发的时候,肯定避免不了要处理并发的情况。...本文基于 Python3 讲解,Python 实现多线程编程需要借助于 threading 模块。 所以,我们要在代码中引用它。 import threading thread...
  • python线程获取返回值

    千次阅读 2019-01-22 14:44:35
    python线程获取返回值实现功能前的准备代码改写threading.Thread类获取返回值使用局部变量获取返回值 实现功能前的准备代码 def say_hello(i): return {"hello": i} def threading_get_return(): ...
  • Python线程)线程安全互斥锁

    千次阅读 2018-04-16 22:52:56
    当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制 线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。 互斥锁为资源引入一个状态:锁定/非锁定。 某个线程要更改共享数据...
  • python3线程加锁的一个例子

    千次阅读 2019-06-08 16:47:35
    再这例子中,线程加上锁之后有并行变成了实际上的串行 [root@linux1 python_tidb]# cat mypy02.py #!/usr/bin/env python3 import pymysql,sys,os import _thread,time def counter(myid,count): mutex.acquire...
  • python线程进程、异步IO

    千人学习 2017-03-06 18:23:52
    具有多线程能力的计算机因有硬件支持而能够在同一时间 执行多于一个线程,进而提升整体处理性能。 该章节主要包括线程进程的概念,基于python线程进程实现,GIL锁的影响,消费者生产者模型,进程池的应用以及IO...
  • Python线程Threading的简单教程

    千次阅读 2018-05-20 08:43:48
    Python解释器使用了内部的GIL(全局解释器锁),在任意时刻只允许单个线程执行,无论有多少核,这限制了python只能在一个处理器上运行。当然使用多线程还是有好处的,不然也就没有存在的必要。当我们程序是I/O密集型...
  • Python 多进程、多线程启动

    万次阅读 2021-02-22 12:52:49
    Python 多进程启动 def main(self, num): """ 多进程启动 ValueError: Pool not running:这问题的根源在于:pool.close()提前生效,关闭了pool。所以提示pool没有运行。 解决:多层循环的情况下,将pool....
  • python线程卡死问题解决

    万次阅读 2017-06-15 14:41:53
    python代码忽然卡死,日志不输出,通过如下方式可以确定线程确实已经死掉了: # top 命令 top命令可以看到机器上所有线程的执行情况,%CPU和%MEM可以看出线程消耗的资源情况 由于机器上线程数量太多,可能要查看...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 276,952
精华内容 110,780
关键字:

如何启动一个python线程

python 订阅