2018-03-30 18:47:18 chengqiuming 阅读数 209
  • lucky带你玩转Python基础进阶

    基础进阶内容包含了很多非常重要且实用性Python知识 也是在工作中必不可缺少的知识点 其中包含了正则,正则实战匹配数据, 面向对象,面向对象实战,TCP、 UDP底层通信协议,测试模块、高阶函数、发送邮件短信、虚拟环境搭建的使用等,让你从基础到进阶,从懵懂到熟练,逐渐进阶大神之路  本课程就让lucky老师就带你打通Python基础进阶

    156 人正在学习 去看看 夏利刚
执行swift-ring-build的rebalance命令,会首先调用下面函数
   
 def rebalance():
        def get_seed(index):
            try:
                return argv[index]
            except IndexError:
                pass
        devs_changed = builder.devs_changed
        try:
            last_balance = builder.get_balance()
            #调用RingBuilder类的rebalance()函数
            parts, balance = builder.rebalance(seed=get_seed(3))
        except exceptions.RingBuilderError as e:
            print '-' * 79
            print("An error has occurred during ring validation. Common\n"
                  "causes of failure are rings that are empty or do not\n"
                  "have enough devices to accommodate the replica count.\n"
                  "Original exception message:\n %s" % e.message
                  )
            print '-' * 79
            exit(EXIT_ERROR)
        if not parts:
            #没有partiton需要移动的情况有两种,一种是经过rebalance的计算,的确没有partition需要移动。
            #另外一种是由于min_part_hours参数的限制,在min_part_hours时间内只允许移动一个partition。
            print 'No partitions could be reassigned.'
            print 'Either none need to be or none can be due to ' \
                  'min_part_hours [%s].' % builder.min_part_hours
            exit(EXIT_WARNING)
        if not devs_changed and abs(last_balance - balance) < 1 and \
                not (last_balance == MAX_BALANCE and balance == MAX_BALANCE):
            print 'Cowardly refusing to save rebalance as it did not change ' \
                  'at least 1%.'
            exit(EXIT_WARNING)
        try:
            #验证生成的Ring的一致性
            builder.validate()
        except exceptions.RingValidationError as e:
            print '-' * 79
            print("An error has occurred during ring validation. Common\n"
                  "causes of failure are rings that are empty or do not\n"
                  "have enough devices to accommodate the replica count.\n"
                  "Original exception message:\n %s" % e.message
                  )
            print '-' * 79
            exit(EXIT_ERROR)
        print 'Reassigned %d (%.02f%%) partitions. Balance is now %.02f.' % \
              (parts, 100.0 * parts / builder.parts, balance)
        status = EXIT_SUCCESS
        #如果balance的值大于5,这意味着此时的Ring处于一个非常不稳定的状态。
        #需要在min_part_hours时间之后,再次rebalance
        if balance > 5:
            print '-' * 79
            print 'NOTE: Balance of %.02f indicates you should push this ' % \
                  balance
            print ' ring, wait at least %d hours, and rebalance/repush.' \
                  % builder.min_part_hours
            print '-' * 79
            status = EXIT_WARNING
        ts = time()
        #将builder文件和reblance之后生成的ring文件存放到backup目录下。
        builder.get_ring().save(
            pathjoin(backup_dir, '%d.' % ts + basename(ring_file)))
        builder.save(pathjoin(backup_dir, '%d.' % ts + basename(argv[1])))
        builder.get_ring().save(ring_file)
        builder.save(argv[1])
        exit(status)
