多任务之协程
  VlRy1zDaWnkA 2024年03月27日 27 0

协程

协程我们是使用gevent模块实现的,而gevent 是对greenlet进行的封装,而greenlet 又是对yield进行封装。要理解gevent就要从yield开始。 要理解yield的作用我们就要先理解可迭代对象与迭代器

一、可迭代对象与迭代器

1> 可迭代对象

迭代是访问集合元素的一种方式。迭代器是一个可以记住遍历的位置的对象。迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不会后退。

  • 可迭代对象定义:

    我们把可以通过for...in...这类语句迭代读取一条数据供我们使用的对象称之为可迭代对象(Iterable)
    
  • 可迭代对象本质

    向我们提供一个这样的中间“人”即迭代器帮助我们对其进行迭代遍历使用。
    可迭代对象通过 __iter__ 方法向我们提供一个迭代器,我们在迭代一个可迭代对象的时候,实际上就是先获取该对象提供的一个迭代器,然后通过这个迭代器来依次获取对象中的每一个数据, 也就是说,一个具备了 __iter__ 方法的对象,就是一个可迭代对象。
    
  • 创建一个可迭代对象

    from collections.abc import Iterable
    
    class Mylist():
    
    	def __iter__(self):
            # 暂时忽略如何构造一个迭代器对象
    		pass
        
    
    print(isinstance(Mylist(), Iterable))
    

2> 迭代器

迭代器是用来帮助我们记录每次迭代访问到的位置,当我们对迭代器使用next()函数的时候,迭代器会向我们返回它所记录位置的下一个位置的数据。实际上,在使用next()函数的时候,调用的就是迭代器对象的__next__方法

  • 迭代器定义:

    一个实现了__iter__方法和__next__方法的对象,就是迭代器。
    
  • 迭代器本质

    for item in Iterable 循环的本质就是先通过iter()函数获取可迭代对象Iterable的迭代器,然后对获取到的迭代器不断调用next()方法来获取下一个值并将其赋值给item,当遇到StopIteration的异常后循环结束。
    
  • 迭代器实现

    from collections.abc import Iterable, Iterator
    
    
    # 自定义一个可迭代对象
    class MyList(object):
    
        def __init__(self):
            self.items = list()
    
        def add(self, value):
            self.items.append(value)
    
        def __iter__(self):
            return MyIterator(self)
    
    
    # 自定义一个迭代器
    class MyIterator(object):
    
        def __init__(self, obj):
            self.obj = obj
            self.current_index = 0
    
        # python要求迭代器本身也是可迭代的
        def __iter__(self):
            return self
    
        def __next__(self):
            if self.current_index < len(self.obj.items):
                item = self.obj.items[self.current_index]
                self.current_index += 1
                return item
            else:
                raise StopIteration
    
    
    m = MyList()
    print("MyList实例对象是否为可迭代对象: ", isinstance(m, Iterable))
    print("MyList实例对象是否为迭代器: ", isinstance(m, Iterator))
    print("MyIterator实例对象是否为可迭代对象: ", isinstance(MyIterator(m), Iterable))
    print("MyIterator实例对象是否为迭代器: ", isinstance(MyIterator(m), Iterator))
    
    m.add(1)
    m.add(22)
    m.add(333)
    m.add(444)
    
    for i in m:
        print(i)
    
    
  • 说明:并不是只有for循环能接收可迭代对象

    li = list(m)
    print(li)
    tp = tuple(m)
    print(tp)
    

二、生成器

  • 生成器的定义

    只要在def中有yield关键字的 就称为 生成器, 生成器是一类特殊的迭代器
    
  • 生成器定义方式

    # 方式一
    G = ( x*2 for x in range(5))
    >>>  <generator object <genexpr> at 0x7f626c132db0>
    
    # 方式二
    def fib(n):
        current = 0
        num1, num2 = 0, 1
        while current < n:
            num1, num2 = num2, num1+num2
            current += 1
            yield num
        return 'done'
    
  • 生成器总结

    • 使用了yield关键字的函数不再是函数,而是生成器。(使用了yield的函数就是生成器)
    • yield关键字有两点作用:
      1. 保存当前运行状态(断点),然后暂停执行,即将生成器(函数)挂起
      2. 将yield关键字后面表达式的值作为返回值返回,此时可以理解为起到了return的作用
    • 可以使用next()函数让生成器从断点处继续执行,即唤醒生成器(函数)
    • Python3中的生成器可以使用return返回最终运行的返回值(通过捕获异常对象的value属性值获取),而Python2中的生成器不允许使用return返回一个返回值(即可以使用return从生成器中退出,但return后不能有任何表达式)。

