• 感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正! 如果转载,请保留作者信息。 ... ...PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!...概述部分:

    感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!

    如果转载,请保留作者信息。
    博客地址:http://blog.csdn.net/gaoxingnengjisuan
    邮箱地址:dong.liu@siat.ac.cn

    PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。


    概述:

    这篇博客关注的问题就是:

    请求信息如何通过swift-proxy服务实现转发;


    这里应用网上找到的一幅图片来看看swift-proxy服务在swift系统中的作用(多谢作者);


        简单说,请求过来之后,通过swift-proxy服务的服务入口点,调用swift-proxy服务上的相关方法,实现通过给定的请求来确定具体要获取的控制器(ObjectController、ContainerController、AccountController),并进一步调用控制器下的具体方法,实现到相应存储服务(Object-server、Container-server、Account-server)的连接,并具体调用相应的方法来实现请求;

    下面来看看代码的实现:

    def __call__(self, env, start_response):
        """
        调用WSGI服务的入口点;
        """
        try:
            # 获取缓存信息;
            # 在env中获取'swift.cache'的值,如果为none,则报错误提示;
            # 注:env:表示wsgi环境的字典;
                
            if self.memcache is None:
                self.memcache = cache_from_env(env)
                
            req = self.update_request(Request(env))
    
            return self.handle_request(req)(env, start_response)
        except UnicodeError:
            err = HTTPPreconditionFailed(request=req, body='Invalid UTF8 or contains NULL')
            return err(env, start_response)
        except (Exception, Timeout):
            start_response('500 Server Error',[('Content-Type', 'text/plain')])
            return ['Internal server error.\n']
    方法handle_request实现了对HTTP传递过来的请求进行具体的处理和执行,来看方法handle_request的实现:


    def handle_request(self, req):
        """
        代理服务的入口点;
        对HTTP请求进行具体的处理和执行等操作;
        """
        try:
            self.logger.set_statsd_prefix('proxy-server')
            if req.content_length and req.content_length < 0:
                self.logger.increment('errors')
                return HTTPBadRequest(request=req, body='Invalid Content-Length')
    
            try:
                if not check_utf8(req.path_info):
                    self.logger.increment('errors')
                    return HTTPPreconditionFailed(request=req, body='Invalid UTF8 or contains NULL')
            except UnicodeError:
                self.logger.increment('errors')
                return HTTPPreconditionFailed(request=req, body='Invalid UTF8 or contains NULL')
    
            try:
                # 根据给定的HTTP的请求路径path获取控制器来处理请求;
                # 如果account, container, object都存在,则获取ObjectController控制器;
                # 如果account, container存在,object不存在,则获取ContainerController控制器;
                # 如果account存在,container, object不存在,则获取AccountController控制器;
                # 返回值:
                # path_parts:version, account, container, object等值的字典;
                # controller:具体的控制器类实例化对象;
                controller, path_parts = self.get_controller(req.path)
                # req.path = /v1/AUTH_2a8cbfbb8ad7411c8465f57311527937/testcontainer2/ceph9
                # controller = <class 'swift.proxy.controllers.obj.ObjectController'>
                # path_parts = {'object_name': 'ceph9', 'version': 'v1', 
                #               'account_name': 'AUTH_2a8cbfbb8ad7411c8465f57311527937', 
                #               'container_name': 'testcontainer2'}
                    
                p = req.path_info
                if isinstance(p, unicode):
                    p = p.encode('utf-8')
            except ValueError:
                self.logger.increment('errors')
                return HTTPNotFound(request=req)
                
            if not controller:
                self.logger.increment('errors')
                return HTTPPreconditionFailed(request=req, body='Bad URL')
                
            if self.deny_host_headers and req.host.split(':')[0] in self.deny_host_headers:
                return HTTPForbidden(request=req, body='Invalid host header')
    
                # server_type:不同的类定义了不同的server_type;
                # server_type = 'Object';
                # server_type = 'Account';
                # server_type = 'Base';
                # server_type = 'Container';          
                self.logger.set_statsd_prefix('proxy-server.' + controller.server_type.lower())
                
            # 获取控制器类的实例化对象;
            controller = controller(self, **path_parts)
                
            if 'swift.trans_id' not in req.environ:
                # if this wasn't set by an earlier middleware, set it now
                trans_id = generate_trans_id(self.trans_id_suffix)
                req.environ['swift.trans_id'] = trans_id
                self.logger.txn_id = trans_id
            req.headers['x-trans-id'] = req.environ['swift.trans_id']
            controller.trans_id = req.environ['swift.trans_id']
            self.logger.client_ip = get_remote_client(req)
                
            try:
                # 执行具体控制器类中的指定方法;
                handler = getattr(controller, req.method)
                # req.method = DELETE
                # controller = <swift.proxy.controllers.obj.ObjectController object at 0x2acc110>
                # handler = <bound method ObjectController.DELETE of <swift.proxy.controllers.obj.ObjectController object at 0x2acc110>>             
                getattr(handler, 'publicly_accessible')
            except AttributeError:
                allowed_methods = getattr(controller, 'allowed_methods', set())
                return HTTPMethodNotAllowed(request=req, headers={'Allow': ', '.join(allowed_methods)})
                
            if 'swift.authorize' in req.environ:
                resp = req.environ['swift.authorize'](req)
                if not resp:
                    del req.environ['swift.authorize']
                else:
                    if not getattr(handler, 'delay_denial', None):
                        return resp
    
            req.environ['swift.orig_req_method'] = req.method
                
            # handler = <bound method ObjectController.DELETE of <swift.proxy.controllers.obj.ObjectController object at 0x2acc110>>
            return handler(req)
    
        except HTTPException as error_response:
            return error_response
        except (Exception, Timeout):
            self.logger.exception(_('ERROR Unhandled exception in request'))
            return HTTPServerError(request=req)
    1.controller, path_parts = self.get_controller(req.path)
       根据给定的HTTP的请求路径path获取控制器(ObjectController、ContainerController、AccountController)来处理请求;
    2.controller = controller(self, **path_parts)
       获取控制器类的实例化对象;
    3.handler = getattr(controller, req.method)
       执行具体控制器类中的指定方法;
    4.return handler(req)
       调用相应控制器中具体的方法来对req进行处理;

       接下来将要调用的就是/swift/proxy/controllers/account.py或/swift/proxy/controllers/container.py或/swift/proxy/controllers/obj.py下面的PUT,POST,DELETE,GET,HEAD等方法;然后再在具体的方法中实现到具体存储服务(Object-serverContainer-server、Account-server)的连接,继而调用其下具体的PUT,POST,DELETE,GET,HEAD等方法来进行请求req的实现;

    展开全文
  • Swift启用WSGI服务的事件循环队列pipeline: catch_errors, proxy-logging, cache, authtoken, keystone, (slo), proxy-server。通过直proxy-server的服务入口点进行,调用相关的方法实现请求响应。  proxy-server...

      Swift启用WSGI服务的事件循环队列pipeline: catch_errors, proxy-logging, cache, authtoken, keystone, (slo), proxy-server。通过proxy-server的服务入口点,实现请求的具体处理和响应。

      proxy-server服务入口点(/swift-kilo-eol/swift/proxy/server.py)

        def __call__(self, env, start_response):
            """
            WSGI 服务入口点
    
            :param env: WSGI 环境变量(字典类型)
            :param start_response: WSGI 可调用对象
            """
    
            # 获取环境变量env中的'swift.cache',若为None,直接报错返回
            # 若req.headers中只有'x-storage-token',则用req.headers的'x-auth-token'来更新
            # 进而调用handle_request进行具体的执行和转发
            try:
                if self.memcache is None:
                    self.memcache = cache_from_env(env, True)
                req = self.update_request(Request(env))
                return self.handle_request(req)(env, start_response)
            except UnicodeError:
                err = HTTPPreconditionFailed(
                    request=req, body='Invalid UTF8 or contains NULL')
                return err(env, start_response)
            except (Exception, Timeout):
                start_response('500 Server Error',
                               [('Content-Type', 'text/plain')])
                return ['Internal server error.\n']</span>


      事实上真正调用的是handle_request()来完成请求的具体处理:
    1.根据req.path的信息返回对应的控制器类;

    2.实例化具体的控制器类对象;

    3.获取控制器类中由req.method指定的方法;

    4.执行具体的方法并返回。

        def handle_request(self, req):
            """
            Entry point for proxy server.
            Should return a WSGI-style callable (such as swob.Response).
    
            :param req: swob.Request object
            """
            try:
                self.logger.set_statsd_prefix('proxy-server')
                if req.content_length and req.content_length < 0:
                    self.logger.increment('errors')
                    return HTTPBadRequest(request=req,
                                          body='Invalid Content-Length')
    
                try:
                    if not check_utf8(req.path_info):
                        self.logger.increment('errors')
                        return HTTPPreconditionFailed(
                            request=req, body='Invalid UTF8 or contains NULL')
                except UnicodeError:
                    self.logger.increment('errors')
                    return HTTPPreconditionFailed(
                        request=req, body='Invalid UTF8 or contains NULL')
    
                try:
                    # 根据req.path的信息返回对应的控制器类和字典(由版本、Account、Container和Object名组成)
                    # 若req.path是/info,则返回InfoController;
                    # 若req.path中account、container、object都存在,则返回ObjectController
                    # 若req.path中只有account、container,则返回ObjectController
                    # 若req.path中只有account,则返回AccountController
    
                    controller, path_parts = self.get_controller(req)
                    p = req.path_info
                    if isinstance(p, unicode):
                        p = p.encode('utf-8')
                except APIVersionError:
                    self.logger.increment('errors')
                    return HTTPBadRequest(request=req)
                except ValueError:
                    self.logger.increment('errors')
                    return HTTPNotFound(request=req)
                if not controller:
                    self.logger.increment('errors')
                    return HTTPPreconditionFailed(request=req, body='Bad URL')
                if self.deny_host_headers and \
                        req.host.split(':')[0] in self.deny_host_headers:
                    return HTTPForbidden(request=req, body='Invalid host header')
    
                self.logger.set_statsd_prefix('proxy-server.' +
                                              controller.server_type.lower())
                # 用get_controller返回的字典 实例化具体的控制器类对象
                controller = controller(self, **path_parts)
                if 'swift.trans_id' not in req.environ:
                    # if this wasn't set by an earlier middleware, set it now
                    trans_id_suffix = self.trans_id_suffix
                    trans_id_extra = req.headers.get('x-trans-id-extra')
                    if trans_id_extra:
                        trans_id_suffix += '-' + trans_id_extra[:32]
                    trans_id = generate_trans_id(trans_id_suffix)
                    req.environ['swift.trans_id'] = trans_id
                    self.logger.txn_id = trans_id
                req.headers['x-trans-id'] = req.environ['swift.trans_id']
                controller.trans_id = req.environ['swift.trans_id']
                self.logger.client_ip = get_remote_client(req)
                try:
                    # 获取具体控制器类中由request指定的方法
                    handler = getattr(controller, req.method)
                    getattr(handler, 'publicly_accessible')
                except AttributeError:
                    allowed_methods = getattr(controller, 'allowed_methods', set())
                    return HTTPMethodNotAllowed(
                        request=req, headers={'Allow': ', '.join(allowed_methods)})
                old_authorize = None
                if 'swift.authorize' in req.environ:
                    # We call authorize before the handler, always. If authorized,
                    # we remove the swift.authorize hook so isn't ever called
                    # again. If not authorized, we return the denial unless the
                    # controller's method indicates it'd like to gather more
                    # information and try again later.
                    resp = req.environ['swift.authorize'](req)
                    if not resp and not req.headers.get('X-Copy-From-Account') \
                            and not req.headers.get('Destination-Account'):
                        # No resp means authorized, no delayed recheck required.
                        old_authorize = req.environ['swift.authorize']
                    else:
                        # Response indicates denial, but we might delay the denial
                        # and recheck later. If not delayed, return the error now.
                        if not getattr(handler, 'delay_denial', None):
                            return resp
                # Save off original request method (GET, POST, etc.) in case it
                # gets mutated during handling.  This way logging can display the
                # method the client actually sent.
                req.environ['swift.orig_req_method'] = req.method
                try:
                    if old_authorize:
                        req.environ.pop('swift.authorize', None)
                    # 执行具体控制器类中由request指定的方法
                    return handler(req)
                finally:
                    if old_authorize:
                        req.environ['swift.authorize'] = old_authorize
            except HTTPException as error_response:
                return error_response
            except (Exception, Timeout):
                self.logger.exception(_('ERROR Unhandled exception in request'))
                return HTTPServerError(request=req)</span>

      具体调用过程:请求到达proxy-server的服务入口点之后,在handle_request()中获取具体的控制器类(AccountController), 接着调用/swift-kilo-eol/swift/controllers/account.py下面的GET/HEAD/PUT/POST/DELETE等方法实现与具体存储服务(AccountServer)的连接,进而调用具体的GET/HEAD/PUT/POST/DELETE等方法实现请求的处理和相应

      下述是GET/HEAD/PUT/POST/DELETE等方法在源码中的具体调用过程,详细过程已在源码中注释

    /swift-kilo-eol/swift/proxy/controller/account.py

        def GETorHEAD(self, req):
            """Handler for HTTP GET/HEAD requests."""
            # 处理HTTP GET/HEAD请求
            if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:
                resp = HTTPBadRequest(request=req)
                resp.body = 'Account name length of %d longer than %d' % \
                            (len(self.account_name),
                             constraints.MAX_ACCOUNT_NAME_LENGTH)
                return resp
    
            # 调用get_part获取经过一致性Hash取值和移位之后的Object存放的分区号
            # 调用iter_nodes获取排除了HandOff以及Error的Object存放的节点号
            # 调用GETorHEAD_base处理HTTP GET/HEAD请求 返回swob.Response对象
            partition = self.app.account_ring.get_part(self.account_name)
            node_iter = self.app.iter_nodes(self.app.account_ring, partition)
            resp = self.GETorHEAD_base(
                req, _('Account'), node_iter, partition,
                req.swift_entity_path.rstrip('/'))
            if resp.status_int == HTTP_NOT_FOUND:
                if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
                    resp.status = HTTP_GONE
                elif self.app.account_autocreate:
                    resp = account_listing_response(self.account_name, req,
                                                    get_listing_content_type(req))
            if req.environ.get('swift_owner'):
                self.add_acls_from_sys_metadata(resp)
            else:
                for header in self.app.swift_owner_headers:
                    resp.headers.pop(header, None)
            return resp</span>

    /swift-kilo-eol/swift/account/server.py

        def HEAD(self, req):
            """Handle HTTP HEAD request."""
            # 处理HEAD请求,返回Account的基本信息,以KV的形式保存在HEAD中
    
            drive, part, account = split_and_validate_path(req, 3)
            out_content_type = get_listing_content_type(req)
    
            # 进行mount检查
            if self.mount_check and not check_mount(self.root, drive):
                return HTTPInsufficientStorage(drive=drive, request=req)
            # 返回一个AccountBroker实例,用于对sqlite数据的查询操作
            broker = self._get_account_broker(drive, part, account,
                                              pending_timeout=0.1,
                                              stale_reads_ok=True)
            if broker.is_deleted():
                return self._deleted_response(broker, req, HTTPNotFound)
            # get_response_headers内部调用get_info()获取Account的基本信息,并更新res的HEAD('X-Account-Container-Count','X-Account-Object-Count': info['object_count'],
            # 'X-Account-Bytes-Used', 'X-Timestamp', 'X-PUT-Timestamp')
            headers = get_response_headers(broker)
            headers['Content-Type'] = out_content_type
            return HTTPNoContent(request=req, headers=headers, charset='utf-8')</span>

    /swift-kilo-eol/swift/account/server.py

        def GET(self, req):
            """Handle HTTP GET request."""
            # 处理GET请求,返回Account的基本信息,以KV的形式保存在HEAD中,但与HEAD不一样的是GET方法中获取了
            # 指定Account下的Container列表,存储在Body中
            # 调用机制与HEAD类似
    
            drive, part, account = split_and_validate_path(req, 3)
            prefix = get_param(req, 'prefix')
            delimiter = get_param(req, 'delimiter')
            if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
                # delimiters can be made more flexible later
                return HTTPPreconditionFailed(body='Bad delimiter')
            limit = constraints.ACCOUNT_LISTING_LIMIT
            given_limit = get_param(req, 'limit')
            if given_limit and given_limit.isdigit():
                limit = int(given_limit)
                if limit > constraints.ACCOUNT_LISTING_LIMIT:
                    return HTTPPreconditionFailed(
                        request=req,
                        body='Maximum limit is %d' %
                        constraints.ACCOUNT_LISTING_LIMIT)
            marker = get_param(req, 'marker', '')
            end_marker = get_param(req, 'end_marker')
            out_content_type = get_listing_content_type(req)
    
            if self.mount_check and not check_mount(self.root, drive):
                return HTTPInsufficientStorage(drive=drive, request=req)
            broker = self._get_account_broker(drive, part, account,
                                              pending_timeout=0.1,
                                              stale_reads_ok=True)
            if broker.is_deleted():
                return self._deleted_response(broker, req, HTTPNotFound)
    
            # 获取Account对应的Container列表,每个Container信息包括(name, object_count, bytes_used, 0),以list形式返回
            # 以不同的形式返回Account对应的Container列表,最普通的是放在body中
            return account_listing_response(account, req, out_content_type, broker,
                                            limit, marker, end_marker, prefix,
                                            delimiter)</span>


    def account_listing_response(account, req, response_content_type, broker=None,
                                 limit='', marker='', end_marker='', prefix='',
                                 delimiter=''):
        if broker is None:
            broker = FakeAccountBroker()
    
        resp_headers = get_response_headers(broker)
    
        # 获取Account对应的Container列表,每个Container信息包括(name, object_count, bytes_used, 0),以list形式返回
        account_list = broker.list_containers_iter(limit, marker, end_marker,
                                                   prefix, delimiter)
    
        # 以不同的形式返回Account对应的Container列表,最普通的是放在body中
        if response_content_type == 'application/json':
            data = []
            for (name, object_count, bytes_used, is_subdir) in account_list:
                if is_subdir:
                    data.append({'subdir': name})
                else:
                    data.append({'name': name, 'count': object_count,
                                 'bytes': bytes_used})
            account_list = json.dumps(data)
        elif response_content_type.endswith('/xml'):
            output_list = ['<?xml version="1.0" encoding="UTF-8"?>',
                           '<account name=%s>' % saxutils.quoteattr(account)]
            for (name, object_count, bytes_used, is_subdir) in account_list:
                if is_subdir:
                    output_list.append(
                        '<subdir name=%s />' % saxutils.quoteattr(name))
                else:
                    item = '<container><name>%s</name><count>%s</count>' \
                           '<bytes>%s</bytes></container>' % \
                           (saxutils.escape(name), object_count, bytes_used)
                    output_list.append(item)
            output_list.append('</account>')
            account_list = '\n'.join(output_list)
        else:
            if not account_list:
                resp = HTTPNoContent(request=req, headers=resp_headers)
                resp.content_type = response_content_type
                resp.charset = 'utf-8'
                return resp
            account_list = '\n'.join(r[0] for r in account_list) + '\n'
        ret = HTTPOk(body=account_list, request=req, headers=resp_headers)
        ret.content_type = response_content_type
        ret.charset = 'utf-8'
        return ret</span>


    /swift-kilo-eol/swift/proxy/controller/account.py

        def PUT(self, req):
            """HTTP PUT request handler."""
            # 处理HTTP PUT请求
            if not self.app.allow_account_management:
                return HTTPMethodNotAllowed(
                    request=req,
                    headers={'Allow': ', '.join(self.allowed_methods)})
            error_response = check_metadata(req, 'account')
            if error_response:
                return error_response
            if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:
                resp = HTTPBadRequest(request=req)
                resp.body = 'Account name length of %d longer than %d' % \
                            (len(self.account_name),
                             constraints.MAX_ACCOUNT_NAME_LENGTH)
                return resp
            # 获取Account对应的分区号,及其对应的节点号(节点以元祖形式返回)
            account_partition, accounts = \
                self.app.account_ring.get_nodes(self.account_name)
            # 根据原始请求头信息调用generate_request_headers生成新格式的请求头
            headers = self.generate_request_headers(req, transfer=True)
            clear_info_cache(self.app, req.environ, self.account_name)
            # 调用make_requests,迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息
            resp = self.make_requests(
                req, self.app.account_ring, account_partition, 'PUT',
                req.swift_entity_path, [headers] * len(accounts))
            self.add_acls_from_sys_metadata(resp)
            return resp</span>


        def make_requests(self, req, ring, part, method, path, headers,
                          query_string='', overrides=None):
            """
            Sends an HTTP request to multiple nodes and aggregates the results.
            It attempts the primary nodes concurrently, then iterates over the
            handoff nodes as needed.
    
            :param req: a request sent by the client
            :param ring: the ring used for finding backend servers
            :param part: the partition number
            :param method: the method to send to the backend
            :param path: the path to send to the backend
                         (full path ends up being  /<$device>/<$part>/<$path>)
            :param headers: a list of dicts, where each dict represents one
                            backend request that should be made.
            :param query_string: optional query string to send to the backend
            :param overrides: optional return status override map used to override
                              the returned status of a request.
            :returns: a swob.Response object
            """
            # 迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息
    
            # 调用get_part_nodes返回分区号对应的所有分区号
            start_nodes = ring.get_part_nodes(part)
            nodes = GreenthreadSafeIterator(self.app.iter_nodes(ring, part))
            # 创建协程池
            pile = GreenAsyncPile(len(start_nodes))
            for head in headers:
                # 从协程池中获取一个协程发送请求到一个远程节点(根据选定的备份数)
                pile.spawn(self._make_request, nodes, part, method, path,
                           head, query_string, self.app.logger.thread_locals)
            response = []
            statuses = []
            for resp in pile:
                if not resp:
                    continue
                response.append(resp)
                statuses.append(resp[0])
                if self.have_quorum(statuses, len(start_nodes)):
                    break
            # give any pending requests *some* chance to finish
            # 等到所有请求都返回
            finished_quickly = pile.waitall(self.app.post_quorum_timeout)
            for resp in finished_quickly:
                if not resp:
                    continue
                response.append(resp)
                statuses.append(resp[0])
            while len(response) < len(start_nodes):
                response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
            statuses, reasons, resp_headers, bodies = zip(*response)
            # 通过投票机制返回最佳响应信息
            return self.best_response(req, statuses, reasons, bodies,
                                      '%s %s' % (self.server_type, req.method),
                                      overrides=overrides, headers=resp_headers)


        def best_response(self, req, statuses, reasons, bodies, server_type,
                          etag=None, headers=None, overrides=None,
                          quorum_size=None):
            """
            Given a list of responses from several servers, choose the best to
            return to the API.
    
            :param req: swob.Request object
            :param statuses: list of statuses returned
            :param reasons: list of reasons for each status
            :param bodies: bodies of each response
            :param server_type: type of server the responses came from
            :param etag: etag
            :param headers: headers of each response
            :param overrides: overrides to apply when lacking quorum
            :param quorum_size: quorum size to use
            :returns: swob.Response object with the correct status, body, etc. set
            """
            # 调用_compute_quorum_response,根据Response的状态等选出最佳响应
    
            if quorum_size is None:
                quorum_size = self._quorum_size(len(statuses))
    
            resp = self._compute_quorum_response(
                req, statuses, reasons, bodies, etag, headers,
                quorum_size=quorum_size)
            if overrides and not resp:
                faked_up_status_indices = set()
                transformed = []
                for (i, (status, reason, hdrs, body)) in enumerate(zip(
                        statuses, reasons, headers, bodies)):
                    if status in overrides:
                        faked_up_status_indices.add(i)
                        transformed.append((overrides[status], '', '', ''))
                    else:
                        transformed.append((status, reason, hdrs, body))
                statuses, reasons, headers, bodies = zip(*transformed)
                resp = self._compute_quorum_response(
                    req, statuses, reasons, bodies, etag, headers,
                    indices_to_avoid=faked_up_status_indices,
                    quorum_size=quorum_size)
    
            if not resp:
                resp = Response(request=req)
                self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'),
                                      {'type': server_type, 'statuses': statuses})
                resp.status = '503 Internal Server Error'
    
            return resp

    /swift-kilo-eol/swift/account/server.py

        def PUT(self, req):
            """Handle HTTP PUT request."""
            # 处理PUT请求
            # 若url中包括<Container>,新建数据库中的Container信息
            # 若url中不包括<Container>,更新数据库中的Account的metadata信息
            drive, part, account, container = split_and_validate_path(req, 3, 4)
            if self.mount_check and not check_mount(self.root, drive):
                return HTTPInsufficientStorage(drive=drive, request=req)
            if container:   # put account container
                if 'x-timestamp' not in req.headers:
                    timestamp = Timestamp(time.time())
                else:
                    timestamp = valid_timestamp(req)
                pending_timeout = None
                container_policy_index = \
                    req.headers.get('X-Backend-Storage-Policy-Index', 0)
                if 'x-trans-id' in req.headers:
                    pending_timeout = 3
                # 调用_get_account_broker返回AccountBroker实例
                broker = self._get_account_broker(drive, part, account,
                                                  pending_timeout=pending_timeout)
                # 检查account对应的数据库不存在,则初始化AccountBroker的数据库
                if account.startswith(self.auto_create_account_prefix) and \
                        not os.path.exists(broker.db_file):
                    try:
                        broker.initialize(timestamp.internal)
                    except DatabaseAlreadyExists:
                        pass
                if req.headers.get('x-account-override-deleted', 'no').lower() != \
                        'yes' and broker.is_deleted():
                    return HTTPNotFound(request=req)
                # 将Container的信息存入数据库
                broker.put_container(container, req.headers['x-put-timestamp'],
                                     req.headers['x-delete-timestamp'],
                                     req.headers['x-object-count'],
                                     req.headers['x-bytes-used'],
                                     container_policy_index)
                if req.headers['x-delete-timestamp'] > \
                        req.headers['x-put-timestamp']:
                    return HTTPNoContent(request=req)
                else:
                    return HTTPCreated(request=req)
            else:   # put account
                timestamp = valid_timestamp(req)
                broker = self._get_account_broker(drive, part, account)
                if not os.path.exists(broker.db_file):
                    try:
                        broker.initialize(timestamp.internal)
                        created = True
                    except DatabaseAlreadyExists:
                        created = False
                # 检查Account是否被标记为删除
                elif broker.is_status_deleted():
                    return self._deleted_response(broker, req, HTTPForbidden,
                                                  body='Recently deleted')
                # 检查Account是否已被删除
                else:
                    created = broker.is_deleted()
                    broker.update_put_timestamp(timestamp.internal)
                    if broker.is_deleted():
                        return HTTPConflict(request=req)
                # 更新数据库中的Account 的metadata信息
                metadata = {}
                metadata.update((key, (value, timestamp.internal))
                                for key, value in req.headers.iteritems()
                                if is_sys_or_user_meta('account', key))
                if metadata:
                    broker.update_metadata(metadata, validate_metadata=True)
                if created:
                    return HTTPCreated(request=req)
                else:
                    return HTTPAccepted(request=req)

    /swift-kilo-eol/swift/proxy/controller/account.py

        def POST(self, req):
            """HTTP POST request handler."""
            # 处理HTTP POST请求
            if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:
                resp = HTTPBadRequest(request=req)
                resp.body = 'Account name length of %d longer than %d' % \
                            (len(self.account_name),
                             constraints.MAX_ACCOUNT_NAME_LENGTH)
                return resp
            error_response = check_metadata(req, 'account')
            if error_response:
                return error_response
            # 获取Account对应的分区号,及其对应的节点号
            account_partition, accounts = \
                self.app.account_ring.get_nodes(self.account_name)
            # 根据原始请求头信息调用generate_request_headers生成新格式的请求头
            headers = self.generate_request_headers(req, transfer=True)
            clear_info_cache(self.app, req.environ, self.account_name)
            # 调用make_requests,迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息
            resp = self.make_requests(
                req, self.app.account_ring, account_partition, 'POST',
                req.swift_entity_path, [headers] * len(accounts))
            # 如果指定的account_name不存在,则先新建Account,再调用make_requests
            if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
                self.autocreate_account(req, self.account_name)
                resp = self.make_requests(
                    req, self.app.account_ring, account_partition, 'POST',
                    req.swift_entity_path, [headers] * len(accounts))
            self.add_acls_from_sys_metadata(resp)
            return resp

    /swift-kilo-eol/swift/account/server.py

        def POST(self, req):
            """Handle HTTP POST request."""
            # 处理POST请求
            # 更新Account的matadata
            drive, part, account = split_and_validate_path(req, 3)
            req_timestamp = valid_timestamp(req)
            if self.mount_check and not check_mount(self.root, drive):
                return HTTPInsufficientStorage(drive=drive, request=req)
            # 调用_get_account_broker返回AccountBroker实例
            broker = self._get_account_broker(drive, part, account)
            if broker.is_deleted():
                return self._deleted_response(broker, req, HTTPNotFound)
            # 利用Head中的metadata更新数据库中Account的metadata
            metadata = {}
            metadata.update((key, (value, req_timestamp.internal))
                            for key, value in req.headers.iteritems()
                            if is_sys_or_user_meta('account', key))
            if metadata:
                broker.update_metadata(metadata, validate_metadata=True)
            return HTTPNoContent(request=req)


    /swift-kilo-eol/swift/proxy/controller/account.py

        def DELETE(self, req):
            """HTTP DELETE request handler."""
            # 处理HTTP DELETE请求
            # Extra safety in case someone typos a query string for an
            # account-level DELETE request that was really meant to be caught by
            # some middleware.
            if req.query_string:
                return HTTPBadRequest(request=req)
            if not self.app.allow_account_management:
                return HTTPMethodNotAllowed(
                    request=req,
                    headers={'Allow': ', '.join(self.allowed_methods)})
            # 获取Account对应的分区号,及其对应的节点号
            account_partition, accounts = \
                self.app.account_ring.get_nodes(self.account_name)
            # 根据原始请求头信息调用generate_request_headers生成新格式的请求头
            headers = self.generate_request_headers(req)
            clear_info_cache(self.app, req.environ, self.account_name)
            # 调用make_requests
            resp = self.make_requests(
                req, self.app.account_ring, account_partition, 'DELETE',
                req.swift_entity_path, [headers] * len(accounts))
            return resp

    /swift-kilo-eol/swift/account/server.py

        def DELETE(self, req):
            """Handle HTTP DELETE request."""
            # 处理DELETE请求
            drive, part, account = split_and_validate_path(req, 3)
            if self.mount_check and not check_mount(self.root, drive):
                return HTTPInsufficientStorage(drive=drive, request=req)
            req_timestamp = valid_timestamp(req)
            broker = self._get_account_broker(drive, part, account)
            # 检查当前Account是否已经被删除,如果已经被删除则直接返回
            if broker.is_deleted():
                return self._deleted_response(broker, req, HTTPNotFound)
            # 将数据库中当前Account标记为删除状态,由AccountReaper服务来完成真正的清理工作
            broker.delete_db(req_timestamp.internal)
            return self._deleted_response(broker, req, HTTPNoContent)



      对于ContainerController和ObjectController的调用过程类似,详情有空再分析。本文还有许多可改进的地方,比如有些具体方法调用过程分析地不够详细

    
    
    展开全文
  • 5台服务器 A B C D E ...nginx:0.6.33版本后不允许POST方法访问静态资源,swift 只有head get方法能用,post put等方法被阻挡在nginx,无法下发到proxyserver。错误表现为:405 Not Allowed 修改配置文件和改

    5台服务器 A B C D E

    A为负载均衡

    BCDE 既做proxy server 又做storage server


    负载均衡工具的选择:

    nginx:0.6.33版本后不允许POST方法访问静态资源,swift 只有head get方法能用,post put等方法被阻挡在nginx,无法下发到proxyserver。错误表现为:405 Not Allowed

    修改配置文件和改代码的方法都没有解决。


    pound  CPU负载过高(据说)


    HAProxy  :配置简单,均衡策略多样,负载小。还有一个简单的web监控页面,很方便。

    使用roundrobin 策略时,引发了多proxy server环境下 swift tempauth验证方式的问题,根本原因是多proxy之间无法共享各自产生的token。

    又在A上部署了keystone,将B C D E上swift的验证方式改为 keystone,成功!



    未完待续...


    性能待测...


    展开全文
  • 感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正! 如果转载,请保留作者信息。 ... ...PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!...概述:

    感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!

    如果转载,请保留作者信息。
    博客地址:http://blog.csdn.net/gaoxingnengjisuan
    邮箱地址:dong.liu@siat.ac.cn

    PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。


    概述:

        上一篇博客中我们说过,请求信息到达swift-proxy服务之后,会确定获取具体的控制器(ObjectController、ContainerController、AccountController),接下来就是调用/swift/proxy/controllers/account.py或/swift/proxy/controllers/container.py或/swift/proxy/controllers/obj.py下面的PUT,POST,DELETE,GET,HEAD等方法;然后再在具体的方法中实现到具体存储服务(Object-server、Container-server、Account-server)的连接,继而调用其下具体的PUT,POST,DELETE,GET,HEAD等方法来进行请求req的实现;

    这篇博客主要关注swift-proxy与swift-account服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;


    源码解析部分(代码中较重要的部分已经进行了相关的注释):


    GETorHEAD

    /swift/proxy/controllers/account.py----class AccountController(Controller)----def GETorHEAD

    def GETorHEAD(self, req):
        """
        通过HTTP处理GET/HEAD请求;
        """
        if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
            resp = HTTPBadRequest(request=req)
            resp.body = 'Account name length of %d longer than %d' % (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
            return resp
    
        # get_nodes:为object获取分区号和所有副本节点信息;
        partition, nodes = self.app.account_ring.get_nodes(self.account_name)
            
        # GETorHEAD_base:HTTP GET或HEAD的基本处理;
        # 返回swob.Response对象;
        resp = self.GETorHEAD_base(
            req, _('Account'), self.app.account_ring, partition,
            req.swift_entity_path.rstrip('/'))
            
            if resp.status_int == HTTP_NOT_FOUND:
                if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
                    resp.status = HTTP_GONE
                elif self.app.account_autocreate:
                    resp = account_listing_response(self.account_name, req,
                                                    get_listing_content_type(req))
            if req.environ.get('swift_owner'):
                self.add_acls_from_sys_metadata(resp)
            else:
                for header in self.app.swift_owner_headers:
                    resp.headers.pop(header, None)
            return resp
    1.为object获取分区号和所有副本节点信息;
    2.HTTP GET或HEAD的基本处理,实现发送请求到远程对象服务上并调用具体方法来处理请求信息;
      注:这里的具体实现就不详细解析了;


    /swift/account/server.py----class AccountController(object)----def HEAD

    def HEAD(self, req):
        """
        处理HTTP协议的HEAD请求;     
        HEAD请求返回account的基本信息,并以key-value的形式保存在HTTPHEAD中返回;
        """
        drive, part, account = split_and_validate_path(req, 3)
        out_content_type = get_listing_content_type(req)
            
        # mount_check是是否进行mount检查;
        # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
        if self.mount_check and not check_mount(self.root, drive):
            return HTTPInsufficientStorage(drive=drive, request=req)
            
        # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作;
        broker = self._get_account_broker(drive, part, account,
                                              pending_timeout=0.1,
                                              stale_reads_ok=True)
            
        if broker.is_deleted():
            return self._deleted_response(broker, req, HTTPNotFound)
            
        # get_info:为账户获取全局数据,获得account的基本信息;
        # 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
        info = broker.get_info()
        headers = {
                'X-Account-Container-Count': info['container_count'],
                'X-Account-Object-Count': info['object_count'],
                'X-Account-Bytes-Used': info['bytes_used'],
                'X-Timestamp': info['created_at'],
                'X-PUT-Timestamp': info['put_timestamp']}
        headers.update((key, value)
                       for key, (value, timestamp) in
                       broker.metadata.iteritems() if value != '')
        headers['Content-Type'] = out_content_type
            
            return HTTPNoContent(request=req, headers=headers, charset='utf-8')
    注:HEAD请求返回account的基本信息(元数据信息),并以key-value的形式保存在HTTPHEAD中返回;


    /swift/account/server.py----class AccountController(object)----def GET

    def GET(self, req):
        """
        处理HTTP协议的GET请求;
        GET同HEAD一样,都是请求返回account的基本信息,并以key-value的形式保存在HTTPHEAD当中;
        不同之处在于GET方法中获取了指定account下的container列表,存储在body中,同HTTPHEAD一同返回;
        """
        drive, part, account = split_and_validate_path(req, 3)
        prefix = get_param(req, 'prefix')
        delimiter = get_param(req, 'delimiter')
        if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
            # delimiters can be made more flexible later
            return HTTPPreconditionFailed(body='Bad delimiter')
        limit = ACCOUNT_LISTING_LIMIT
        given_limit = get_param(req, 'limit')
        if given_limit and given_limit.isdigit():
            limit = int(given_limit)
            if limit > ACCOUNT_LISTING_LIMIT:
                return HTTPPreconditionFailed(request=req, body='Maximum limit is %d' % ACCOUNT_LISTING_LIMIT)
        marker = get_param(req, 'marker', '')
        end_marker = get_param(req, 'end_marker')
        out_content_type = get_listing_content_type(req)
    
        # mount_check是是否进行mount检查;
        # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
        if self.mount_check and not check_mount(self.root, drive):
            return HTTPInsufficientStorage(drive=drive, request=req)
            
        # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作;
        broker = self._get_account_broker(drive, part, account,
                                              pending_timeout=0.1,
                                              stale_reads_ok=True)
            
        if broker.is_deleted():
            return self._deleted_response(broker, req, HTTPNotFound)
            
        # 获取指定account下的容器列表(包括具体容器的相关信息);
        return account_listing_response(account, req, out_content_type, broker,
                                        limit, marker, end_marker, prefix,
                                        delimiter)

    来看方法account_listing_response的具体实现:

    def account_listing_response(account, req, response_content_type, broker=None,
                                 limit='', marker='', end_marker='', prefix='',
                                 delimiter=''):
        """
        获取指定account下的容器列表(包括具体容器的相关信息);
        """
        if broker is None:
            broker = FakeAccountBroker()
    
        info = broker.get_info()
        resp_headers = {
            'X-Account-Container-Count': info['container_count'],
            'X-Account-Object-Count': info['object_count'],
            'X-Account-Bytes-Used': info['bytes_used'],
            'X-Timestamp': info['created_at'],
            'X-PUT-Timestamp': info['put_timestamp']}
        resp_headers.update((key, value)
                            for key, (value, timestamp) in
                            broker.metadata.iteritems() if value != '')
    
        # 获取容器列表,每个容器信息包括(name, object_count, bytes_used, 0);
        account_list = broker.list_containers_iter(limit, marker, end_marker,
                                                   prefix, delimiter)
        if response_content_type == 'application/json':
            data = []
            for (name, object_count, bytes_used, is_subdir) in account_list:
                if is_subdir:
                    data.append({'subdir': name})
                else:
                    data.append({'name': name, 'count': object_count,
                                 'bytes': bytes_used})
            account_list = json.dumps(data)
        elif response_content_type.endswith('/xml'):
            output_list = ['<?xml version="1.0" encoding="UTF-8"?>',
                           '<account name=%s>' % saxutils.quoteattr(account)]
            for (name, object_count, bytes_used, is_subdir) in account_list:
                if is_subdir:
                    output_list.append(
                        '<subdir name=%s />' % saxutils.quoteattr(name))
                else:
                    item = '<container><name>%s</name><count>%s</count>' \
                           '<bytes>%s</bytes></container>' % \
                           (saxutils.escape(name), object_count, bytes_used)
                    output_list.append(item)
            output_list.append('</account>')
            account_list = '\n'.join(output_list)
        else:
            if not account_list:
                resp = HTTPNoContent(request=req, headers=resp_headers)
                resp.content_type = response_content_type
                resp.charset = 'utf-8'
                return resp
            account_list = '\n'.join(r[0] for r in account_list) + '\n'
        ret = HTTPOk(body=account_list, request=req, headers=resp_headers)
        ret.content_type = response_content_type
        ret.charset = 'utf-8'
        return ret

    下一篇博客将继续swift-proxy与swift-account的分析工作。
    展开全文
  • 感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正! 如果转载,请保留作者信息。 ... ...PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!...概述:

    感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!

    如果转载,请保留作者信息。
    博客地址:http://blog.csdn.net/gaoxingnengjisuan
    邮箱地址:dong.liu@siat.ac.cn

    PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。


    概述:

    这篇博客主要关注swift-proxy与swift-object服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;


    源码解析部分(代码中较重要的部分已经进行了相关的注释):


    GETorHEAD

    /swift/proxy/controllers/obj.py----class ContainerController(Controller)----def GETorHEAD

    def GETorHEAD(self, req):
        """
        处理HTTP协议GET或者HEAD请求;
        """
        # 获取指定object所属的container的信息;
        container_info = self.container_info(self.account_name, self.container_name, req)
            
        req.acl = container_info['read_acl']
        if 'swift.authorize' in req.environ:
            aresp = req.environ['swift.authorize'](req)
            if aresp:
                return aresp
    
        # 获取指定object所对应的分区号;
        partition = self.app.object_ring.get_part(self.account_name, self.container_name, self.object_name)
            
        resp = self.GETorHEAD_base(
                req, _('Object'), self.app.object_ring, partition,
                req.swift_entity_path)
    
        if ';' in resp.headers.get('content-type', ''):
            # strip off swift_bytes from content-type
            content_type, check_extra_meta = resp.headers['content-type'].rsplit(';', 1)
            if check_extra_meta.lstrip().startswith('swift_bytes='):
                resp.content_type = content_type
        return resp
    
    /swift/obj/server.py----class ContainerController(object)----def HEAD

    def HEAD(self, request):
        """
        检索返回一个object的metadata,同GET请求的处理方法几乎一致,唯一不同的是不在body中返回file;
        """
        device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
        try:
            disk_file = self.get_diskfile(device, partition, account, container, obj)
        except DiskFileDeviceUnavailable:
            return HTTPInsufficientStorage(drive=device, request=request)
        try:
            metadata = disk_file.read_metadata()
        except (DiskFileNotExist, DiskFileQuarantined):
            return HTTPNotFound(request=request, conditional_response=True)
        response = Response(request=request, conditional_response=True)
        response.headers['Content-Type'] = metadata.get('Content-Type', 'application/octet-stream')
        for key, value in metadata.iteritems():
            if is_user_meta('object', key) or key.lower() in self.allowed_headers:
                response.headers[key] = value
        response.etag = metadata['ETag']
        ts = metadata['X-Timestamp']
        response.last_modified = math.ceil(float(ts))
        # Needed for container sync feature
        response.headers['X-Timestamp'] = ts
        response.content_length = int(metadata['Content-Length'])
        try:
            response.content_encoding = metadata['Content-Encoding']
        except KeyError:
            pass
        return response

    /swift/obj/server.py----class ContainerController(object)----def GET

    def GET(self, request):
        """
        检索一个object对象,在response.heads中返回metadata,在response.body中返回objectdata,流程如下:
        1.根据url中的信息新建DiskFile对象file,检查request.heads中的必要K-V,检查mount情况;
        2.如果file#is_deleted或者file.metadata中'X-Delete-At'小于当前时间(表示已标记为准备删除)
          或者通过file#get_data_file_size查看文件是否异常,如果已经删除或存在异常,返回404HTTPNotFound;
        3.检查request.heads里的'If-match'和'If-none-match',前者检查file.metadata中的'ETag'是否与其一致确定所检索的文件,后者确定如果没有匹配的是否返回file的etag信息;
        4.确定了需要操作的file,利用file的iterator,将其绑定response的构造函数参数app_iter,
          并且将file.metadata写入response.heads中,并返回response;
        """
        device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
            
        keep_cache = self.keep_cache_private or (
            'X-Auth-Token' not in request.headers and
            'X-Storage-Token' not in request.headers)
            
        try:
            disk_file = self.get_diskfile(device, partition, account, container, obj)
        except DiskFileDeviceUnavailable:
            return HTTPInsufficientStorage(drive=device, request=request)
            
        try:
            with disk_file.open():
                metadata = disk_file.get_metadata()
                obj_size = int(metadata['Content-Length'])
                file_x_ts = metadata['X-Timestamp']
                file_x_ts_flt = float(file_x_ts)
                file_x_ts_utc = datetime.fromtimestamp(file_x_ts_flt, UTC)
    
                if_unmodified_since = request.if_unmodified_since
                    
                if if_unmodified_since and file_x_ts_utc > if_unmodified_since:
                    return HTTPPreconditionFailed(request=request)
    
                if_modified_since = request.if_modified_since
                if if_modified_since and file_x_ts_utc <= if_modified_since:
                    return HTTPNotModified(request=request)
    
                keep_cache = (self.keep_cache_private or
                              ('X-Auth-Token' not in request.headers and
                               'X-Storage-Token' not in request.headers))
                response = Response(
                    app_iter=disk_file.reader(keep_cache=keep_cache),
                    request=request, conditional_response=True)
                response.headers['Content-Type'] = metadata.get('Content-Type', 'application/octet-stream')
                for key, value in metadata.iteritems():
                    if is_user_meta('object', key) or key.lower() in self.allowed_headers:
                        response.headers[key] = value
                response.etag = metadata['ETag']
                response.last_modified = math.ceil(file_x_ts_flt)
                response.content_length = obj_size
                try:
                    response.content_encoding = metadata['Content-Encoding']
                except KeyError:
                    pass
                response.headers['X-Timestamp'] = file_x_ts
                resp = request.get_response(response)
                    
        except (DiskFileNotExist, DiskFileQuarantined):
            resp = HTTPNotFound(request=request, conditional_response=True)
        return resp

    POST

    /swift/proxy/controllers/obj.py----class ContainerController(Controller)----def POST

    def POST(self, req):
        """
        处理HTTP协议POST请求;
        """
        # 计算预计删除对象时间???
        if 'x-delete-after' in req.headers:
            try:
                x_delete_after = int(req.headers['x-delete-after'])
            except ValueError:
                return HTTPBadRequest(request=req,
                                      content_type='text/plain',
                                      body='Non-integer X-Delete-After')
            req.headers['x-delete-at'] = normalize_delete_at_timestamp(time.time() + x_delete_after)
            
    
        # 在object的实现方法中,系统默认以PUT方法来实现POST方法;
        if self.app.object_post_as_copy:
            req.method = 'PUT'
            req.path_info = '/v1/%s/%s/%s' % (self.account_name, self.container_name, self.object_name)
            req.headers['Content-Length'] = 0
            req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name, self.object_name))
            req.headers['X-Fresh-Metadata'] = 'true'
            req.environ['swift_versioned_copy'] = True
            if req.environ.get('QUERY_STRING'):
                req.environ['QUERY_STRING'] += '&multipart-manifest=get'
            else:
                req.environ['QUERY_STRING'] = 'multipart-manifest=get'
            resp = self.PUT(req)
    
            if resp.status_int != HTTP_CREATED:
                return resp
            return HTTPAccepted(request=req)
            
        else:
            error_response = check_metadata(req, 'object')
            if error_response:
                return error_response
            container_info = self.container_info(
                self.account_name, self.container_name, req)
            container_partition = container_info['partition']
            containers = container_info['nodes']
            req.acl = container_info['write_acl']
            if 'swift.authorize' in req.environ:
                aresp = req.environ['swift.authorize'](req)
                if aresp:
                    return aresp
            if not containers:
                return HTTPNotFound(request=req)
            if 'x-delete-at' in req.headers:
                try:
                    x_delete_at = normalize_delete_at_timestamp(int(req.headers['x-delete-at']))
                    if int(x_delete_at) < time.time():
                        return HTTPBadRequest(
                            body='X-Delete-At in past', request=req,
                            content_type='text/plain')
                except ValueError:
                    return HTTPBadRequest(request=req,
                                          content_type='text/plain',
                                          body='Non-integer X-Delete-At')
                req.environ.setdefault('swift.log_info', []).append('x-delete-at:%s' % x_delete_at)
                delete_at_container = normalize_delete_at_timestamp(
                    int(x_delete_at) /self.app.expiring_objects_container_divisor *self.app.expiring_objects_container_divisor)
                delete_at_part, delete_at_nodes = \
                        self.app.container_ring.get_nodes(self.app.expiring_objects_account, delete_at_container)
            else:
                delete_at_container = delete_at_part = delete_at_nodes = None
            partition, nodes = self.app.object_ring.get_nodes(self.account_name, self.container_name, self.object_name)
            req.headers['X-Timestamp'] = normalize_timestamp(time.time())
    
            headers = self._backend_requests(
                req, len(nodes), container_partition, containers,
                delete_at_container, delete_at_part, delete_at_nodes)
    
            resp = self.make_requests(req, self.app.object_ring, partition, 'POST', req.swift_entity_path, headers)
                
            return resp

    /swift/obj/server.py----class ContainerController(object)----def POST

    def POST(self, request):
        """     
        更新object的元数据信息,流程如下:
        1.从requesturl中提取device,partition, account, container, obj;
          检查requestheads中的'x-timestamp'是否存在,检查mount情况;
        2.根据请求信息新建DiskFile对象file,检查是否存在;
          (包括检查metadata中的'X-Delete-At',调用file#is_deleted()和检查file.data_size)
        3.如果检查都通过,则根据request.heads中的元素更新metadata;
        4.从request.heads中提取'X-Delete-At'并与file.metadata中的相同字段比较;
          根据较新的值调用file#delete_at_update(),通知更新container的信息;
        5.调用file#put()方法将metadata写入到.meta文件和data_file的扩展属性中;
            
        实现更新object的元数据信息;
        并通知object的更新到container;
        """
        # 根据request.path获取device、partition、account、container、obj等参数;
        device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
    
        if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
            return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')
    
        new_delete_at = int(request.headers.get('X-Delete-At') or 0)
        if new_delete_at and new_delete_at < time.time():
            return HTTPBadRequest(body='X-Delete-At in past', request=request, content_type='text/plain')
            
        try:
            disk_file = self.get_diskfile(device, partition, account, container, obj)
        except DiskFileDeviceUnavailable:
            return HTTPInsufficientStorage(drive=device, request=request)
        try:
            orig_metadata = disk_file.read_metadata()
        except (DiskFileNotExist, DiskFileQuarantined):
            return HTTPNotFound(request=request)
        orig_timestamp = orig_metadata.get('X-Timestamp', '0')
        if orig_timestamp >= request.headers['x-timestamp']:
            return HTTPConflict(request=request)
            
        metadata = {'X-Timestamp': request.headers['x-timestamp']}
        metadata.update(val for val in request.headers.iteritems() if is_user_meta('object', val[0]))
        for header_key in self.allowed_headers:
           if header_key in request.headers:
                header_caps = header_key.title()
                metadata[header_caps] = request.headers[header_key]
            
        orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
            
        if orig_delete_at != new_delete_at:
            if new_delete_at:
                self.delete_at_update('PUT', new_delete_at, account, container, obj, request, device)
            if orig_delete_at:
                self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
        disk_file.write_metadata(metadata)
        return HTTPAccepted(request=request)

    下一篇博客将继续swift-proxy与swift-object的分析工作。

    展开全文
  • swift proxy-server 安装

    2013-03-05 17:43:59
    1 SwiftProxy+storage) 将swift 和keystone分离安装,4个zone用4块硬盘来替代 1.1 环境说明 机器名 eth0 说明 Cloudtest01 192.168.0.17/24 Keystone glance nova-api nova-volumes Cloudtest03 192....
  • OpenStack Swift作为开源的云存储工具,被越来越多的公司使用。为了记录和巩固学习swift的开源源码,所以进行一系列的源码开源学习笔记,供初学者快速学习和理解swift的内部功能。 proxy下面的server.py模块是所有...
  • 之前说过了,swift是怎样根据配置文件进行服务的启动的,现在来说一下proxy-server的启动过程,到现在为止,系统已经找到了swift-proxy-server文件,并进行执行,看下该文件的代码 import sys from swift.common....
  •  创建好了builder文件和ring文件之后,下一步的操作就是启动服务了,通常启动单独的服务会有单独的命令,例如swift-proxy-server start等,但是一般我们使用swift-init命令,因为大多数情况下所有的服务会安装在同...
  • 参考陈沙克博客:http://www.chenshake.com/swift-single-version/我是装系统时分了个lvm /dev/sda7安装软件apt-get -y install swift swift-proxy swift-account swift-container swift-object \xfsprogs curl ...
  • 感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正! 如果转载,请保留作者信息。 博客地址:... ...PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!...
  •  OpenStack Swift作为开源的云存储工具,被越来越多的公司使用。为了记录和巩固学习... proxy下面的server.py模块是所有对account,container,object等对象进行管理操作的在swiftproxy端的总入口。在swif...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 1 Swift概述Swift是Open Stack开源云计算项目的子项目之一,是一个多租户、高扩展性的和高可用性的对象存储系统,用于低成本地存储大量的非结构化数据;Swift通过在软件层面引入一致性哈希技术和数据冗余性,牺牲...
  • 感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正! 如果转载,请保留作者信息。 ... ...PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!...接续上一篇博客
  • 接上篇,swift-proxyswift-store的安装 先说一下服务器分配  swift-proxy和keystone部署在192.168.25.11  swift-store是两台 分别是192.168.25.12和192.168.25.13 (这是测试IP) 先安装代理节点swift-proxy...
  • 1.Swift介绍 1.1. Swift架构概述 官方给出的标准架构进行分析,如图,分别为存储节点(Storage node)、代理节点(Proxy node)和认证节点(Auth node)三部分。 1.1.1. 代理节点 代理节点可以说是Swift的核心...
1 2 3 4 5 ... 20
收藏数 3,030
精华内容 1,212