这个函数首先调用swift.common.ring.builder.RingBuilder类的get_balance()函数来获取当前Ring的balance值,这个值标识了一个Ring的平衡程度,也就是健康状态,这个值越高表示这个Ring的balance越需要rebalance。一个健康的Ring的balance的值应该是0
Ring的balance的值取决于所有device的balance值,一个device的balance的值指的是超过这个device所希望接纳的partition个数的partition的数量,除以该device所希望接纳的partition的个数,然后乘以100.比如一个device所希望接纳partition个数是123,结果现在它接纳了124个partition,那么这个device的balance的值为(124-123)/123*100=0.83。在这个Ring中,取所有device的balance的值的最大值作为该Ring的balance值。
如果Ring没有device的变化(添加或删除),并且rebalance之前和之后的balance的值相差小于1,则认为该Ring不需要rebalance,不会生成新的ring文件。
rebalance命令的实际工作仍是由swift.common.ring.builder.RingBuilder类的rebalance()来完成。
#swift/common/ring/builder.py
    def rebalance(self, seed=None):
        """
        这是Ringbuild的主要功能函数,它会根据设备权重、zone的信息(尽可能地将partition的副本分配到不在一个zone的设备上),
        以及近期的分配情况等信息,重新对partition进行分配。
        这个函数并不是partition分配的最佳算法(最佳算法会进行更多的分析从而占用更多的时间)
        此函数会一直做rebalance操作直到这个Ring的balance值小于1%,或者balance的值变化小于1%
        """
        old_replica2part2dev = copy.deepcopy(self._replica2part2dev)
        if seed is not None:
            random.seed(seed)
        self._ring = None
        if self._last_part_moves_epoch is None:
                  #对于新创建的Ring执行_initial_balance()
            self._initial_balance()
            self.devs_changed = False
            return self.parts, self.get_balance()
        changed_parts = 0
        self._update_last_part_moves()
        last_balance = 0
        #函数_adjust_replica2part2dev_size()的主要功能是调整设备查询表
        #也就是_replica2part2dev数组,使其大小和维度调整为和当前的replicas数量以及partition数量一致
        #并且返回需要新添加的partition与replicas列表
        new_parts, removed_part_count = self._adjust_replica2part2dev_size()
        changed_parts += removed_part_count
        if new_parts or removed_part_count:
            self._set_parts_wanted()
        self._reassign_parts(new_parts)
        changed_parts += len(new_parts)
        while True:
            reassign_parts = self._gather_reassign_parts()
            self._reassign_parts(reassign_parts)
            changed_parts += len(reassign_parts)
            while self._remove_devs:
                self.devs[self._remove_devs.pop()['id']] = None
            balance = self.get_balance()
            if balance < 1 or abs(last_balance - balance) < 1 or \
                    changed_parts == self.parts:
                break
            last_balance = balance
        self.devs_changed = False
        self.version += 1
        changed_parts = 0
        for rep_id, _rep in enumerate(self._replica2part2dev):
            for part_id, new_device in enumerate(_rep):
                # IndexErrors will be raised if the replicas are increased or
                # decreased, and that actually means the partition has changed
                try:
                    old_device = old_replica2part2dev[rep_id][part_id]
                except IndexError:
                    changed_parts += 1
                    continue
                if old_device != new_device:
                    changed_parts += 1
        return changed_parts, balance
如果是新创建的Ring,那么控制逻辑进入到_initial_balance()函数中。如果不是新创建的Ring,会直接调用_reassign_parts(),反复对partitions进行重新分配,直到balance值小于1为止(或者两次rebalance操作得到的balance值的变化小于1%,或者所有的partition都已经移动过一次)。
新建Ring和非新建Ring的rebalance操作过程类似,区别只是在于新建Ring的情况在_initial_balance()函数里做了初始化工作。
     def _initial_balance(self):
        """
        Initial partition assignment is the same as rebalancing an
        existing ring, but with some initial setup beforehand.
        """
        self._last_part_moves = array('B', (0 for _junk in xrange(self.parts)))
        self._last_part_moves_epoch = int(time())
        self._set_parts_wanted()
        self._reassign_parts(self._adjust_replica2part2dev_size()[0])
