精华内容
下载资源
问答
  • 但这个过程中running_mean和running_var会进行更新,所以也验证了 running_mean和running_var只受模型的模式(train模型或eval模型)的影响, 与是否进行反向传播(loss.backward)和梯度更新(optimiter.step)没有关系...
    以下的过程中都是在train模式下进行的,并且没有进行损失计算和梯度更新,
    但这个过程中running_mean和running_var会进行更新,所以也验证了
    running_mean和running_var只受模型的模式(train模型或eval模型)的影响,
    与是否进行反向传播(loss.backward)和梯度更新(optimiter.step)没有关系。
    实验一:
    1. 标准库函数的参数设置为 torch.nn.BatchNorm1d(linear1_features,momentum=0.1)
    2. 自定义函数的参数设置为 MyBatchnorm1d(linear1_features,momentum=0.9)
    3. 相同的输入,对比输出的参数值是否相同
    实验二:
    1. 标准库函数的参数设置为 torch.nn.BatchNorm1d(linear1_features,momentum=None)
    2. 自定义函数的参数设置为 MyBatchnorm1d(linear1_features,momentum=None)
    3. 相同的输入,对比输出的参数值是否相同
    

     针对标准库torch.nn.BatchNorm1d()中running_mean和running_var计算方法的结论:

    为方便描述,规定:

    • rm表示running_mean;
    • rv表示running_var;
    • m表示momemtum
    • b_num表示batchnum,代表当前batch之前的batch个数

    一、在train模式下

    1. 带动量时,即指定momentum为一个大于0小于1的数值时,相当于当前值与历史值的加权平均

    \small rm_{new} = (1-m)*rm_{old}+m*mean\_bn

    其中mean_bn是当前batch数据的batch平均值。这里的momentum是对当前值的加权系数!!默认为0.1

     \small rv_{new} = (1-m)*rv_{old}+m*var\_bn

    其中var_bn是当前batch数据的方差平均值(这里是方差,不是标准差,标准差的平方=方差)

    2. 不带动量时,即momentum=None,直接让历史值与当前值取平均

    \small rm_{new} = rm_{old}*b\_num+mean\_bn)/(b\_num+1)

    \small rv_{new} = rv_{old}*b\_num+var\_bn)/(b\_num+1)

    二、在eval模型下

    直接用rm、rv作为当前batch均值和方差 

    代码如下:

    # -*- coding: utf-8 -*-
    import torch
    from torch.autograd import Variable
    import torch.optim as optim
    
    batchsize = 16
    batchnum = 100
    # rawdata = torch.randn([batchsize*batchnum,3])
    # torch.save(rawdata,'./debug_running_var_mean_rawdata.pth')
    #加载数据,保证数据是不变的
    rawdata = torch.load('./debug_running_var_mean_rawdata.pth')
    print(rawdata.size())
    
    y = Variable(torch.FloatTensor([4,5]))
    dataset = [Variable(rawdata[curpos:curpos + batchsize]) for curpos in range(0,len(rawdata),batchsize)]
    
    
    class SimpleModel(torch.nn.Module):
        def __init__(self):
            super(SimpleModel,self).__init__()
            linear1_features = 5
            self.linear1 = torch.nn.Linear(3,linear1_features)
            self.relu = torch.nn.ReLU()
            self.linear2 = torch.nn.Linear(linear1_features,2)
            #设计时batchnorm放在linear1后面,所以这里用linear1的输出维度
            self.batch_norm = torch.nn.BatchNorm1d(linear1_features,momentum=0.1)  #标准库中的Barchnorm,track_running_stats默认为True
            # self.batch_norm = torch.nn.BatchNorm1d(linear1_features,momentum=None)  #标准库中的Barchnorm,track_running_stats默认为True
    
        def forward(self,x):
            x = self.linear1(x)
            x = self.relu(x)
            x = self.batch_norm(x)
            x = self.linear2(x)
            return x
    
    #先进行一下简单训练之后,保存参数,后面的模型可以加载此函数,这样相当于给用于实验的两个模型初始化了相同的参数
    train_demo = 0
    if train_demo == 1:
        model = SimpleModel()
        # print(list(model.parameters()))
        # #查看模型的初始参数
        # print(model.state_dict().keys())
        # # for i, j in model.named_parameters():
        # for i,j in model.state_dict().items():
        #     print('++++',i)
        #     print('\t',j)
    
        loss_fn = torch.nn.MSELoss(size_average=False)
        optimizer = optim.SGD(model.parameters(),lr=0.001,momentum=0.9)
        model.train()
        for t,x in enumerate(dataset):
            y_pred = model(x)
            loss = loss_fn(y_pred,y)
            print(t,loss.data)
            model.zero_grad()
            loss.backward()
            optimizer.step()
    
        #查看训练后的模型参数
        print('##################The trained Model parameters###############')
        print(model.state_dict().keys())
        # for i, j in model.named_parameters():
        for i,j in model.state_dict().items():
            print('++++',i)
            print('\t',j)
        #保存模型参数
        state = {'model': model.state_dict()}
        torch.save(state,'debug_batchnorm.pth')
    
    
    class MyBatchnorm1d(torch.nn.Module):
        def __init__(self,num_features,momentum=0.9):
            '''
            自定义的batchnorm
            :param num_features:
            :param momentum: 动量系数,大于等于0小于1,表示保留原来变量值的比例,与标准库torch.nn.Batchnorm1d正好相反
                             当取None时,采用简单的取平均的方式计算running_mean和running_var
            '''
            super(MyBatchnorm1d,self).__init__()
            self.weight = torch.nn.Parameter(torch.ones(num_features).float())
            self.bias = torch.nn.Parameter(torch.zeros(num_features).float())
            #register_buffer相当于requires_grad=False的Parameter,所以两种方法都可以
            #方法一:
            self.register_buffer('running_mean',torch.zeros(num_features))
            self.register_buffer('running_var',torch.zeros(num_features))
            self.register_buffer('num_batches_tracked',torch.tensor(0))
            #方法二:
            # self.running_mean = torch.nn.Parameter(torch.zeros(num_features),requires_grad=False)
            # self.running_var = torch.nn.Parameter(torch.ones(num_features),requires_grad=False)
            # self.num_batches_tracked = torch.nn.Parameter(torch.tensor(0),requires_grad=False)
    
            self.momentum = momentum
    
        def forward(self,x):
            if self.training: #训练模型
                #数据是二维的情况下,可以这么处理,其他维的时候不是这样的,但原理都一样。
                mean_bn = x.mean(0, keepdim=True).squeeze(0) #相当于x.mean(0, keepdim=False)
                var_bn = x.var(0, keepdim=True).squeeze(0) #相当于x.var(0, keepdim=False)
    
                if self.momentum is not None:
                    self.running_mean.mul_(self.momentum)
                    self.running_mean.add_((1 - self.momentum) * mean_bn.data)
                    self.running_var.mul_(self.momentum)
                    self.running_var.add_((1 - self.momentum) * var_bn.data)
                else:  #直接取平均,以下是公式变形,即 m_new = (m_old*n + new_value)/(n+1)
                    self.running_mean = self.running_mean+(mean_bn.data-self.running_mean)/(self.num_batches_tracked+1)
                    self.running_var = self.running_var+(var_bn.data-self.running_var)/(self.num_batches_tracked+1)
                self.num_batches_tracked += 1
            else: #eval模式
                mean_bn = torch.autograd.Variable(self.running_mean)
                var_bn = torch.autograd.Variable(self.running_var)
    
            eps = 1e-5
            x_normalized = (x - mean_bn) / torch.sqrt(var_bn + eps)
            results = self.weight * x_normalized + self.bias
            return results
    
    
    class DebugSimpleModel(torch.nn.Module):
        def __init__(self):
            super(DebugSimpleModel,self).__init__()
            linear1_features = 5
            self.linear1 = torch.nn.Linear(3,linear1_features)
            self.relu = torch.nn.ReLU()
            self.linear2 = torch.nn.Linear(linear1_features,2)
            self.batch_norm = MyBatchnorm1d(linear1_features,momentum=0.9)  #使用自定义的Batchnorm
            # self.batch_norm = MyBatchnorm1d(linear1_features,momentum=None)  #使用自定义的Batchnorm
    
        def forward(self,x):
            x = self.linear1(x)
            x = self.relu(x)
            x = self.batch_norm(x)
            x = self.linear2(x)
            return x
    
    
    #查看训练后的模型参数
    print('##################The trained Model parameters###############')
    model_param_dict = torch.load('debug_batchnorm.pth')['model']
    print(model_param_dict.keys())
    # for i, j in model.named_parameters():
    for i,j in model_param_dict.items():
        print('++++',i)
        print('\t',j)
    
    '''
    以下的过程中都是在train模式下进行的,并且没有进行损失计算和梯度更新,
    但这个过程中running_mean和running_var会进行更新,所以也验证了
    running_mean和running_var只受模型的模式(train模型或eval模型)的影响,
    与是否进行反向传播(loss.backward)和梯度更新(optimiter.step)没有关系。
    实验一:
    1. 标准库函数的参数设置为 torch.nn.BatchNorm1d(linear1_features,momentum=0.1)
    2. 自定义函数的参数设置为 MyBatchnorm1d(linear1_features,momentum=0.9)
    3. 相同的输入,对比输出的参数值是否相同
    实验二:
    1. 标准库函数的参数设置为 torch.nn.BatchNorm1d(linear1_features,momentum=None)
    2. 自定义函数的参数设置为 MyBatchnorm1d(linear1_features,momentum=None)
    3. 相同的输入,对比输出的参数值是否相同
    
    '''
    test_demo = 1
    if test_demo == 1:
        test_model = SimpleModel()
        test_model.load_state_dict(torch.load('debug_batchnorm.pth')['model'])
        test_model.train()
        for t,x in enumerate(dataset):
            y_pred = test_model(x)
        print('\n++++++++++  Norm output  ++++++++++++++++')
        for i,j in test_model.state_dict().items():
            print('++++',i)
            print('\t',j)
    
    debug_demo = 1
    if debug_demo == 1:
        debug_model = DebugSimpleModel()
        #因为自定义的模型参数与标准模型的参数完全一样,所以把标准模型作为预训练的模型(即可以加载标准模型的训练后的参数作为自己的参数)
        debug_model.load_state_dict(torch.load('debug_batchnorm.pth')['model'])
        debug_model.train()
        for t,x in enumerate(dataset):
            y_pred = debug_model(x)
    
        print('\n++++++++++++ Mymodel Output ++++++++++++++')
        for i,j in debug_model.state_dict().items():
            print('++++',i)
            print('\t',j)

    展开全文
  • 前面两篇帖子分别总结了innodb_thread_concurrency和thread pool的原理:前者是在存储引擎层面限制并发运行的线程数,代码路径过于靠后,此时query已在...除了这两种解决方案,还可以在server层进行running threa...

    前面两篇帖子分别总结了innodb_thread_concurrency和thread pool的原理:

    前者是在存储引擎层面限制并发运行的线程数,代码路径过于靠后,此时query已在server层完成解析;

    后者则是在server层创建多组常驻线程,用于接收客户端连接发送的query并代为执行,而不是为每个连接单独创建一个线程。

    除了这两种解决方案,还可以在server层进行running thread数量判断,如果达到阈值则直接报错或sleep。

    thread_running的意义

    thread_running状态变量记录了当前并发执行stmt/command的数量,执行前加1执行后减1;

    代码逻辑

    do_command

    -->dispatch_command

    ...

    inc_thread_running

    ...

    mysql_execute_command or execute_some_command

    ...

    dec_thread_running

    ...

    Thread_running突然飙高的诱因:

    1客户端连接暴增;

    2系统性能瓶颈,如CPU,IO或者mem swap;

    3异常sql;

    往往在这种情况下,MySQL server会表现出hang住的假象。

    解决方案

    暂时禁止新sql执行,为此引入两个阈值low_watermark和high_watermark,以及变量threads_running_ctl_mode(selects或者all );

    执行query前,检查thread_running,

    1若其已达high_watermark阈值则直接拒绝执行并返回错误:mysql

    server is too busy

    2若其位于low和high之间,则sleep 5ms,然后继续尝试,累计等待100ms后则执行

    3对于已经开启事务和super用户,不做限制

    4

    threads_running_ctl_mode控制query类型:SELECTS/ALL,默认为SELECTS,表示只影响SELECT语句

    Patch部分源码见注1

    进一步改进

    将低水位限流从sleep-retry优化为基于FIFO的cond-wait/signal(实现8个FIFO);

    1高水位限流(这点保持不变);

    2低水位优化;其他解决方案:mariadb开发thread pool,percona在其上实现了优先队列;

    本patch优势:思路与thread pool一致,但代码更简洁(不到1000行);而且增加了特定query的过滤;

    Patch部分代码见注2

    低水位优化细节

    1新增thread_active记录并发线程数,位于mysql_execute_command(sql解析之后),高水位则在query解析之前判断;

    Thread_active只统计select/DML,而commit/rollback则放过。

    2采用FIFO,当thread_active >=

    thread_running_low_watermark时进程进入FIFO等待,其他线程执行完sql后唤醒FIFO;

    保证并发线程控制在thread_running_low_watermark内,同时引入threads_running_wait_timeout控制线程在FIFO最大等待时间,超时则直接报错返回。

    3引入8个FIFO,降低了进出FIFO的锁竞争,线程采用RR分配到不同fifo,每个队列限制并发运行线程为threads_running_low_watermark/8。

    已经通过高水位验证的thread,开始执行query,[解析后进行低水位判断,若通过则执行],执行当前sql完毕后,thread可能发起新query,则重复[]过程。

    新增系统变量

    threads_running_wait_timeout:进入FIFO排队最长时间,等待超时后sql被拒,默认100,单位为毫秒ms。

    新增状态变量

    threads_active:当前并发SELECT/INSERT/UPDATE/DELETE执行的线程数目;

    threads_wait:当前进入到FIFO中等待的线程数目;

    测试效果

    ./sysbench

    --test=tests/db/select.lua --max-requests=0 --mysql-host=myxxxx.cm3

    --mysql-user=test --mysql-table-engine=innodb --oltp-table-size=5000000

    --oltp-tables-count=32

    e26759f88fea47fd2d34c9cdc059f757.png

    normal

    mysql-0 :未打补丁版本,设置innodb_thread_concurrency=0

    normal

    mysql-1 :未打补丁版本,innodb_thread_concurrency=32

    patched

    mysql :低水位限流补丁版本(活跃线程数不超过64)

    注1

    +static my_bool thread_running_control(THD *thd, ulong tr)

    +{

    +  int slept_cnt= 0;

    +  ulong tr_low, tr_high;

    +  DBUG_ENTER("thread_running_control");

    +

    +  /*

    +    Super user/slave thread will not be affected at any time,

    +    transactions that have already started will continue.

    +  */

    +  if (thd->security_ctx->master_access & SUPER_ACL|| --对于super权限的用户和已经开启的事务不做限制

    +     thd->in_active_multi_stmt_transaction() ||

    +      thd->slave_thread)

    +    DBUG_RETURN(FALSE);

    +

    +  /*

    +    To promise that tr_low will never be greater than tr_high,

    +    as values may be changed between these two statements.

    +    eg.

    +        (low, high) = (200, 500)

    +        1. read low = 200

    +        2. other sessions: set low = 20; set high = 80

    +        3. read high = 80

    +    Don't take a lock here to avoid lock contention.

    +  */

    +  do

    +  {

    +    tr_low= thread_running_low_watermark;

    +    tr_high= thread_running_high_watermark;

    +

    +  } while (tr_low > tr_high);

    +

    +check_buzy:

    +

    +  /* tr_high is promised to be non-zero.*/

    +  if ((tr_low == 0 && tr < tr_high) || (tr_low != 0 && tr < tr_low))

    +    DBUG_RETURN(FALSE);

    +

    +  if (tr >= tr_high)

    +  {

    +    int can_reject= 1;

    +

    +    /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */

    +    if (thread_running_ctl_mode == 0)

    +    {

    +      int query_is_select= 0;

    +      if (thd->query_length() >= 8)

    +      {

    +        char *p=thd->query();  --读取query text的前6个字符,以判断是否为select

    +        if (my_toupper(system_charset_info, p[0]) == 'S' &&

    +            my_toupper(system_charset_info, p[1]) == 'E' &&

    +            my_toupper(system_charset_info, p[2]) == 'L' &&

    +            my_toupper(system_charset_info, p[3]) == 'E' &&

    +            my_toupper(system_charset_info, p[4]) == 'C' &&

    +            my_toupper(system_charset_info, p[5]) == 'T')

    +

    +          query_is_select= 1;

    +      }

    +

    +      if (!query_is_select)

    +        can_reject= 0;

    +    }

    +

    +    if (can_reject)

    +    {

    +      inc_thread_rejected();

    +      DBUG_RETURN(TRUE);

    +    }

    +    else

    +      DBUG_RETURN(FALSE);

    +  }

    +

    +  if (tr_low != 0 && tr >= tr_low)

    +  {

    +    /*

    +      If total slept time exceed 100ms and thread running does not

    +      reach high watermark, let it in.

    +    */

    +    if (slept_cnt >= 20)

    +      DBUG_RETURN(FALSE);

    +

    +    dec_thread_running()

    +

    +    /* wait for 5ms. */

    +    my_sleep(5000UL);

    +

    +    slept_cnt++;

    +    tr= inc_thread_running() - 1;

    +

    +    goto check_buzy;

    +  }

    +

    +  DBUG_RETURN(FALSE);

    +}

    +

    +/**

    Perform one connection-level (COM_XXXX) command.

    @param command         type of command to perform

    @@ -1016,7 +1126,8 @@

    thd->set_query_id(get_query_id());

    if (!(server_command_flags[command] & CF_SKIP_QUERY_ID))

    next_query_id();

    -  inc_thread_running();

    +  /* remember old value of thread_running for *thread_running_control*. */

    +  int32 tr= inc_thread_running() - 1;

    if (!(server_command_flags[command] & CF_SKIP_QUESTIONS))

    statistic_increment(thd->status_var.questions, &LOCK_status);

    @@ -1129,6 +1240,13 @@

    {

    if (alloc_query(thd, packet, packet_length))

    break;                                 // fatal error is set

    +

    +    if (thread_running_control(thd, (ulong)tr))

    +    {

    +      my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0));

    +      break;

    +    }

    +

    MYSQL_QUERY_START(thd->query(), thd->thread_id, (char *) (thd->db ? thd->db : ""),  &thd->security_ctx->priv_user[0])

    注2

    http://www.gpfeng.com/wp-content/uploads/2014/01/tr-control.diff_.txt

    +/**

    Perform one connection-level (COM_XXXX) command.

    @param command         type of command to perform

    @@ -1177,7 +1401,7 @@

    command= COM_SHUTDOWN;

    }

    thd->set_query_id(next_query_id());

    -  inc_thread_running();

    +  int32 tr= inc_thread_running();

    if (!(server_command_flags[command] & CF_SKIP_QUESTIONS))

    statistic_increment(thd->status_var.questions, &LOCK_status);

    @@ -1209,6 +1433,15 @@

    goto done;

    }

    +  if (command == COM_QUERY && alloc_query(thd, packet, packet_length))

    +    goto endof_case;                 // fatal error is set

    +

    +  if (thread_running_control_high(thd, tr))

    +  {

    +    my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0));

    +    goto endof_case;

    +  }

    +

    switch (command) {

    case COM_INIT_DB:

    {

    @@ -1311,8 +1544,6 @@

    }

    case COM_QUERY:

    {

    -    if (alloc_query(thd, packet, packet_length))

    -      break;                                 // fatal error is set

    MYSQL_QUERY_START(thd->query(), thd->thread_id,

    (char *) (thd->db ? thd->db : ""),

    &thd->security_ctx->priv_user[0],

    @@ -1751,6 +1982,7 @@

    my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0));

    break;

    }

    +endof_case:

    done:

    DBUG_ASSERT(thd->derived_tables == NULL &&

    @@ -2502,12 +2734,37 @@

    Opt_trace_array trace_command_steps(&thd->opt_trace, "steps");

    DBUG_ASSERT(thd->transaction.stmt.cannot_safely_rollback() == FALSE);

    +  bool count_active= false;

    if (need_traffic_control(thd, lex->sql_command))

    {

    thd->killed = THD::KILL_QUERY;

    goto error;

    }

    +

    +  switch (lex->sql_command) {

    +

    +  case SQLCOM_SELECT:

    +  case SQLCOM_UPDATE:

    +  case SQLCOM_UPDATE_MULTI:

    +  case SQLCOM_DELETE:

    +  case SQLCOM_DELETE_MULTI:

    +  case SQLCOM_INSERT:

    +  case SQLCOM_INSERT_SELECT:

    +  case SQLCOM_REPLACE:

    +  case SQLCOM_REPLACE_SELECT:

    +    count_active= true;

    +    break;

    +  default:

    +    break;

    +  }

    +

    +  if (count_active && thread_running_control_low_enter(thd))

    +  {

    +    my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, myf(0));

    +    goto error;

    +  }

    +

    status_var_increment(thd->status_var.com_stat[lex->sql_command]);

    switch (gtid_pre_statement_checks(thd))

    @@ -4990,6 +5247,9 @@

    finish:

    +  if (count_active)

    +    thread_running_control_low_exit(thd);

    +

    DBUG_ASSERT(!thd->in_active_multi_stmt_transaction() ||

    thd->in_multi_stmt_transaction_mode());

    +static my_bool thread_running_control_high(THD *thd, int32 tr)

    +{

    +  int32 tr_high;

    +  DBUG_ENTER("thread_running_control_high");

    +

    +  tr_high= (int32)thread_running_high_watermark;

    +

    +  /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */

    +  if ((!tr_high || tr <= tr_high) ||

    +      thd->transaction.is_active() ||

    +      thd->get_command() != COM_QUERY ||

    +      thd->security_ctx->master_access & SUPER_ACL ||

    +      thd->slave_thread)

    +    DBUG_RETURN(FALSE);

    +

    +  const char *query= thd->query();

    +  uint32 len= thd->query_length();

    +

    +  if ((!has_prefix(query, len, "SELECT", 6) && thread_running_ctl_mode == 0) || --不再是逐个字符判断

    +      has_prefix(query, len, "COMMIT", 6) ||

    +      has_prefix(query, len, "ROLLBACK", 8))

    +    DBUG_RETURN(FALSE);

    +

    +  /* confirm again*/

    +  if (tr > tr_high && get_thread_running() > tr_high)

    +  {

    +    __sync_add_and_fetch(&thread_rejected, 1);

    +    DBUG_RETURN(TRUE);

    +  }

    +

    +  DBUG_RETURN(FALSE);

    +}

    +

    +static my_bool thread_running_control_low_enter(THD *thd)

    +{

    +  int res= 0;

    +  int32 tr_low;

    +  my_bool ret= FALSE;

    +  my_bool slept= FALSE;

    +  struct timespec timeout;

    +  Thread_conc_queue *queue;

    +  DBUG_ENTER("thread_running_control_low_enter");

    +

    +  /* update global status */

    +  __sync_add_and_fetch(&thread_active, 1);

    +

    +  tr_low= (int32)queue_tr_low_watermark;

    +  queue= thread_conc_queues + thd->query_id % N_THREAD_CONC_QUEUE;

    +

    +  queue->lock();--问1:在进行低水位判断前,先锁定FIFO,避免低水位验证失败时无法获取FIFO锁进而不能放入FIFO;

    +

    +retry:

    +

    +  if ((!tr_low || queue->thread_active < tr_low) ||

    +      (thd->lex->sql_command != SQLCOM_SELECT && thread_running_ctl_mode == 0) ||

    +      (!slept && (thd->transaction.is_active() ||

    +        thd->security_ctx->master_access & SUPER_ACL || thd->slave_thread)))

    +  {

    +    queue->thread_active++; --判断是否满足进入FIFO条件,如不满足则立即更新thread_active++,解锁queue并退出;

    +    queue->unlock();

    +    DBUG_RETURN(ret);

    +  }

    +

    +  if (!slept)

    +  {

    +    queue->unlock();

    +

    +    /* sleep for 500 us */

    +    my_sleep(500);

    +    slept= TRUE;

    +    queue->lock();

    +

    +    goto retry;

    +  }

    +

    +  /* get a free wait-slot */

    +  Thread_wait_slot *slot= queue->pop_free();

    +

    +  /* can't find a free wait slot, must let the query enter */

    +  if (!slot)-- 当FIFO都满了,即无法把当前线程放入,则必须放行让该sql正常执行

    +  {

    +    queue->thread_active++;

    +    queue->unlock();

    +    DBUG_RETURN(ret);

    +  }

    +

    +  slot->signaled= false;

    +  slot->wait_ended= false;

    +

    +  /* put slot into waiting queue. */

    +  queue->push_back_wait(slot);

    +  queue->thread_wait++;

    +

    +  queue->unlock();

    +

    +  /* update global status */

    +  thd_proc_info(thd, "waiting in server fifo");

    +  __sync_sub_and_fetch(&thread_active, 1);

    +  __sync_add_and_fetch(&thread_wait, 1);

    +

    +  /* cond-wait for at most thread_running_wait_timeout(ms). */

    +  set_timespec_nsec(timeout, thread_running_wait_timeout_ns);

    +

    +  mysql_mutex_lock(&slot->mutex);

    +  while (!slot->signaled)

    +  {

    +    res= mysql_cond_timedwait(&slot->cond, &slot->mutex, &timeout);

    +    /* no need to signal if cond-wait timedout */

    +    slot->signaled= true;

    +  }

    +  mysql_mutex_unlock(&slot->mutex);

    +

    +  queue->lock();

    +  queue->thread_wait--;

    +  queue->thread_active++;

    +

    +  /* remove slot from waiting queue. */

    +  queue->remove_wait(slot);

    +  /* put slot into the free queue for reuse. */

    +  queue->push_back_free(slot);

    +

    +  queue->unlock();

    +

    +  /* update global status */

    +  __sync_sub_and_fetch(&thread_wait, 1);

    +  __sync_add_and_fetch(&thread_active, 1);

    +  thd_proc_info(thd, 0);

    +

    +  if (res == ETIMEDOUT || res == ETIME)

    +  {

    +    ret= TRUE; // indicate that query is rejected.

    +    __sync_add_and_fetch(&thread_rejected, 1);

    +  }

    +

    +  DBUG_RETURN(ret);

    +}

    展开全文
  • 每秒查询次数(QPS)可以衡量数据库的吞吐量,但不能反映MySQL的工作强度。后者由Threads_running度量,表示为量表(而QPS是速率)。在讨论Threads_running之前,让我们考虑一个类比:

    文章目录

    正文

        每秒查询次数(Queries per second,QPS)可以衡量数据库的吞吐量,但不能反映MySQL的工作强度。后者由Threads_running度量,表示为量表(而QPS是速率)。在讨论Threads_running之前,让我们考虑一个类比:
    在这里插入图片描述
        上图是来自汽车的数字仪表板。左边的大圆圈是车速表,右边的大圆圈是转速表。车速表是大多数人所熟悉的:它们显示汽车行驶的速度。在上面,车速表的读数为84 km/h。转速表很常见,但并不是每个仪表盘都包含转速表。它们显示了发动机的工作强度,以每分钟转数(RPM)为单位。在上面,转速表的读数接近3,000 RPM。(转速表中间的“ M3”是变速箱齿轮。我们可以忽略这一点)
        QPS与速度(84 km/h)类似,而Threads_running与RPM(〜3k RPM)类似。
        速度和RPM的变化受多种因素的影响:加速和减速,上坡或下坡,逆风或逆风,以及(在越来越多的新车中)计算机编程可优化燃油经济性。因此,在不同的转速下可以达到相同的速度,但是通常汽车在较低的转速下燃烧较少的燃料,而在较高的转速下燃烧较多的燃料。同样,较高的RPM意味着发动机上的应力更大,从而增加了机械故障的机会。我们关心燃烧燃料和机械故障,因为两者都使我们付出了金钱。这就是为什么汽车配备转速表的原因:RPM是一项重要指标。
        QPS 和 Threads_running 也因许多因素而有很大差异:硬件规格(CPU内核和速度,存储类型和IOPs等),查询类型(读或写),查询计划,表大小,行大小,表模式(列类型,索引-特别是索引),数据访问模式(全读或全写或混合),“嘈杂的邻居”,一天中的时间(例如营业时间),一年中的时间(例如纳税季节),特殊促销,糟糕的演员,等等-随便你说吧!因此,与汽车一样,相同的QPS可以在不同的Threads_running上实现,因此这是一个重要的指标。
        Threads_running表示MySQL的工作强度。我工作的几个数据库仅运行10个线程即可完成10,000 QPS。其他人则需要运行100个线程才能达到3,000 QPS。从前面的段落中,我们知道原因:各种变化的因素。
        仅QPS不能告诉我们MySQL是否在给系统施加压力,或者,形象的说,它是否开始烧油并停止运转。
    监视和处理运行中的高Threads_running是至关重要的。转速表有一个最大值,你通常不能把车推到它之外,但是MySQL非常有野心:它没有最大值,它会根据需要运行尽可能多的线程。它会尝试,但会随着Threads_running的增加而变慢并最终失败:

    Threads_runningMySQL
    0 - 10Normal:几乎所有硬件都没问题
    10 - 30Busy:大多数硬件通常都可以,因为服务器多核
    30 - 50High:很少有工作负载需要运行这么多线程。它可以短期爆发(<5min),但如果持续时间较长,则响应时间很可能很慢
    50 - 100Overloaded:某些硬件可以处理此问题,但是不能期望在此范围内成功运行。对于我们的本地部署硬件而言,此范围内的瞬时突发(<5s)通常是可以的。
    > 100Failing:在极少数情况下,MySQL可以运行大于100个线程,但在此范围内可能会失败

        建议指导值:

    • Threads_running < 50
    • 1:1000 Threads_running:QPS

        让我们换个角度来阐明一个重要的问题:MySQL线程是一个数据库连接。Threads_running是活动查询的数据库连接数。请记住,每个应用程序实例都有其自己的数据库连接池,这一点很重要。因此,最大可能的连接(线程)为:

    Max Connections = App 实例数 * 数据库连接池大小
    

        连接池大小为100是合理的,但是如果将应用程序部署到5个应用程序实例,则可能有500个数据库连接。通常是这样:应用程序通常具有数百个空闲数据库连接,这是连接池的用途。(对连接的线程,MySQL也有一个尺度)直到同时运行过多的连接(线程),这才成为问题。
        一个应用程序不止一次被扩展(即部署更多的应用程序实例)以处理更多的请求,但这样做会使数据库过载,运行的线程太多。没有快速或简单的解决方案来解决这种类型的数据库性能限制。原因很简单:如果您希望MySQL在相同的时间(每秒)内执行更多的工作(查询),则每个工作必须花费较少的时间,否则无法进行计算。如果每个查询花费100毫秒,则执行速度不可能超过10QPS。您问:“啊,但是要是使用更多的CPU内核呢?”
        明白了!使用另一个CPU内核意味着运行另一个线程,现在我们正朝着一个更低的上限快速前进:运行50个线程。但这正是MySQL试图做的。如前所述,MySQL非常雄心勃勃:它没有设置Threads_running的最大值,它将根据需要运行尽可能多的线程。QPS的上限(如果存在)非常高,而Threads_running的上限非常低。为什么是这样?
        运行50个线程是一个合理的要求。到2020年,甚至应该运行1,000个线程。让我用另一个问题回答这个问题:为什么丰田没有法拉利的速度和力量?丰田的最高时速为 200km/h,法拉利的最高时速为 340km/h。为什么丰田不能跑得更快呢?为什么MySQL不能走得更快?
        两者的答案在工程的每一个细节上。例如,法拉利跑得很快,因为它有一个非常强大的引擎,但你不能只关注这里。这辆车的每一个细节都设计得既能提供动力,又不会在动力之下断裂。例如,飞机可以以258公里/小时的速度起飞和飞行,因此一辆时速340公里的法拉利将能飞行,除非其车身的空气动力学使它保持在地面上。如果设计成这样,丰田汽车的时速可以达到340公里,而MySQL可以运行1000个线程。
        像丰田一样,MySQL构建良好、可靠,而且超出了您的需要,但它本身或其周围的任何硬件、操作系统和应用程序都不是设计成法拉利的。
        实际经验表明,对于运行50个线程以上的大多数应用程序,MySQL的性能不佳。早在2014年,MySQL专家Alexey Straganov就对MySQL5.6进行了基准测试,测试运行的Threads_running非常高:Percona Server: Improve Scalability with Percona Thread Pool。在运行64个threads running时性能达到峰值。这些是实验结果,实际的应用程序查询比综合基准查询更具挑战性。
        解决方案?通过分片扩展数据库。但这是另一个漫长的话题。
        让我们以积极的结尾结束。Vadim Tkachenko也许是全球MySQL性能方面的最杰出专家,他最近实现了100,000个线程的运行:MySQL Challenge: 100k Connections

    参考文档

    [1]Daniel Nichter.MySQL Threads Running How Hard is MySQL Working?[EB/OL].https://hackmysql.com/post/mysql-threads-running-how-hard-is-mysql-working/,2020-04-25.
    [2]Oracle Corporation.Server Status Variables[EB/OL].https://dev.mysql.com/doc/refman/8.0/en/server-status-variables.html,2021-01-01.

    展开全文
  • 本文整理汇总了Python中asyncio._get_running_loop方法的典型用法代码示例。如果您正苦于以下问题:Python asyncio._get_running_loop方法的具体用法?Python asyncio._get_running_loop怎么用?Python asyncio._get...

    本文整理汇总了Python中asyncio._get_running_loop方法的典型用法代码示例。如果您正苦于以下问题:Python asyncio._get_running_loop方法的具体用法?Python asyncio._get_running_loop怎么用?Python asyncio._get_running_loop使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在模块asyncio的用法示例。

    在下文中一共展示了asyncio._get_running_loop方法的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

    示例1: run_main

    ​点赞 6

    # 需要导入模块: import asyncio [as 别名]

    # 或者: from asyncio import _get_running_loop [as 别名]

    def run_main(main):

    """Main entrypoint for asyncio tasks.

    This differs from `asyncio.run` in one key way - the main task is cancelled

    *first*, then any outstanding tasks are cancelled (and logged, remaining

    tasks are indicative of bugs/failures).

    """

    if asyncio._get_running_loop() is not None:

    raise RuntimeError("Cannot be called from inside a running event loop")

    main_task = None

    loop = asyncio.new_event_loop()

    try:

    asyncio.set_event_loop(loop)

    main_task = loop.create_task(main)

    return loop.run_until_complete(main_task)

    finally:

    try:

    if main_task is not None:

    loop.run_until_complete(cancel_task(main_task))

    _cancel_all_tasks(loop)

    loop.run_until_complete(loop.shutdown_asyncgens())

    finally:

    asyncio.set_event_loop(None)

    loop.close()

    开发者ID:dask,项目名称:dask-gateway,代码行数:27,

    示例2: test_get_event_loop_returns_running_loop

    ​点赞 6

    # 需要导入模块: import asyncio [as 别名]

    # 或者: from asyncio import _get_running_loop [as 别名]

    def test_get_event_loop_returns_running_loop(self):

    class Policy(asyncio.DefaultEventLoopPolicy):

    def get_event_loop(self):

    raise NotImplementedError

    loop = None

    old_policy = asyncio.get_event_loop_policy()

    try:

    asyncio.set_event_loop_policy(Policy())

    loop = asyncio.new_event_loop()

    self.assertIs(asyncio._get_running_loop(), None)

    async def func():

    self.assertIs(asyncio.get_event_loop(), loop)

    self.assertIs(asyncio._get_running_loop(), loop)

    loop.run_until_complete(func())

    finally:

    asyncio.set_event_loop_policy(old_policy)

    if loop is not None:

    loop.close()

    self.assertIs(asyncio._get_running_loop(), None)

    开发者ID:ShikyoKira,项目名称:Project-New-Reign---Nemesis-Main,代码行数:26,

    示例3: get_running_loop

    ​点赞 5

    # 需要导入模块: import asyncio [as 别名]

    # 或者: from asyncio import _get_running_loop [as 别名]

    def get_running_loop():

    """Gets the currently running event loop

    Uses asyncio.get_running_loop() if available (Python 3.7+) or a backported

    version of the same function in 3.5/3.6.

    """

    try:

    loop = asyncio.get_running_loop()

    except AttributeError:

    loop = asyncio._get_running_loop()

    if loop is None:

    raise RuntimeError("no running event loop")

    return loop

    开发者ID:Azure,项目名称:azure-iot-sdk-python,代码行数:15,

    示例4: acquire_loop

    ​点赞 5

    # 需要导入模块: import asyncio [as 别名]

    # 或者: from asyncio import _get_running_loop [as 别名]

    def acquire_loop(running: bool = False) -> asyncio.AbstractEventLoop:

    """Gracefully acquire a loop.

    The function tries to get an event loop via :func:`asyncio.get_event_loop`.

    On fail, returns a new loop using :func:`asyncio.new_event_loop`.

    Parameters

    ----------

    running: :class:`bool`

    Indicates if the function should get a loop that is already running.

    """

    try:

    loop = asyncio._get_running_loop()

    except Exception: # an error might occur actually

    loop = None

    if running and loop is not None:

    return loop

    else:

    try:

    loop = asyncio.get_event_loop()

    if loop.is_running() and not running:

    # loop is running while we have to get the non-running one,

    # let us raise an error to go into clause.

    raise ValueError("Current event loop is already running.")

    except Exception:

    loop = asyncio.new_event_loop()

    return loop

    开发者ID:NeKitDS,项目名称:gd.py,代码行数:35,

    示例5: _get_running_loop

    ​点赞 5

    # 需要导入模块: import asyncio [as 别名]

    # 或者: from asyncio import _get_running_loop [as 别名]

    def _get_running_loop():

    return _running_loop._loop

    开发者ID:open-telemetry,项目名称:opentelemetry-python,代码行数:4,

    示例6: _get_event_loop

    ​点赞 5

    # 需要导入模块: import asyncio [as 别名]

    # 或者: from asyncio import _get_running_loop [as 别名]

    def _get_event_loop():

    current_loop = _get_running_loop()

    if current_loop is not None:

    return current_loop

    return asyncio.events.get_event_loop_policy().get_event_loop()

    开发者ID:open-telemetry,项目名称:opentelemetry-python,代码行数:7,

    示例7: _get_running_loop

    ​点赞 5

    # 需要导入模块: import asyncio [as 别名]

    # 或者: from asyncio import _get_running_loop [as 别名]

    def _get_running_loop():

    return _running_loop._loop

    开发者ID:eventbrite,项目名称:pysoa,代码行数:4,

    示例8: _get_event_loop

    ​点赞 5

    # 需要导入模块: import asyncio [as 别名]

    # 或者: from asyncio import _get_running_loop [as 别名]

    def _get_event_loop():

    current_loop = _get_running_loop()

    if current_loop is not None:

    return current_loop

    return asyncio.events.get_event_loop_policy().get_event_loop()

    开发者ID:eventbrite,项目名称:pysoa,代码行数:7,

    示例9: close

    ​点赞 5

    # 需要导入模块: import asyncio [as 别名]

    # 或者: from asyncio import _get_running_loop [as 别名]

    def close(self) -> None:

    """

    关闭天勤接口实例并释放相应资源

    Example::

    # m1901开多3手

    from tqsdk import TqApi

    from contextlib import closing

    with closing(TqApi()) as api:

    api.insert_order(symbol="DCE.m1901", direction="BUY", offset="OPEN", volume=3)

    """

    if self._loop.is_closed():

    return

    if self._loop.is_running():

    raise Exception("不能在协程中调用 close, 如需关闭 api 实例需在 wait_update 返回后再关闭")

    elif asyncio._get_running_loop():

    raise Exception(

    "TqSdk 使用了 python3 的原生协程和异步通讯库 asyncio,您所使用的 IDE 不支持 asyncio, 请使用 pycharm 或其它支持 asyncio 的 IDE")

    # 总会发送 serial_extra_array 数据,由 TqWebHelper 处理

    for _, serial in self._serials.items():

    self._process_serial_extra_array(serial)

    self._run_until_idle() # 由于有的处于 ready 状态 task 可能需要报撤单, 因此一直运行到没有 ready 状态的 task

    for task in self._tasks:

    task.cancel()

    while self._tasks: # 等待 task 执行完成

    self._run_once()

    self._loop.run_until_complete(self._loop.shutdown_asyncgens())

    self._loop.close()

    _clear_logs() # 清除过期日志文件

    开发者ID:shinnytech,项目名称:tqsdk-python,代码行数:34,

    示例10: sync_wrapper

    ​点赞 5

    # 需要导入模块: import asyncio [as 别名]

    # 或者: from asyncio import _get_running_loop [as 别名]

    def sync_wrapper(coro):

    def inner_sync_wrapper(self, *args, **kwargs):

    start_new_loop = None

    # try to get the running loop

    # `get_running_loop()` is new in Python 3.7, fall back on privateinternal for 3.6

    try:

    get_running_loop = asyncio.get_running_loop

    except AttributeError:

    get_running_loop = asyncio._get_running_loop

    # If there is no running loop we will need to start a new one and run it to completion

    try:

    if get_running_loop():

    start_new_loop = False

    else:

    start_new_loop = True

    except RuntimeError:

    start_new_loop = True

    if start_new_loop is True:

    f = asyncio.ensure_future(coro(self, *args, **kwargs))

    asyncio.get_event_loop().run_until_complete(f)

    f = f.result()

    else:

    # don't use create_task. It's python3.7 only

    f = asyncio.ensure_future(coro(self, *args, **kwargs))

    return f

    return inner_sync_wrapper

    # Monkey patch in our sync_wrapper

    开发者ID:FlorianKempenich,项目名称:Appdaemon-Test-Framework,代码行数:34,

    注:本文中的asyncio._get_running_loop方法示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。

    展开全文
  • is your activity running?”,从报错信息也可以看出需要activity的token。 2021-12-06 20:40:53.146 1910-5502/? W/WindowManager: Attempted to add application window with unknown token null. Aborting. 2021...
  • MySQL replication 中slave机器上有两个关键的进程,死一个都不行,一个是slave_sql_running,一个是Slave_IO_Running,一个负责与主机的io通信,一个负责自己的slave mysql进程。如果是slave_io_running no了,那么...
  • Looks like the mysql server is not started. Look to the official documentation of MySQL how you can start a service under ... Install the..., If it is there and not running then start it. It wil...
  • ERROR running qmake

    2021-08-04 09:53:21
    1>Reading Qt configuration...GHViewerDetect.vcxproj : error : ERROR running qmake 1>GHViewerDetect.vcxproj : error : qmake: (D:/SoftWare/QT5.9.3/5.9.3/msvc2017_64/bin/qmake) 1>GHViewerDetect.vcx
  • 发现k8s-node1节点的coredns出现0/1 Running状态; 查看详细信息:kubectl describe pod coredns-57d4cbf879-xgk2f -n kube-system [root@k8s-master kubernetes]# kubectl describe pod coredns-57d4cbf879-...
  • show slave status\G.......Relay_Log_File: localhost-relay-bin.000535Relay_Log_Pos:21795072Relay_Master_Log_File: localhost-bin.000094Slave_IO_Running: YesSlave_SQL_Running: NoRe...
  • eth0 up但是没有running的小问题

    千次阅读 2021-05-14 13:16:08
    但是ifconfig eth0结果中没有running,tcpdump显示 PC eth0上既收不到包也没发出去包。奇怪,检查ifcfg-eth0的配置文件,没发现异常。该pc为双网卡,eth1工作正常,gw也在eth1上配置。pc自己ping eth0地址,pi...
  • 报错:vmware tools is not running。 查看租户的Admin UI,发现用户事件里,确实有相关桌面故障报错: 二、分析处理 查看事件台有如下事件: 2)可能原因:当C:\Program Files\VMware Tools和C:\...
  • 关于dba_datapump_jobs中的not running的作业的清除的方法。not running状态的作业有两个意思1.作业被暂停。2.dw/dm进程crash.但是master table还存在其实多数情况下是属于2.关于怎么清除相当作业.可以看下面的MOS...
  • 本文主要讲述如何解决HDFS运行Balancer提示“Another Balancer is running..”导致Balancer失败的问题。 测试环境: 1.操作系统版本为Redhat7.2 2.CM和CDH版本为5.11.2 3.HDFS已启用HA 文章目录结构:...
  • 遇到一个线上问题,应用执行sql报错,排查mysql:select * from information_schema.innodb_trx;select * from information_schema.INNODB_LOCKS;select * from information_schema.INNODB_LOCK_WAITS;...
  • HBase报错server is not running yet解决方法问题描述解决过程 问题描述 尝试安装Hadoop 3.1.1与HBase 2.3.5,安装并配置完成后,Hadoop正常运行,HBase启动Shell后运行list指令测试,出现如下错误 ERROR: org....
  • Oracle dba_datapump_jobs中not running作业的清除方法 not running状态的作业有两个意思 1.作业被暂停。 2.dw/dm进程crash.但是master table还存在 其实多数情况下是属于2.关于怎么清除相当作业.可以看下面的...
  • 诡异的一点,在对应一直在running的事务,发现了trx_query是为空的,(经人指点之后,说这是事务没有提交也没有回滚时出现的状态)。在 kill 对应的trx_mysql_thread_id 之后,让测试并发的请求该接口,也没有出现...
  • torch.nn.BatchNorm2d(num_features,eps=1e-05,momentum=0.1,affine=True,track_running_stats=True,device=None,dtype=None) 参数: Num_features:输入 (N,C,H,W) Eps: 为保证数值稳定性在分母上加入的一个极...
  • waiting for more updates Master_Retry_Count: 86400 ~~ 1 row in set (0.00 sec) ERROR: No query specified 其中Slave_IO_Running: Yes Slave_SQL_Running: Yes状态需要是Yes表示成功,接下来就可以进行验证操作...
  • 二、Slave两个关键进程mysql replication 中slave机器上有两个关键的进程,死一个都不行,一个是slave_sql_running,一个是Slave_IO_Running,一个负责与主机的io通信,一个负责自己的slave mysql进程。三、如果是S....
  • I am running MySql 5.6 and I noticed that the CPU utilization never crosses 50% on an m1.large aws instance, when I benchmark the server (a few hundred queries executed in parallel over a period of ti...
  • ---恢复内容开始---摘要Qt帮助文档中是这样介绍的:bool QThread::isRunning () constReturnstrue if the thread is running; otherwise returns false.See also isFinished().解释的一本正经,使用却不然。问题:...
  • 前言:运营人员反映,有一单子提交卡住了,页面一直没有返回。...2,最后去看下是否是表锁住了,查看到有2个事务一直RUNNING,没有结束。,mysql> select * from INNODB_TRXG;3,通过trx_mysql_thread_id: ...
  • gradle build running很慢

    千次阅读 2020-12-20 18:17:11
    gradle build running很慢 本人测试ok: Android Studio Gradle Build Running 特别慢的问题 有位同学说: Gradle Build Running每次都要2分钟,另一个同学回答说:我特想知道你那2分钟在干嘛?盯着屏幕发呆?我想说...
  • Slave_SQL_Running: No mysql同步故障解决今天检查数据库发现一台MySQL Slave未和主机同步,查看Slave状态:mysql> show slave status\GSlave_IO_Running: YesSlave_SQL_Running: NoLast_Errno: 1062....Seconds_...
  • [WARNING] [apscheduler.scheduler] Execution of job "xxx” next run at: “xxx" skipped: maximum number of running instances reached (1) 可能的原因 模式为interval,设置的seconds小于任务执行所需时间,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 955,812
精华内容 382,324
关键字:

running