三、协程

  • 协程的定义

    协程(微线程、纤程)是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源);协程是运行在某个单一线程下的,即先有线程才有协程。
    
  • 协程与线程区别

    在实现多任务时, 线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。 操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。 所以线程的切换非常耗性能。但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次系统都抗的住。
    
  • 协程模块gevent

    • 通过yield实现协程的工作过程

      import time
      
      def work1():
          while True:
              print("----work1---")
              yield
              time.sleep(0.5)
      
      def work2():
          while True:
              print("----work2---")
              yield
              time.sleep(0.5)
      
      def main():
          w1 = work1()
          w2 = work2()
          while True:
              next(w1)
              next(w2)
      
      if __name__ == "__main__":
          main()
      
    • 通过greenlet实现协程

      为了更好使用协程来完成多任务,python中的greenlet模块对其封装,从而使得切换任务变的更加简单
      
      # pip install greenlet
      
      from greenlet import greenlet
      import time
      
      
      def test1():
          while True:
              print("---A--")
              gr2.switch()
              time.sleep(0.5)
      
      
      def test2():
          while True:
              print("---B--")
              gr1.switch()
              time.sleep(0.5)
      
      
      gr1 = greenlet(test1)
      gr2 = greenlet(test2)
      # 切换到gr1中运行
      gr1.switch()
      
      
    • 通过gevent实现协程

        greenlet已经实现了协程,但是这个还的人工切换,那么python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent.
        其原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。
        由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就
      保证总有greenlet在运行,而不是等待IO
      
      • gevent的使用

        # pip3 install gevent
        
        import gevent
        
        def f(n):
            for i in range(n):
                print(gevent.getcurrent(), i)
        
        g1 = gevent.spawn(f, 5)
        g2 = gevent.spawn(f, 5)
        g3 = gevent.spawn(f, 5)
        g1.join()
        g2.join()
        g3.join()
        
        # 运行结果
        # 可以看到,3个greenlet是依次运行而不是交替运行
        <Greenlet at 0x10e49f550: f(5)> 0
        <Greenlet at 0x10e49f550: f(5)> 1
        <Greenlet at 0x10e49f550: f(5)> 2
        <Greenlet at 0x10e49f550: f(5)> 3
        <Greenlet at 0x10e49f550: f(5)> 4
        <Greenlet at 0x10e49f910: f(5)> 0
        <Greenlet at 0x10e49f910: f(5)> 1
        <Greenlet at 0x10e49f910: f(5)> 2
        <Greenlet at 0x10e49f910: f(5)> 3
        <Greenlet at 0x10e49f910: f(5)> 4
        <Greenlet at 0x10e49f4b0: f(5)> 0
        <Greenlet at 0x10e49f4b0: f(5)> 1
        <Greenlet at 0x10e49f4b0: f(5)> 2
        <Greenlet at 0x10e49f4b0: f(5)> 3
        <Greenlet at 0x10e49f4b0: f(5)> 4
          
        
      • gevent切换执行

        import gevent
        
        # 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块
        monkey.patch_all()
        
        def f(n):
            for i in range(n):
                print(gevent.getcurrent(), i)
                #用来模拟一个耗时操作,注意不是time模块中的sleep
                # gevent.sleep(1)
                time.sleep(1)
        
        g1 = gevent.spawn(f, 5)
        g2 = gevent.spawn(f, 5)
        g3 = gevent.spawn(f, 5)
        g1.join()
        g2.join()
        g3.join()
        
        # 运行结果
        <Greenlet at 0x7fa70ffa1c30: f(5)> 0
        <Greenlet at 0x7fa70ffa1870: f(5)> 0
        <Greenlet at 0x7fa70ffa1eb0: f(5)> 0
        <Greenlet at 0x7fa70ffa1c30: f(5)> 1
        <Greenlet at 0x7fa70ffa1870: f(5)> 1
        <Greenlet at 0x7fa70ffa1eb0: f(5)> 1
        <Greenlet at 0x7fa70ffa1c30: f(5)> 2
        <Greenlet at 0x7fa70ffa1870: f(5)> 2
        <Greenlet at 0x7fa70ffa1eb0: f(5)> 2
        <Greenlet at 0x7fa70ffa1c30: f(5)> 3
        <Greenlet at 0x7fa70ffa1870: f(5)> 3
        <Greenlet at 0x7fa70ffa1eb0: f(5)> 3
        <Greenlet at 0x7fa70ffa1c30: f(5)> 4
        <Greenlet at 0x7fa70ffa1870: f(5)> 4
        <Greenlet at 0x7fa70ffa1eb0: f(5)> 4
        
      • 使用协程实现TCP通信 为多个用户服务

        # 【本机环境运行】
        
        import socket
        import gevent
        from gevent import monkey
        
        monkey.patch_all()
        
        
        def handle_request(client_socket):
            while True:
                data = client_socket.recv(1024).decode("gbk")
                print(data)
        
        
        def main():
            tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            tcp_socket.bind(("127.0.0.1", 9000))
            tcp_socket.listen(128)
        
            while True:
                # 接收新的请求,未收到连接请求会导致程序堵塞
                tcp_server_socket, client_addr = tcp_socket.accept()
                # 将处理请求的任务交给协程处理
                gevent.spawn(handle_request, tcp_server_socket)
          
        if __name__ == "__main__":
            main()
        
      • 使用队列和协程实现生产者消费者模型

        from gevent import queue, monkey
        import gevent
        import time
        import random
        
        monkey.patch_all()
        
        
        def producer(q):
            for value in ['A', 'B', 'C', 'D', 'E', 'F']:
                print(f'【Producer】 put {value} to queue...')
                q.put(value)
                time.sleep(random.random())
            # 发送结束信号
            q.put(None)
        
        
        def consumer(q):
            while True:
                # 如果队列中无数据,consumer会一直执行(协程只有在耗时操作时才切换任务),
                # 可能会导致producer无法往队列中添加数据,我们可以手动添加一个延时
                if not q.empty():
                    data = q.get()
                    # 接收到结束信号退出程序
                    if data is None:
                        return
                    print(f"【Consumer】 get {data} from queue")
                    time.sleep(random.random())
                else:
                    time.sleep(0.5)
        
        
        q = queue.Queue()
          if __name__ == "__main__":
            gevent.joinall([
                gevent.spawn(producer, q),
                gevent.spawn(consumer, q)
          ])
        