对于新建Ring的情况,_adjust_replica2part2dev_size()所返回的tuple的第一个值是一个这样的数组,该数组的每一个元素是一个(partition,replica)的tuple,表示该partition的replica需要被重新分配到device上去。
_reassign_parts()函数以上面返回的数组为参数进行分配。
_reassign_parts()首先将设备按照每个设备还想接收多少个partiton(根据weight)来排队,并且在rebalance的过程中一直按照这个标准队列,然后遍历整个partition的列表,将partition分配到想接收partition最多的设备上,并且同时保证这个partition的副本之间距离最远。
所谓距离最远指的是副本之间尽可能地在不同的region里面,如果一定要在同一个region里面,尽可能地分配到不同zone里面,如果没有满足的情况,就尽可能地分配到具有不同的(IP地址、端口)的设备上。
Rebalance()最后将新生成的Ring文件保存以及备份,整个Ring生成过程就完成了。Ring rebalance的过程仅仅是生成不同的Ring文件,也就是修改partition到device的映射,最终partition的移动是由Replicator进程来完成的。
参考
2018-03-30 18:39:07 chengqiuming 阅读数 117
  • lucky带你玩转Python基础进阶

    基础进阶内容包含了很多非常重要且实用性Python知识 也是在工作中必不可缺少的知识点 其中包含了正则,正则实战匹配数据, 面向对象,面向对象实战,TCP、 UDP底层通信协议,测试模块、高阶函数、发送邮件短信、虚拟环境搭建的使用等,让你从基础到进阶,从懵懂到熟练,逐渐进阶大神之路  本课程就让lucky老师就带你打通Python基础进阶

    156 人正在学习 去看看 夏利刚
通过setup.cfg文件可以知道swift-ring-builder工具的源码入口位于/bin/swift-ring-builder脚本,这个脚本仅仅是对swift.cli.ringbuilder模块的封装,直接调用了swift.cli.ringbuilder中的main函数。
def main(arguments=None):
    if len(argv) == 2:
        command = "default"
    else:
        command = argv[2]
    if argv[0].endswith('-safe'):
        try:
            with lock_parent_directory(abspath(argv[1]), 15):
                Commands.__dict__.get(command, Commands.unknown.im_func)()
        except exceptions.LockTimeout:
            print "Ring/builder dir currently locked."
            exit(2)
    else:
        #调动Commands类的名为"command"函数,如果该函数不存在则调用
        #Commands类的unknown()函数,对于Ring的创建,"command"为create
        Commands.__dict__.get(command, Commands.unknown.im_func)()
完成一定的参数解析工作后,最终使用swift.cli.ringbuilder.Commands类的create()函数去完成Ring的创建。
class Commands(object):
    def create():
        """
        swift-ring-builder <builder_file> create <part_power> <replicas> <min_part_hours>
        Creates <builder_file> with 2^<part_power> partitions and <replicas>.
        <min_part_hours> is number of hours to restrict moving a partition more than once.
        """
        if len(argv) < 6:
            print Commands.create.__doc__.strip()
            exit(EXIT_ERROR)
        #创建RingBuilder对象实例
        builder = RingBuilder(int(argv[3]), float(argv[4]), int(argv[5]))
        #为builder文件创建一个备份目录,该目录下会备份builder文件
        backup_dir = pathjoin(dirname(argv[1]), 'backups')
        try:
            mkdir(backup_dir)
        except OSError as err:
            if err.errno != EEXIST:
                raise
        #将Ring的初始化信息保存到备份文件中去
        builder.save(pathjoin(backup_dir, '%d.' % time() + basename(argv[1])))
        #将Ring的初始化信息保存到builer文件中去
        builder.save(argv[1])
        exit(EXIT_SUCCESS)
这个函数的逻辑非常简单,主要就是创建一个swift.common.ring.builder.RingBuilder类的实例,然后将它的初始化信息保存到Ring的builder文件和备份文件里。
接下来看看这类的初始化过程
# swift/common/ring/builder.py
class RingBuilder(object):
    def __init__(self, part_power, replicas, min_part_hours):
        self.part_power = part_power
        self.replicas = replicas
        self.min_part_hours = min_part_hours
        self.parts = 2 ** self.part_power
        self.devs = []
        self.devs_changed = False
        self.version = 0
        # _replica2part2dev 是一个二维数组,第一维从replica映射到partition
        # 第二维从partition映射到device。所以对一个replica个数为3,
        # partition个数为 2^23 的ring来说,_replica2part2dev是一个3*2^23数组
        # 该数据的每一个元素都是device ID(数据类型unsigned short)。
        self._replica2part2dev = None
        # _last_part_moves是一个长度为2^23的数组,数组中每个元素为unsigned byte
        # 这个数组的每个元素表示该元素所对应的partition据上次移动所过去的时间
        # (以小时为单位)。这个数组存在的目的是为了保证同一个partition在一定的时间内(一般是24小时)
        # 不会被移动两次。不过删除或者把一个设备的weight设为0不受这个时间的限制。这是因为
        # 删除设备或者把wight设为0是因为该设备已经出现故障
        # _last_part_moves_epoch表示_last_part_moves的基准时间
        self._last_part_moves_epoch = None
        self._last_part_moves = None
        self._last_part_gather_start = 0
        self._remove_devs = []
        self._ring = None
