proxy 多个swift

2014-07-25 00:03:19 gaoxingnengjisuan 阅读数 2591

感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!

如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn

PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。


概述:

    上一篇博客中我们说过,请求信息到达swift-proxy服务之后,会确定获取具体的控制器(ObjectController、ContainerController、AccountController),接下来就是调用/swift/proxy/controllers/account.py或/swift/proxy/controllers/container.py或/swift/proxy/controllers/obj.py下面的PUT,POST,DELETE,GET,HEAD等方法;然后再在具体的方法中实现到具体存储服务(Object-server、Container-server、Account-server)的连接,继而调用其下具体的PUT,POST,DELETE,GET,HEAD等方法来进行请求req的实现;

这篇博客主要关注swift-proxy与swift-account服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;


源码解析部分(代码中较重要的部分已经进行了相关的注释):


GETorHEAD

/swift/proxy/controllers/account.py----class AccountController(Controller)----def GETorHEAD

def GETorHEAD(self, req):
    """
    通过HTTP处理GET/HEAD请求;
    """
    if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
        resp = HTTPBadRequest(request=req)
        resp.body = 'Account name length of %d longer than %d' % (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
        return resp

    # get_nodes:为object获取分区号和所有副本节点信息;
    partition, nodes = self.app.account_ring.get_nodes(self.account_name)
        
    # GETorHEAD_base:HTTP GET或HEAD的基本处理;
    # 返回swob.Response对象;
    resp = self.GETorHEAD_base(
        req, _('Account'), self.app.account_ring, partition,
        req.swift_entity_path.rstrip('/'))
        
        if resp.status_int == HTTP_NOT_FOUND:
            if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
                resp.status = HTTP_GONE
            elif self.app.account_autocreate:
                resp = account_listing_response(self.account_name, req,
                                                get_listing_content_type(req))
        if req.environ.get('swift_owner'):
            self.add_acls_from_sys_metadata(resp)
        else:
            for header in self.app.swift_owner_headers:
                resp.headers.pop(header, None)
        return resp
1.为object获取分区号和所有副本节点信息;
2.HTTP GET或HEAD的基本处理,实现发送请求到远程对象服务上并调用具体方法来处理请求信息;
  注:这里的具体实现就不详细解析了;


/swift/account/server.py----class AccountController(object)----def HEAD

def HEAD(self, req):
    """
    处理HTTP协议的HEAD请求;     
    HEAD请求返回account的基本信息,并以key-value的形式保存在HTTPHEAD中返回;
    """
    drive, part, account = split_and_validate_path(req, 3)
    out_content_type = get_listing_content_type(req)
        
    # mount_check是是否进行mount检查;
    # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
    if self.mount_check and not check_mount(self.root, drive):
        return HTTPInsufficientStorage(drive=drive, request=req)
        
    # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作;
    broker = self._get_account_broker(drive, part, account,
                                          pending_timeout=0.1,
                                          stale_reads_ok=True)
        
    if broker.is_deleted():
        return self._deleted_response(broker, req, HTTPNotFound)
        
    # get_info:为账户获取全局数据,获得account的基本信息;
    # 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
    info = broker.get_info()
    headers = {
            'X-Account-Container-Count': info['container_count'],
            'X-Account-Object-Count': info['object_count'],
            'X-Account-Bytes-Used': info['bytes_used'],
            'X-Timestamp': info['created_at'],
            'X-PUT-Timestamp': info['put_timestamp']}
    headers.update((key, value)
                   for key, (value, timestamp) in
                   broker.metadata.iteritems() if value != '')
    headers['Content-Type'] = out_content_type
        
        return HTTPNoContent(request=req, headers=headers, charset='utf-8')
注:HEAD请求返回account的基本信息(元数据信息),并以key-value的形式保存在HTTPHEAD中返回;


/swift/account/server.py----class AccountController(object)----def GET

def GET(self, req):
    """
    处理HTTP协议的GET请求;
    GET同HEAD一样,都是请求返回account的基本信息,并以key-value的形式保存在HTTPHEAD当中;
    不同之处在于GET方法中获取了指定account下的container列表,存储在body中,同HTTPHEAD一同返回;
    """
    drive, part, account = split_and_validate_path(req, 3)
    prefix = get_param(req, 'prefix')
    delimiter = get_param(req, 'delimiter')
    if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
        # delimiters can be made more flexible later
        return HTTPPreconditionFailed(body='Bad delimiter')
    limit = ACCOUNT_LISTING_LIMIT
    given_limit = get_param(req, 'limit')
    if given_limit and given_limit.isdigit():
        limit = int(given_limit)
        if limit > ACCOUNT_LISTING_LIMIT:
            return HTTPPreconditionFailed(request=req, body='Maximum limit is %d' % ACCOUNT_LISTING_LIMIT)
    marker = get_param(req, 'marker', '')
    end_marker = get_param(req, 'end_marker')
    out_content_type = get_listing_content_type(req)

    # mount_check是是否进行mount检查;
    # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
    if self.mount_check and not check_mount(self.root, drive):
        return HTTPInsufficientStorage(drive=drive, request=req)
        
    # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作;
    broker = self._get_account_broker(drive, part, account,
                                          pending_timeout=0.1,
                                          stale_reads_ok=True)
        
    if broker.is_deleted():
        return self._deleted_response(broker, req, HTTPNotFound)
        
    # 获取指定account下的容器列表(包括具体容器的相关信息);
    return account_listing_response(account, req, out_content_type, broker,
                                    limit, marker, end_marker, prefix,
                                    delimiter)

来看方法account_listing_response的具体实现:

def account_listing_response(account, req, response_content_type, broker=None,
                             limit='', marker='', end_marker='', prefix='',
                             delimiter=''):
    """
    获取指定account下的容器列表(包括具体容器的相关信息);
    """
    if broker is None:
        broker = FakeAccountBroker()

    info = broker.get_info()
    resp_headers = {
        'X-Account-Container-Count': info['container_count'],
        'X-Account-Object-Count': info['object_count'],
        'X-Account-Bytes-Used': info['bytes_used'],
        'X-Timestamp': info['created_at'],
        'X-PUT-Timestamp': info['put_timestamp']}
    resp_headers.update((key, value)
                        for key, (value, timestamp) in
                        broker.metadata.iteritems() if value != '')

    # 获取容器列表,每个容器信息包括(name, object_count, bytes_used, 0);
    account_list = broker.list_containers_iter(limit, marker, end_marker,
                                               prefix, delimiter)
    if response_content_type == 'application/json':
        data = []
        for (name, object_count, bytes_used, is_subdir) in account_list:
            if is_subdir:
                data.append({'subdir': name})
            else:
                data.append({'name': name, 'count': object_count,
                             'bytes': bytes_used})
        account_list = json.dumps(data)
    elif response_content_type.endswith('/xml'):
        output_list = ['<?xml version="1.0" encoding="UTF-8"?>',
                       '<account name=%s>' % saxutils.quoteattr(account)]
        for (name, object_count, bytes_used, is_subdir) in account_list:
            if is_subdir:
                output_list.append(
                    '<subdir name=%s />' % saxutils.quoteattr(name))
            else:
                item = '<container><name>%s</name><count>%s</count>' \
                       '<bytes>%s</bytes></container>' % \
                       (saxutils.escape(name), object_count, bytes_used)
                output_list.append(item)
        output_list.append('</account>')
        account_list = '\n'.join(output_list)
    else:
        if not account_list:
            resp = HTTPNoContent(request=req, headers=resp_headers)
            resp.content_type = response_content_type
            resp.charset = 'utf-8'
            return resp
        account_list = '\n'.join(r[0] for r in account_list) + '\n'
    ret = HTTPOk(body=account_list, request=req, headers=resp_headers)
    ret.content_type = response_content_type
    ret.charset = 'utf-8'
    return ret

下一篇博客将继续swift-proxy与swift-account的分析工作。

2016-06-07 14:04:11 weixin_34331102 阅读数 28

          OpenStack Swift作为开源的云存储工具,被越来越多的公司使用。为了记录和巩固学习swift的开源源码,所以进行一系列的源码开源学习笔记,供初学者快速学习和理解swift的内部功能。

          proxy下面的server.py模块是所有对account,container,object等对象进行管理操作的在swift的proxy端的总入口。在swift系统在接收到url请求后,先是经过middleware处理链分别验证处理后,再进入到proxy下面的server模块,进入 _call_方法调用后,把对应的请求分发给不同的controller处理,controller再调用各自的nodeStroage服务程序进行处理,返回各自的resp结果到server模块,最后通过mimmdleware处理链再反方向返回最终的请求处理结果

 

     1.首先server.py的整个结构如下图:包括4部分:一堆import引用,一个required_filters的字典,一个名为“Application(object)”的class ,一个app_factory(global_conf, **local_conf) 方法

   

     2.主要介绍“Application(object)”的class,其中包含了所有主要的功能方法

   

 

 2.1 _init_ 方法,Application类的初始化方法,主要就是初始化一些对象,包括:conf配置文件参数 的初始化,log日志初始化,memcache对象初始化,account_ring,container_ring, object_ring对象初始化等

 

2.2  check_config(self) 方法,主要是检查配置文件proxy-server.conf中配置的“read_affinity” 和“sorting_method”属性值是否正确,该方法在 app_factory(global_conf, **local_conf):方法时调用  

   

2.3 get_controller(self, path)方法,主要是根据传入的 urlPath解析并返回对应的control类和一个字典对象,其中字典对象的值根据传入url格式的不同返回不同的值

 

[python] view plaincopy
  1. def get_controller(self, path):  
  2.         """ 
  3.         Get the controller to handle a request. 
  4.  
  5.         :param path: path from request 
  6.         :returns: tuple of (controller class, path dictionary) 
  7.  
  8.         :raises: ValueError (thrown by split_path) if given invalid path 
  9.         """  
  10.         if path == '/info':  #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典  
  11.             d = dict(version=None,  
  12.                      expose_info=self.expose_info,  
  13.                      disallowed_sections=self.disallowed_sections,  
  14.                      admin_key=self.admin_key)  
  15.             return InfoController, d  
  16.   
  17.         version, account, container, obj = split_path(path, 14True)  #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量  
  18.         d = dict(version=version,  
  19.                  account_name=account,  
  20.                  container_name=container,  
  21.                  object_name=obj)  
  22.         if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种  
  23.             return ObjectController, d  
  24.         elif container and account:  
  25.             return ContainerController, d  
  26.         elif account and not container and not obj:  
  27.             return AccountController, d  
  28.         return None, d  


 

 

2.4  __call__(self, env, start_response)方法,是server模块的实际对account、container、object等对象调用处理的功能入口。

 

[python] view plaincopy
  1. def __call__(self, env, start_response):  
  2.         """ 
  3.         WSGI entry point. 
  4.         Wraps env in swob.Request object and passes it down. 
  5.  
  6.         :param env: WSGI environment dictionary 
  7.         :param start_response: WSGI callable 
  8.         """  
  9.         try:  
  10.             if self.memcache is None#首先判断是否memcache值存在,不存在再去获取一次  
  11.                 self.memcache = cache_from_env(env)  
  12.             req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token  
  13.             return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象  
  14.         except UnicodeError:  
  15.             err = HTTPPreconditionFailed(  
  16.                 request=req, body='Invalid UTF8 or contains NULL')  
  17.             return err(env, start_response)  
  18.         except (Exception, Timeout):  
  19.             start_response('500 Server Error',  
  20.                            [('Content-Type''text/plain')])  
  21.             return ['Internal server error.\n']  


2.5 update_request(self, req)方法,根据请求中header里面的x-storage-token有而x-auth-token没有的情况,把x-storage-token的值赋予x-auth-token

 


2.6 handle_request(self, req)方法,server模块实际处理request请求的方法,熟悉servlet的同学可以把它理解成servlet的作用

 

[python] view plaincopy
  1. def handle_request(self, req):  
  2.         """ 
  3.         Entry point for proxy server. 
  4.         Should return a WSGI-style callable (such as swob.Response). 
  5.  
  6.         :param req: swob.Request object 
  7.         """  
  8.         try:  
  9.             self.logger.set_statsd_prefix('proxy-server'#在日志的开头加上‘proxy-server’,方便跟踪分析  
  10.             if req.content_length and req.content_length < 0#检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录  
  11.                 self.logger.increment('errors')  
  12.                 return HTTPBadRequest(request=req,  
  13.                                       body='Invalid Content-Length')  
  14.   
  15.             try:  
  16.                 if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录  
  17.                     self.logger.increment('errors')  
  18.                     return HTTPPreconditionFailed(  
  19.                         request=req, body='Invalid UTF8 or contains NULL')  
  20.             except UnicodeError:  
  21.                 self.logger.increment('errors')  
  22.                 return HTTPPreconditionFailed(  
  23.                     request=req, body='Invalid UTF8 or contains NULL')  
  24.   
  25.             try:  
  26.                 controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象  
  27.                 p = req.path_info  
  28.                 if isinstance(p, unicode):  
  29.                     p = p.encode('utf-8'#path编码Unicode转换utf-8  
  30.             except ValueError:           #发生值异常,返回错误请求,并日志记录  
  31.                 self.logger.increment('errors')  
  32.                 return HTTPNotFound(request=req)  
  33.             if not controller:         #为找到对应处理的controller类时,返回错误请求,并日志记录  
  34.                 self.logger.increment('errors')  
  35.                 return HTTPPreconditionFailed(request=req, body='Bad URL')  
  36.             if self.deny_host_headers and \    #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录  
  37.                     req.host.split(':')[0in self.deny_host_headers:  
  38.                 return HTTPForbidden(request=req, body='Invalid host header')  
  39.   
  40.             self.logger.set_statsd_prefix('proxy-server.' +  
  41.                                           controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析  
  42.             controller = controller(self, **path_parts)  #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)  
  43.             if 'swift.trans_id' not in req.environ:    #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID  
  44.                 # if this wasn't set by an earlier middleware, set it now  
  45.                 trans_id = generate_trans_id(self.trans_id_suffix)  
  46.                 req.environ['swift.trans_id'] = trans_id  
  47.                 self.logger.txn_id = trans_id  
  48.             req.headers['x-trans-id'] = req.environ['swift.trans_id']  
  49.             controller.trans_id = req.environ['swift.trans_id']  
  50.             self.logger.client_ip = get_remote_client(req)  #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析  
  51.             try:  
  52.                 handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)  
  53.                 getattr(handler, 'publicly_accessible'#再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)  
  54.             except AttributeError:  
  55.                 allowed_methods = getattr(controller, 'allowed_methods', set())  
  56.                 return HTTPMethodNotAllowed(  
  57.                     request=req, headers={'Allow'', '.join(allowed_methods)})  
  58.             if 'swift.authorize' in req.environ: #做鉴权操作  
  59.                 # We call authorize before the handler, always. If authorized,  
  60.                 # we remove the swift.authorize hook so isn't ever called  
  61.                 # again. If not authorized, we return the denial unless the  
  62.                 # controller's method indicates it'd like to gather more  
  63.                 # information and try again later.  
  64.                 resp = req.environ['swift.authorize'](req)  
  65.                 if not resp:  
  66.                     # No resp means authorized, no delayed recheck required.  
  67.                     del req.environ['swift.authorize']  
  68.                 else:  
  69.                     # Response indicates denial, but we might delay the denial  
  70.                     # and recheck later. If not delayed, return the error now.  
  71.                     if not getattr(handler, 'delay_denial'None):  
  72.                         return resp  
  73.             # Save off original request method (GET, POST, etc.) in case it  
  74.             # gets mutated during handling.  This way logging can display the  
  75.             # method the client actually sent.  
  76.             req.environ['swift.orig_req_method'] = req.method  
  77.             return handler(req)    #调用最终的method方法,并返回resp结果  
  78.         except HTTPException as error_response:  
  79.             return error_response  
  80.         except (Exception, Timeout):  
  81.             self.logger.exception(_('ERROR Unhandled exception in request'))  
  82.             return HTTPServerError(request=req)  

 

2.7 sort_nodes(self, nodes)方法,对nodes对象进行排序处理,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用

 

[python] view plaincopy
  1. def sort_nodes(self, nodes):  
  2.         ''''' 
  3.         Sorts nodes in-place (and returns the sorted list) according to 
  4.         the configured strategy. The default "sorting" is to randomly 
  5.         shuffle the nodes. If the "timing" strategy is chosen, the nodes 
  6.         are sorted according to the stored timing data. 
  7.         '''  
  8.         # In the case of timing sorting, shuffling ensures that close timings  
  9.         # (ie within the rounding resolution) won't prefer one over another.  
  10.         # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)  
  11.         shuffle(nodes)  
  12.         if self.sorting_method == 'timing':  #配置文件中排序方法为timing时,以时间排序  
  13.             now = time()  
  14.   
  15.             def key_func(node):  
  16.                 timing, expires = self.node_timings.get(node['ip'], (-1.00))  
  17.                 return timing if expires > now else -1.0  
  18.             nodes.sort(key=key_func)  
  19.         elif self.sorting_method == 'affinity'#配置文件中排序方法为affinity时,以自定义的亲和力规则排序  
  20.             nodes.sort(key=self.read_affinity_sort_key)  
  21.         return nodes  

 

2.8 set_node_timing(self, node, timing)方法,提供给外部程序调用

 

2.9    error_limited(self, node)方法,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用

 

[python] view plaincopy
  1. def error_limited(self, node):  
  2.         """ 
  3.         Check if the node is currently error limited. 
  4.  
  5.         :param node: dictionary of node to check 
  6.         :returns: True if error limited, False otherwise 
  7.         """  
  8.         now = time()  
  9.         if 'errors' not in node:  #errors没在node中时返回false  
  10.             return False  
  11.         if 'last_error' in node and node['last_error'] < \   #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔  
  12.                 now - self.error_suppression_interval:  
  13.             del node['last_error'#node去掉last_error  
  14.             if 'errors' in node: #errors在node中时返回 去掉errors,且返回false  
  15.                 del node['errors']  
  16.             return False  
  17.         limited = node['errors'] > self.error_suppression_limit  #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录  
  18.         if limited:  
  19.             self.logger.debug(  
  20.                 _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)  
  21.         return limited  

 

 

2.10  error_limit(self, node, msg)方法,提供给外部程序调用,用于给node直接增加errors到比系统允许的次数+1,并记录last_error时间,和做日志记录

 

2.11  error_limit(self, node, msg)方法,提供给外部程序调用,用于给node增加errors次数,并记录last_error时间,和做日志记录

 

2.12  iter_nodes(self, ring, partition, node_iter=None)方法,提供给外部程序调用,用于对nodes做排序后生成的nodes迭代器

 

2.13  exception_occurred(self, node, typ, additional_info)方法,提供给外部程序调用,用于当node发生异常了,进行日志记录

2.14  modify_wsgi_pipeline(self, pipe)方法,提供给外部程序调用,用于系统启动时,初始化pipeline,并做日志记录

 

[python] view plaincopy
  1. def modify_wsgi_pipeline(self, pipe):  
  2.         """ 
  3.         Called during WSGI pipeline creation. Modifies the WSGI pipeline 
  4.         context to ensure that mandatory middleware is present in the pipeline. 
  5.  
  6.         :param pipe: A PipelineWrapper object 
  7.         """  
  8.         pipeline_was_modified = False  
  9.         for filter_spec in reversed(required_filters):  #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理  
  10.             filter_name = filter_spec['name']  
  11.             if filter_name not in pipe:  
  12.                 afters = filter_spec.get('after_fn'lambda _junk: [])(pipe)  
  13.                 insert_at = 0  
  14.                 for after in afters:  
  15.                     try:  
  16.                         insert_at = max(insert_at, pipe.index(after) + 1)  
  17.                     except ValueError:  # not in pipeline; ignore it  
  18.                         pass  
  19.                 self.logger.info(  
  20.                     'Adding required filter %s to pipeline at position %d' %  
  21.                     (filter_name, insert_at))  
  22.                 ctx = pipe.create_filter(filter_name)  
  23.                 pipe.insert_filter(ctx, index=insert_at)  
  24.                 pipeline_was_modified = True  
  25.   
  26.         if pipeline_was_modified:    
  27.             self.logger.info("Pipeline was modified. New pipeline is \"%s\".",  
  28.                              pipe)  
  29.         else:  
  30.             self.logger.debug("Pipeline is \"%s\"", pipe)  


 

以下源码为2014、3、12的最新的Proxy的server.py源码,只加了部分代码注释:

 

[python] view plaincopy
  1. # Copyright (c) 2010-2012 OpenStack Foundation  
  2. #  
  3. # Licensed under the Apache License, Version 2.0 (the "License");  
  4. # you may not use this file except in compliance with the License.  
  5. # You may obtain a copy of the License at  
  6. #  
  7. #    http://www.apache.org/licenses/LICENSE-2.0  
  8. #  
  9. # Unless required by applicable law or agreed to in writing, software  
  10. # distributed under the License is distributed on an "AS IS" BASIS,  
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or  
  12. # implied.  
  13. # See the License for the specific language governing permissions and  
  14. # limitations under the License.  
  15.   
  16. import mimetypes  
  17. import os  
  18. import socket  
  19. from swift import gettext_ as _  
  20. from random import shuffle  
  21. from time import time  
  22. import itertools  
  23.   
  24. from eventlet import Timeout  
  25.   
  26. from swift import __canonical_version__ as swift_version  
  27. from swift.common import constraints  
  28. from swift.common.ring import Ring  
  29. from swift.common.utils import cache_from_env, get_logger, \  
  30.     get_remote_client, split_path, config_true_value, generate_trans_id, \  
  31.     affinity_key_function, affinity_locality_predicate, list_from_csv, \  
  32.     register_swift_info  
  33. from swift.common.constraints import check_utf8  
  34. from swift.proxy.controllers import AccountController, ObjectController, \  
  35.     ContainerController, InfoController  
  36. from swift.common.swob import HTTPBadRequest, HTTPForbidden, \  
  37.     HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \  
  38.     HTTPServerError, HTTPException, Request  
  39.   
  40.   
  41. # List of entry points for mandatory middlewares.  
  42. #  
  43. # Fields:  
  44. #  
  45. # "name" (required) is the entry point name from setup.py.  
  46. #  
  47. # "after_fn" (optional) a function that takes a PipelineWrapper object as its  
  48. # single argument and returns a list of middlewares that this middleware  
  49. # should come after. Any middlewares in the returned list that are not present  
  50. # in the pipeline will be ignored, so you can safely name optional middlewares  
  51. # to come after. For example, ["catch_errors", "bulk"] would install this  
  52. # middleware after catch_errors and bulk if both were present, but if bulk  
  53. # were absent, would just install it after catch_errors.  
  54.   
  55. required_filters = [  
  56.     {'name''catch_errors'},  
  57.     {'name''gatekeeper',  
  58.      'after_fn'lambda pipe: (['catch_errors']  
  59.                                if pipe.startswith("catch_errors")  
  60.                                else [])},  
  61.     {'name''dlo''after_fn'lambda _junk: ['catch_errors''gatekeeper',  
  62.                                                'proxy_logging']}]  
  63.   
  64.   
  65. class Application(object):  
  66.     """WSGI application for the proxy server."""  
  67.   
  68.     def __init__(self, conf, memcache=None, logger=None, account_ring=None,  
  69.                  container_ring=None, object_ring=None):  
  70.         if conf is None:  
  71.             conf = {}  
  72.         if logger is None:  
  73.             self.logger = get_logger(conf, log_route='proxy-server')  
  74.         else:  
  75.             self.logger = logger  
  76.   
  77.         swift_dir = conf.get('swift_dir''/etc/swift')  
  78.         self.node_timeout = int(conf.get('node_timeout'10))  
  79.         self.recoverable_node_timeout = int(  
  80.             conf.get('recoverable_node_timeout'self.node_timeout))  
  81.         self.conn_timeout = float(conf.get('conn_timeout'0.5))  
  82.         self.client_timeout = int(conf.get('client_timeout'60))  
  83.         self.put_queue_depth = int(conf.get('put_queue_depth'10))  
  84.         self.object_chunk_size = int(conf.get('object_chunk_size'65536))  
  85.         self.client_chunk_size = int(conf.get('client_chunk_size'65536))  
  86.         self.trans_id_suffix = conf.get('trans_id_suffix''')  
  87.         self.post_quorum_timeout = float(conf.get('post_quorum_timeout'0.5))  
  88.         self.error_suppression_interval = \  
  89.             int(conf.get('error_suppression_interval'60))  
  90.         self.error_suppression_limit = \  
  91.             int(conf.get('error_suppression_limit'10))  
  92.         self.recheck_container_existence = \  
  93.             int(conf.get('recheck_container_existence'60))  
  94.         self.recheck_account_existence = \  
  95.             int(conf.get('recheck_account_existence'60))  
  96.         self.allow_account_management = \  
  97.             config_true_value(conf.get('allow_account_management''no'))  
  98.         self.object_post_as_copy = \  
  99.             config_true_value(conf.get('object_post_as_copy''true'))  
  100.         self.object_ring = object_ring or Ring(swift_dir, ring_name='object')  
  101.         self.container_ring = container_ring or Ring(swift_dir,  
  102.                                                      ring_name='container')  
  103.         self.account_ring = account_ring or Ring(swift_dir,  
  104.                                                  ring_name='account')  
  105.         self.memcache = memcache  
  106.         mimetypes.init(mimetypes.knownfiles +  
  107.                        [os.path.join(swift_dir, 'mime.types')])  
  108.         self.account_autocreate = \  
  109.             config_true_value(conf.get('account_autocreate''no'))  
  110.         self.expiring_objects_account = \  
  111.             (conf.get('auto_create_account_prefix'or '.') + \  
  112.             (conf.get('expiring_objects_account_name'or 'expiring_objects')  
  113.         self.expiring_objects_container_divisor = \  
  114.             int(conf.get('expiring_objects_container_divisor'or 86400)  
  115.         self.max_containers_per_account = \  
  116.             int(conf.get('max_containers_per_account'or 0)  
  117.         self.max_containers_whitelist = [  
  118.             a.strip()  
  119.             for a in conf.get('max_containers_whitelist''').split(',')  
  120.             if a.strip()]  
  121.         self.deny_host_headers = [  
  122.             host.strip() for host in  
  123.             conf.get('deny_host_headers''').split(','if host.strip()]  
  124.         self.rate_limit_after_segment = \  
  125.             int(conf.get('rate_limit_after_segment'10))  
  126.         self.rate_limit_segments_per_sec = \  
  127.             int(conf.get('rate_limit_segments_per_sec'1))  
  128.         self.log_handoffs = config_true_value(conf.get('log_handoffs''true'))  
  129.         self.cors_allow_origin = [  
  130.             a.strip()  
  131.             for a in conf.get('cors_allow_origin''').split(',')  
  132.             if a.strip()]  
  133.         self.strict_cors_mode = config_true_value(  
  134.             conf.get('strict_cors_mode''t'))  
  135.         self.node_timings = {}  
  136.         self.timing_expiry = int(conf.get('timing_expiry'300))  
  137.         self.sorting_method = conf.get('sorting_method''shuffle').lower()  
  138.         self.max_large_object_get_time = float(  
  139.             conf.get('max_large_object_get_time''86400'))  
  140.         value = conf.get('request_node_count''2 * replicas').lower().split()  
  141.         if len(value) == 1:  
  142.             value = int(value[0])  
  143.             self.request_node_count = lambda replicas: value  
  144.         elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':  
  145.             value = int(value[0])  
  146.             self.request_node_count = lambda replicas: value * replicas  
  147.         else:  
  148.             raise ValueError(  
  149.                 'Invalid request_node_count value: %r' % ''.join(value))  
  150.         try:  
  151.             self._read_affinity = read_affinity = conf.get('read_affinity''')  
  152.             self.read_affinity_sort_key = affinity_key_function(read_affinity)  
  153.         except ValueError as err:  
  154.             # make the message a little more useful  
  155.             raise ValueError("Invalid read_affinity value: %r (%s)" %  
  156.                              (read_affinity, err.message))  
  157.         try:  
  158.             write_affinity = conf.get('write_affinity''')  
  159.             self.write_affinity_is_local_fn \  
  160.                 = affinity_locality_predicate(write_affinity)  
  161.         except ValueError as err:  
  162.             # make the message a little more useful  
  163.             raise ValueError("Invalid write_affinity value: %r (%s)" %  
  164.                              (write_affinity, err.message))  
  165.         value = conf.get('write_affinity_node_count',  
  166.                          '2 * replicas').lower().split()  
  167.         if len(value) == 1:  
  168.             value = int(value[0])  
  169.             self.write_affinity_node_count = lambda replicas: value  
  170.         elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':  
  171.             value = int(value[0])  
  172.             self.write_affinity_node_count = lambda replicas: value * replicas  
  173.         else:  
  174.             raise ValueError(  
  175.                 'Invalid write_affinity_node_count value: %r' % ''.join(value))  
  176.         # swift_owner_headers are stripped by the account and container  
  177.         # controllers; we should extend header stripping to object controller  
  178.         # when a privileged object header is implemented.  
  179.         swift_owner_headers = conf.get(  
  180.             'swift_owner_headers',  
  181.             'x-container-read, x-container-write, '  
  182.             'x-container-sync-key, x-container-sync-to, '  
  183.             'x-account-meta-temp-url-key, x-account-meta-temp-url-key-2, '  
  184.             'x-account-access-control')  
  185.         self.swift_owner_headers = [  
  186.             name.strip().title()  
  187.             for name in swift_owner_headers.split(','if name.strip()]  
  188.         # Initialization was successful, so now apply the client chunk size  
  189.         # parameter as the default read / write buffer size for the network  
  190.         # sockets.  
  191.         #  
  192.         # NOTE WELL: This is a class setting, so until we get set this on a  
  193.         # per-connection basis, this affects reading and writing on ALL  
  194.         # sockets, those between the proxy servers and external clients, and  
  195.         # those between the proxy servers and the other internal servers.  
  196.         #  
  197.         # ** Because it affects the client as well, currently, we use the  
  198.         # client chunk size as the govenor and not the object chunk size.  
  199.         socket._fileobject.default_bufsize = self.client_chunk_size  
  200.         self.expose_info = config_true_value(  
  201.             conf.get('expose_info''yes'))  
  202.         self.disallowed_sections = list_from_csv(  
  203.             conf.get('disallowed_sections'))  
  204.         self.admin_key = conf.get('admin_key'None)  
  205.         register_swift_info(  
  206.             version=swift_version,  
  207.             strict_cors_mode=self.strict_cors_mode,  
  208.             **constraints.EFFECTIVE_CONSTRAINTS)  
  209.   
  210.     def check_config(self):  
  211.         """ 
  212.         Check the configuration for possible errors 
  213.         """  
  214.         if self._read_affinity and self.sorting_method != 'affinity':  
  215.             self.logger.warn("sorting_method is set to '%s', not 'affinity'; "  
  216.                              "read_affinity setting will have no effect." %  
  217.                              self.sorting_method)  
  218.   
  219.     def get_controller(self, path):  
  220.         """ 
  221.         Get the controller to handle a request. 
  222.  
  223.         :param path: path from request 
  224.         :returns: tuple of (controller class, path dictionary) 
  225.  
  226.         :raises: ValueError (thrown by split_path) if given invalid path 
  227.         """  
  228.         if path == '/info':  #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典  
  229.             d = dict(version=None,  
  230.                      expose_info=self.expose_info,  
  231.                      disallowed_sections=self.disallowed_sections,  
  232.                      admin_key=self.admin_key)  
  233.             return InfoController, d  
  234.   
  235.         version, account, container, obj = split_path(path, 14True)  #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量  
  236.         d = dict(version=version,  
  237.                  account_name=account,  
  238.                  container_name=container,  
  239.                  object_name=obj)  
  240.         if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种  
  241.             return ObjectController, d  
  242.         elif container and account:  
  243.             return ContainerController, d  
  244.         elif account and not container and not obj:  
  245.             return AccountController, d  
  246.         return None, d  
  247.   
  248.     def __call__(self, env, start_response):  
  249.         """ 
  250.         WSGI entry point. 
  251.         Wraps env in swob.Request object and passes it down. 
  252.  
  253.         :param env: WSGI environment dictionary 
  254.         :param start_response: WSGI callable 
  255.         """  
  256.         try:  
  257.             if self.memcache is None#首先判断是否memcache值存在,不存在再去获取一次  
  258.                 self.memcache = cache_from_env(env)  
  259.             req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token  
  260.             return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象  
  261.         except UnicodeError:  
  262.             err = HTTPPreconditionFailed(  
  263.                 request=req, body='Invalid UTF8 or contains NULL')  
  264.             return err(env, start_response)  
  265.         except (Exception, Timeout):  
  266.             start_response('500 Server Error',  
  267.                            [('Content-Type''text/plain')])  
  268.             return ['Internal server error.\n']  
  269.   
  270.     def update_request(self, req):  
  271.         if 'x-storage-token' in req.headers and \  
  272.                 'x-auth-token' not in req.headers:  
  273.             req.headers['x-auth-token'] = req.headers['x-storage-token']  
  274.         return req  
  275.   
  276.     def handle_request(self, req):  
  277.         """ 
  278.         Entry point for proxy server. 
  279.         Should return a WSGI-style callable (such as swob.Response). 
  280.  
  281.         :param req: swob.Request object 
  282.         """  
  283.         try:  
  284.             self.logger.set_statsd_prefix('proxy-server'#在日志的开头加上‘proxy-server’,方便跟踪分析  
  285.             if req.content_length and req.content_length < 0#检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录  
  286.                 self.logger.increment('errors')  
  287.                 return HTTPBadRequest(request=req,  
  288.                                       body='Invalid Content-Length')  
  289.   
  290.             try:  
  291.                 if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录  
  292.                     self.logger.increment('errors')  
  293.                     return HTTPPreconditionFailed(  
  294.                         request=req, body='Invalid UTF8 or contains NULL')  
  295.             except UnicodeError:  
  296.                 self.logger.increment('errors')  
  297.                 return HTTPPreconditionFailed(  
  298.                     request=req, body='Invalid UTF8 or contains NULL')  
  299.   
  300.             try:  
  301.                 controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象  
  302.                 p = req.path_info  
  303.                 if isinstance(p, unicode):  
  304.                     p = p.encode('utf-8'#path编码Unicode转换utf-8  
  305.             except ValueError:           #发生值异常,返回错误请求,并日志记录  
  306.                 self.logger.increment('errors')  
  307.                 return HTTPNotFound(request=req)  
  308.             if not controller:         #为找到对应处理的controller类时,返回错误请求,并日志记录  
  309.                 self.logger.increment('errors')  
  310.                 return HTTPPreconditionFailed(request=req, body='Bad URL')  
  311.             if self.deny_host_headers and \    #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录  
  312.                     req.host.split(':')[0in self.deny_host_headers:  
  313.                 return HTTPForbidden(request=req, body='Invalid host header')  
  314.   
  315.             self.logger.set_statsd_prefix('proxy-server.' +  
  316.                                           controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析  
  317.             controller = controller(self, **path_parts)  #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)  
  318.             if 'swift.trans_id' not in req.environ:    #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID  
  319.                 # if this wasn't set by an earlier middleware, set it now  
  320.                 trans_id = generate_trans_id(self.trans_id_suffix)  
  321.                 req.environ['swift.trans_id'] = trans_id  
  322.                 self.logger.txn_id = trans_id  
  323.             req.headers['x-trans-id'] = req.environ['swift.trans_id']  
  324.             controller.trans_id = req.environ['swift.trans_id']  
  325.             self.logger.client_ip = get_remote_client(req)  #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析  
  326.             try:  
  327.                 handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)  
  328.                 getattr(handler, 'publicly_accessible'#再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)  
  329.             except AttributeError:  
  330.                 allowed_methods = getattr(controller, 'allowed_methods', set())  
  331.                 return HTTPMethodNotAllowed(  
  332.                     request=req, headers={'Allow'', '.join(allowed_methods)})  
  333.             if 'swift.authorize' in req.environ: #做鉴权操作  
  334.                 # We call authorize before the handler, always. If authorized,  
  335.                 # we remove the swift.authorize hook so isn't ever called  
  336.                 # again. If not authorized, we return the denial unless the  
  337.                 # controller's method indicates it'd like to gather more  
  338.                 # information and try again later.  
  339.                 resp = req.environ['swift.authorize'](req)  
  340.                 if not resp:  
  341.                     # No resp means authorized, no delayed recheck required.  
  342.                     del req.environ['swift.authorize']  
  343.                 else:  
  344.                     # Response indicates denial, but we might delay the denial  
  345.                     # and recheck later. If not delayed, return the error now.  
  346.                     if not getattr(handler, 'delay_denial'None):  
  347.                         return resp  
  348.             # Save off original request method (GET, POST, etc.) in case it  
  349.             # gets mutated during handling.  This way logging can display the  
  350.             # method the client actually sent.  
  351.             req.environ['swift.orig_req_method'] = req.method  
  352.             return handler(req)    #调用最终的method方法,并返回resp结果  
  353.         except HTTPException as error_response:  
  354.             return error_response  
  355.         except (Exception, Timeout):  
  356.             self.logger.exception(_('ERROR Unhandled exception in request'))  
  357.             return HTTPServerError(request=req)  
  358.   
  359.     def sort_nodes(self, nodes):  
  360.         ''''' 
  361.         Sorts nodes in-place (and returns the sorted list) according to 
  362.         the configured strategy. The default "sorting" is to randomly 
  363.         shuffle the nodes. If the "timing" strategy is chosen, the nodes 
  364.         are sorted according to the stored timing data. 
  365.         '''  
  366.         # In the case of timing sorting, shuffling ensures that close timings  
  367.         # (ie within the rounding resolution) won't prefer one over another.  
  368.         # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)  
  369.         shuffle(nodes)  
  370.         if self.sorting_method == 'timing':  #配置文件中排序方法为timing时,以时间排序  
  371.             now = time()  
  372.   
  373.             def key_func(node):  
  374.                 timing, expires = self.node_timings.get(node['ip'], (-1.00))  
  375.                 return timing if expires > now else -1.0  
  376.             nodes.sort(key=key_func)  
  377.         elif self.sorting_method == 'affinity'#配置文件中排序方法为affinity时,以自定义的亲和力规则排序  
  378.             nodes.sort(key=self.read_affinity_sort_key)  
  379.         return nodes  
  380.   
  381.     def set_node_timing(self, node, timing):  
  382.         if self.sorting_method != 'timing':  
  383.             return  
  384.         now = time()  
  385.         timing = round(timing, 3)  # sort timings to the millisecond  
  386.         self.node_timings[node['ip']] = (timing, now + self.timing_expiry)  
  387.   
  388.     def error_limited(self, node):  
  389.         """ 
  390.         Check if the node is currently error limited. 
  391.  
  392.         :param node: dictionary of node to check 
  393.         :returns: True if error limited, False otherwise 
  394.         """  
  395.         now = time()  
  396.         if 'errors' not in node:  #errors没在node中时返回false  
  397.             return False  
  398.         if 'last_error' in node and node['last_error'] < \   #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔  
  399.                 now - self.error_suppression_interval:  
  400.             del node['last_error'#node去掉last_error  
  401.             if 'errors' in node: #errors在node中时返回 去掉errors,且返回false  
  402.                 del node['errors']  
  403.             return False  
  404.         limited = node['errors'] > self.error_suppression_limit  #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录  
  405.         if limited:  
  406.             self.logger.debug(  
  407.                 _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)  
  408.         return limited  
  409.   
  410.     def error_limit(self, node, msg):  
  411.         """ 
  412.         Mark a node as error limited. This immediately pretends the 
  413.         node received enough errors to trigger error suppression. Use 
  414.         this for errors like Insufficient Storage. For other errors 
  415.         use :func:`error_occurred`. 
  416.  
  417.         :param node: dictionary of node to error limit 
  418.         :param msg: error message 
  419.         """  
  420.         node['errors'] = self.error_suppression_limit + 1  
  421.         node['last_error'] = time()  
  422.         self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),  
  423.                           {'msg': msg, 'ip': node['ip'],  
  424.                           'port': node['port'], 'device': node['device']})  
  425.   
  426.     def error_occurred(self, node, msg):  
  427.         """ 
  428.         Handle logging, and handling of errors. 
  429.  
  430.         :param node: dictionary of node to handle errors for 
  431.         :param msg: error message 
  432.         """  
  433.         node['errors'] = node.get('errors'0) + 1  
  434.         node['last_error'] = time()  
  435.         self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),  
  436.                           {'msg': msg, 'ip': node['ip'],  
  437.                           'port': node['port'], 'device': node['device']})  
  438.   
  439.     def iter_nodes(self, ring, partition, node_iter=None):  
  440.         """ 
  441.         Yields nodes for a ring partition, skipping over error 
  442.         limited nodes and stopping at the configurable number of 
  443.         nodes. If a node yielded subsequently gets error limited, an 
  444.         extra node will be yielded to take its place. 
  445.  
  446.         Note that if you're going to iterate over this concurrently from 
  447.         multiple greenthreads, you'll want to use a 
  448.         swift.common.utils.GreenthreadSafeIterator to serialize access. 
  449.         Otherwise, you may get ValueErrors from concurrent access. (You also 
  450.         may not, depending on how logging is configured, the vagaries of 
  451.         socket IO and eventlet, and the phase of the moon.) 
  452.  
  453.         :param ring: ring to get yield nodes from 
  454.         :param partition: ring partition to yield nodes for 
  455.         :param node_iter: optional iterable of nodes to try. Useful if you 
  456.             want to filter or reorder the nodes. 
  457.         """  
  458.         part_nodes = ring.get_part_nodes(partition)  
  459.         if node_iter is None:  
  460.             node_iter = itertools.chain(part_nodes,  
  461.                                         ring.get_more_nodes(partition))  
  462.         num_primary_nodes = len(part_nodes)  
  463.   
  464.         # Use of list() here forcibly yanks the first N nodes (the primary  
  465.         # nodes) from node_iter, so the rest of its values are handoffs.  
  466.         primary_nodes = self.sort_nodes(  
  467.             list(itertools.islice(node_iter, num_primary_nodes)))  
  468.         handoff_nodes = node_iter  
  469.         nodes_left = self.request_node_count(len(primary_nodes))  
  470.   
  471.         for node in primary_nodes:  
  472.             if not self.error_limited(node):  
  473.                 yield node  
  474.                 if not self.error_limited(node):  
  475.                     nodes_left -= 1  
  476.                     if nodes_left <= 0:  
  477.                         return  
  478.         handoffs = 0  
  479.         for node in handoff_nodes:  
  480.             if not self.error_limited(node):  
  481.                 handoffs += 1  
  482.                 if self.log_handoffs:  
  483.                     self.logger.increment('handoff_count')  
  484.                     self.logger.warning(  
  485.                         'Handoff requested (%d)' % handoffs)  
  486.                     if handoffs == len(primary_nodes):  
  487.                         self.logger.increment('handoff_all_count')  
  488.                 yield node  
  489.                 if not self.error_limited(node):  
  490.                     nodes_left -= 1  
  491.                     if nodes_left <= 0:  
  492.                         return  
  493.   
  494.     def exception_occurred(self, node, typ, additional_info):  
  495.         """ 
  496.         Handle logging of generic exceptions. 
  497.  
  498.         :param node: dictionary of node to log the error for 
  499.         :param typ: server type 
  500.         :param additional_info: additional information to log 
  501.         """  
  502.         self.logger.exception(  
  503.             _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '  
  504.               '%(info)s'),  
  505.             {'type': typ, 'ip': node['ip'], 'port': node['port'],  
  506.              'device': node['device'], 'info': additional_info})  
  507.   
  508.     def modify_wsgi_pipeline(self, pipe):  
  509.         """ 
  510.         Called during WSGI pipeline creation. Modifies the WSGI pipeline 
  511.         context to ensure that mandatory middleware is present in the pipeline. 
  512.  
  513.         :param pipe: A PipelineWrapper object 
  514.         """  
  515.         pipeline_was_modified = False  
  516.         for filter_spec in reversed(required_filters):  #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理  
  517.             filter_name = filter_spec['name']  
  518.             if filter_name not in pipe:  
  519.                 afters = filter_spec.get('after_fn'lambda _junk: [])(pipe)  
  520.                 insert_at = 0  
  521.                 for after in afters:  
  522.                     try:  
  523.                         insert_at = max(insert_at, pipe.index(after) + 1)  
  524.                     except ValueError:  # not in pipeline; ignore it  
  525.                         pass  
  526.                 self.logger.info(  
  527.                     'Adding required filter %s to pipeline at position %d' %  
  528.                     (filter_name, insert_at))  
  529.                 ctx = pipe.create_filter(filter_name)  
  530.                 pipe.insert_filter(ctx, index=insert_at)  
  531.                 pipeline_was_modified = True  
  532.   
  533.         if pipeline_was_modified:    
  534.             self.logger.info("Pipeline was modified. New pipeline is \"%s\".",  
  535.                              pipe)  
  536.         else:  
  537.             self.logger.debug("Pipeline is \"%s\"", pipe)  
  538.   
  539.   
  540. def app_factory(global_conf, **local_conf):  
  541.     """paste.deploy app factory for creating WSGI proxy apps."""  
  542.     conf = global_conf.copy()  
  543.     conf.update(local_conf)  
  544.     app = Application(conf)  
  545.     app.check_config()  
  546.     return app  
2016-07-19 15:38:03 sinat_27186785 阅读数 1458

  Swift启用WSGI服务的事件循环队列pipeline: catch_errors, proxy-logging, cache, authtoken, keystone, (slo), proxy-server。通过proxy-server的服务入口点,实现请求的具体处理和响应。

  proxy-server服务入口点(/swift-kilo-eol/swift/proxy/server.py)

    def __call__(self, env, start_response):
        """
        WSGI 服务入口点

        :param env: WSGI 环境变量(字典类型)
        :param start_response: WSGI 可调用对象
        """

        # 获取环境变量env中的'swift.cache',若为None,直接报错返回
        # 若req.headers中只有'x-storage-token',则用req.headers的'x-auth-token'来更新
        # 进而调用handle_request进行具体的执行和转发
        try:
            if self.memcache is None:
                self.memcache = cache_from_env(env, True)
            req = self.update_request(Request(env))
            return self.handle_request(req)(env, start_response)
        except UnicodeError:
            err = HTTPPreconditionFailed(
                request=req, body='Invalid UTF8 or contains NULL')
            return err(env, start_response)
        except (Exception, Timeout):
            start_response('500 Server Error',
                           [('Content-Type', 'text/plain')])
            return ['Internal server error.\n']</span>


  事实上真正调用的是handle_request()来完成请求的具体处理:
1.根据req.path的信息返回对应的控制器类;

2.实例化具体的控制器类对象;

3.获取控制器类中由req.method指定的方法;

4.执行具体的方法并返回。

    def handle_request(self, req):
        """
        Entry point for proxy server.
        Should return a WSGI-style callable (such as swob.Response).

        :param req: swob.Request object
        """
        try:
            self.logger.set_statsd_prefix('proxy-server')
            if req.content_length and req.content_length < 0:
                self.logger.increment('errors')
                return HTTPBadRequest(request=req,
                                      body='Invalid Content-Length')

            try:
                if not check_utf8(req.path_info):
                    self.logger.increment('errors')
                    return HTTPPreconditionFailed(
                        request=req, body='Invalid UTF8 or contains NULL')
            except UnicodeError:
                self.logger.increment('errors')
                return HTTPPreconditionFailed(
                    request=req, body='Invalid UTF8 or contains NULL')

            try:
                # 根据req.path的信息返回对应的控制器类和字典(由版本、Account、Container和Object名组成)
                # 若req.path是/info,则返回InfoController;
                # 若req.path中account、container、object都存在,则返回ObjectController
                # 若req.path中只有account、container,则返回ObjectController
                # 若req.path中只有account,则返回AccountController

                controller, path_parts = self.get_controller(req)
                p = req.path_info
                if isinstance(p, unicode):
                    p = p.encode('utf-8')
            except APIVersionError:
                self.logger.increment('errors')
                return HTTPBadRequest(request=req)
            except ValueError:
                self.logger.increment('errors')
                return HTTPNotFound(request=req)
            if not controller:
                self.logger.increment('errors')
                return HTTPPreconditionFailed(request=req, body='Bad URL')
            if self.deny_host_headers and \
                    req.host.split(':')[0] in self.deny_host_headers:
                return HTTPForbidden(request=req, body='Invalid host header')

            self.logger.set_statsd_prefix('proxy-server.' +
                                          controller.server_type.lower())
            # 用get_controller返回的字典 实例化具体的控制器类对象
            controller = controller(self, **path_parts)
            if 'swift.trans_id' not in req.environ:
                # if this wasn't set by an earlier middleware, set it now
                trans_id_suffix = self.trans_id_suffix
                trans_id_extra = req.headers.get('x-trans-id-extra')
                if trans_id_extra:
                    trans_id_suffix += '-' + trans_id_extra[:32]
                trans_id = generate_trans_id(trans_id_suffix)
                req.environ['swift.trans_id'] = trans_id
                self.logger.txn_id = trans_id
            req.headers['x-trans-id'] = req.environ['swift.trans_id']
            controller.trans_id = req.environ['swift.trans_id']
            self.logger.client_ip = get_remote_client(req)
            try:
                # 获取具体控制器类中由request指定的方法
                handler = getattr(controller, req.method)
                getattr(handler, 'publicly_accessible')
            except AttributeError:
                allowed_methods = getattr(controller, 'allowed_methods', set())
                return HTTPMethodNotAllowed(
                    request=req, headers={'Allow': ', '.join(allowed_methods)})
            old_authorize = None
            if 'swift.authorize' in req.environ:
                # We call authorize before the handler, always. If authorized,
                # we remove the swift.authorize hook so isn't ever called
                # again. If not authorized, we return the denial unless the
                # controller's method indicates it'd like to gather more
                # information and try again later.
                resp = req.environ['swift.authorize'](req)
                if not resp and not req.headers.get('X-Copy-From-Account') \
                        and not req.headers.get('Destination-Account'):
                    # No resp means authorized, no delayed recheck required.
                    old_authorize = req.environ['swift.authorize']
                else:
                    # Response indicates denial, but we might delay the denial
                    # and recheck later. If not delayed, return the error now.
                    if not getattr(handler, 'delay_denial', None):
                        return resp
            # Save off original request method (GET, POST, etc.) in case it
            # gets mutated during handling.  This way logging can display the
            # method the client actually sent.
            req.environ['swift.orig_req_method'] = req.method
            try:
                if old_authorize:
                    req.environ.pop('swift.authorize', None)
                # 执行具体控制器类中由request指定的方法
                return handler(req)
            finally:
                if old_authorize:
                    req.environ['swift.authorize'] = old_authorize
        except HTTPException as error_response:
            return error_response
        except (Exception, Timeout):
            self.logger.exception(_('ERROR Unhandled exception in request'))
            return HTTPServerError(request=req)</span>

  具体调用过程:请求到达proxy-server的服务入口点之后,在handle_request()中获取具体的控制器类(AccountController), 接着调用/swift-kilo-eol/swift/controllers/account.py下面的GET/HEAD/PUT/POST/DELETE等方法实现与具体存储服务(AccountServer)的连接,进而调用具体的GET/HEAD/PUT/POST/DELETE等方法实现请求的处理和相应

  下述是GET/HEAD/PUT/POST/DELETE等方法在源码中的具体调用过程,详细过程已在源码中注释

/swift-kilo-eol/swift/proxy/controller/account.py

    def GETorHEAD(self, req):
        """Handler for HTTP GET/HEAD requests."""
        # 处理HTTP GET/HEAD请求
        if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:
            resp = HTTPBadRequest(request=req)
            resp.body = 'Account name length of %d longer than %d' % \
                        (len(self.account_name),
                         constraints.MAX_ACCOUNT_NAME_LENGTH)
            return resp

        # 调用get_part获取经过一致性Hash取值和移位之后的Object存放的分区号
        # 调用iter_nodes获取排除了HandOff以及Error的Object存放的节点号
        # 调用GETorHEAD_base处理HTTP GET/HEAD请求 返回swob.Response对象
        partition = self.app.account_ring.get_part(self.account_name)
        node_iter = self.app.iter_nodes(self.app.account_ring, partition)
        resp = self.GETorHEAD_base(
            req, _('Account'), node_iter, partition,
            req.swift_entity_path.rstrip('/'))
        if resp.status_int == HTTP_NOT_FOUND:
            if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
                resp.status = HTTP_GONE
            elif self.app.account_autocreate:
                resp = account_listing_response(self.account_name, req,
                                                get_listing_content_type(req))
        if req.environ.get('swift_owner'):
            self.add_acls_from_sys_metadata(resp)
        else:
            for header in self.app.swift_owner_headers:
                resp.headers.pop(header, None)
        return resp</span>

/swift-kilo-eol/swift/account/server.py

    def HEAD(self, req):
        """Handle HTTP HEAD request."""
        # 处理HEAD请求,返回Account的基本信息,以KV的形式保存在HEAD中

        drive, part, account = split_and_validate_path(req, 3)
        out_content_type = get_listing_content_type(req)

        # 进行mount检查
        if self.mount_check and not check_mount(self.root, drive):
            return HTTPInsufficientStorage(drive=drive, request=req)
        # 返回一个AccountBroker实例,用于对sqlite数据的查询操作
        broker = self._get_account_broker(drive, part, account,
                                          pending_timeout=0.1,
                                          stale_reads_ok=True)
        if broker.is_deleted():
            return self._deleted_response(broker, req, HTTPNotFound)
        # get_response_headers内部调用get_info()获取Account的基本信息,并更新res的HEAD('X-Account-Container-Count','X-Account-Object-Count': info['object_count'],
        # 'X-Account-Bytes-Used', 'X-Timestamp', 'X-PUT-Timestamp')
        headers = get_response_headers(broker)
        headers['Content-Type'] = out_content_type
        return HTTPNoContent(request=req, headers=headers, charset='utf-8')</span>

/swift-kilo-eol/swift/account/server.py

    def GET(self, req):
        """Handle HTTP GET request."""
        # 处理GET请求,返回Account的基本信息,以KV的形式保存在HEAD中,但与HEAD不一样的是GET方法中获取了
        # 指定Account下的Container列表,存储在Body中
        # 调用机制与HEAD类似

        drive, part, account = split_and_validate_path(req, 3)
        prefix = get_param(req, 'prefix')
        delimiter = get_param(req, 'delimiter')
        if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
            # delimiters can be made more flexible later
            return HTTPPreconditionFailed(body='Bad delimiter')
        limit = constraints.ACCOUNT_LISTING_LIMIT
        given_limit = get_param(req, 'limit')
        if given_limit and given_limit.isdigit():
            limit = int(given_limit)
            if limit > constraints.ACCOUNT_LISTING_LIMIT:
                return HTTPPreconditionFailed(
                    request=req,
                    body='Maximum limit is %d' %
                    constraints.ACCOUNT_LISTING_LIMIT)
        marker = get_param(req, 'marker', '')
        end_marker = get_param(req, 'end_marker')
        out_content_type = get_listing_content_type(req)

        if self.mount_check and not check_mount(self.root, drive):
            return HTTPInsufficientStorage(drive=drive, request=req)
        broker = self._get_account_broker(drive, part, account,
                                          pending_timeout=0.1,
                                          stale_reads_ok=True)
        if broker.is_deleted():
            return self._deleted_response(broker, req, HTTPNotFound)

        # 获取Account对应的Container列表,每个Container信息包括(name, object_count, bytes_used, 0),以list形式返回
        # 以不同的形式返回Account对应的Container列表,最普通的是放在body中
        return account_listing_response(account, req, out_content_type, broker,
                                        limit, marker, end_marker, prefix,
                                        delimiter)</span>


def account_listing_response(account, req, response_content_type, broker=None,
                             limit='', marker='', end_marker='', prefix='',
                             delimiter=''):
    if broker is None:
        broker = FakeAccountBroker()

    resp_headers = get_response_headers(broker)

    # 获取Account对应的Container列表,每个Container信息包括(name, object_count, bytes_used, 0),以list形式返回
    account_list = broker.list_containers_iter(limit, marker, end_marker,
                                               prefix, delimiter)

    # 以不同的形式返回Account对应的Container列表,最普通的是放在body中
    if response_content_type == 'application/json':
        data = []
        for (name, object_count, bytes_used, is_subdir) in account_list:
            if is_subdir:
                data.append({'subdir': name})
            else:
                data.append({'name': name, 'count': object_count,
                             'bytes': bytes_used})
        account_list = json.dumps(data)
    elif response_content_type.endswith('/xml'):
        output_list = ['<?xml version="1.0" encoding="UTF-8"?>',
                       '<account name=%s>' % saxutils.quoteattr(account)]
        for (name, object_count, bytes_used, is_subdir) in account_list:
            if is_subdir:
                output_list.append(
                    '<subdir name=%s />' % saxutils.quoteattr(name))
            else:
                item = '<container><name>%s</name><count>%s</count>' \
                       '<bytes>%s</bytes></container>' % \
                       (saxutils.escape(name), object_count, bytes_used)
                output_list.append(item)
        output_list.append('</account>')
        account_list = '\n'.join(output_list)
    else:
        if not account_list:
            resp = HTTPNoContent(request=req, headers=resp_headers)
            resp.content_type = response_content_type
            resp.charset = 'utf-8'
            return resp
        account_list = '\n'.join(r[0] for r in account_list) + '\n'
    ret = HTTPOk(body=account_list, request=req, headers=resp_headers)
    ret.content_type = response_content_type
    ret.charset = 'utf-8'
    return ret</span>


/swift-kilo-eol/swift/proxy/controller/account.py

    def PUT(self, req):
        """HTTP PUT request handler."""
        # 处理HTTP PUT请求
        if not self.app.allow_account_management:
            return HTTPMethodNotAllowed(
                request=req,
                headers={'Allow': ', '.join(self.allowed_methods)})
        error_response = check_metadata(req, 'account')
        if error_response:
            return error_response
        if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:
            resp = HTTPBadRequest(request=req)
            resp.body = 'Account name length of %d longer than %d' % \
                        (len(self.account_name),
                         constraints.MAX_ACCOUNT_NAME_LENGTH)
            return resp
        # 获取Account对应的分区号,及其对应的节点号(节点以元祖形式返回)
        account_partition, accounts = \
            self.app.account_ring.get_nodes(self.account_name)
        # 根据原始请求头信息调用generate_request_headers生成新格式的请求头
        headers = self.generate_request_headers(req, transfer=True)
        clear_info_cache(self.app, req.environ, self.account_name)
        # 调用make_requests,迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息
        resp = self.make_requests(
            req, self.app.account_ring, account_partition, 'PUT',
            req.swift_entity_path, [headers] * len(accounts))
        self.add_acls_from_sys_metadata(resp)
        return resp</span>


    def make_requests(self, req, ring, part, method, path, headers,
                      query_string='', overrides=None):
        """
        Sends an HTTP request to multiple nodes and aggregates the results.
        It attempts the primary nodes concurrently, then iterates over the
        handoff nodes as needed.

        :param req: a request sent by the client
        :param ring: the ring used for finding backend servers
        :param part: the partition number
        :param method: the method to send to the backend
        :param path: the path to send to the backend
                     (full path ends up being  /<$device>/<$part>/<$path>)
        :param headers: a list of dicts, where each dict represents one
                        backend request that should be made.
        :param query_string: optional query string to send to the backend
        :param overrides: optional return status override map used to override
                          the returned status of a request.
        :returns: a swob.Response object
        """
        # 迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息

        # 调用get_part_nodes返回分区号对应的所有分区号
        start_nodes = ring.get_part_nodes(part)
        nodes = GreenthreadSafeIterator(self.app.iter_nodes(ring, part))
        # 创建协程池
        pile = GreenAsyncPile(len(start_nodes))
        for head in headers:
            # 从协程池中获取一个协程发送请求到一个远程节点(根据选定的备份数)
            pile.spawn(self._make_request, nodes, part, method, path,
                       head, query_string, self.app.logger.thread_locals)
        response = []
        statuses = []
        for resp in pile:
            if not resp:
                continue
            response.append(resp)
            statuses.append(resp[0])
            if self.have_quorum(statuses, len(start_nodes)):
                break
        # give any pending requests *some* chance to finish
        # 等到所有请求都返回
        finished_quickly = pile.waitall(self.app.post_quorum_timeout)
        for resp in finished_quickly:
            if not resp:
                continue
            response.append(resp)
            statuses.append(resp[0])
        while len(response) < len(start_nodes):
            response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
        statuses, reasons, resp_headers, bodies = zip(*response)
        # 通过投票机制返回最佳响应信息
        return self.best_response(req, statuses, reasons, bodies,
                                  '%s %s' % (self.server_type, req.method),
                                  overrides=overrides, headers=resp_headers)


    def best_response(self, req, statuses, reasons, bodies, server_type,
                      etag=None, headers=None, overrides=None,
                      quorum_size=None):
        """
        Given a list of responses from several servers, choose the best to
        return to the API.

        :param req: swob.Request object
        :param statuses: list of statuses returned
        :param reasons: list of reasons for each status
        :param bodies: bodies of each response
        :param server_type: type of server the responses came from
        :param etag: etag
        :param headers: headers of each response
        :param overrides: overrides to apply when lacking quorum
        :param quorum_size: quorum size to use
        :returns: swob.Response object with the correct status, body, etc. set
        """
        # 调用_compute_quorum_response,根据Response的状态等选出最佳响应

        if quorum_size is None:
            quorum_size = self._quorum_size(len(statuses))

        resp = self._compute_quorum_response(
            req, statuses, reasons, bodies, etag, headers,
            quorum_size=quorum_size)
        if overrides and not resp:
            faked_up_status_indices = set()
            transformed = []
            for (i, (status, reason, hdrs, body)) in enumerate(zip(
                    statuses, reasons, headers, bodies)):
                if status in overrides:
                    faked_up_status_indices.add(i)
                    transformed.append((overrides[status], '', '', ''))
                else:
                    transformed.append((status, reason, hdrs, body))
            statuses, reasons, headers, bodies = zip(*transformed)
            resp = self._compute_quorum_response(
                req, statuses, reasons, bodies, etag, headers,
                indices_to_avoid=faked_up_status_indices,
                quorum_size=quorum_size)

        if not resp:
            resp = Response(request=req)
            self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'),
                                  {'type': server_type, 'statuses': statuses})
            resp.status = '503 Internal Server Error'

        return resp

/swift-kilo-eol/swift/account/server.py

    def PUT(self, req):
        """Handle HTTP PUT request."""
        # 处理PUT请求
        # 若url中包括<Container>,新建数据库中的Container信息
        # 若url中不包括<Container>,更新数据库中的Account的metadata信息
        drive, part, account, container = split_and_validate_path(req, 3, 4)
        if self.mount_check and not check_mount(self.root, drive):
            return HTTPInsufficientStorage(drive=drive, request=req)
        if container:   # put account container
            if 'x-timestamp' not in req.headers:
                timestamp = Timestamp(time.time())
            else:
                timestamp = valid_timestamp(req)
            pending_timeout = None
            container_policy_index = \
                req.headers.get('X-Backend-Storage-Policy-Index', 0)
            if 'x-trans-id' in req.headers:
                pending_timeout = 3
            # 调用_get_account_broker返回AccountBroker实例
            broker = self._get_account_broker(drive, part, account,
                                              pending_timeout=pending_timeout)
            # 检查account对应的数据库不存在,则初始化AccountBroker的数据库
            if account.startswith(self.auto_create_account_prefix) and \
                    not os.path.exists(broker.db_file):
                try:
                    broker.initialize(timestamp.internal)
                except DatabaseAlreadyExists:
                    pass
            if req.headers.get('x-account-override-deleted', 'no').lower() != \
                    'yes' and broker.is_deleted():
                return HTTPNotFound(request=req)
            # 将Container的信息存入数据库
            broker.put_container(container, req.headers['x-put-timestamp'],
                                 req.headers['x-delete-timestamp'],
                                 req.headers['x-object-count'],
                                 req.headers['x-bytes-used'],
                                 container_policy_index)
            if req.headers['x-delete-timestamp'] > \
                    req.headers['x-put-timestamp']:
                return HTTPNoContent(request=req)
            else:
                return HTTPCreated(request=req)
        else:   # put account
            timestamp = valid_timestamp(req)
            broker = self._get_account_broker(drive, part, account)
            if not os.path.exists(broker.db_file):
                try:
                    broker.initialize(timestamp.internal)
                    created = True
                except DatabaseAlreadyExists:
                    created = False
            # 检查Account是否被标记为删除
            elif broker.is_status_deleted():
                return self._deleted_response(broker, req, HTTPForbidden,
                                              body='Recently deleted')
            # 检查Account是否已被删除
            else:
                created = broker.is_deleted()
                broker.update_put_timestamp(timestamp.internal)
                if broker.is_deleted():
                    return HTTPConflict(request=req)
            # 更新数据库中的Account 的metadata信息
            metadata = {}
            metadata.update((key, (value, timestamp.internal))
                            for key, value in req.headers.iteritems()
                            if is_sys_or_user_meta('account', key))
            if metadata:
                broker.update_metadata(metadata, validate_metadata=True)
            if created:
                return HTTPCreated(request=req)
            else:
                return HTTPAccepted(request=req)

/swift-kilo-eol/swift/proxy/controller/account.py

    def POST(self, req):
        """HTTP POST request handler."""
        # 处理HTTP POST请求
        if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:
            resp = HTTPBadRequest(request=req)
            resp.body = 'Account name length of %d longer than %d' % \
                        (len(self.account_name),
                         constraints.MAX_ACCOUNT_NAME_LENGTH)
            return resp
        error_response = check_metadata(req, 'account')
        if error_response:
            return error_response
        # 获取Account对应的分区号,及其对应的节点号
        account_partition, accounts = \
            self.app.account_ring.get_nodes(self.account_name)
        # 根据原始请求头信息调用generate_request_headers生成新格式的请求头
        headers = self.generate_request_headers(req, transfer=True)
        clear_info_cache(self.app, req.environ, self.account_name)
        # 调用make_requests,迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息
        resp = self.make_requests(
            req, self.app.account_ring, account_partition, 'POST',
            req.swift_entity_path, [headers] * len(accounts))
        # 如果指定的account_name不存在,则先新建Account,再调用make_requests
        if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
            self.autocreate_account(req, self.account_name)
            resp = self.make_requests(
                req, self.app.account_ring, account_partition, 'POST',
                req.swift_entity_path, [headers] * len(accounts))
        self.add_acls_from_sys_metadata(resp)
        return resp

/swift-kilo-eol/swift/account/server.py

    def POST(self, req):
        """Handle HTTP POST request."""
        # 处理POST请求
        # 更新Account的matadata
        drive, part, account = split_and_validate_path(req, 3)
        req_timestamp = valid_timestamp(req)
        if self.mount_check and not check_mount(self.root, drive):
            return HTTPInsufficientStorage(drive=drive, request=req)
        # 调用_get_account_broker返回AccountBroker实例
        broker = self._get_account_broker(drive, part, account)
        if broker.is_deleted():
            return self._deleted_response(broker, req, HTTPNotFound)
        # 利用Head中的metadata更新数据库中Account的metadata
        metadata = {}
        metadata.update((key, (value, req_timestamp.internal))
                        for key, value in req.headers.iteritems()
                        if is_sys_or_user_meta('account', key))
        if metadata:
            broker.update_metadata(metadata, validate_metadata=True)
        return HTTPNoContent(request=req)


/swift-kilo-eol/swift/proxy/controller/account.py

    def DELETE(self, req):
        """HTTP DELETE request handler."""
        # 处理HTTP DELETE请求
        # Extra safety in case someone typos a query string for an
        # account-level DELETE request that was really meant to be caught by
        # some middleware.
        if req.query_string:
            return HTTPBadRequest(request=req)
        if not self.app.allow_account_management:
            return HTTPMethodNotAllowed(
                request=req,
                headers={'Allow': ', '.join(self.allowed_methods)})
        # 获取Account对应的分区号,及其对应的节点号
        account_partition, accounts = \
            self.app.account_ring.get_nodes(self.account_name)
        # 根据原始请求头信息调用generate_request_headers生成新格式的请求头
        headers = self.generate_request_headers(req)
        clear_info_cache(self.app, req.environ, self.account_name)
        # 调用make_requests
        resp = self.make_requests(
            req, self.app.account_ring, account_partition, 'DELETE',
            req.swift_entity_path, [headers] * len(accounts))
        return resp

/swift-kilo-eol/swift/account/server.py

    def DELETE(self, req):
        """Handle HTTP DELETE request."""
        # 处理DELETE请求
        drive, part, account = split_and_validate_path(req, 3)
        if self.mount_check and not check_mount(self.root, drive):
            return HTTPInsufficientStorage(drive=drive, request=req)
        req_timestamp = valid_timestamp(req)
        broker = self._get_account_broker(drive, part, account)
        # 检查当前Account是否已经被删除,如果已经被删除则直接返回
        if broker.is_deleted():
            return self._deleted_response(broker, req, HTTPNotFound)
        # 将数据库中当前Account标记为删除状态,由AccountReaper服务来完成真正的清理工作
        broker.delete_db(req_timestamp.internal)
        return self._deleted_response(broker, req, HTTPNoContent)



  对于ContainerController和ObjectController的调用过程类似,详情有空再分析。本文还有许多可改进的地方,比如有些具体方法调用过程分析地不够详细


                                    
2014-04-14 13:26:21 u014419512 阅读数 78

          OpenStack Swift作为开源的云存储工具,被越来越多的公司使用。为了记录和巩固学习swift的开源源码,所以进行一系列的源码开源学习笔记,供初学者快速学习和理解swift的内部功能。

          proxy下面的server.py模块是所有对account,container,object等对象进行管理操作的在swift的proxy端的总入口。在swift系统在接收到url请求后,先是经过middleware处理链分别验证处理后,再进入到proxy下面的server模块,进入 _call_方法调用后,把对应的请求分发给不同的controller处理,controller再调用各自的nodeStroage服务程序进行处理,返回各自的resp结果到server模块,最后通过mimmdleware处理链再反方向返回最终的请求处理结果

 

     1.首先server.py的整个结构如下图:包括4部分:一堆import引用,一个required_filters的字典,一个名为“Application(object)”的class ,一个app_factory(global_conf, **local_conf) 方法

   

     2.主要介绍“Application(object)”的class,其中包含了所有主要的功能方法

   

 

 2.1 _init_ 方法,Application类的初始化方法,主要就是初始化一些对象,包括:conf配置文件参数 的初始化,log日志初始化,memcache对象初始化,account_ring,container_ring, object_ring对象初始化等

 

2.2  check_config(self) 方法,主要是检查配置文件proxy-server.conf中配置的“read_affinity” 和“sorting_method”属性值是否正确,该方法在 app_factory(global_conf, **local_conf):方法时调用  

   

2.3 get_controller(self, path)方法,主要是根据传入的 urlPath解析并返回对应的control类和一个字典对象,其中字典对象的值根据传入url格式的不同返回不同的值

 

[python] view plaincopy
  1. def get_controller(self, path):  
  2.         """ 
  3.         Get the controller to handle a request. 
  4.  
  5.         :param path: path from request 
  6.         :returns: tuple of (controller class, path dictionary) 
  7.  
  8.         :raises: ValueError (thrown by split_path) if given invalid path 
  9.         """  
  10.         if path == '/info':  #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典  
  11.             d = dict(version=None,  
  12.                      expose_info=self.expose_info,  
  13.                      disallowed_sections=self.disallowed_sections,  
  14.                      admin_key=self.admin_key)  
  15.             return InfoController, d  
  16.   
  17.         version, account, container, obj = split_path(path, 14True)  #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量  
  18.         d = dict(version=version,  
  19.                  account_name=account,  
  20.                  container_name=container,  
  21.                  object_name=obj)  
  22.         if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种  
  23.             return ObjectController, d  
  24.         elif container and account:  
  25.             return ContainerController, d  
  26.         elif account and not container and not obj:  
  27.             return AccountController, d  
  28.         return None, d  


 

 

2.4  __call__(self, env, start_response)方法,是server模块的实际对account、container、object等对象调用处理的功能入口。

 

[python] view plaincopy
  1. def __call__(self, env, start_response):  
  2.         """ 
  3.         WSGI entry point. 
  4.         Wraps env in swob.Request object and passes it down. 
  5.  
  6.         :param env: WSGI environment dictionary 
  7.         :param start_response: WSGI callable 
  8.         """  
  9.         try:  
  10.             if self.memcache is None#首先判断是否memcache值存在,不存在再去获取一次  
  11.                 self.memcache = cache_from_env(env)  
  12.             req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token  
  13.             return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象  
  14.         except UnicodeError:  
  15.             err = HTTPPreconditionFailed(  
  16.                 request=req, body='Invalid UTF8 or contains NULL')  
  17.             return err(env, start_response)  
  18.         except (Exception, Timeout):  
  19.             start_response('500 Server Error',  
  20.                            [('Content-Type''text/plain')])  
  21.             return ['Internal server error.\n']  


2.5 update_request(self, req)方法,根据请求中header里面的x-storage-token有而x-auth-token没有的情况,把x-storage-token的值赋予x-auth-token

 


2.6 handle_request(self, req)方法,server模块实际处理request请求的方法,熟悉servlet的同学可以把它理解成servlet的作用

 

[python] view plaincopy
  1. def handle_request(self, req):  
  2.         """ 
  3.         Entry point for proxy server. 
  4.         Should return a WSGI-style callable (such as swob.Response). 
  5.  
  6.         :param req: swob.Request object 
  7.         """  
  8.         try:  
  9.             self.logger.set_statsd_prefix('proxy-server'#在日志的开头加上‘proxy-server’,方便跟踪分析  
  10.             if req.content_length and req.content_length < 0#检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录  
  11.                 self.logger.increment('errors')  
  12.                 return HTTPBadRequest(request=req,  
  13.                                       body='Invalid Content-Length')  
  14.   
  15.             try:  
  16.                 if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录  
  17.                     self.logger.increment('errors')  
  18.                     return HTTPPreconditionFailed(  
  19.                         request=req, body='Invalid UTF8 or contains NULL')  
  20.             except UnicodeError:  
  21.                 self.logger.increment('errors')  
  22.                 return HTTPPreconditionFailed(  
  23.                     request=req, body='Invalid UTF8 or contains NULL')  
  24.   
  25.             try:  
  26.                 controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象  
  27.                 p = req.path_info  
  28.                 if isinstance(p, unicode):  
  29.                     p = p.encode('utf-8'#path编码Unicode转换utf-8  
  30.             except ValueError:           #发生值异常,返回错误请求,并日志记录  
  31.                 self.logger.increment('errors')  
  32.                 return HTTPNotFound(request=req)  
  33.             if not controller:         #为找到对应处理的controller类时,返回错误请求,并日志记录  
  34.                 self.logger.increment('errors')  
  35.                 return HTTPPreconditionFailed(request=req, body='Bad URL')  
  36.             if self.deny_host_headers and \    #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录  
  37.                     req.host.split(':')[0in self.deny_host_headers:  
  38.                 return HTTPForbidden(request=req, body='Invalid host header')  
  39.   
  40.             self.logger.set_statsd_prefix('proxy-server.' +  
  41.                                           controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析  
  42.             controller = controller(self, **path_parts)  #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)  
  43.             if 'swift.trans_id' not in req.environ:    #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID  
  44.                 # if this wasn't set by an earlier middleware, set it now  
  45.                 trans_id = generate_trans_id(self.trans_id_suffix)  
  46.                 req.environ['swift.trans_id'] = trans_id  
  47.                 self.logger.txn_id = trans_id  
  48.             req.headers['x-trans-id'] = req.environ['swift.trans_id']  
  49.             controller.trans_id = req.environ['swift.trans_id']  
  50.             self.logger.client_ip = get_remote_client(req)  #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析  
  51.             try:  
  52.                 handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)  
  53.                 getattr(handler, 'publicly_accessible'#再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)  
  54.             except AttributeError:  
  55.                 allowed_methods = getattr(controller, 'allowed_methods', set())  
  56.                 return HTTPMethodNotAllowed(  
  57.                     request=req, headers={'Allow'', '.join(allowed_methods)})  
  58.             if 'swift.authorize' in req.environ: #做鉴权操作  
  59.                 # We call authorize before the handler, always. If authorized,  
  60.                 # we remove the swift.authorize hook so isn't ever called  
  61.                 # again. If not authorized, we return the denial unless the  
  62.                 # controller's method indicates it'd like to gather more  
  63.                 # information and try again later.  
  64.                 resp = req.environ['swift.authorize'](req)  
  65.                 if not resp:  
  66.                     # No resp means authorized, no delayed recheck required.  
  67.                     del req.environ['swift.authorize']  
  68.                 else:  
  69.                     # Response indicates denial, but we might delay the denial  
  70.                     # and recheck later. If not delayed, return the error now.  
  71.                     if not getattr(handler, 'delay_denial'None):  
  72.                         return resp  
  73.             # Save off original request method (GET, POST, etc.) in case it  
  74.             # gets mutated during handling.  This way logging can display the  
  75.             # method the client actually sent.  
  76.             req.environ['swift.orig_req_method'] = req.method  
  77.             return handler(req)    #调用最终的method方法,并返回resp结果  
  78.         except HTTPException as error_response:  
  79.             return error_response  
  80.         except (Exception, Timeout):  
  81.             self.logger.exception(_('ERROR Unhandled exception in request'))  
  82.             return HTTPServerError(request=req)  

 

2.7 sort_nodes(self, nodes)方法,对nodes对象进行排序处理,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用

 

[python] view plaincopy
  1. def sort_nodes(self, nodes):  
  2.         ''''' 
  3.         Sorts nodes in-place (and returns the sorted list) according to 
  4.         the configured strategy. The default "sorting" is to randomly 
  5.         shuffle the nodes. If the "timing" strategy is chosen, the nodes 
  6.         are sorted according to the stored timing data. 
  7.         '''  
  8.         # In the case of timing sorting, shuffling ensures that close timings  
  9.         # (ie within the rounding resolution) won't prefer one over another.  
  10.         # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)  
  11.         shuffle(nodes)  
  12.         if self.sorting_method == 'timing':  #配置文件中排序方法为timing时,以时间排序  
  13.             now = time()  
  14.   
  15.             def key_func(node):  
  16.                 timing, expires = self.node_timings.get(node['ip'], (-1.00))  
  17.                 return timing if expires > now else -1.0  
  18.             nodes.sort(key=key_func)  
  19.         elif self.sorting_method == 'affinity'#配置文件中排序方法为affinity时,以自定义的亲和力规则排序  
  20.             nodes.sort(key=self.read_affinity_sort_key)  
  21.         return nodes  

 

2.8 set_node_timing(self, node, timing)方法,提供给外部程序调用

 

2.9    error_limited(self, node)方法,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用

 

[python] view plaincopy
  1. def error_limited(self, node):  
  2.         """ 
  3.         Check if the node is currently error limited. 
  4.  
  5.         :param node: dictionary of node to check 
  6.         :returns: True if error limited, False otherwise 
  7.         """  
  8.         now = time()  
  9.         if 'errors' not in node:  #errors没在node中时返回false  
  10.             return False  
  11.         if 'last_error' in node and node['last_error'] < \   #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔  
  12.                 now - self.error_suppression_interval:  
  13.             del node['last_error'#node去掉last_error  
  14.             if 'errors' in node: #errors在node中时返回 去掉errors,且返回false  
  15.                 del node['errors']  
  16.             return False  
  17.         limited = node['errors'] > self.error_suppression_limit  #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录  
  18.         if limited:  
  19.             self.logger.debug(  
  20.                 _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)  
  21.         return limited  

 

 

2.10  error_limit(self, node, msg)方法,提供给外部程序调用,用于给node直接增加errors到比系统允许的次数+1,并记录last_error时间,和做日志记录

 

2.11  error_limit(self, node, msg)方法,提供给外部程序调用,用于给node增加errors次数,并记录last_error时间,和做日志记录

 

2.12  iter_nodes(self, ring, partition, node_iter=None)方法,提供给外部程序调用,用于对nodes做排序后生成的nodes迭代器

 

2.13  exception_occurred(self, node, typ, additional_info)方法,提供给外部程序调用,用于当node发生异常了,进行日志记录

2.14  modify_wsgi_pipeline(self, pipe)方法,提供给外部程序调用,用于系统启动时,初始化pipeline,并做日志记录

 

[python] view plaincopy
  1. def modify_wsgi_pipeline(self, pipe):  
  2.         """ 
  3.         Called during WSGI pipeline creation. Modifies the WSGI pipeline 
  4.         context to ensure that mandatory middleware is present in the pipeline. 
  5.  
  6.         :param pipe: A PipelineWrapper object 
  7.         """  
  8.         pipeline_was_modified = False  
  9.         for filter_spec in reversed(required_filters):  #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理  
  10.             filter_name = filter_spec['name']  
  11.             if filter_name not in pipe:  
  12.                 afters = filter_spec.get('after_fn'lambda _junk: [])(pipe)  
  13.                 insert_at = 0  
  14.                 for after in afters:  
  15.                     try:  
  16.                         insert_at = max(insert_at, pipe.index(after) + 1)  
  17.                     except ValueError:  # not in pipeline; ignore it  
  18.                         pass  
  19.                 self.logger.info(  
  20.                     'Adding required filter %s to pipeline at position %d' %  
  21.                     (filter_name, insert_at))  
  22.                 ctx = pipe.create_filter(filter_name)  
  23.                 pipe.insert_filter(ctx, index=insert_at)  
  24.                 pipeline_was_modified = True  
  25.   
  26.         if pipeline_was_modified:    
  27.             self.logger.info("Pipeline was modified. New pipeline is \"%s\".",  
  28.                              pipe)  
  29.         else:  
  30.             self.logger.debug("Pipeline is \"%s\"", pipe)  


 

以下源码为2014、3、12的最新的Proxy的server.py源码,只加了部分代码注释:

 

[python] view plaincopy
  1. # Copyright (c) 2010-2012 OpenStack Foundation  
  2. #  
  3. # Licensed under the Apache License, Version 2.0 (the "License");  
  4. # you may not use this file except in compliance with the License.  
  5. # You may obtain a copy of the License at  
  6. #  
  7. #    http://www.apache.org/licenses/LICENSE-2.0  
  8. #  
  9. # Unless required by applicable law or agreed to in writing, software  
  10. # distributed under the License is distributed on an "AS IS" BASIS,  
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or  
  12. # implied.  
  13. # See the License for the specific language governing permissions and  
  14. # limitations under the License.  
  15.   
  16. import mimetypes  
  17. import os  
  18. import socket  
  19. from swift import gettext_ as _  
  20. from random import shuffle  
  21. from time import time  
  22. import itertools  
  23.   
  24. from eventlet import Timeout  
  25.   
  26. from swift import __canonical_version__ as swift_version  
  27. from swift.common import constraints  
  28. from swift.common.ring import Ring  
  29. from swift.common.utils import cache_from_env, get_logger, \  
  30.     get_remote_client, split_path, config_true_value, generate_trans_id, \  
  31.     affinity_key_function, affinity_locality_predicate, list_from_csv, \  
  32.     register_swift_info  
  33. from swift.common.constraints import check_utf8  
  34. from swift.proxy.controllers import AccountController, ObjectController, \  
  35.     ContainerController, InfoController  
  36. from swift.common.swob import HTTPBadRequest, HTTPForbidden, \  
  37.     HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \  
  38.     HTTPServerError, HTTPException, Request  
  39.   
  40.   
  41. # List of entry points for mandatory middlewares.  
  42. #  
  43. # Fields:  
  44. #  
  45. # "name" (required) is the entry point name from setup.py.  
  46. #  
  47. # "after_fn" (optional) a function that takes a PipelineWrapper object as its  
  48. # single argument and returns a list of middlewares that this middleware  
  49. # should come after. Any middlewares in the returned list that are not present  
  50. # in the pipeline will be ignored, so you can safely name optional middlewares  
  51. # to come after. For example, ["catch_errors", "bulk"] would install this  
  52. # middleware after catch_errors and bulk if both were present, but if bulk  
  53. # were absent, would just install it after catch_errors.  
  54.   
  55. required_filters = [  
  56.     {'name''catch_errors'},  
  57.     {'name''gatekeeper',  
  58.      'after_fn'lambda pipe: (['catch_errors']  
  59.                                if pipe.startswith("catch_errors")  
  60.                                else [])},  
  61.     {'name''dlo''after_fn'lambda _junk: ['catch_errors''gatekeeper',  
  62.                                                'proxy_logging']}]  
  63.   
  64.   
  65. class Application(object):  
  66.     """WSGI application for the proxy server."""  
  67.   
  68.     def __init__(self, conf, memcache=None, logger=None, account_ring=None,  
  69.                  container_ring=None, object_ring=None):  
  70.         if conf is None:  
  71.             conf = {}  
  72.         if logger is None:  
  73.             self.logger = get_logger(conf, log_route='proxy-server')  
  74.         else:  
  75.             self.logger = logger  
  76.   
  77.         swift_dir = conf.get('swift_dir''/etc/swift')  
  78.         self.node_timeout = int(conf.get('node_timeout'10))  
  79.         self.recoverable_node_timeout = int(  
  80.             conf.get('recoverable_node_timeout'self.node_timeout))  
  81.         self.conn_timeout = float(conf.get('conn_timeout'0.5))  
  82.         self.client_timeout = int(conf.get('client_timeout'60))  
  83.         self.put_queue_depth = int(conf.get('put_queue_depth'10))  
  84.         self.object_chunk_size = int(conf.get('object_chunk_size'65536))  
  85.         self.client_chunk_size = int(conf.get('client_chunk_size'65536))  
  86.         self.trans_id_suffix = conf.get('trans_id_suffix''')  
  87.         self.post_quorum_timeout = float(conf.get('post_quorum_timeout'0.5))  
  88.         self.error_suppression_interval = \  
  89.             int(conf.get('error_suppression_interval'60))  
  90.         self.error_suppression_limit = \  
  91.             int(conf.get('error_suppression_limit'10))  
  92.         self.recheck_container_existence = \  
  93.             int(conf.get('recheck_container_existence'60))  
  94.         self.recheck_account_existence = \  
  95.             int(conf.get('recheck_account_existence'60))  
  96.         self.allow_account_management = \  
  97.             config_true_value(conf.get('allow_account_management''no'))  
  98.         self.object_post_as_copy = \  
  99.             config_true_value(conf.get('object_post_as_copy''true'))  
  100.         self.object_ring = object_ring or Ring(swift_dir, ring_name='object')  
  101.         self.container_ring = container_ring or Ring(swift_dir,  
  102.                                                      ring_name='container')  
  103.         self.account_ring = account_ring or Ring(swift_dir,  
  104.                                                  ring_name='account')  
  105.         self.memcache = memcache  
  106.         mimetypes.init(mimetypes.knownfiles +  
  107.                        [os.path.join(swift_dir, 'mime.types')])  
  108.         self.account_autocreate = \  
  109.             config_true_value(conf.get('account_autocreate''no'))  
  110.         self.expiring_objects_account = \  
  111.             (conf.get('auto_create_account_prefix'or '.') + \  
  112.             (conf.get('expiring_objects_account_name'or 'expiring_objects')  
  113.         self.expiring_objects_container_divisor = \  
  114.             int(conf.get('expiring_objects_container_divisor'or 86400)  
  115.         self.max_containers_per_account = \  
  116.             int(conf.get('max_containers_per_account'or 0)  
  117.         self.max_containers_whitelist = [  
  118.             a.strip()  
  119.             for a in conf.get('max_containers_whitelist''').split(',')  
  120.             if a.strip()]  
  121.         self.deny_host_headers = [  
  122.             host.strip() for host in  
  123.             conf.get('deny_host_headers''').split(','if host.strip()]  
  124.         self.rate_limit_after_segment = \  
  125.             int(conf.get('rate_limit_after_segment'10))  
  126.         self.rate_limit_segments_per_sec = \  
  127.             int(conf.get('rate_limit_segments_per_sec'1))  
  128.         self.log_handoffs = config_true_value(conf.get('log_handoffs''true'))  
  129.         self.cors_allow_origin = [  
  130.             a.strip()  
  131.             for a in conf.get('cors_allow_origin''').split(',')  
  132.             if a.strip()]  
  133.         self.strict_cors_mode = config_true_value(  
  134.             conf.get('strict_cors_mode''t'))  
  135.         self.node_timings = {}  
  136.         self.timing_expiry = int(conf.get('timing_expiry'300))  
  137.         self.sorting_method = conf.get('sorting_method''shuffle').lower()  
  138.         self.max_large_object_get_time = float(  
  139.             conf.get('max_large_object_get_time''86400'))  
  140.         value = conf.get('request_node_count''2 * replicas').lower().split()  
  141.         if len(value) == 1:  
  142.             value = int(value[0])  
  143.             self.request_node_count = lambda replicas: value  
  144.         elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':  
  145.             value = int(value[0])  
  146.             self.request_node_count = lambda replicas: value * replicas  
  147.         else:  
  148.             raise ValueError(  
  149.                 'Invalid request_node_count value: %r' % ''.join(value))  
  150.         try:  
  151.             self._read_affinity = read_affinity = conf.get('read_affinity''')  
  152.             self.read_affinity_sort_key = affinity_key_function(read_affinity)  
  153.         except ValueError as err:  
  154.             # make the message a little more useful  
  155.             raise ValueError("Invalid read_affinity value: %r (%s)" %  
  156.                              (read_affinity, err.message))  
  157.         try:  
  158.             write_affinity = conf.get('write_affinity''')  
  159.             self.write_affinity_is_local_fn \  
  160.                 = affinity_locality_predicate(write_affinity)  
  161.         except ValueError as err:  
  162.             # make the message a little more useful  
  163.             raise ValueError("Invalid write_affinity value: %r (%s)" %  
  164.                              (write_affinity, err.message))  
  165.         value = conf.get('write_affinity_node_count',  
  166.                          '2 * replicas').lower().split()  
  167.         if len(value) == 1:  
  168.             value = int(value[0])  
  169.             self.write_affinity_node_count = lambda replicas: value  
  170.         elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':  
  171.             value = int(value[0])  
  172.             self.write_affinity_node_count = lambda replicas: value * replicas  
  173.         else:  
  174.             raise ValueError(  
  175.                 'Invalid write_affinity_node_count value: %r' % ''.join(value))  
  176.         # swift_owner_headers are stripped by the account and container  
  177.         # controllers; we should extend header stripping to object controller  
  178.         # when a privileged object header is implemented.  
  179.         swift_owner_headers = conf.get(  
  180.             'swift_owner_headers',  
  181.             'x-container-read, x-container-write, '  
  182.             'x-container-sync-key, x-container-sync-to, '  
  183.             'x-account-meta-temp-url-key, x-account-meta-temp-url-key-2, '  
  184.             'x-account-access-control')  
  185.         self.swift_owner_headers = [  
  186.             name.strip().title()  
  187.             for name in swift_owner_headers.split(','if name.strip()]  
  188.         # Initialization was successful, so now apply the client chunk size  
  189.         # parameter as the default read / write buffer size for the network  
  190.         # sockets.  
  191.         #  
  192.         # NOTE WELL: This is a class setting, so until we get set this on a  
  193.         # per-connection basis, this affects reading and writing on ALL  
  194.         # sockets, those between the proxy servers and external clients, and  
  195.         # those between the proxy servers and the other internal servers.  
  196.         #  
  197.         # ** Because it affects the client as well, currently, we use the  
  198.         # client chunk size as the govenor and not the object chunk size.  
  199.         socket._fileobject.default_bufsize = self.client_chunk_size  
  200.         self.expose_info = config_true_value(  
  201.             conf.get('expose_info''yes'))  
  202.         self.disallowed_sections = list_from_csv(  
  203.             conf.get('disallowed_sections'))  
  204.         self.admin_key = conf.get('admin_key'None)  
  205.         register_swift_info(  
  206.             version=swift_version,  
  207.             strict_cors_mode=self.strict_cors_mode,  
  208.             **constraints.EFFECTIVE_CONSTRAINTS)  
  209.   
  210.     def check_config(self):  
  211.         """ 
  212.         Check the configuration for possible errors 
  213.         """  
  214.         if self._read_affinity and self.sorting_method != 'affinity':  
  215.             self.logger.warn("sorting_method is set to '%s', not 'affinity'; "  
  216.                              "read_affinity setting will have no effect." %  
  217.                              self.sorting_method)  
  218.   
  219.     def get_controller(self, path):  
  220.         """ 
  221.         Get the controller to handle a request. 
  222.  
  223.         :param path: path from request 
  224.         :returns: tuple of (controller class, path dictionary) 
  225.  
  226.         :raises: ValueError (thrown by split_path) if given invalid path 
  227.         """  
  228.         if path == '/info':  #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典  
  229.             d = dict(version=None,  
  230.                      expose_info=self.expose_info,  
  231.                      disallowed_sections=self.disallowed_sections,  
  232.                      admin_key=self.admin_key)  
  233.             return InfoController, d  
  234.   
  235.         version, account, container, obj = split_path(path, 14True)  #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量  
  236.         d = dict(version=version,  
  237.                  account_name=account,  
  238.                  container_name=container,  
  239.                  object_name=obj)  
  240.         if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种  
  241.             return ObjectController, d  
  242.         elif container and account:  
  243.             return ContainerController, d  
  244.         elif account and not container and not obj:  
  245.             return AccountController, d  
  246.         return None, d  
  247.   
  248.     def __call__(self, env, start_response):  
  249.         """ 
  250.         WSGI entry point. 
  251.         Wraps env in swob.Request object and passes it down. 
  252.  
  253.         :param env: WSGI environment dictionary 
  254.         :param start_response: WSGI callable 
  255.         """  
  256.         try:  
  257.             if self.memcache is None#首先判断是否memcache值存在,不存在再去获取一次  
  258.                 self.memcache = cache_from_env(env)  
  259.             req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token  
  260.             return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象  
  261.         except UnicodeError:  
  262.             err = HTTPPreconditionFailed(  
  263.                 request=req, body='Invalid UTF8 or contains NULL')  
  264.             return err(env, start_response)  
  265.         except (Exception, Timeout):  
  266.             start_response('500 Server Error',  
  267.                            [('Content-Type''text/plain')])  
  268.             return ['Internal server error.\n']  
  269.   
  270.     def update_request(self, req):  
  271.         if 'x-storage-token' in req.headers and \  
  272.                 'x-auth-token' not in req.headers:  
  273.             req.headers['x-auth-token'] = req.headers['x-storage-token']  
  274.         return req  
  275.   
  276.     def handle_request(self, req):  
  277.         """ 
  278.         Entry point for proxy server. 
  279.         Should return a WSGI-style callable (such as swob.Response). 
  280.  
  281.         :param req: swob.Request object 
  282.         """  
  283.         try:  
  284.             self.logger.set_statsd_prefix('proxy-server'#在日志的开头加上‘proxy-server’,方便跟踪分析  
  285.             if req.content_length and req.content_length < 0#检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录  
  286.                 self.logger.increment('errors')  
  287.                 return HTTPBadRequest(request=req,  
  288.                                       body='Invalid Content-Length')  
  289.   
  290.             try:  
  291.                 if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录  
  292.                     self.logger.increment('errors')  
  293.                     return HTTPPreconditionFailed(  
  294.                         request=req, body='Invalid UTF8 or contains NULL')  
  295.             except UnicodeError:  
  296.                 self.logger.increment('errors')  
  297.                 return HTTPPreconditionFailed(  
  298.                     request=req, body='Invalid UTF8 or contains NULL')  
  299.   
  300.             try:  
  301.                 controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象  
  302.                 p = req.path_info  
  303.                 if isinstance(p, unicode):  
  304.                     p = p.encode('utf-8'#path编码Unicode转换utf-8  
  305.             except ValueError:           #发生值异常,返回错误请求,并日志记录  
  306.                 self.logger.increment('errors')  
  307.                 return HTTPNotFound(request=req)  
  308.             if not controller:         #为找到对应处理的controller类时,返回错误请求,并日志记录  
  309.                 self.logger.increment('errors')  
  310.                 return HTTPPreconditionFailed(request=req, body='Bad URL')  
  311.             if self.deny_host_headers and \    #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录  
  312.                     req.host.split(':')[0in self.deny_host_headers:  
  313.                 return HTTPForbidden(request=req, body='Invalid host header')  
  314.   
  315.             self.logger.set_statsd_prefix('proxy-server.' +  
  316.                                           controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析  
  317.             controller = controller(self, **path_parts)  #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)  
  318.             if 'swift.trans_id' not in req.environ:    #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID  
  319.                 # if this wasn't set by an earlier middleware, set it now  
  320.                 trans_id = generate_trans_id(self.trans_id_suffix)  
  321.                 req.environ['swift.trans_id'] = trans_id  
  322.                 self.logger.txn_id = trans_id  
  323.             req.headers['x-trans-id'] = req.environ['swift.trans_id']  
  324.             controller.trans_id = req.environ['swift.trans_id']  
  325.             self.logger.client_ip = get_remote_client(req)  #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析  
  326.             try:  
  327.                 handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)  
  328.                 getattr(handler, 'publicly_accessible'#再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)  
  329.             except AttributeError:  
  330.                 allowed_methods = getattr(controller, 'allowed_methods', set())  
  331.                 return HTTPMethodNotAllowed(  
  332.                     request=req, headers={'Allow'', '.join(allowed_methods)})  
  333.             if 'swift.authorize' in req.environ: #做鉴权操作  
  334.                 # We call authorize before the handler, always. If authorized,  
  335.                 # we remove the swift.authorize hook so isn't ever called  
  336.                 # again. If not authorized, we return the denial unless the  
  337.                 # controller's method indicates it'd like to gather more  
  338.                 # information and try again later.  
  339.                 resp = req.environ['swift.authorize'](req)  
  340.                 if not resp:  
  341.                     # No resp means authorized, no delayed recheck required.  
  342.                     del req.environ['swift.authorize']  
  343.                 else:  
  344.                     # Response indicates denial, but we might delay the denial  
  345.                     # and recheck later. If not delayed, return the error now.  
  346.                     if not getattr(handler, 'delay_denial'None):  
  347.                         return resp  
  348.             # Save off original request method (GET, POST, etc.) in case it  
  349.             # gets mutated during handling.  This way logging can display the  
  350.             # method the client actually sent.  
  351.             req.environ['swift.orig_req_method'] = req.method  
  352.             return handler(req)    #调用最终的method方法,并返回resp结果  
  353.         except HTTPException as error_response:  
  354.             return error_response  
  355.         except (Exception, Timeout):  
  356.             self.logger.exception(_('ERROR Unhandled exception in request'))  
  357.             return HTTPServerError(request=req)  
  358.   
  359.     def sort_nodes(self, nodes):  
  360.         ''''' 
  361.         Sorts nodes in-place (and returns the sorted list) according to 
  362.         the configured strategy. The default "sorting" is to randomly 
  363.         shuffle the nodes. If the "timing" strategy is chosen, the nodes 
  364.         are sorted according to the stored timing data. 
  365.         '''  
  366.         # In the case of timing sorting, shuffling ensures that close timings  
  367.         # (ie within the rounding resolution) won't prefer one over another.  
  368.         # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)  
  369.         shuffle(nodes)  
  370.         if self.sorting_method == 'timing':  #配置文件中排序方法为timing时,以时间排序  
  371.             now = time()  
  372.   
  373.             def key_func(node):  
  374.                 timing, expires = self.node_timings.get(node['ip'], (-1.00))  
  375.                 return timing if expires > now else -1.0  
  376.             nodes.sort(key=key_func)  
  377.         elif self.sorting_method == 'affinity'#配置文件中排序方法为affinity时,以自定义的亲和力规则排序  
  378.             nodes.sort(key=self.read_affinity_sort_key)  
  379.         return nodes  
  380.   
  381.     def set_node_timing(self, node, timing):  
  382.         if self.sorting_method != 'timing':  
  383.             return  
  384.         now = time()  
  385.         timing = round(timing, 3)  # sort timings to the millisecond  
  386.         self.node_timings[node['ip']] = (timing, now + self.timing_expiry)  
  387.   
  388.     def error_limited(self, node):  
  389.         """ 
  390.         Check if the node is currently error limited. 
  391.  
  392.         :param node: dictionary of node to check 
  393.         :returns: True if error limited, False otherwise 
  394.         """  
  395.         now = time()  
  396.         if 'errors' not in node:  #errors没在node中时返回false  
  397.             return False  
  398.         if 'last_error' in node and node['last_error'] < \   #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔  
  399.                 now - self.error_suppression_interval:  
  400.             del node['last_error'#node去掉last_error  
  401.             if 'errors' in node: #errors在node中时返回 去掉errors,且返回false  
  402.                 del node['errors']  
  403.             return False  
  404.         limited = node['errors'] > self.error_suppression_limit  #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录  
  405.         if limited:  
  406.             self.logger.debug(  
  407.                 _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)  
  408.         return limited  
  409.   
  410.     def error_limit(self, node, msg):  
  411.         """ 
  412.         Mark a node as error limited. This immediately pretends the 
  413.         node received enough errors to trigger error suppression. Use 
  414.         this for errors like Insufficient Storage. For other errors 
  415.         use :func:`error_occurred`. 
  416.  
  417.         :param node: dictionary of node to error limit 
  418.         :param msg: error message 
  419.         """  
  420.         node['errors'] = self.error_suppression_limit + 1  
  421.         node['last_error'] = time()  
  422.         self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),  
  423.                           {'msg': msg, 'ip': node['ip'],  
  424.                           'port': node['port'], 'device': node['device']})  
  425.   
  426.     def error_occurred(self, node, msg):  
  427.         """ 
  428.         Handle logging, and handling of errors. 
  429.  
  430.         :param node: dictionary of node to handle errors for 
  431.         :param msg: error message 
  432.         """  
  433.         node['errors'] = node.get('errors'0) + 1  
  434.         node['last_error'] = time()  
  435.         self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),  
  436.                           {'msg': msg, 'ip': node['ip'],  
  437.                           'port': node['port'], 'device': node['device']})  
  438.   
  439.     def iter_nodes(self, ring, partition, node_iter=None):  
  440.         """ 
  441.         Yields nodes for a ring partition, skipping over error 
  442.         limited nodes and stopping at the configurable number of 
  443.         nodes. If a node yielded subsequently gets error limited, an 
  444.         extra node will be yielded to take its place. 
  445.  
  446.         Note that if you're going to iterate over this concurrently from 
  447.         multiple greenthreads, you'll want to use a 
  448.         swift.common.utils.GreenthreadSafeIterator to serialize access. 
  449.         Otherwise, you may get ValueErrors from concurrent access. (You also 
  450.         may not, depending on how logging is configured, the vagaries of 
  451.         socket IO and eventlet, and the phase of the moon.) 
  452.  
  453.         :param ring: ring to get yield nodes from 
  454.         :param partition: ring partition to yield nodes for 
  455.         :param node_iter: optional iterable of nodes to try. Useful if you 
  456.             want to filter or reorder the nodes. 
  457.         """  
  458.         part_nodes = ring.get_part_nodes(partition)  
  459.         if node_iter is None:  
  460.             node_iter = itertools.chain(part_nodes,  
  461.                                         ring.get_more_nodes(partition))  
  462.         num_primary_nodes = len(part_nodes)  
  463.   
  464.         # Use of list() here forcibly yanks the first N nodes (the primary  
  465.         # nodes) from node_iter, so the rest of its values are handoffs.  
  466.         primary_nodes = self.sort_nodes(  
  467.             list(itertools.islice(node_iter, num_primary_nodes)))  
  468.         handoff_nodes = node_iter  
  469.         nodes_left = self.request_node_count(len(primary_nodes))  
  470.   
  471.         for node in primary_nodes:  
  472.             if not self.error_limited(node):  
  473.                 yield node  
  474.                 if not self.error_limited(node):  
  475.                     nodes_left -= 1  
  476.                     if nodes_left <= 0:  
  477.                         return  
  478.         handoffs = 0  
  479.         for node in handoff_nodes:  
  480.             if not self.error_limited(node):  
  481.                 handoffs += 1  
  482.                 if self.log_handoffs:  
  483.                     self.logger.increment('handoff_count')  
  484.                     self.logger.warning(  
  485.                         'Handoff requested (%d)' % handoffs)  
  486.                     if handoffs == len(primary_nodes):  
  487.                         self.logger.increment('handoff_all_count')  
  488.                 yield node  
  489.                 if not self.error_limited(node):  
  490.                     nodes_left -= 1  
  491.                     if nodes_left <= 0:  
  492.                         return  
  493.   
  494.     def exception_occurred(self, node, typ, additional_info):  
  495.         """ 
  496.         Handle logging of generic exceptions. 
  497.  
  498.         :param node: dictionary of node to log the error for 
  499.         :param typ: server type 
  500.         :param additional_info: additional information to log 
  501.         """  
  502.         self.logger.exception(  
  503.             _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '  
  504.               '%(info)s'),  
  505.             {'type': typ, 'ip': node['ip'], 'port': node['port'],  
  506.              'device': node['device'], 'info': additional_info})  
  507.   
  508.     def modify_wsgi_pipeline(self, pipe):  
  509.         """ 
  510.         Called during WSGI pipeline creation. Modifies the WSGI pipeline 
  511.         context to ensure that mandatory middleware is present in the pipeline. 
  512.  
  513.         :param pipe: A PipelineWrapper object 
  514.         """  
  515.         pipeline_was_modified = False  
  516.         for filter_spec in reversed(required_filters):  #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理  
  517.             filter_name = filter_spec['name']  
  518.             if filter_name not in pipe:  
  519.                 afters = filter_spec.get('after_fn'lambda _junk: [])(pipe)  
  520.                 insert_at = 0  
  521.                 for after in afters:  
  522.                     try:  
  523.                         insert_at = max(insert_at, pipe.index(after) + 1)  
  524.                     except ValueError:  # not in pipeline; ignore it  
  525.                         pass  
  526.                 self.logger.info(  
  527.                     'Adding required filter %s to pipeline at position %d' %  
  528.                     (filter_name, insert_at))  
  529.                 ctx = pipe.create_filter(filter_name)  
  530.                 pipe.insert_filter(ctx, index=insert_at)  
  531.                 pipeline_was_modified = True  
  532.   
  533.         if pipeline_was_modified:    
  534.             self.logger.info("Pipeline was modified. New pipeline is \"%s\".",  
  535.                              pipe)  
  536.         else:  
  537.             self.logger.debug("Pipeline is \"%s\"", pipe)  
  538.   
  539.   
  540. def app_factory(global_conf, **local_conf):  
  541.     """paste.deploy app factory for creating WSGI proxy apps."""  
  542.     conf = global_conf.copy()  
  543.     conf.update(local_conf)  
  544.     app = Application(conf)  
  545.     app.check_config()  
  546.     return app  
2014-07-25 00:44:42 gaoxingnengjisuan 阅读数 3134

感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!

如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn

PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。


概述:

这篇博客主要关注swift-proxy与swift-object服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;


源码解析部分(代码中较重要的部分已经进行了相关的注释):


GETorHEAD

/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def GETorHEAD

def GETorHEAD(self, req):
    """
    处理HTTP协议GET或者HEAD请求;
    """
    # 获取指定object所属的container的信息;
    container_info = self.container_info(self.account_name, self.container_name, req)
        
    req.acl = container_info['read_acl']
    if 'swift.authorize' in req.environ:
        aresp = req.environ['swift.authorize'](req)
        if aresp:
            return aresp

    # 获取指定object所对应的分区号;
    partition = self.app.object_ring.get_part(self.account_name, self.container_name, self.object_name)
        
    resp = self.GETorHEAD_base(
            req, _('Object'), self.app.object_ring, partition,
            req.swift_entity_path)

    if ';' in resp.headers.get('content-type', ''):
        # strip off swift_bytes from content-type
        content_type, check_extra_meta = resp.headers['content-type'].rsplit(';', 1)
        if check_extra_meta.lstrip().startswith('swift_bytes='):
            resp.content_type = content_type
    return resp
/swift/obj/server.py----class ContainerController(object)----def HEAD

def HEAD(self, request):
    """
    检索返回一个object的metadata,同GET请求的处理方法几乎一致,唯一不同的是不在body中返回file;
    """
    device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
    try:
        disk_file = self.get_diskfile(device, partition, account, container, obj)
    except DiskFileDeviceUnavailable:
        return HTTPInsufficientStorage(drive=device, request=request)
    try:
        metadata = disk_file.read_metadata()
    except (DiskFileNotExist, DiskFileQuarantined):
        return HTTPNotFound(request=request, conditional_response=True)
    response = Response(request=request, conditional_response=True)
    response.headers['Content-Type'] = metadata.get('Content-Type', 'application/octet-stream')
    for key, value in metadata.iteritems():
        if is_user_meta('object', key) or key.lower() in self.allowed_headers:
            response.headers[key] = value
    response.etag = metadata['ETag']
    ts = metadata['X-Timestamp']
    response.last_modified = math.ceil(float(ts))
    # Needed for container sync feature
    response.headers['X-Timestamp'] = ts
    response.content_length = int(metadata['Content-Length'])
    try:
        response.content_encoding = metadata['Content-Encoding']
    except KeyError:
        pass
    return response

/swift/obj/server.py----class ContainerController(object)----def GET

def GET(self, request):
    """
    检索一个object对象,在response.heads中返回metadata,在response.body中返回objectdata,流程如下:
    1.根据url中的信息新建DiskFile对象file,检查request.heads中的必要K-V,检查mount情况;
    2.如果file#is_deleted或者file.metadata中'X-Delete-At'小于当前时间(表示已标记为准备删除)
      或者通过file#get_data_file_size查看文件是否异常,如果已经删除或存在异常,返回404HTTPNotFound;
    3.检查request.heads里的'If-match'和'If-none-match',前者检查file.metadata中的'ETag'是否与其一致确定所检索的文件,后者确定如果没有匹配的是否返回file的etag信息;
    4.确定了需要操作的file,利用file的iterator,将其绑定response的构造函数参数app_iter,
      并且将file.metadata写入response.heads中,并返回response;
    """
    device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
        
    keep_cache = self.keep_cache_private or (
        'X-Auth-Token' not in request.headers and
        'X-Storage-Token' not in request.headers)
        
    try:
        disk_file = self.get_diskfile(device, partition, account, container, obj)
    except DiskFileDeviceUnavailable:
        return HTTPInsufficientStorage(drive=device, request=request)
        
    try:
        with disk_file.open():
            metadata = disk_file.get_metadata()
            obj_size = int(metadata['Content-Length'])
            file_x_ts = metadata['X-Timestamp']
            file_x_ts_flt = float(file_x_ts)
            file_x_ts_utc = datetime.fromtimestamp(file_x_ts_flt, UTC)

            if_unmodified_since = request.if_unmodified_since
                
            if if_unmodified_since and file_x_ts_utc > if_unmodified_since:
                return HTTPPreconditionFailed(request=request)

            if_modified_since = request.if_modified_since
            if if_modified_since and file_x_ts_utc <= if_modified_since:
                return HTTPNotModified(request=request)

            keep_cache = (self.keep_cache_private or
                          ('X-Auth-Token' not in request.headers and
                           'X-Storage-Token' not in request.headers))
            response = Response(
                app_iter=disk_file.reader(keep_cache=keep_cache),
                request=request, conditional_response=True)
            response.headers['Content-Type'] = metadata.get('Content-Type', 'application/octet-stream')
            for key, value in metadata.iteritems():
                if is_user_meta('object', key) or key.lower() in self.allowed_headers:
                    response.headers[key] = value
            response.etag = metadata['ETag']
            response.last_modified = math.ceil(file_x_ts_flt)
            response.content_length = obj_size
            try:
                response.content_encoding = metadata['Content-Encoding']
            except KeyError:
                pass
            response.headers['X-Timestamp'] = file_x_ts
            resp = request.get_response(response)
                
    except (DiskFileNotExist, DiskFileQuarantined):
        resp = HTTPNotFound(request=request, conditional_response=True)
    return resp

POST

/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def POST

def POST(self, req):
    """
    处理HTTP协议POST请求;
    """
    # 计算预计删除对象时间???
    if 'x-delete-after' in req.headers:
        try:
            x_delete_after = int(req.headers['x-delete-after'])
        except ValueError:
            return HTTPBadRequest(request=req,
                                  content_type='text/plain',
                                  body='Non-integer X-Delete-After')
        req.headers['x-delete-at'] = normalize_delete_at_timestamp(time.time() + x_delete_after)
        

    # 在object的实现方法中,系统默认以PUT方法来实现POST方法;
    if self.app.object_post_as_copy:
        req.method = 'PUT'
        req.path_info = '/v1/%s/%s/%s' % (self.account_name, self.container_name, self.object_name)
        req.headers['Content-Length'] = 0
        req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name, self.object_name))
        req.headers['X-Fresh-Metadata'] = 'true'
        req.environ['swift_versioned_copy'] = True
        if req.environ.get('QUERY_STRING'):
            req.environ['QUERY_STRING'] += '&multipart-manifest=get'
        else:
            req.environ['QUERY_STRING'] = 'multipart-manifest=get'
        resp = self.PUT(req)

        if resp.status_int != HTTP_CREATED:
            return resp
        return HTTPAccepted(request=req)
        
    else:
        error_response = check_metadata(req, 'object')
        if error_response:
            return error_response
        container_info = self.container_info(
            self.account_name, self.container_name, req)
        container_partition = container_info['partition']
        containers = container_info['nodes']
        req.acl = container_info['write_acl']
        if 'swift.authorize' in req.environ:
            aresp = req.environ['swift.authorize'](req)
            if aresp:
                return aresp
        if not containers:
            return HTTPNotFound(request=req)
        if 'x-delete-at' in req.headers:
            try:
                x_delete_at = normalize_delete_at_timestamp(int(req.headers['x-delete-at']))
                if int(x_delete_at) < time.time():
                    return HTTPBadRequest(
                        body='X-Delete-At in past', request=req,
                        content_type='text/plain')
            except ValueError:
                return HTTPBadRequest(request=req,
                                      content_type='text/plain',
                                      body='Non-integer X-Delete-At')
            req.environ.setdefault('swift.log_info', []).append('x-delete-at:%s' % x_delete_at)
            delete_at_container = normalize_delete_at_timestamp(
                int(x_delete_at) /self.app.expiring_objects_container_divisor *self.app.expiring_objects_container_divisor)
            delete_at_part, delete_at_nodes = \
                    self.app.container_ring.get_nodes(self.app.expiring_objects_account, delete_at_container)
        else:
            delete_at_container = delete_at_part = delete_at_nodes = None
        partition, nodes = self.app.object_ring.get_nodes(self.account_name, self.container_name, self.object_name)
        req.headers['X-Timestamp'] = normalize_timestamp(time.time())

        headers = self._backend_requests(
            req, len(nodes), container_partition, containers,
            delete_at_container, delete_at_part, delete_at_nodes)

        resp = self.make_requests(req, self.app.object_ring, partition, 'POST', req.swift_entity_path, headers)
            
        return resp

/swift/obj/server.py----class ContainerController(object)----def POST

def POST(self, request):
    """     
    更新object的元数据信息,流程如下:
    1.从requesturl中提取device,partition, account, container, obj;
      检查requestheads中的'x-timestamp'是否存在,检查mount情况;
    2.根据请求信息新建DiskFile对象file,检查是否存在;
      (包括检查metadata中的'X-Delete-At',调用file#is_deleted()和检查file.data_size)
    3.如果检查都通过,则根据request.heads中的元素更新metadata;
    4.从request.heads中提取'X-Delete-At'并与file.metadata中的相同字段比较;
      根据较新的值调用file#delete_at_update(),通知更新container的信息;
    5.调用file#put()方法将metadata写入到.meta文件和data_file的扩展属性中;
        
    实现更新object的元数据信息;
    并通知object的更新到container;
    """
    # 根据request.path获取device、partition、account、container、obj等参数;
    device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)

    if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
        return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')

    new_delete_at = int(request.headers.get('X-Delete-At') or 0)
    if new_delete_at and new_delete_at < time.time():
        return HTTPBadRequest(body='X-Delete-At in past', request=request, content_type='text/plain')
        
    try:
        disk_file = self.get_diskfile(device, partition, account, container, obj)
    except DiskFileDeviceUnavailable:
        return HTTPInsufficientStorage(drive=device, request=request)
    try:
        orig_metadata = disk_file.read_metadata()
    except (DiskFileNotExist, DiskFileQuarantined):
        return HTTPNotFound(request=request)
    orig_timestamp = orig_metadata.get('X-Timestamp', '0')
    if orig_timestamp >= request.headers['x-timestamp']:
        return HTTPConflict(request=request)
        
    metadata = {'X-Timestamp': request.headers['x-timestamp']}
    metadata.update(val for val in request.headers.iteritems() if is_user_meta('object', val[0]))
    for header_key in self.allowed_headers:
       if header_key in request.headers:
            header_caps = header_key.title()
            metadata[header_caps] = request.headers[header_key]
        
    orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
        
    if orig_delete_at != new_delete_at:
        if new_delete_at:
            self.delete_at_update('PUT', new_delete_at, account, container, obj, request, device)
        if orig_delete_at:
            self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
    disk_file.write_metadata(metadata)
    return HTTPAccepted(request=request)

下一篇博客将继续swift-proxy与swift-object的分析工作。

Swift安装和配置

阅读数 2988