四、进程、线程、协程区别

  • 进程是资源分配的单位
  • 线程是操作系统调度的单位
  • 进程切换需要的资源很最大,效率很低
  • 线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下)
  • 协程切换任务资源很小,效率高
  • 多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中 所以是并发

五、同步和异步

1> 基本概念

  • 同步

    	同步调用即当我们提交一个任务后,就在原地等待,等拿到第一个任务的结果之后再继续下一行代码,效率比较低;同步调用解决方案:多线程/多进程
    
  • 异步

    	异步即当我们提交一个任务之后,不必等待上一个任务的结果,直接进行下一个任务,监听这些任务的完成状态,当有一个任务完成之后,再继续完成这个任务的后续操作
    
  • 同步异步代码实现与分析

    from gevent import monkey
    import gevent
    import time
    
    monkey.patch_all()
    
    
    def task(task_no, delay_time):
        print(f"【任务-{task_no}】 开始!")
        time.sleep(delay_time)
        print(f"【任务-{task_no}】 结束!")
    
    
    # 发布3个任务, 3个任务分别耗时10, 6, 12
    if __name__ == '__main__':
        # 同步执行方式
        start_time = time.time()
        task("同步1", 5)
        task("同步2", 3)
        task("同步3", 6)
        end_time = time.time()
        print(f"同步执行方式耗时 {end_time - start_time}")
    
        # 异步执行方式
        start_time = time.time()
        gevent.joinall([
            gevent.spawn(task, "异步1", 5),
            gevent.spawn(task, "异步2", 3),
            gevent.spawn(task, "异步3", 6),
        ])
        end_time = time.time()
        print(f"异步执行方式耗时 {end_time - start_time}")
    
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2024年03月27日 0

暂无评论

推荐阅读
  YqbaJkf98QJO   12小时前   9   0   0 Python
  KmYlqcgEuC3l   5天前   13   0   0 Python
VlRy1zDaWnkA
作者其他文章 更多