RingBuild类实例初始化后,在保存了传递进来的part_power、replicas以及min_part_hours等参数之后,初始化了一个重要的二维数组_replica2part2dev。
_replica2part2dev数组就是设备查询表,它的第一维以replicas为索引,也就是说如果设定replicas等于3,那么该数组第一维就是3个成员,每一个成员都是一个数组(第二维数组)。第二维数组负责partition到device的映射,长度为partition的个数。
_replica2part2dev之外的一个重要数组即devs[]数组,该数组即为我们前面提到的设备列表。根据从_replica2part2dev中检索到的设备号,到该表中查询设备的具体位置信息。目前设备表的内容为空,因为此时还不知道设备的情况。
到此,构建Ring的第一个步骤创建Ring文件已经完成。
2019-10-14 16:47:29 weixin_43319216 阅读数 3
  • lucky带你玩转Python基础进阶

    基础进阶内容包含了很多非常重要且实用性Python知识 也是在工作中必不可缺少的知识点 其中包含了正则,正则实战匹配数据, 面向对象,面向对象实战,TCP、 UDP底层通信协议,测试模块、高阶函数、发送邮件短信、虚拟环境搭建的使用等,让你从基础到进阶,从懵懂到熟练,逐渐进阶大神之路  本课程就让lucky老师就带你打通Python基础进阶

    156 人正在学习 去看看 夏利刚

在学习tensorflow网络框架中,碰到获取一层神经网络的权重定义函数,代码如下:

def get_weight(shape, lambda):
    var=tf.Variable(tf.random_normal(shape),dtype=tf.float32)

    #add_to_collection函数将这个新生成变量的L2正则化损失项加入集合,这个函数的第一个参数'losses'是集合的名字,
    tf.add_to_collection('losses',tf.contrib.layers.l2_regularizer(lambda)(var)) #第二个参数是要加入这个集合的内容
    return var  #返回生成的变量

在python3.7的环境下运行报错:

原因:lambda是python3.7中的关键字,不能作为函数参数,将其修改为其他非关键字名称即可。

 

2019-01-17 14:24:26 qq_34732729 阅读数 1597
  • lucky带你玩转Python基础进阶

    基础进阶内容包含了很多非常重要且实用性Python知识 也是在工作中必不可缺少的知识点 其中包含了正则,正则实战匹配数据, 面向对象,面向对象实战,TCP、 UDP底层通信协议,测试模块、高阶函数、发送邮件短信、虚拟环境搭建的使用等,让你从基础到进阶,从懵懂到熟练,逐渐进阶大神之路  本课程就让lucky老师就带你打通Python基础进阶

    156 人正在学习 去看看 夏利刚

import tensorflow as tf

# 获取一层神经网络边上的权重,并将这个权重的L2正则化损失加入名称为‘losses’的集合中
def get_weight(shape, lambda):
    # 生成一个变量
    var = tf.Variable(tf.random_normal(shape), dtype = tf.float32)
    # add_to_collection 函数将这个新生成变量的L2正则化损失项 加入集合
    # 这个函数的第一个参数'losses' 是集合的名字, 第二个参数是要加入这个集合的内容
    tf.add_to_collection('losses', tf.contrib.layers.l2_regularizer(lambda)(var))
    # 返回生成的变量
    return var

 

其中 lambda为关键字,不允许使用,随意加一个序号即可,例如:改为  lambda1   , 完美解决

2013-07-10 23:29:37 lcnsir 阅读数 713
  • lucky带你玩转Python基础进阶

    基础进阶内容包含了很多非常重要且实用性Python知识 也是在工作中必不可缺少的知识点 其中包含了正则,正则实战匹配数据, 面向对象,面向对象实战,TCP、 UDP底层通信协议,测试模块、高阶函数、发送邮件短信、虚拟环境搭建的使用等,让你从基础到进阶,从懵懂到熟练,逐渐进阶大神之路  本课程就让lucky老师就带你打通Python基础进阶

    156 人正在学习 去看看 夏利刚

