精华内容
下载资源
问答
  • 随着大数据时代的来临,基于数据而提供的服务越来越多,这些数据几乎都是由爬虫获取并进行规范化提取后完成的。 本系列博客,讲解使用 Scrapy 搭建分布式爬虫并通过 Elasticsearch. 和 django 搭建搜索引擎网站,一...

    人工智能时代,数据先行。随着大数据时代的来临,基于数据而提供的服务越来越多,这些数据几乎都是由爬虫获取并进行规范化提取后完成的。

    本系列博客,讲解使用 Scrapy 搭建分布式爬虫并通过 Elasticsearch. 和 django 搭建搜索引擎网站,一方面可以让读者具有获取所需数据的能力,同时也可以让读者深入认知网络知识和编程知识。

    本系列博客的思路:

    1. 环境配置和基础知识铺垫
    2. 爬取真实数据
    3. scrapy 突破反爬虫技术
    4. scrapy 进阶
    5. scrapy redis 分布式爬虫
    6. elasticsearch & django 实现搜索引擎

    下面是详细的技术内容:

    一、环境配置和基础知识铺垫

    二、爬取真实数据

    三、scrapy 突破反爬虫技术

     

    四、scrapy 进阶

    五、scrapy redis 分布式爬虫

    六、elasticsearch & django 实现搜索引擎

    本系列博客给你的体验:

    • 开发爬虫所需要用到的技术以及网站分析技巧
    • 理解scrapy的原理和所有组件的使用以及分布式爬虫scrapy-redis的使用和原理
    • 理解分布式开源搜索引擎elasticsearch的使用以及搜索引擎的原理
    • 体验django如何快速搭建一个实现效果与百度相类似的网站。
    展开全文
  • 这里用到了requests库,和python操作mysql的知识(注意sql语句,对于字符串类型,要有单引号)。 我们之前直接对response做了selector,实际上,这是因为response本身包装了Scrapy的Selector,这里我们直接使用了...

    到目前为止,cnblogs仍然只能获取60条信息。

    家里的ip是动态分配的,重启路由器可能导致ip变化。阿里云是静态的,亚马逊是动态的。

    实际上,代理ip会减慢爬取速度,所以我们尽量限制我们本机的爬取速度。

    正常情况下:

    有了ip代理:

    ip代理有高匿与普通之别。高匿是说代理服务器完全不知道我们本机的ip,而普通是说代理服务器可能会将本机ip带过去。

    在scrapy中实现ip代理十分简单,之前写的RandomUserAgentMiddlware中的process_request会处理每一个request,我们只需要在该函数的最后加一句:

    request.meta["proxy"] = "http://60.167.159.236:808"

    后面的ip代理是哪里来的呢?是在西刺免费代理IP中找的。

    当然了,我们如果要实现一个ip代理池,是还有其他工作要做的。也就是说,既然要实现ip代理,自然不能只有一个ip,我们需要一个代理池,以此实现每次随机取出一个ip代理。如何实现?很简单,我们爬取西刺这个网站就好了。

    注释掉:

    # request.meta["proxy"] = "http://60.167.159.236:808"

    新建一个名为tools的package:

    新建一个脚本crawl_xici_ip.py,并安装requests:

    pip install -i https://pypi.douban.com/simple requests

    编辑crawl_xici_ip.py:

    import requests
    from scrapy.selector import Selector
    import MySQLdb
    
    conn = MySQLdb.connect(host="127.0.0.1", user="root", passwd="", db="Spider", charset="utf8")
    cursor = conn.cursor()
    
    
    def crawl_ips():
        """
        爬取西刺的免费ip代理
        """
        headers = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0"}
        for i in range(2073):
            re = requests.get("https://www.xicidaili.com/nn/{0}".format(i), headers=headers)
    
            selector = Selector(text=re.text)
            all_trs = selector.css("#ip_list tr")
    
            ip_list = []
            for tr in all_trs[1:]:
                speed_str = tr.css(".bar::attr(title)").extract_first()
                speed = float(speed_str.split("秒")[0])
                all_texts = tr.css("td::text").extract()
                ip = all_texts[0]
                port = all_texts[1]
                proxy_type = all_texts[5]
    
                ip_list.append((ip, port, proxy_type, speed))
    
            # 每获取一页,存入到数据库中
            for ip_info in ip_list:
                sql = "insert proxy_ip(ip, port, speed, proxy_type) VALUES('{0}', '{1}', {2}, '{3}')".format(
                    ip_info[0], ip_info[1], ip_info[3], ip_info[2]
                )
                try:
                    # 执行sql语句
                    cursor.execute(sql)
                    # 提交到数据库执行
                    conn.commit()
                except:
                    # Rollback in case there is any error
                    conn.rollback()
    
                conn.commit()
    
            print("---------------{0} done---------------".format(i))
    
    
    if __name__ == "__main__":
        crawl_ips()

    这里用到了requests库,和python操作mysql的知识(注意sql语句,对于字符串类型,要有单引号)。

    我们之前直接对response做了selector,实际上,这是因为response本身包装了Scrapy的Selector,这里我们直接使用了Selector。

    运行之前,新建一个表proxy_ip(可以不设主键,或者以ip为第一主键,port为第二主键):

    运行。

    上面完成了数据爬取,下面还要实现ip代理的获取。是从数据库中获取,如何获取呢?可以使用下面这条sql语句:

    SELECT ip, port FROM proxy_ip
    ORDER BY RAND()
    LIMIT 1

    可以在Navicat中进行测试:

    所以,再添加一个类:

    class GetIP(object):
        def delete_ip(self, ip):
            # 从数据库中删除无效的ip
            delete_sql = """
                delete from proxy_ip where ip='{0}'
            """.format(ip)
            cursor.execute(delete_sql)
            conn.commit()
            return True
    
        def judge_ip(self, ip, port, proxy_type):
            # 判断ip是否可用
            http_url = "http://www.baidu.com"
            proxy_url = "{0}://{1}:{2}".format(proxy_type, ip, port)
            # 配置代理
            try:
                proxy_dict = {
                    "http":proxy_url,
                }
                response = requests.get(http_url, proxies=proxy_dict)
            except Exception as e:
                print ("invalid ip and port")
                self.delete_ip(ip)
                return False
            else:
                code = response.status_code
                if code >= 200 and code < 300:
                    print("effective ip")
                    return True
                else:
                    print("invalid ip and port")
                    self.delete_ip(ip)
                    return False
    
        def get_random_ip(self):
            # 从数据库中随机获取一个可用的ip
            random_sql = """
                SELECT ip, port, proxy_type FROM proxy_ip
                ORDER BY RAND()
                LIMIT 1
                """
            result = cursor.execute(random_sql)
            for ip_info in cursor.fetchall():
                ip = ip_info[0]
                port = ip_info[1]
                proxy_type = ip_info[2]
    
                judge_re = self.judge_ip(ip, port, proxy_type)
                if judge_re:
                    return "{0}://{1}:{2}".format(proxy_type, ip, port)
                else:
                    return self.get_random_ip()

    将入口调用改为(要写在__main__之下,否则在import的时候,会执行这些逻辑):

    if __name__ == "__main__":
        get_ip = GetIP()
        get_ip.get_random_ip()
    

    调试一下,成功之后,回到middlewares.py,引入刚刚的类:

    from tools.crawl_xici_ip import GetIP

    添加一个新的middleware类:

    class RandomProxyMiddleware(object):
        # 动态设置ip代理
        def process_request(self, request, spider):
            get_ip = GetIP()
            request.meta["proxy"] = get_ip.get_random_ip()

    此外有一个开源的库——scrapy-proxies,这是一款scrapy的插件,比我们的功能强大得多,代码只有一个文件。它就是定义了一个middleware,但是是通过settings进行读取的,是读文件,这一点不如我们从数据库中操作优。可以拿着进行改造。

    此外,scrapy官方有一个scrapy-crawlera项目,这让我们动态IP的配置更加简单,但是需要收费。

    此外是tor,洋葱浏览器,洋葱网络实际上是对我们的网络进行了很多层的包装。当我们的请求经过这个洋葱网络,它就会做多次的转发,达到了匿名效果,黑客多用。但这个需要VPN,一个敏感话题。

    按需要,决定是否在settings.py中配置:

    'Spoder.middlewares.RandomProxyMiddleware':605

     

    展开全文
  • 官网上说,downloader middleware是在request与response处理中的钩子框架,就是说它可以hook住一些函数,只要实现了这个函数的类,都可以被执行。用以全局修改Request和Response。 官网上还说,如果我们自己实现...

    百度百科对User Agent的摘要:

    User Agent中文名为用户代理,简称 UA,它是一个特殊字符串头,使得服务器能够识别客户使用的操作系统及版本、CPU 类型、浏览器及版本、浏览器渲染引擎、浏览器语言、浏览器插件等。

    我们访问一个网站获取它的返回内容,可以通过Chrome、Firefox和IE等,或者我们写一段代码获取。

    实际上我们的浏览器只是一个代理,服务器有时候需要知道是什么浏览器访问的,就需要写一段字符串来表明。也就是说User Agent是起标识作用的。

    User Agent是放在请求头中的。这是浏览器自动加的,同一个浏览器,内容其实是一样的。

    在进行修改之前,进行调试,在response中的request变量中的headers中,有一个User-Agent变量,它的值为:

    想要实现随机切换User Agent十分简单,可以在settings.py中增一个user_agent_list变量:

    user_agent_list = [
        'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36',
        'Mozilla/5.0 (Windows NT 6.1; W…) Gecko/20100101 Firefox/68.0',
    ]

    然后在cnblogs.py中引入它:

    from Spider.settings import user_agent_list

    为了实现随机选择User Agent,引入:

    import random

    新建类变量:

        headers = {
            'HOST': "news.cnblogs.com",
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; W…) Gecko/20100101 Firefox/68.0',
            'referer': 'https://news.cnblogs.com/'
        }

    通过下面三句,就可以获得一个有随机User Agent的headers了:

    random_index = random.randint(0, len(user_agent_list)-1)
    random_agent = user_agent_list[random_index]
    self.headers["User-Agent"] = random_agent

    并在yield出去的Request对象中添加:

    headers=headers

    上面生成随机agent的的逻辑应该放在一个Request被yield出去之前,而headers变量应该放在类中。

    接下来,request中的headers下的user agent会在下面二者中随机出现:

    这样做有一个问题,需要我们在每一处Request被yield出去之前都加上那三条逻辑。

    删除或注释除了settings中配置user_agent_list以外的的代码。

    于是我们使用download middleware完成这项任务,先复习架构:

    我们首先将settings.py中关于DOWNLOADER_MIDDLEWARES的注释取消:

    DOWNLOADER_MIDDLEWARES = {
       'Spider.middlewares.SpiderDownloaderMiddleware': 543,
    }

    我们看看默认提供的一个UserAgentMiddleware:

    刚刚默认的user-agent就是在这里设置的。

    读代码,可以知道,如果settings.py中设置了USER_AGENT,就不会使用默认的Scrapy,所以可以在settings.py中加入:

    # 默认的User Agent
    USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; W…) Gecko/20100101 Firefox/68.0'

    看这个也行,其实生成项目时,自动生成的middleware.py中的示例代码也交代了如何写middleware,比较重要的方法是process_request,所有对Request的处理都在这儿。

    官网上说,downloader middleware是在request与response处理中的钩子框架,就是说它可以hook住一些函数,只要实现了这个函数的类,都可以被执行。用以全局修改Request和Response。

    官网上还说,如果我们自己实现了UserAgentMiddleware,就必须把默认提供的设置为None,或者你可以把自己配置的middleware的数字设的大一点,这样就可以最后处理。下面把它置为None:

    DOWNLOADER_MIDDLEWARES = {
        'Spider.middlewares.SpiderDownloaderMiddleware': 543,
        'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None
    }

    在middlewares.py中实现自己的middleware:

    import random
    from Spider.settings import user_agent_list
    class RandomUserAgentMiddlware(object):
        """
        随机更换user-agent
        """
        def __init__(self, crawler):
            super(RandomUserAgentMiddlware, self).__init__()
            self.user_agent_list = crawler.settings.get("user_agent_list", [])
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(crawler)
    
        def process_request(self, request, spider):
            def rdm():
                # 获取一个随机的user agent
                random_index = random.randint(0, len(user_agent_list) - 1)
                random_agent = user_agent_list[random_index]
                return random_agent
    
            request.headers.setdefault('User-Agent', rdm())

    (函数中定义函数,是动态语言闭包的一种特性)

    配置它:

    DOWNLOADER_MIDDLEWARES = {
        'Spider.middlewares.SpiderDownloaderMiddleware': 543,
        'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
        'Spider.middlewares.RandomUserAgentMiddlware': 600,
    }

    调试,果然可以。

    但我们不这样做,因为这样做的话,user_agent_list是写死了的。把user_agent_list也注释掉。

    github上搜索:

    fake-useragent

    直达:https://github.com/hellysmile/fake-useragent

    我们先安装:

    pip install fake-useragent

    用法也很简单,我们可以在命令行中测试。

    >>> from fake_useragent import UserAgent
    >>> ua = UserAgent()
    >>> ua.ie
    'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 2.0.50727; SLCC2; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC
     6.0; Zune 4.0; Tablet PC 2.0; InfoPath.3; .NET4.0C; .NET4.0E)'
    >>> ua.ie
    'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0) chromeframe/10.0.648.205'
    >>> ua.chrome
    'Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1667.0 Safari/537.36'
    >>> ua.chrome
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
    >>> ua.random
    'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.60 Safari/537.17'
    >>> ua.random
    'Mozilla/5.0 (Windows NT 6.1; rv:27.3) Gecko/20130101 Firefox/27.3'
    

    这个包实际上是维护了一个url。这个url会更新,使得老版本的url不能使用,所以需要我们维护。

    更改之前的middleware:

    from fake_useragent import UserAgent
    class RandomUserAgentMiddlware(object):
        """
        随机更换user-agent
        """
        def __init__(self, crawler):
            super(RandomUserAgentMiddlware, self).__init__()
            self.ua = UserAgent()
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(crawler)
    
        def process_request(self, request, spider):
            request.headers.setdefault('User-Agent', self.ua.random)

    为了让这个middleware能根据我们的配置进行随机选择,我们在settings中加入一个配置:

    # 随机user agent 类型
    RANDOM_UA_TYPE = 'random'

    并进一步更改middleware:

    class RandomUserAgentMiddlware(object):
        """
        随机更换user-agent
        """
        def __init__(self, crawler):
            super(RandomUserAgentMiddlware, self).__init__()
            self.ua = UserAgent()
            self.ua_type = crawler.settings.get("RANDOM_UA_TYPE", "random")
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(crawler)
    
        def process_request(self, request, spider):
            def get_ua():
                return getattr(self.ua, self.ua_type)
    
            request.headers.setdefault('User-Agent', get_ua())

    有其他配置,就按照这种模式来。

    展开全文
  • python分布式多进程框架 Ray

    万次阅读 2019-03-13 19:49:27
    并行和分布式计算是现代应用程序的主要内容。我们需要利用多个核心或多台机器来加速应用程序或大规模运行它们。网络爬虫和搜索所使用的基础设施并不是在某人笔记本电脑上运行的单线程程序,而是相互通信和交互的服务...

    全栈工程师开发手册 (作者:栾鹏)
    python教程全解


    并行和分布式计算是现代应用程序的主要内容。我们需要利用多个核心或多台机器来加速应用程序或大规模运行它们。网络爬虫和搜索所使用的基础设施并不是在某人笔记本电脑上运行的单线程程序,而是相互通信和交互的服务的集合。

    ray的api接口教程:https://ray.readthedocs.io/en/latest/api.html

    为什么要使用 Ray?

    很多教程解释了如何使用 Python 的多进程模块(https://docs.python.org/2/library/multiprocessing.html)。遗憾的是,多进程模块在处理现代应用程序的要求方面存在严重的短板。这些要求包括以下这些内容:

    在多台计算机上运行相同的代码。

    构建有状态且可以与之通信的微服务和 actor。

    优雅地处理机器故障。

    有效处理大对象和数值数据。

    Ray(https://github.com/ray-project/ray)解决了所有这些问题,在保持简单性的同时让复杂的行为成为可能。

    AI的开源框架

    与深度学习框架的关系: Ray与TensorFlow,PyTorch和MXNet等深度学习框架完全兼容,在许多应用中与Ray一起使用一个或多个深度学习框架是很自然的(例如,我们的强化学习库使用TensorFlow和PyTorch)。

    与其他分布式系统的关系:今天使用了许多流行的分布式系统,但是其中大多数并不是用AI应用程序构建的,并且缺乏支持所需的性能以及表示AI应用程序的API。从今天的分布式系统来看,它们缺少以下功能(以各种组合方式):

    支持毫秒级任务和每秒数百万个任务
    嵌套并行(在任务内并行化任务,例如超参数搜索内部的并行模拟)(见下图)
    在运行时动态确定任意任务依赖关系(例如,为了避免等待缓慢的工作人员)
    在共享可变状态下运行的任务(例如,神经网络权重或模拟器)
    支持异构资源(CPU,GPU等)

    在这里插入图片描述

    有两种使用Ray的主要方法:通过其较低级别的API和更高级别的库。较高级别的库建立在较低级别的API之上。目前这些包括Ray RLlib,一个可扩展的强化学习库和Ray.tune,一个高效的分布式超参数搜索库。

    开始使用 Ray

    ray.init() 命令将启动所有相关的 Ray 进程。在切换到集群时,这是唯一需要更改的行(我们需要传入redis的地址)。

    ray.init(redis_address="123.45.67.89:6379")
    

    这些过程包括:

    • 有很多 worker 进程并行执行 Python 函数(大概是每个 CPU 核心对应一个 worker)。

    • 用于将“任务”分配给 worker(以及其他计算机)的调度程序进程。任务是 Ray 调度的工作单元,对应于一个函数调用或方法调用。

    • 共享内存对象存储库,用于在 worker 之间有效地共享对象(无需创建副本)。

    • 内存数据库,用于存储在发生机器故障时重新运行任务所需的元数据。

    Ray worker 是独立的进程,而不是线程,因为在 Python 中存在全局解释器锁,所以对多线程的支持非常有限。

    Ray低级API

    Ray API的目标是自然地表达非常普遍的计算模式和应用程序,而不受像MapReduce这样的固定模式的限制。

    动态任务图

    Ray应用程序或作业中的基础基元是一个动态任务图。这与TensorFlow中的计算图非常不同。而在TensorFlow中,一个计算图代表一个神经网络,并且在单个应用程序中执行多次,在Ray中,任务图代表整个应用程序,并且只执行一次。任务图不是事先知道的。它是在应用程序运行时动态构建的,执行一个任务可能会触发创建更多任务。
    在这里插入图片描述
    任意的Python函数都可以作为任务执行,并且可以任意依赖其他任务的输出。下面的例子给出了说明。

    要将 Python 函数 f 转换为一个“远程函数”(可以远程和异步执行的函数),可以使用 @ray.remote 装饰器来声明这个函数。然后函数调用 f.remote() 将立即返回一个 future(future 是对最终输出的引用),实际的函数执行将在后台进行(我们将这个函数执行称为任务)。

    要将一个任务的输出作为输入提供给后续任务,只需将第一个任务返回的 future 作为参数传给第二个任务。Ray 的调度程序会自动考虑任务依赖关系。在第一个任务完成之前不会执行第二个任务,第一个任务的输出将自动被发送给执行第二个任务的机器。

    import ray
    import time,datetime
    
    # Start Ray.
    ray.init()
    
    import numpy as np
    
    # 定义两个远程函数。
    # 这些函数的调用创建了远程执行的任务
    
    @ray.remote
    def create_matrix(size):
        return np.random.normal(size=size)
    @ray.remote
    def multiply_matrices(x, y):
        return np.dot(x, y)
    
    result_ids = []
    for i in range(400):
        # 开始两个并行的任务,这些会立即返回futures并在后台执行
        x_id = create_matrix.remote([1000, 1000])
        print(datetime.datetime.now())
        y_id = create_matrix.remote([1000, 1000])
        print(datetime.datetime.now())
        # 开始第三个任务,但这并不会被提前计划,直到前两个任务都完成了.
        result_ids.append(multiply_matrices.remote(x_id, y_id))
        print(datetime.datetime.now())
    # 获取结果。这个结果直到第三个任务完成才能得到。只有get创建以后所有的任务才开始创建执行。
    z_id = ray.get(result_ids)
    print(z_id)
    
    

    可以看看机器的性能损耗
    在这里插入图片描述

    有效的使用聚合函数

    下图是两个聚合过程和相应的函数。以线性方式聚合值与以树形结构方式聚合值的对比
    在这里插入图片描述

    右图方式的聚合函数会比左图方式的聚合更高校,因为在一个任务

    import time
    @ray.remote
    def add(x, y):
        time.sleep(1)
        return x + y
    # Aggregate the values slowly. This approach takes O(n) where n is the
    # number of values being aggregated. In this case, 7 seconds.
    id1 = add.remote(1, 2)
    id2 = add.remote(id1, 3)
    id3 = add.remote(id2, 4)
    id4 = add.remote(id3, 5)
    id5 = add.remote(id4, 6)
    id6 = add.remote(id5, 7)
    id7 = add.remote(id6, 8)
    result = ray.get(id7)
    # Aggregate the values in a tree-structured pattern. This approach
    # takes O(log(n)). In this case, 3 seconds.
    id1 = add.remote(1, 2)
    id2 = add.remote(3, 4)
    id3 = add.remote(5, 6)
    id4 = add.remote(7, 8)
    id5 = add.remote(id1, id2)
    id6 = add.remote(id3, id4)
    id7 = add.remote(id5, id6)
    result = ray.get(id7)
    
    # Slow approach.
    values = [1, 2, 3, 4, 5, 6, 7, 8]
    while len(values) > 1:
        values = [add.remote(values[0], values[1])] + values[2:]
    result = ray.get(values[0])
    # Fast approach.
    values = [1, 2, 3, 4, 5, 6, 7, 8]
    while len(values) > 1:
        values = values[2:] + [add.remote(values[0], values[1])]
    result = ray.get(values[0])
    

    简单开始

    首先来看一下最简单的Ray程序是如何编写的。

    # 导入ray,并初始化执行环境
    import ray
    ray.init()
    
    # 定义ray remote函数
    @ray.remote
    def hello():
        return "Hello world !"
    
    # 异步执行remote函数,返回结果id
    object_id = hello.remote()
    
    # 同步获取计算结果
    hello = ray.get(object_id)
    
    # 输出计算结果
    print hello
    

    在Ray里,通过Python注解@ray.remote定义remote函数。使用此注解声明的函数都会自带一个默认的方法remote,通过此方法发起的函数调用都是以提交分布式任务的方式异步执行的,函数的返回值是一个对象id,使用ray.get内置操作可以同步获取该id对应的对象。熟悉Java里的Future机制的话对此应该并不陌生,或许会有人疑惑这和普通的异步函数调用没什么大的区别,但是这里最大的差异是,函数hello是分布式异步执行的。

    remote函数是Ray分布式计算抽象中的核心概念,通过它开发者拥有了动态定制计算依赖(任务DAG)的能力。比如:

    @ray.remote
    def A():
        return "A"
    
    @ray.remote
    def B():
        return "B"
    
    @ray.remote
    def C(a, b):
        return "C"
    
    a_id = A.remote()
    b_id = B.remote()
    c_id = C.remote(a_id, b_id)
    print ray.get(c_id)
    

    例子代码中,对函数A、B的调用是完全并行执行的,但是对函数C的调用依赖于A、B函数的返回结果。Ray可以保证函数C需要等待A、B函数的结果真正计算出来后才会执行。如果将函数A、B、C类比为DAG的节点的话,那么DAG的边就是函数C参数对函数A、B计算结果的依赖,自由的函数调用方式允许Ray可以自由地定制DAG的结构和计算依赖关系。另外,提及一点的是Python的函数可以定义函数具有多个返回值,这也使得Python的函数更天然具备了DAG节点多入和多出的特点。

    这里写图片描述

    二、系统架构

    Ray是使用什么样的架构对分布式计算做出如上抽象的呢,一下给出了Ray的系统架构(来自Ray论文,参考文献1)。

    这里写图片描述

    作为分布式计算系统,Ray仍旧遵循了典型的Master-Slave的设计:Master负责全局协调和状态维护,Slave执行分布式计算任务。不过和传统的分布式计算系统不同的是,Ray使用了混合任务调度的思路。在集群部署模式下,Ray启动了以下关键组件:

    1. GlobalScheduler:Master上启动了一个全局调度器,用于接收本地调度器提交的任务,并将任务分发给合适的本地任务调度器执行。
    2. RedisServer:Master上启动了一到多个RedisServer用于保存分布式任务的状态信息(ControlState),包括对象机器的映射、任务描述、任务debug信息等。
    3. LocalScheduler:每个Slave上启动了一个本地调度器,用于提交任务到全局调度器,以及分配任务给当前机器的Worker进程。
    4. Worker:每个Slave上可以启动多个Worker进程执行分布式任务,并将计算结果存储到ObjectStore。
    5. ObjectStore:每个Slave上启动了一个ObjectStore存储只读数据对象,Worker可以通过共享内存的方式访问这些对象数据,这样可以有效地减少内存拷贝和对象序列化成本。ObjectStore底层由Apache Arrow实现。
    6. Plasma:每个Slave上的ObjectStore都由一个名为Plasma的对象管理器进行管理,它可以在Worker访问本地ObjectStore上不存在的远程数据对象时,主动拉取其它Slave上的对象数据到当前机器。

    需要说明的是,Ray的论文中提及,全局调度器可以启动一到多个,而目前Ray的实现文档里讨论的内容都是基于一个全局调度器的情况。我猜测可能是Ray尚在建设中,一些机制还未完善,后续读者可以留意此处的细节变化。

    Ray的任务也是通过类似Spark中Driver的概念的方式进行提交的,有所不同的是:

    1. Spark的Driver提交的是任务DAG,一旦提交则不可更改。
    2. 而Ray提交的是更细粒度的remote function,任务DAG依赖关系由函数依赖关系自由定制。

    论文给出的架构图里并未画出Driver的概念,因此我在其基础上做了一些修改和扩充。

    这里写图片描述

    Ray的Driver节点和和Slave节点启动的组件几乎相同,不过却有以下区别:

    1. Driver上的工作进程DriverProcess一般只有一个,即用户启动的PythonShell。Slave可以根据需要创建多个WorkerProcess。
    2. Driver只能提交任务,却不能接收来自全局调度器分配的任务。Slave可以提交任务,也可以接收全局调度器分配的任务。
    3. Driver可以主动绕过全局调度器给Slave发送Actor调用任务(此处设计是否合理尚不讨论)。Slave只能接收全局调度器分配的计算任务。

    三、核心操作

    基于以上架构,我们简单讨论一下Ray中关键的操作和流程。

    1. ray.init()

    在PythonShell中,使用ray.init()可以在本地启动ray,包括Driver、HeadNode(Master)和若干Slave。

    import ray
    ray.init()
    

    如果是直连已有的Ray集群,只需要指定RedisServer的地址即可。

    ray.init(redis_address="<redis-address>")
    

    本地启动Ray得到的输出如下:

    >>> ray.init()
    Waiting for redis server at 127.0.0.1:58807 to respond...
    Waiting for redis server at 127.0.0.1:23148 to respond...
    Allowing the Plasma store to use up to 13.7439GB of memory.
    Starting object store with directory /tmp and huge page support disabled
    Starting local scheduler with 8 CPUs, 0 GPUs
    
    ======================================================================
    View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5
    ======================================================================
    
    {'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}
    >>> 
    

    本地启动Ray时,可以看到Ray的WebUI的访问地址。

    2. ray.put()

    使用ray.put()可以将Python对象存入本地ObjectStore,并且异步返回一个唯一的ObjectID。通过该ID,Ray可以访问集群中任一个节点上的对象(远程对象通过查阅Master的对象表获得)。

    对象一旦存入ObjectStore便不可更改,Ray的remote函数可以将直接将该对象的ID作为参数传入。使用ObjectID作为remote函数参数,可以有效地减少函数参数的写ObjectStore的次数。

    @ray.remote
    def f(x):
        pass
    
    x = "hello"
    
    # 对象x往ObjectStore拷贝里10次
    [f.remote(x) for _ in range(10)]
    
    # 对象x仅往ObjectStore拷贝1次
    x_id = ray.put(x)
    [f.remote(x_id) for _ in range(10)]
    

    3. ray.get()

    使用ray.get()可以通过ObjectID获取ObjectStore内的对象并将之转换为Python对象。对于数组类型的对象,Ray使用共享内存机制减少数据的拷贝成本。而对于其它对象则需要将数据从ObjectStore拷贝到进程的堆内存中。

    如果调用ray.get()操作时,对象尚未创建好,则get操作会阻塞,直到对象创建完成后返回。get操作的关键流程如下:

    1. Driver或者Worker进程首先到ObjectStore内请求ObjectID对应的对象数据。
    2. 如果本地ObjectStore没有对应的对象数据,本地对象管理器Plasma会检查Master上的对象表查看对象是否存储其它节点的ObjectStore。
    3. 如果对象数据在其它节点的ObjectStore内,Plasma会发送网络请求将对象数据拉到本地ObjectStore。
    4. 如果对象数据还没有创建好,Master会在对象创建完成后通知请求的Plasma读取。
    5. 如果对象数据已经被所有的ObjectStore移除(被LRU策略删除),本地调度器会根据任务血缘关系执行对象的重新创建工作。
    6. 一旦对象数据在本地ObjectStore可用,Driver或者Worker进程会通过共享内存的方式直接将对象内存区域映射到自己的进程地址空间中,并反序列化为Python对象。

    另外,ray.get()可以一次性读取多个对象的数据:

    result_ids = [ray.put(i) for i in range(10)]
    ray.get(result_ids)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    

    4. @ray.remote

    Ray中使用注解@ray.remote可以声明一个remote function。remote函数时Ray的基本任务调度单元,remote函数定义后会立即被序列化存储到RedisServer中,并且分配了一个唯一的ID,这样就保证了集群的所有节点都可以看到这个函数的定义。

    不过,这样对remote函数定义有了一个潜在的要求,即remote函数内如果调用了其它的用户函数,则必须提前定义,否则remote函数无法找到对应的函数定义内容。

    remote函数内也可以调用其它的remote函数,Driver和Slave每次调用remote函数时,其实都是向集群提交了一个计算任务,从这里也可以看到Ray的分布式计算的自由性。

    Ray中调用remote函数的关键流程如下:

    1. 调用remote函数时,首先会创建一个任务对象,它包含了函数的ID、参数的ID或者值(Python的基本对象直接传值,复杂对象会先通过ray.put()操作存入ObjectStore然后返回ObjectID)、函数返回值对象的ID。
    2. 任务对象被发送到本地调度器。
    3. 本地调度器决定任务对象是在本地调度还是发送给全局调度器。如果任务对象的依赖(参数)在本地的ObejctStore已经存在且本地的CPU和GPU计算资源充足,那么本地调度器将任务分配给本地的WorkerProcess执行。否则,任务对象被发送给全局调度器并存储到任务表(TaskTable)中,全局调度器根据当前的任务状态信息决定将任务发给集群中的某一个本地调度器。
    4. 本地调度器收到任务对象后(来自本地的任务或者全局调度分配的任务),会将其放入一个任务队列中,等待计算资源和本地依赖满足后分配给WorkerProcess执行。
    5. Worker收到任务对象后执行该任务,并将函数返回值存入ObjectStore,并更新Master的对象表(ObjectTable)信息。

    @ray.remote注解有一个参数num_return_vals用于声明remote函数的返回值个数,基于此实现remote函数的多返回值机制。

    @ray.remote(num_return_vals=2)
    def f():
        return 1, 2
    
    x_id, y_id = f.remote()
    ray.get(x_id)  # 1
    ray.get(y_id)  # 2
    

    @ray.remote注解的另一个参数num_gpus可以为任务指定GPU的资源。使用内置函数ray.get_gpu_ids()可以获取当前任务可以使用的GPU信息。

    @ray.remote(num_gpus=1)
    def gpu_method():
        return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())
    

    5. ray.wait()

    ray.wait()操作支持批量的任务等待,基于此可以实现一次性获取多个ObjectID对应的数据。

    # 启动5个remote函数调用任务
    results = [f.remote(i) for i in range(5)]
    # 阻塞等待4个任务完成,超时时间为2.5s
    ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)
    

    上述例子中,results包含了5个ObjectID,使用ray.wait操作可以一直等待有4个任务完成后返回,并将完成的数据对象放在第一个list类型返回值内,未完成的ObjectID放在第二个list返回值内。如果设置了超时时间,那么在超时时间结束后仍未等到预期的返回值个数,则已超时完成时的返回值为准。

    6. ray.error_info()

    使用ray.error_info()可以获取任务执行时产生的错误信息。

    >>> import time
    >>> @ray.remote
    >>> def f():
    >>>     time.sleep(5)
    >>>     raise Exception("This task failed!!")
    >>> f.remote()
    Remote function __main__.f failed with:
    
    Traceback (most recent call last):
      File "<stdin>", line 4, in f
    Exception: This task failed!!
    
    
      You can inspect errors by running
    
          ray.error_info()
    
      If this driver is hanging, start a new one with
    
          ray.init(redis_address="127.0.0.1:65452")
    >>> ray.error_info()
    [{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n  File "<stdin>", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]
    

    7. Actor

    Ray的remote函数只能处理无状态的计算需求,有状态的计算需求需要使用Ray的Actor实现。在Python的class定义前使用@ray.remote可以声明Actor。

    @ray.remote
    class Counter(object):
        def __init__(self):
            self.value = 0
    
        def increment(self):
            self.value += 1
            return self.value
    

    使用如下方式创建Actor对象。

    a1 = Counter.remote()
    a2 = Counter.remote()
    

    Ray创建Actor的流程为:

    1. Master选取一个Slave,并将Actor创建任务分发给它的本地调度器。
    2. 创建Actor对象,并执行它的构造函数。

    从流程可以看出,Actor对象的创建时并行的。

    通过调用Actor对象的方法使用Actor。

    a1.increment.remote()  # ray.get returns 1
    a2.increment.remote()  # ray.get returns 1
    

    调用Actor对象的方法的流程为:

    1. 首先创建一个任务。
    2. 该任务被Driver直接分配到创建该Actor对应的本地执行器执行,这个操作绕开了全局调度器(Worker是否也可以使用Actor直接分配任务尚存疑问)。
    3. 返回Actor方法调用结果的ObjectID。

    为了保证Actor状态的一致性,对同一个Actor的方法调用是串行执行的。

    四、安装Ray

    如果只是使用Ray,可以使用如下命令直接安装。

    pip intall ray
    

    如果需要编译Ray的最新源码进行安装,按照如下步骤进行(MaxOS):

    # 更新编译依赖包
    brew update
    brew install cmake pkg-config automake autoconf libtool boost wget
    pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six
    # 下载源码编译安装
    git clone https://github.com/ray-project/ray.git
    cd ray/python
    python setup.py install
    # 测试
    python test/runtest.py
    
    # 安装WebUI需要的库[可选]
    pip install jupyter ipywidgets bokeh
    
    # 编译Ray文档[可选]
    cd ray/doc
    pip install -r requirements-doc.txt
    make html
    open _build/html/index.html
    

    我在MacOS上安装jupyter时,遇到了Python的setuptools库无法升级的情况,原因是MacOS的安全性设置问题,可以使用如下方式解决:

    1. 重启电脑,启动时按住Command+R进入Mac保护模式。
    2. 打开命令行,输入命令csrutils disable关闭系统安全策略。
    3. 重启电脑,继续安装jupyter。
    4. 安装完成后,重复如上的方式执行csrutils enable,再次重启即可。

    进入PythonShell,输入代码本地启动Ray:

    import ray
    ray.init()
    

    浏览器内打开WebUI界面如下:

    这里写图片描述

    参考资料

    1. Ray论文:Real-Time Machine Learning: The Missing Pieces
    2. Ray开发手册:http://ray.readthedocs.io/en/latest/index.html
    3. Ray源代码:https://github.com/ray-project/ray
    展开全文
  • 为您提供Linkis 分布式服务框架下载,Linkis是一个打通了多个计算存储引擎如:Spark、TiSpark、Hive、Python和HBase等,对外提供统一REST/WebSocket/JDBC接口,提交执行SQL、Pyspark、HiveQL、Scala等脚本的计算...
  • Python 并行分布式框架之 Celery

    千次阅读 2017-06-30 09:33:17
    Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。 架构设计 Celery的架构由三部分组成,消息中间件(message broker),任务执行单元...
  • 分布式服务框架

    2016-03-19 10:13:00
    2019独角兽企业重金招聘Python工程师标准>>> ...
  • python并发分布式框架Celery搭建

    万次阅读 2018-07-05 00:41:28
    最近项目用到一个好东西,分享给大家,简单,有效,这就是我的感受!...消息中间件Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimen...
  • 分布式服务框架设计

    2018-09-11 17:42:00
    2019独角兽企业重金招聘Python工程师标准>>> ...
  • 数据使用:数据分析服务、互联网金融、数据建模、医疗病例分析、自然语言处理、信息聚类 scrapy+elasticsearch+django 获取数据 深入认识网络知识和编程知识 1、环境配置基础知识 2、爬取数据 3、突破反爬虫 4、...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 安装rides服务 安装rides客户端 pip install celery 编写第一个任务 tasks.py # encoding:utf-8 from celery import Celery # 指定broker-----rides地址后面/5指定任务存放在rides第几个库 broker = '...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 先来上一张celery框架图解:Celery的架构由三部分组成,消息中间件(brokers),任务执行单元(worker),和任务执行结果存储(task result store)组成。消息中间件Celery本身不提供消息服务,但是可以方便和第三方...
  • 这是lumigo / python_tracer,这是Lumigo的Python代理,用于分布式跟踪和性能监视。 支持的Python运行时:3.6、3.7、3.8 用法 该软件包允许您通过Lambda层进行自动度量收集,通过无服务器框架进行自动度量收集和...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 介绍: Linkis是一个打通了多个计算存储引擎如:Spark、TiSpark、Hive、Python和HBase等,对外提供统一REST/WebSocket/JDBC接口,提交执行SQL、Pyspark、HiveQL、Scala等...1、统一作业执行服务:一个分布式的REST/Web
  • 如果没有开源的,我的思路是基于同一种协议(hession,thrift,protobuff,avro等)把各种语言支撑的框架集成到一起(例如 php python c++ 的)形成一个支持多语言互为provider,consumer的分布式服务框架,您看行吗...
  • OpenCensus-统计收集和分布式跟踪框架Python的 。 OpenCensus提供了一个框架来衡量服务器的资源使用情况并收集性能统计信息。 该存储库包含与Python相关的实用程序和OpenCensus所需的支持软件。 追踪 安装和基本...
  • 分布式服务框架XXL-RPC

    2016-08-26 14:55:00
    2019独角兽企业重金招聘Python工程师标准>>> ...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • Go和Python中的异步高性能分布式OpenFlow 1.3 / 1.5框架。 目标 利用多个CPU内核的分布式OpenFlow框架。 在消息的序列化/反序列化方面比Ryu更快。 主要特点 分布式计算 可靠的排队和异步OpenFlow事件通知 应用程序...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 449
精华内容 179
关键字:

python分布式服务框架

python 订阅