接上篇 

通过wift-proxy-server 脚本,swift调用swift.common.wsgi中的run_wsgi(conf_file, 'proxy-server', default_port=8080, **options) 启动 wsgi服务

def run_wsgi(conf_path, app_section, *args, **kwargs):
    """
    Runs the server using the specified number of workers.

    :param conf_path: Path to paste.deploy style configuration file/directory
    :param app_section: App name from conf file to load config from
    """
    # Load configuration, Set logger and Load request processor
    try:
        (conf, logger, log_name) = \
            _initrp(conf_path, app_section, *args, **kwargs)
    except ConfigFileError, e:
        print e
        return

    # bind to address and port
    sock = get_socket(conf, default_port=kwargs.get('default_port', 8080)) #socket 绑定端口和conf中的ip地址得到socket
    # remaining tasks should not require elevated privileges
    drop_privileges(conf.get('user', 'swift'))

    # Ensure the application can be loaded before proceeding.
    loadapp(conf_path, global_conf={'log_name': log_name})

    # set utils.FALLOCATE_RESERVE if desired
    reserve = int(conf.get('fallocate_reserve', 0))
    if reserve > 0:
        utils.FALLOCATE_RESERVE = reserve
    # redirect errors to logger and close stdio
    capture_stdio(logger)

    worker_count = int(conf.get('workers', '1'))
    # Useful for profiling [no forks].
    if worker_count == 0:
        run_server(conf, logger, sock) #使用得到的socket,logger日志, 和配置运行server
        return


下来转入 run_server()函数 真正启动wsgi 服务 使用基于eventlet的并发多线程网络IO库,实现了协程 (博大精深,要慢慢学习)

def run_server(conf, logger, sock):
    # Ensure TZ environment variable exists to avoid stat('/etc/localtime') on
    # some platforms. This locks in reported times to the timezone in which
    # the server first starts running in locations that periodically change
    # timezones.
    os.environ['TZ'] = time.strftime("%z", time.gmtime())

    wsgi.HttpProtocol.default_request_version = "HTTP/1.0" #以下都设置此wsgi服务的http协议
    # Turn off logging requests by the underlying WSGI software.
    wsgi.HttpProtocol.log_request = lambda *a: None
    # Redirect logging other messages by the underlying WSGI software.
    wsgi.HttpProtocol.log_message = \
        lambda s, f, *a: logger.error('ERROR WSGI: ' + f % a)
    wsgi.WRITE_TIMEOUT = int(conf.get('client_timeout') or 60)

    eventlet.hubs.use_hub(get_hub())
    eventlet.patcher.monkey_patch(all=False, socket=True)
    eventlet_debug = config_true_value(conf.get('eventlet_debug', 'no'))
    eventlet.debug.hub_exceptions(eventlet_debug)
    # utils.LogAdapter stashes name in server; fallback on unadapted loggers
    if hasattr(logger, 'server'):
        log_name = logger.server
    else:
        log_name = logger.name
    app = loadapp(conf['__file__'], global_conf={'log_name': log_name})#loadapp方法加载conf文件中的app的app_factory实例化这些app
    max_clients = int(conf.get('max_clients', '1024'))
    pool = RestrictedGreenPool(size=max_clients) #RestrictedGreenPool线程池用来实现协程,保证并行
    try:
        wsgi.server(sock, app, NullLogger(), custom_pool=pool) #使用eventlet库中的wsgi.server实现携程(多线程的并发运行app并管理socket,实现对请求的非阻塞IO)
    except socket.error, err:
        if err[0] != errno.EINVAL:
            raise
    pool.waitall()

上述的app就是 配置文件中 wsgi管道中的app 他们都要实现__call__方法保证是callable的,这些wsgi app 响应请求的操作就在__call__方法中实现(以后会说到) 并且服务的启动顺序和响应顺序应该是相反的。

get_article_info

阅读数 766

没有更多推荐了,返回首页