精华内容
下载资源
问答
  • 调度过程 以下就将会详细介绍golang的调度流程,方便阅读,将会省略部分无关代码。 初始化 调度器的初始化从 schedinit()函数开始,将会设置m最大个数(maxmcount)及p最大个数(GOMAXPROCS)等 func schedinit() { ...

    调度过程

    以下就将会详细介绍golang的调度流程,方便阅读,将会省略部分无关代码。

    初始化

    调度器的初始化从 schedinit()函数开始,将会设置m最大个数(maxmcount)及p最大个数(GOMAXPROCS)等

    func schedinit() {
        sched.maxmcount = 10000  // 设置m的最大值为10000
        mcommoninit(_g_.m) //初始化当前m
        // 确认P的个数
        // 默认等于cpu个数,可以通过GOMAXPROCS环境变量更改
        procs := ncpu
        if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
            procs = n
        }
        // 调整P的个数,这里是新分配procs个P
        // 这个函数很重要,所有的P都是从这里分配的,以后也不用担心没有P了
        if procresize(procs) != nil {
            throw("unknown runnable goroutine during bootstrap")
        }
        ...
    }
    

    procresize方法主要完成以下任务:

    1. 比较目标个数和原始p的个数,进行全局缓存的扩容或收缩
    2. 遍历p的缓存,将未初始化的p进行初始化
    3. 对于收缩的情况,将收缩的p进行回收处理
    4. 分别将空闲的p和有任务的p加入空闲链表和工作链表

    下面是procresize()的源码:

    //全局数据结构:
    allp []*p // len(allp) == gomaxprocs; may change at safe points, otherwise immutable
    sched schedt //全局调度器(综述文中有介绍)
    
    // 所有的P都在这个函数分配,不管是最开始的初始化分配,还是后期调整
    func procresize(nprocs int32) *p {
        ...
        old := gomaxprocs
    	// 扩张allp数组
    	if nprocs > int32(len(allp)) {
    		lock(&allpLock)
    		if nprocs <= int32(cap(allp)) {
    			allp = allp[:nprocs]
    		} else {
    			// 分配nprocs个*p
    			nallp := make([]*p, nprocs)
    			copy(nallp, allp[:cap(allp)])
    			allp = nallp
    		}
    		unlock(&allpLock)
    	}
    	// 初始化新的p
    	for i := int32(0); i < nprocs; i++ {
    		pp := allp[i]
    		if pp == nil {
    			pp = new(p)
    		    ...
    			// 将pp保存到allp数组里, allp[i] = pp
    			atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
    		}
            ...
    	}
    	// 释放无用的p
    	for i := nprocs; i < old; i++ {
    		p := allp[i]
    		// 任务转移
    		// 本地任务队列转换到全局队列
    		for p.runqhead != p.runqtail {
    			p.runqtail--
    			gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
    			globrunqputhead(gp)
    		}
    		// 优先执行的也转移到全局
    		if p.runnext != 0 {
    			globrunqputhead(p.runnext.ptr())
    			p.runnext = 0
    		}
    		// 后台标记的g也转移
    		if gp := p.gcBgMarkWorker.ptr(); gp != nil {
    			casgstatus(gp, _Gwaiting, _Grunnable)
    			globrunqput(gp)
    			p.gcBgMarkWorker.set(nil)
    		}
    		// 做一些内存释放等操作
           ...
    	}
        ...
        //将p放入队列
    	var runnablePs *p
    	for i := nprocs - 1; i >= 0; i-- {
    		p := allp[i]
    		// 如果是当前的M绑定的P,不放入P空闲链表
    		// 否则更改P的状态为_Pidle,放入P空闲链表
    		if _g_.m.p.ptr() == p {
    			continue
    		}
    		p.status = _Pidle
    		if runqempty(p) { 
    			pidleput(p)// 将空闲p放入全局空闲链表
    		} else {
               // 非空闲的通过绑定m,链起来     
    			p.m.set(mget())
    			p.link.set(runnablePs)
                // 最后一个空闲的不加入空闲列表 直接返回去调度使用
    			runnablePs = p
    		}
    	}
    }
    

    新建的无任务p都会被放到空闲链表中:

    func pidleput(_p_ *p) {
        if !runqempty(_p_) {
            throw("pidleput: P has non-empty run queue")
        }
        _p_.link = sched.pidle //通过p的link形成链表
        sched.pidle.set(_p_)
        // 将sched.npidle加1
        atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic
    }
    

    默认只有schedinit和startTheWorld会调用procresize()schedinit初始化p,startTheWorld会激活所有有任务的p。
    完成调度器初始化后,系统会引导生成 main goroutine,之前是在全局的g0上执行初始化工作

    golang支持在运行间修改p数量:runtime.GOMAXPROCS(),但是带价很大,会触发STW

     lock(&sched.lock)
        ret := int(gomaxprocs)
        unlock(&sched.lock)
        if n <= 0 || n == ret {
            return ret
        }
        // 有stw和重启世界的过程
        stopTheWorld("GOMAXPROCS")
        // newprocs will be processed by startTheWorld
        newprocs = int32(n)
        startTheWorld()
        return ret
    

    以上便是golang初始化调度器的所有步骤,具体:

    1. 调用schedinit,初始化maxmcount和gomaxprocs的数量
    2. sechdinit中调用procresize(),初始化所有的p,并放入空闲链表中
    3. schedinit结束后,引导创建main goroutine,执行main(之前是在全局的g0中执行),汇编执行引导,文中并没有描述
    4. 运行时可以调用runtime.GOMAXPROCS()函数修改p的数量,会触发STW,有带价。如果真的有需求,可以考虑启动前修改系统环境变量实现。

    g的创建

    在编写程序中,使用 go func() {}来创建一个goroutine(g),这条语句会被编译器翻译成函数 newproc()。

    func newproc(siz int32, fn *funcval) {
        //用fn + PtrSize 获取第一个参数的地址,也就是argp
        //这里要了解一下go的堆栈
        argp := add(unsafe.Pointer(&fn), sys.PtrSize)
        //用siz - 8 获取pc地址 (汇编实现)
        pc := getcallerpc()
        // 用g0的栈创建G对象
        systemstack(func() {
            newproc1(fn, (*uint8)(argp), siz, pc)
        })
    }
    

    了解一下保存需要执行的业务函数的funcval:
    funcval 是一个变长结构,第一个成员是函数指针,往后是fn的参数,个数长度可变,但是起始位置固定(在有参数的的情况下)。将*funcval的地址跳过一个指针长度(fn)便是参数的起始地址了。

    // 上面的 add 是跳过这个 fn,到参数的起始位置
    type funcval struct {
        fn uintptr
        // variable-size, fn-specific data here
    }
    

    newproc()获取到参数的地址和callerpc,然后调用newproc1().
    流程如下图:
    在这里插入图片描述

    代码如下:

    // 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行
    func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) {
        _g_ := getg()
        if fn == nil {
            _g_.m.throwing = -1 // do not dump full stacks
            throw("go of nil func value")
        }
        _g_.m.locks++ // disable preemption because it can be holding p in a local var
        siz := narg
        // 从m中获取p
        _p_ := _g_.m.p.ptr()
        // 从gfree list获取g
        newg := gfget(_p_)
        // 如果没获取到g,则新建一个
        if newg == nil {
            // 分配栈为 2k 大小的G对象
            newg = malg(_StackMin)
            casgstatus(newg, _Gidle, _Gdead) //将g的状态改为_Gdead
            // 添加到allg数组,防止gc扫描清除掉
            allgadd(newg) 
        }
        // 参数大小+稍微一点空间
        totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize 
        totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
        // 新协程的栈顶计算,将栈顶减去参数占用的空间
        sp := newg.stack.hi - totalSize
        spArg := sp
        // 如果有参数
        if narg > 0 {
            // copy参数到栈上
            memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
            ... //一些gc相关的工作省略
        }
        // 初始化G的gobuf,保存sp,pc,任务函数等
        memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
        newg.sched.sp = sp
        newg.stktopsp = sp
        // 保存goexit的地址到sched.pc,后面会调节 goexit 作为任务函数返回后执行的地址,所以goroutine结束后会调用goexit
        newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
        // sched.g保存当前新的G
        newg.sched.g = guintptr(unsafe.Pointer(newg))
        // 将当前的pc压入栈,保存g的任务函数为pc
        gostartcallfn(&newg.sched, fn)
        // gopc保存newproc的pc
        newg.gopc = callerpc
        // 任务函数的地址
        newg.startpc = fn.fn
        ...
        // 更改当前g的状态为_Grunnable
        casgstatus(newg, _Gdead, _Grunnable)
        // 生成唯一的goid
        newg.goid = int64(_p_.goidcache)
        // 将当前新生成的g,放入队列
        runqput(_p_, newg, true)
        // 如果有空闲的p 且 m没有处于自旋状态 且 main goroutine已经启动,那么唤醒某个m来执行任务
        if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
            wakep()
        }
    }
    

    g 默认会复用,会从p的free中获取,当p free为空,从全局的schedt中的gfreeStack或者gfreeNoStack中拉取到本地freelist

    //从缓存列表获取一个空闲的g
    func gfget(_p_ *p) *g {
    retry:
        gp := _p_.gfree
        if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) {
            //本地空闲队列为空的时候,从全局中获取,需要加锁
            lock(&sched.gflock)
            //一次转移最多32个空闲到本地p
            for _p_.gfreecnt < 32 {
                if sched.gfreeStack != nil {
                    gp = sched.gfreeStack                    //获取g
                    sched.gfreeStack = gp.schedlink.ptr() //链表头指向下一个g
                } else if sched.gfreeNoStack != nil {
                    gp = sched.gfreeNoStack
                    sched.gfreeNoStack = gp.schedlink.ptr()
                } else {
                    break
                }
                _p_.gfreecnt++
                sched.ngfree--
                gp.schedlink.set(_p_.gfree)
                _p_.gfree = gp
            }
            unlock(&sched.gflock)
            goto retry
        }
        // 获取到g
        if gp != nil {
            // 调整链表头及个数
            _p_.gfree = gp.schedlink.ptr()
            _p_.gfreecnt--
            // 堆栈为空就分配
            if gp.stack.lo == 0 {
                // Stack was deallocated in gfput. Allocate a new one.
                systemstack(func() {
                    gp.stack = stackalloc(_FixedStack)
                })
                gp.stackguard0 = gp.stack.lo + _StackGuard
            } else {
              ...
            }
        }
        return gp
    }
    

    当一次调度执行完g后,调度器会将g放回p或者全局队列,当空闲任务个数超过64个的时候,会调整部分到全局任务队列,直到p本地空闲队列为32个的时候停止。

    func gfput(_p_ *p, gp *g) {
        // 处理堆栈
        stksize := gp.stack.hi - gp.stack.lo
        // 不是默认堆栈,直接释放(扩张后的堆栈可能会很大,留着占内存,下次重新分配就好了)
        if stksize != _FixedStack {
            stackfree(gp.stack)
            gp.stack.lo = 0
            gp.stack.hi = 0
            gp.stackguard0 = 0
        }
        // 处理p的复用链表
        gp.schedlink.set(_p_.gfree)
        _p_.gfree = gp
        _p_.gfreecnt++
        // 超过64个,放回部分到全局队列
        if _p_.gfreecnt >= 64 {
            lock(&sched.gflock)
            for _p_.gfreecnt >= 32 {
                _p_.gfreecnt--
                gp = _p_.gfree
                _p_.gfree = gp.schedlink.ptr()
                if gp.stack.lo == 0 {
                    gp.schedlink.set(sched.gfreeNoStack)
                    sched.gfreeNoStack = gp
                } else {
                    gp.schedlink.set(sched.gfreeStack)
                    sched.gfreeStack = gp
                }
                sched.ngfree++
            }
            unlock(&sched.gflock)
        }
    }
    

    malg()函数创建一个新的g,包括为该g申请栈空间(支持程序分配栈的系统)。系统中的每个g都是由该函数创建而来的。

    //一般传入的堆栈大小默认为2k
    func malg(stacksize int32) *g {
        newg := new(g)
        if stacksize >= 0 {
            stacksize = round2(_StackSystem + stacksize)// 对齐
            systemstack(func() {
                newg.stack = stackalloc(uint32(stacksize))// 调用 stackalloc 分配栈
            })
            newg.stackguard0 = newg.stack.lo + _StackGuard        // 设置 stackguard
            newg.stackguard1 = ^uintptr(0)
        }
        return newg
    }
    

    创建成功会被放入到 allg的全局队列中,gc回收遍历扫描会使用,也防止gc回收分配好的g

    var (
        // 存储所有g的数组
        allgs []*g
        // 保护allgs的互斥锁
        allglock mutex  
        allglen uintptr
    )
    func allgadd(gp *g) {
        lock(&allglock)
        allgs = append(allgs, gp)
        allglen = uintptr(len(allgs))
        unlock(&allglock)
    }
    
    

    当获取到一个可用的g之后:

    1. 初始化g的gobuf信息(上下文信息,包括sp,pc以及函数g执行完之后的返回指令pc(goexit函数))
    2. 添加到g到p的本地队列
    3. p的本地队列满了,便添加到全局队列,顺便转移部分本地队列的数据到全局队列,供其他的p获取。
    4. 若存在有空闲的p及未自旋的m,调用wakep()方法,这里会获取一个空闲的m或新建一个m,去和空闲的p绑点,调度。后文会有对该方法的解释
    // 尝试将G放到P的本地队列
    func runqput(_p_ *p, gp *g, next bool) {
        if next {
        retryNext:
            oldnext := _p_.runnext
            // 将G赋值给_p_.runnext
            // 最新的G优先级最高,最可能先被执行。
            // 剩下的G如果go运行时调度器发现有空闲的core,就会把任务偷走点,
            // 让别的core执行,这样才能充分利用多核,提高并发能
            if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
                goto retryNext
            }
            gp = oldnext.ptr()
        }
    retry:
        h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
        t := _p_.runqtail
        // 如果本地队列还有剩余的位置,将G插入本地队列的尾部
        if t-h < uint32(len(_p_.runq)) {
            _p_.runq[t%uint32(len(_p_.runq))].set(gp)
            atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
            return
        }
        // 本地队列已满,放入全局队列
        if runqputslow(_p_, gp, h, t) {
            return
        }
        goto retry
    }
    
    // 如果本地满了以后,一次将本地的一半的G转移到全局队列
    func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
        //首先转移一半到全局队列,省略
        ...
        // 将拿到的G,添加到全局队列末尾, 全局数据处理是需要加锁的,所以slow。
        lock(&sched.lock)
        globrunqputbatch(batch[0], batch[n], int32(n+1))
        unlock(&sched.lock)
        return true
    }
    

    放入队列时,p队列满了会分一半到全局队列,其他的p可以获取全局队列中的g执行。newproc1最后会唤醒其他m p去执行任务

    到这里go fun()流程就完成了。 g不会被删除,但是会清理过大的栈空间,防止内存爆炸。gc过程中也会调用shrinkstack()将栈空间回收。这就是golang和可以创建大量g来支持并发的原因之一,g是复用的并且初始栈大小只有2k,超过2k的栈在g空闲的时候是会被回收的,这也减轻了系统内存的压力

    系统线程m

    在golang中有三种系统线程:

    • 主线程:golang程序启动加载的时候就运行在主线程上,代码中由一个全局的m0表示
    • 运行sysmon的线程
    • 普通用户线程,用来与p绑定,运行g中的任务的线程,
      主线程和运行sysmon都是单实例,单独一个线程。而用户线程会有很多事例,他会根据调度器的需求新建,休眠和唤醒。

    在newproc1中我们发现创建g成功后,会尝试wakep唤醒一个用户线程m执行任务,这里详细描述下这个方法:

    // 尝试获取一个M来运行可运行的G
    func wakep() {
        // 如果有其他的M处于自旋状态,那么就不管了,直接返回
        // 因为自旋的M回拼命找G来运行的,就不新找一个M(劳动者)来运行了。
        if !atomic.Cas(&sched.nmspinning, 0, 1) {
            return
        }
        startm(nil, true)
    }
    
    // startm是启动一个M,先尝试获取一个空闲P,如果获取不到则返回
    // 获取到P后,在尝试获取M,如果获取不到就新建一个M
    func startm(_p_ *p, spinning bool) {
        lock(&sched.lock)
        // 如果P为nil,则尝试获取一个空闲P
        if _p_ == nil {
            _p_ = pidleget()
            if _p_ == nil {
                unlock(&sched.lock)
                return
            }
        }
        // 获取一个空闲的M
        mp := mget()
        unlock(&sched.lock)
        if mp == nil {
            var fn func()
            if spinning {
                // The caller incremented nmspinning, so set m.spinning in the new M.
                fn = mspinning
            }
            // 如果获取不到,则新建一个,新建完成后就立即返回
            newm(fn, _p_)
            return
        }
        // The caller incremented nmspinning, so set m.spinning in the new M.
        mp.spinning = spinning //标记该M是否在自旋
        mp.nextp.set(_p_) // 暂存P
        notewakeup(&mp.park) // 唤醒M
    }
    

    上述代码可以发现m回去调用mget()方法,获取不成功后才会选择创建,这里表明m也是支持复用的。获取不到任务的m也会被加入到空闲的m链表中,等待唤醒。

    下面从新建m开始:

    func newm(fn func(), _p_ *p) {
        // 根据fn和p和绑定一个m对象
        mp := allocm(_p_, fn)
        // 设置当前m的下一个p为_p_
        mp.nextp.set(_p_)
        ...
        // 真正的分配os thread
        newm1(mp)
    }
    
    // 分配一个m,且不关联任何一个os thread
    func allocm(_p_ *p, fn func()) *m {
        _g_ := getg()
        _g_.m.locks++ // disable GC because it can be called from sysmon
        if _g_.m.p == 0 {
            acquirep(_p_) // 如果没有绑定p的话,申请一个p,只有p有cache,可以供m来申请内存。
        }
        ...
        mp := new(m)
        mp.mstartfn = fn
        mcommoninit(mp)   //初始化当前m
        // 给g0分配一定的堆栈
        if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" {
            mp.g0 = malg(-1)   //这些系统必须使用系统的栈
        } else {
            mp.g0 = malg(8192 * sys.StackGuardMultiplier) //go的栈是大小是8k
        }
        mp.g0.m = mp
        //绑定的p和当前m的p一样,解绑
        if _p_ == _g_.m.p.ptr() {
            releasep()
        }
        return mp
    }
    

    m初始化:检查数量,超过10000个异常停机;接受信号的g创建初始化;

    func mcommoninit(mp *m) {
        _g_ := getg()
        // g0 stack won't make sense for user (and is not necessary unwindable).
        if _g_ != _g_.m.g0 {
            callers(1, mp.createstack[:])
        }
        lock(&sched.lock)
        if sched.mnext+1 < sched.mnext {
            throw("runtime: thread ID overflow")
        }
        mp.id = sched.mnext
        sched.mnext++
        // m数量检查
        checkmcount()
        ...
        // signal g创建初始化
        mpreinit(mp)
        if mp.gsignal != nil {
            mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
        }
        //加入全局m链表
        mp.alllink = allm //链表
        atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
        unlock(&sched.lock)
    }
    
    func newm1(mp *m) {
        // 对cgo的处理
        ...
        execLock.rlock() // Prevent process clone.
        // 创建一个系统线程,并且传入该 mp 绑定的 g0 的栈顶指针
        // 让系统线程执行 mstart 函数,后面的逻辑都在 mstart 函数中
        newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
        execLock.runlock()
    }
    

    每个操作系统分配系统线程的流程是不一样的,下面代码展示了在linux和windows系统下该函数的实现,其他的环境暂时不做讨论:

    //linux
    // 分配一个系统线程,且完成 g0 和 g0上的栈分配
    // 传入 mstart 函数,让线程执行 mstart
    func newosproc(mp *m, stk unsafe.Pointer) { 
        // Disable signals during clone, so that the new thread starts
        // with signals disabled. It will enable them in minit.
        var oset sigset
        sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
        ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
        sigprocmask(_SIG_SETMASK, &oset, nil)
       ...
    }
    
    //windows
    func newosproc(mp *m, stk unsafe.Pointer) {
        const _STACK_SIZE_PARAM_IS_A_RESERVATION = 0x00010000
        // stackSize must match SizeOfStackReserve in cmd/link/internal/ld/pe.go.
        const stackSize = 0x00200000*_64bit + 0x00100000*(1-_64bit)
        thandle := stdcall6(_CreateThread, 0, stackSize,
            funcPC(tstart_stdcall), uintptr(unsafe.Pointer(mp)),
            _STACK_SIZE_PARAM_IS_A_RESERVATION, 0)
        ...
        // Close thandle to avoid leaking the thread object if it exits.
        stdcall1(_CloseHandle, thandle)
    }
    

    创建m的时候,会给m的g0分配分配栈空间。g0是该m私有的,golang中系统命令都是在g0上执行的,函数systemstack(func())(汇编实现)会将方法转到g0栈上执行,然后转回当前的g。管理命令操作执行都在g0栈上执行,隔离了业务内容和指令的执行,避免做g共享内存。

    下面将描述获取一个空闲的m
    在startm中,m是优先去空闲队列中获取,未获取到空闲队列才选择创建

    func mget() *m {
       //从idle 的m链表中搞一个
        mp := sched.midle.ptr()
        if mp != nil {
            sched.midle = mp.schedlink
            sched.nmidle--
        }
        return mp
    }
    

    被唤醒的进入工作状态的m会陷入调度循环,竭尽全力获取g执行,当找不到可执行的任务,或者任务用时过长,系统调用阻塞等原因被剥夺p,m会再次进入休眠状态。

    // 停止M,使其休眠,但不会被系统回收
    // 调用notesleep使M进入休眠,唤醒后就会从休眠出直接开始执行
    // 线程可以处于三种状态: 等待中(Waiting)、待执行(Runnable)或执行中(Executing)。
    func stopm() {
        _g_ := getg()
    retry:
        lock(&sched.lock)
        mput(_g_.m)
        unlock(&sched.lock)
        // 在lock_futex.go 中
        // 休眠,等待被唤醒
        notesleep(&_g_.m.park)
        noteclear(&_g_.m.park)
       ...
        // 这里是被wakenote唤醒后的操作了
        // 绑定p
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    
    // 把mp添加到midle列表
    func mput(mp *m) {
        mp.schedlink = sched.midle
        sched.midle.set(mp)
        sched.nmidle++
        checkdead()
    }
    

    到这里可以看到,m也是不会主动删除释放的,支持复用。当大量的m被创建的时候,对性能是有影响的:

    • 系统线程调度上下文切换是有消耗的
    • m本身是有占资源的(自身的栈内存,寄存器等)

    执行goroutine

    m 执行 g有两个起点:

    1. 从m的启动函数(创建m的时候绑定的)mstart()开始,触发m的调度
    2. 调度过程中调用stopm()睡眠后,通过 notewakeup(&mp.park)恢复m的执行,并从stopm()的位置开始执行,重新调度。

    mstart() 流程:

    func mstart() {
        _g_ := getg()
        osStack := _g_.stack.lo == 0     // 检查栈边界,为0的话是系统栈
        if osStack {
            // 处理系统栈
            size := _g_.stack.hi
            if size == 0 {
                size = 8192 * sys.StackGuardMultiplier
            }
            _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
            _g_.stack.lo = _g_.stack.hi - size + 1024
        }
        _g_.stackguard0 = _g_.stack.lo + _StackGuard
        _g_.stackguard1 = _g_.stackguard0
        mstart1(0)   //启动m
    }
    
    // dummy一直为0,给getcallersp当参数
    func mstart1(dummy int32) {
        _g_ := getg()
        if _g_ != _g_.m.g0 {
            throw("bad runtime·mstart")
        }
        // 记录mstart1 函数结束后的地址pc和mstart1 函数参数到当前g的运行现场
        save(getcallerpc(), getcallersp(unsafe.Pointer(&dummy)))
        asminit()
        // 初始化m
        minit()
        // 如果当前g的m是初始m0,执行mstartm0()
        if _g_.m == &m0 {
            // 对于初始m,需要一些特殊处理
            mstartm0()
        }
        // 如果有m的起始任务函数,则执行,比如 sysmon 函数
        if fn := _g_.m.mstartfn; fn != nil {
            fn()
        }
        // GC startworld的时候,会检查闲置m是否少于并发标记需求(needaddgcproc)
        // 新建m,设置 m.helpgc=-1,加入限制队列等待唤醒
        if _g_.m.helpgc != 0 {
            _g_.m.helpgc = 0
            stopm()
        } else if _g_.m != &m0 {
            // 绑定p
            acquirep(_g_.m.nextp.ptr())
            _g_.m.nextp = 0
        }
        // 进入调度,而且不会在返回
        schedule()
    }
    

    绑定号p之后,m拥有了可分配cache和执行队列,进入核心调度循环,核心调度从schedule函数开始,调度完一次之后会引导重新执行schedule,实现循环调度。

    schedule方法会主要功能:尽可能给m找到可以运行的g,这其中主要是分为以下几种:

    1. 当前m已经指定了g。该情况下会将m与p解绑,然后m睡眠,等待被绑定的g被调度然后唤醒该m执行该g
    2. gc触发STW的时候,m直接睡眠
    3. gcmark(标记)阶段,大概有1/4的g用来并行标记,这里也会检测是否调度gc标记的g(gcBlackenEnabled!=0)
    4. 调度61次后会从全局的g队列中尝试获取g
    5. 全局队列中未获取到便去绑定p的本地任务队列获取g
    6. 还未获取便调用findrunnable()去尽可能获取,取不到便会睡眠,不返回。
    7. 获取到的g有绑定的m,交出当前的p和g,与指定的m绑定,唤醒指定的m,自己睡眠,等待唤醒。
    8. 执行获取到的g

    综上,在该方法中,m在以下情况会休眠:

    • 当m.lockedg != 0(m有绑定固定执行的g),m会在stoplockedm()解绑p并休眠,等待被绑定的g被其他m调度的时候来唤醒该m,直接被绑定的g
    • sched.gcwaiting != 0(gc STW)m会休眠
    • findrunnable()中想尽一切办法都没有获取到可执行的g的时候,m会休眠
    • 获取到g的时候,g绑定了其他的m(gp.lockedm != 0),当前m会解绑p,休眠,然后唤醒g绑定的m,执行该g

    当m休眠被唤醒的时候,并不会从固定的位置开始执行,会直接从休眠的位置开始执行。

    以下是schedule方法的g获取流程(省略了lockm和lockg的处理以及gc STW的处理):

    在这里插入图片描述

    另外一个展示当m绑定了g即该m只能执行特定的g(m.lockedg != 0, g.lockedm != 0)以及检测gc STW的调度:

    在这里插入图片描述

    具体代码:

    func schedule() {
        _g_ := getg()
        if _g_.m.locks != 0 {
            throw("schedule: holding locks")
        }
        // 如果当前M锁定了某个G,那么应该交出P,进入休眠
        // 等待某个M调度拿到lockedg,然后唤醒lockedg的M
        if _g_.m.lockedg != 0 {
            stoplockedm()
            execute(_g_.m.lockedg.ptr(), false) // Never returns.
        }
    top:
        // 如果当前GC需要停止整个世界(STW), 则调用gcstopm休眠当前的M
        if sched.gcwaiting != 0 {
            gcstopm()
            goto top
        }
        var gp *g
        var inheritTime bool
        // 如果当前GC正在标记阶段, 则查找有没有待运行的GC Worker, GC Worker也是一个G
        if gp == nil && gcBlackenEnabled != 0 {
            gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
        }
        if gp == nil {
            // 每隔61次调度,尝试从全局队列种获取G
            // ? 为何是61次? https://github.com/golang/go/issues/20168
            if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
                lock(&sched.lock)
                gp = globrunqget(_g_.m.p.ptr(), 1)
                unlock(&sched.lock)
            }
        }
        if gp == nil {
            // 从p的本地队列中获取
            gp, inheritTime = runqget(_g_.m.p.ptr())
        }
        if gp == nil {
            // 想尽办法找到可运行的G,找不到就不用返回了
            gp, inheritTime = findrunnable() // blocks until work is available
        }
        // M即将要执行G,如果M还是spinning,那么重置为false
        if _g_.m.spinning {
            // 重置为非自旋,并根据需要唤醒或新建一个M来运行
            resetspinning()
        }
        // 如果找到的G已经锁定M了,dolockOSThread和cgo会将G和M绑定
        // 则用startlockedm执行,将P和G都交给对方lockedm,唤醒绑定M-lockedm,自己回空闲队列。
        if gp.lockedm != 0 {
            startlockedm(gp)
            goto top
        }
        execute(gp, inheritTime)
    }
    

    excute()方法将会去执行g

    // 执行goroutine的任务函数
    // 如果inheritTime=true,那么当前的G继承剩余的时间片,其实就是不让schedtick累加,
    // 这样的话就不会触发每61次从全局队列找G
    func execute(gp *g, inheritTime bool) {
        _g_ := getg()
        // 更改gp的状态为_Grunning
        casgstatus(gp, _Grunnable, _Grunning)
        // 置等待时间为0
        gp.waitsince = 0
        // 置可抢占标志为fasle
        gp.preempt = false
        gp.stackguard0 = gp.stack.lo + _StackGuard
        // 如果不是inheritTime,schedtick累加
        if !inheritTime {
            _g_.m.p.ptr().schedtick++
        }
        // 当前的M的G改为gp
        _g_.m.curg = gp
        // gp的M改为当前的M
        gp.m = _g_.m
        ...
        // gogo由汇编实现, runtime/asm_amd64.s
        // 实现当前的G切换到gp,然后用JMP跳转到G的任务函数
        // 当任务函数执行完后会调用 goexit
        gogo(&gp.sched)
    }
    

    gogo由汇编实现,主要是由g0切换到g栈,然后执行函数。

    // 从g0栈切换到G栈,然后JMP到任务函数代码
    TEXT runtime·gogo(SB), NOSPLIT, $16-8
        MOVQ    buf+0(FP), BX       // gobuf
        MOVQ    gobuf_g(BX), DX   //G
        MOVQ    0(DX), CX       // make sure g != nil
        get_tls(CX)
        MOVQ    DX, g(CX)
        MOVQ    gobuf_sp(BX), SP    // restore SP 恢复sp寄存器值切换到g栈
        MOVQ    gobuf_ret(BX), AX
        MOVQ    gobuf_ctxt(BX), DX
        MOVQ    gobuf_bp(BX), BP
        MOVQ    $0, gobuf_sp(BX)    // clear to help garbage collector
        MOVQ    $0, gobuf_ret(BX)
        MOVQ    $0, gobuf_ctxt(BX)
        MOVQ    $0, gobuf_bp(BX)
        MOVQ    gobuf_pc(BX), BX // 获取G任务函数的地址
        JMP BX                           // 转到任务函数执行
    

    当调用任务函数结束返回的时候,会执行到我们在创建g流程中就初始化好的指令:goexit

    TEXT runtime·goexit(SB),NOSPLIT,$0-0
        BYTE    $0x90   // NOP
        CALL    runtime·goexit1(SB) // does not return 调用goexit1函数
        // traceback from goexit1 must hit code range of goexit
        BYTE    $0x90   // NOP
    

    下面是goexit1()函数:

    // 当goroutine结束后,会调用这个函数
    func goexit1() {
        // 切换到g0执行goexit0
        mcall(goexit0)
    }
    
    func goexit0(gp *g) {
        _g_ := getg()
        // gp的状态置为_Gdead
        casgstatus(gp, _Grunning, _Gdead)
        // 状态重置
        gp.m = nil
        // G和M是否锁定
        locked := gp.lockedm != 0
        // G和M解除锁定
        gp.lockedm = 0
        _g_.m.lockedg = 0
        ...
        // 处理G和M的清除工作
        dropg()
        if _g_.m.lockedInt != 0 {
            print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
            throw("internal lockOSThread error")
        }
        _g_.m.lockedExt = 0
        // 将G放入P的G空闲链表
        gfput(_g_.m.p.ptr(), gp)
        // 再次进入调度
        schedule()
    }
    

    注意:无论是mcall systemstack还是gogo都不会更新g0.sched栈现场,需要切换到g0的时候,直接从“g_sched+gobuf_sp”读取地址恢复sp。在调用goexit0/schedule时,g0栈将初始化,从头开始。

    至此一个m的调度流程已经清晰:
    在这里插入图片描述

    1. 由创建m绑定的mstart()函数或者notewakeup()唤醒的执行位置(一般都在schedule()方法中休眠,详细请看上方有介绍)
    2. 进入schedule()方法,开始获取可执行的g,获取不到就休眠,等待wakep()调度,获取到p后调用execute()启动执行
    3. execute()调用gogo执行g
    4. gogo切换到g栈,并执行fn
    5. 结束后调用goexit()->goexit1()->goexit0()->sechedule()重新调度。

    无论是mcall systemstack还是gogo都不会更新g0.sched栈现场,需要切换到g0的时候,直接从
    “g_sched+gobuf_sp”读取地址恢复sp。在调用goexit0/schedule时,g0栈从头开始。mstart1中保存了g0的gobuf信息。

    findrunable

    该方法会想尽一切办法找到可以执行的任务,核心调度函数
    这里逻辑较为复杂,下面将以代码中的两个标签top和stop将流程分开:
    top label:
    在这里插入图片描述

    stop label:
    在这里插入图片描述

    上面是top和stop标签内的流程图,结合在一起便是findrunnable的全部流程,其中gcmark的调度部分有省略,将会在gc中详细描述。
    结合代码看看这个方法:

    // 找到一个可以运行的G,不找到就让M休眠,然后等待唤醒,直到找到一个G返回
    func findrunnable() (gp *g, inheritTime bool) {
        _g_ := getg()
        // 此处和handoffp中的条件必须一致:如果findrunnable将返回G运行,则handoffp必须启动M.
    top:
        _p_ := _g_.m.p.ptr()
        // 如果gc正等着运行,停止M,也就是STW
        if sched.gcwaiting != 0 {
            gcstopm()
            goto top
        }
        if _p_.runSafePointFn != 0 {
            runSafePointFn()
        }
        // fing是执行finalizer的goroutine
        if fingwait && fingwake {
            if gp := wakefing(); gp != nil {
                ready(gp, 0, true)
            }
        }
        if *cgo_yield != nil {
            asmcgocall(*cgo_yield, nil)
        }
        // local runq
        // 再尝试从本地队列中获取G
        if gp, inheritTime := runqget(_p_); gp != nil {
            return gp, inheritTime
        }
        // global runq
        // 尝试从全局队列中获取G
        if sched.runqsize != 0 {
            lock(&sched.lock)
            gp := globrunqget(_p_, 0)
            unlock(&sched.lock)
            if gp != nil {
                return gp, false
            }
        }
        // 从网络IO轮询器中找到就绪的G,把这个G变为可运行的G
        if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
            if gp := netpoll(false); gp != nil { // non-blocking
                // netpoll returns list of goroutines linked by schedlink.
                // 如果找到的可运行的网络IO的G列表,则把相关的G插入全局队列
                injectglist(gp.schedlink.ptr())
                // 更改G的状态为_Grunnable,以便下次M能找到这些G来执行
                casgstatus(gp, _Gwaiting, _Grunnable)
                // goroutine trace事件记录-unpark
                if trace.enabled {
                    traceGoUnpark(gp, 0)
                }
                return gp, false
            }
        }
        // Steal work from other P's.
        procs := uint32(gomaxprocs)
        // 如果其他P都是空闲的,就不从其他P哪里偷取G了
        if atomic.Load(&sched.npidle) == procs-1 {
            goto stop
        }
        // 如果当前的M没在自旋 且 正在自旋的M数量大于等于正在使用的P的数量,那么block
        // 当GOMAXPROCS远大于1,但程序并行度低时,防止过多的CPU消耗。
        if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
            goto stop
        }
        // 如果M为非自旋,那么设置为自旋状态
        if !_g_.m.spinning {
            _g_.m.spinning = true
            atomic.Xadd(&sched.nmspinning, 1)
        }
        // 随机选一个P,尝试从这P中偷取一些G
        for i := 0; i < 4; i++ { // 尝试四次
            for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
                if sched.gcwaiting != 0 {
                    goto top
                }
                stealRunNextG := i > 2 // first look for ready queues with more than 1 g
                // 从allp[enum.position()]偷去一半的G,并返回其中的一个
                if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                    return gp, false
                }
            }
        }
    stop:
        // 当前的M找不到G来运行。如果此时P处于 GC mark 阶段
        // 那么此时可以安全的扫描和黑化对象,和返回 gcBgMarkWorker 来运行
        if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
            // 设置gcMarkWorkerMode 为 gcMarkWorkerIdleMode
            _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
            // 获取gcBgMarkWorker goroutine
            gp := _p_.gcBgMarkWorker.ptr()
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
        allpSnapshot := allp
        // return P and block
        lock(&sched.lock)
        if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
            unlock(&sched.lock)
            goto top
        }
        // 再次从全局队列中获取G
        if sched.runqsize != 0 {
            gp := globrunqget(_p_, 0)
            unlock(&sched.lock)
            return gp, false
        }
        // 将当前对M和P解绑
        if releasep() != _p_ {
            throw("findrunnable: wrong p")
        }
        // 将p放入p空闲链表
        pidleput(_p_)
        unlock(&sched.lock)
        wasSpinning := _g_.m.spinning
        // M取消自旋状态
        if _g_.m.spinning {
            _g_.m.spinning = false
            if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                throw("findrunnable: negative nmspinning")
            }
        }
        // check all runqueues once again
        // 再次检查所有的P,有没有可以运行的G
        for _, _p_ := range allpSnapshot {
            // 如果p的本地队列有G
            if !runqempty(_p_) {
                lock(&sched.lock)
                // 获取另外一个空闲P
                _p_ = pidleget()
                unlock(&sched.lock)
                if _p_ != nil {
                    // 如果P不是nil,将M绑定P
                    acquirep(_p_)
                    // 如果是自旋,设置M为自旋
                    if wasSpinning {
                        _g_.m.spinning = true
                        atomic.Xadd(&sched.nmspinning, 1)
                    }
                    // 返回到函数开头,从本地p获取G
                    goto top
                }
                break
            }
        }
        // gcmark的goroutine,这里会控制这类g的数量
        if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
            lock(&sched.lock)
            _p_ = pidleget()
            if _p_ != nil && _p_.gcBgMarkWorker == 0 {
                pidleput(_p_)
                _p_ = nil
            }
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto stop
            }
        }
        // poll network
        // 再次检查netpoll
        if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
            gp := netpoll(true) // block until new work is available
            if gp != nil {
                lock(&sched.lock)
                _p_ = pidleget()
                unlock(&sched.lock)
                acquirep(_p_)
                injectglist(gp.schedlink.ptr())
                casgstatus(gp, _Gwaiting, _Grunnable)
                return gp, false
            }
        }
        // 实在找不到G,那就休眠吧
        // 且此时的M一定不是自旋状态
        stopm()
        goto top
    }
    

    系统调用

    系统调用有用户态到内核态的切换,并且部分系统调用甚至会阻塞,当goroutine在处理系统调用的时候,如果不采取措施的话,会导致一个goroutine占用p时间过长,导致p中其他的goroutine无法及时调度。针对系统调用调度器做了一些操作,保证系统调用阻塞的同时,其他的goroutine可以被合理调度。

    golang封装的系统到到最后都会调用到 Syscall() 或者 RawSyscall()这两个方法,RawSyscall是一个直接进行系统调用,而Syscall方法是做了部分处理,来配合go的调度器工作,以下代码省略了部分无关内容,展示了golang系统调用的过程:

    EXT    ·Syscall(SB),NOSPLIT,$0-56
        CALL    runtime·entersyscall(SB)   //调用entersyscall进行调用前准备
        MOVQ    trap+0(FP), AX  // syscall entry
        SYSCALL                                 // linux amd64平台下的系统调用指令
        CMPQ    AX, $0xfffffffffffff001
        JLS ok
        CALL    runtime·exitsyscall(SB)     //结束系统调用
        RET
    ok:
        CALL    runtime·exitsyscall(SB)
        RET
    // func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2, err uintptr)
    TEXT ·RawSyscall(SB),NOSPLIT,$0-56
        MOVQ    trap+0(FP), AX  // syscall entry
        SYSCALL  //linux amd64平台下的系统调用指令
        CMPQ    AX, $0xfffffffffffff001
        JLS ok1
        RET
    ok1:
        RET
    

    明显SysCall比RawSyscall多调用了两个方法,entersyscall和exitsyscall,增加这两个函数的调用,让调度器有机会去对即将要进入系统调用的goroutine进行调整,方便调度。

    entersyscall():

    // 系统调用的时候调用该函数
    // 进入系统调用,G将会进入_Gsyscall状态,也就是会被暂时挂起,直到系统调用结束。
    // 此时M进入系统调用,那么P也会放弃该M。但是,此时M还指向P,在M从系统调用返回后还能找到P
    func entersyscall(dummy int32) {
        reentersyscall(getcallerpc(), getcallersp(unsafe.Pointer(&dummy)))
    }
    
    func reentersyscall(pc, sp uintptr) {
        _g_ := getg()
        _g_.m.locks++
        // 让G进入_Gsyscall状态,此时G已经被挂起了,直到系统调用结束,才会让G重新写进入running
        casgstatus(_g_, _Grunning, _Gsyscall)
        //唤醒 sysmon m,这个监控长时间执行的g
        if atomic.Load(&sched.sysmonwait) != 0 {
            systemstack(entersyscall_sysmon)
            save(pc, sp)
        }
        // 这里很关键:P的M已经陷入系统调用,于是P忍痛放弃该M
        // 但是请注意:此时M还指向P,在M从系统调用返回后还能找到P
        _g_.m.mcache = nil
        _g_.m.p.ptr().m = 0
        // P的状态变为Psyscall
        atomic.Store(&_g_.m.p.ptr().status, _Psyscall)
    
    }
    

    该方法主要是为系统调用前做了准备工作:

    • 修改g的状态为_Gsyscall
    • 检查sysmon线程是否在执行,睡眠需要唤醒
    • p放弃m,但是m依旧持有p的指针,结束调用后优先选择p
    • 修改p的状态为_Psyscal

    做好这些准备工作便可以真正的执行系统调用了。当该线程m长时间阻塞在系统调用的时候,一直在运行的sysmon线程会检测到该p的状态,并将其剥离,驱动其他的m(新建或获取)来调度执行该p上的任务,这其中主要是在retake方法中实现的,该方法还处理了goroutine抢占调度,这里省略,后面介绍抢占调度在介绍:

    //实现go调度系统的抢占
    func retake(now int64) uint32 {
        n := 0
        lock(&allpLock)
        for i := 0; i < len(allp); i++ {
            _p_ := allp[i]
            if _p_ == nil {
                continue
            }
            pd := &_p_.sysmontick
            s := _p_.status
            //p在系统调用中
            if s == _Psyscall {
                t := int64(_p_.syscalltick)
                if int64(pd.syscalltick) != t {
                    pd.syscalltick = uint32(t)
                    pd.syscallwhen = now
                    continue
                }
                //没有可以调度的任务且时间阻塞时间未到阀值,直接跳过
                if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                    continue
                }
                // 这里出发了系统调用长时间阻塞的调度
                unlock(&allpLock)
                incidlelocked(-1)
                if atomic.Cas(&_p_.status, s, _Pidle) {
                    n++
                    _p_.syscalltick++
                    //关键方法,将对长时间阻塞的p进行重新调度
                    handoffp(_p_)
                }
                incidlelocked(1)
                lock(&allpLock)
            } else if s == _Prunning {
              //暂时省略
            }
        }
        unlock(&allpLock)
        return uint32(n)
    }
    

    当系统调用时间过长的时候,会调用handoffp()方法:

    // p的切换,系统调用或者绑定M时使用
    func handoffp(_p_ *p) {
        //当前p有任务或者全局任务队列有任务,触发一次调度
        //startm()上文有描述,会获取一个m来调度当前p的任务,当前p为nil时,会调度其他p任务队列
        if !runqempty(_p_) || sched.runqsize != 0 {
            startm(_p_, false)
            return
        }
        //gc标记阶段且当前p有标记任务,触发调度
        if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
            startm(_p_, false)
            return
        }
        //有自旋m或空闲p,触发调度
        if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
            startm(_p_, true)
            return
        }
      ...
        //全局队列不为空
        if sched.runqsize != 0 {
            unlock(&sched.lock)
            startm(_p_, false)
            return
        }
        
        if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
            unlock(&sched.lock)
            startm(_p_, false)
            return
        }
        //实在没任务,放入空闲队列
        pidleput(_p_)
        unlock(&sched.lock)
    }
    

    可以看到,通过handoffp方法,阻塞在系统调用的p会被重新调度,不会阻塞其他任务的执行。
    没有空闲m的时候,这里有可能会创建出新的m来进行调度。

    回到Syscall的执行流程中,当系统Syscall返回的时,会调用exitsyscall方法恢复调度:

    // goroutine g退出系统调用。安排它再次在cpu上运行。
    // 这个函数只能从go syscall库中调用,而不是从运行时使用的低级系统调用中调用
    func exitsyscall(dummy int32) {
        _g_ := getg()
        // 重新获取p
        if exitsyscallfast() {
            casgstatus(_g_, _Gsyscall, _Grunning)
            return
        }
        // 没有获取到p,只能解绑当前g,重新调度该m了
        mcall(exitsyscall0)
    }
    

    exitsyscall会尝试重新绑定p,优先选择之前m绑定的p(进入系统的调用的时候,p只是单方面解绑了和m的关系,通过m依旧可以找到p):

    
    func exitsyscallfast() bool {
        _g_ := getg()
        //stw,直接解绑p,然后退出
        if sched.stopwait == freezeStopWait {
            _g_.m.mcache = nil
            _g_.m.p = 0
            return false
        }
        // 如果之前附属的P尚未被其他M,尝试绑定该P
        if _g_.m.p != 0 && _g_.m.p.ptr().status == _Psyscall && atomic.Cas(&_g_.m.p.ptr().status, _Psyscall, _Prunning) {
            exitsyscallfast_reacquired()
            return true
        }
        // 否则从空闲P列表中取出一个来
        oldp := _g_.m.p.ptr()
        _g_.m.mcache = nil
        _g_.m.p = 0
        if sched.pidle != 0 {
            var ok bool
            systemstack(func() {
                ok = exitsyscallfast_pidle()
            })
            if ok {
                return true
            }
        }
        return false
    }
    

    当获取p失败的时候,只能选择重新调度:

    func exitsyscall0(gp *g) {
        _g_ := getg()
        //修改g状态为 _Grunable
        casgstatus(gp, _Gsyscall, _Grunnable)
        dropg() //解绑
        lock(&sched.lock)
        //尝试获取p
        _p_ := pidleget()
        if _p_ == nil {
            //未获取到p,g进入全局队列等待调度
            globrunqput(gp)
        } else if atomic.Load(&sched.sysmonwait) != 0 {
            atomic.Store(&sched.sysmonwait, 0)
            notewakeup(&sched.sysmonnote)
        }
        unlock(&sched.lock)
        //获取到p,绑定,然后执行
        if _p_ != nil {
            acquirep(_p_)
            execute(gp, false) // Never returns.
        }
        // m有绑定的g,解绑p然后绑定的g来唤醒,执行
        if _g_.m.lockedg != 0 {
            stoplockedm()
            execute(gp, false) // Never returns.
        }
        //关联p失败了,休眠,等待唤醒,在进行调度。
        stopm()
        schedule() // Never returns.
    }
    

    上述便是golang系统调用的整个流程,大致如下:

    5997541b85f6b23182472b3aec3df831.png

    1. 业务调用封装好的系统调用函数,编译器翻译到Syscall
    2. 执行entersyscall()方法,修改g,p的状态,p单方面解绑m,并检查唤醒sysmon线程,检测系统调用。
    3. 当sysmon线程检测到系统调用阻塞时间过长的时候,调用retake,重新调度该p,让p上可执行的得以执行,不浪费资源
    4. 系统调用返回,进入exitsyscall方法,优先获取之前的p,如果该p已经被占有,重新获取空闲的p,绑定,然后继续执行该g。当获取不到p的时候,调用exitsyscall0,解绑g,休眠,等待下次唤醒调度。

    抢占调度

    golang调度高效秘诀之一是它的抢占式调度。当任务函数执行的时间超过了一定的时间,
    sysmon方法会不断的检测所有p上任务的执行情况,当有超过预定执行时间的g时,会发起抢占。这一切也是在retake函数中实现的,上文描述了该函数在系统调用中的功能,这里讲下该函数如何执行抢占。

    // retake()函数会遍历所有的P,如果一个P处于执行状态,
    // 且已经连续执行了较长时间,就会被抢占。
    // retake()调用preemptone()将P的stackguard0设为
    // stackPreempt(关于stackguard的详细内容,可以参考 Split Stacks),
    // 这将导致该P中正在执行的G进行下一次函数调用时,
    // 导致栈空间检查失败。进而触发morestack()(汇编代码,位于asm_XXX.s中)
    // 然后进行一连串的函数调用,主要的调用过程如下:
    // morestack()(汇编代码)-> newstack() -> gopreempt_m() -> goschedImpl() -> schedule()
    // http://ga0.github.io/golang/2015/09/20/golang-runtime-scheduler.html
    func retake(now int64) uint32 {
        n := 0
        lock(&allpLock)
        for i := 0; i < len(allp); i++ {
            _p_ := allp[i]
            pd := &_p_.sysmontick
            s := _p_.status
            if s == _Psyscall {
            //系统调用部分可看系统调用的分析
           ...
            } else if s == _Prunning {
                // 超时抢占
                if pd.schedwhen+forcePreemptNS > now {
                    continue
                }
                preemptone(_p_)
            }
        }
        unlock(&allpLock)
        return uint32(n)
    }
    

    当检测到某个p上的任务执行超过一定时间后,调用preemptone对当前g进行抢占:

    func preemptone(_p_ *p) bool {
        // 标记可抢占
        gp.preempt = true
        // gorotuine 中的每个调用都会通过将当前堆栈指针与 gp->stackguard0 进行比较来检查堆栈溢出。
        // 将 gp->stackguard0 设置为 stackPreempt 会将抢占折叠为正常的堆栈溢出检查。
        gp.stackguard0 = stackPreempt
        return true
    }
    

    可以看到只是设置了两个参数,并没有执行实际的抢占工作,事实上这个过程是异步的,将在其他的地方执行真正的抢占操作。

    stackguard0本身是用来检测goroutine的栈是否需要扩充的,当设置为stackPreempt时,在执行函数的时候,便会触发栈扩充,调用morestack()方法,morestack会调用newstack,该方法会扩充g的栈空间,也兼职了goroutine的抢占功能。
    preempt 为抢占的备用手段,在stackguard0设置stackPreempt且在newstack中未能被抢占时,该标记也会在其他地方设置stackguard0的值为stackPreempt,再次触发抢占。

    func newstack() {
        thisg := getg()
        gp := thisg.m.curg
        // 注意:如果另一个线程即将尝试抢占gp,则stackguard0可能会在发生变化。
        // 所以现在读一次,判断是否被抢占。
        preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
      
        if preempt {
            //以下情况不会被抢占
            if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
                // Let the goroutine keep running for now.
                // gp->preempt is set, so it will be preempted next time.
                gp.stackguard0 = gp.stack.lo + _StackGuard
                gogo(&gp.sched) // never return
            }
        }
      
        if preempt {
            casgstatus(gp, _Grunning, _Gwaiting)
            //gc扫描抢占
            if gp.preemptscan {
                for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
                }
                if !gp.gcscandone {
                    //扫描当前gp栈
                    gcw := &gp.m.p.ptr().gcw
                    scanstack(gp, gcw)
                    if gcBlackenPromptly {
                        gcw.dispose()
                    }
                    gp.gcscandone = true
                }
                gp.preemptscan = false
                gp.preempt = false
                casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
                // This clears gcscanvalid.
                casgstatus(gp, _Gwaiting, _Grunning)
                gp.stackguard0 = gp.stack.lo + _StackGuard
                gogo(&gp.sched) //  恢复后继续执行
            }
            //转换状态为 _Gwaiting
            casgstatus(gp, _Gwaiting, _Grunning)
            gopreempt_m(gp) // never return
        }
      ...
    }
    

    以上关于gc的抢占可以先忽略,关注一下普通抢占:

    func gopreempt_m(gp *g) {
        goschedImpl(gp)
    }
    
    // 将当前的G入全剧队列,然后调用调度器
    func goschedImpl(gp *g) {
        status := readgstatus(gp)
        // 将gp的状态改为_Grunnable
        casgstatus(gp, _Grunning, _Grunnable)
        // 解除与当前M的关联
        dropg()
        lock(&sched.lock)
        // 入全局队列
        globrunqput(gp)
        unlock(&sched.lock)
        // 启动调度
        xx()
    }
    

    这里最终会取消m和g的绑定,并将g放入全局队列中,然后开始调度m执行新的任务

    以上是golang抢占调度的基本内容,总结如下:

    1. 正常goroutine的抢占都时由监控线程的sysmon发起的,超时执行的goroutine会被打上可抢占的标志。(gc scan阶段也会发生抢占,主要是为了扫描正在运行的g的栈空间)
    2. 在任务的每个函数中,编译器会加上栈空间检测代码,有需要栈空间扩容或者抢占便会进入morestack,然后调用newstack方法
    3. newstack中会检测是否抢占和抢占类型。gc扫描触发的抢占回扫描当前g栈上的内容,然后继续执行当前g。而普通抢占则会解绑当前g,将g放入全局队列,然后继续调度。

    sysmon

    上文中的系统调用和抢占调度都离不开这个函数。现在简单介绍下,在以后分析完内存,gc后会做详细的介绍

    sysmon独立的运行在一个特殊的m上,它定期执行一次,每次会做以下事情:

    • 2分钟没有gc则触发一次gc
    • 系统调用和抢占调度的实现
    • 处理长时间未返回结果的
    展开全文
  • 处理器调度层次与调度算法

    千次阅读 2019-04-08 23:14:19
    一、处理器调度的层次 【处理器调度的层次】 高级调度:又称长程调度,作业调度 #决定能否加入到执行的进程池中 低级调度:又称短程调度,进程调度 #决定哪个可用进程占用处理器执行 中级调度:又称平衡负载调度 #...

    一、处理器调度的层次
    【处理器调度的层次】

    • 高级调度:又称长程调度,作业调度
      #决定能否加入到执行的进程池中
    • 低级调度:又称短程调度,进程调度
      #决定哪个可用进程占用处理器执行
    • 中级调度:又称平衡负载调度
      #决定主存中的可用进程集合

    【处理器调用层次与关键状态转换】
    在这里插入图片描述

    【处理器调度层次与关键状态转换】
    在这里插入图片描述

    【高级调度】

    • 分时OS中,高级调度决定:
      #是否接受一个终端用户的连接
      #命令能否被系统接纳并构成进程
      #新建态进程是否加入就绪进程队列
    • 批处理OS中,高级调度又称为作业调度,功能是按照某种原则从后备作业队列中选取作业进入主存,并为作业做好运行前的准备工作和完成后的善后工作

    【中级调度】

    • 引进中级调度是为了提高内存利用率和作业吞吐量
    • 中级调度决定哪些进程被允许驻留在主存中参与竞争处理器及其他资源,起到短期调整系统负荷的作用
    • 中级调度把一些进程换出主存,从而使之进入“挂起”状态,不参与进程调度,以平顺系统的负载

    【低级调度】

    • 低级调度:又称处理器调度、进程调度、短程调度,按照某种原则把处理器分配给就绪态进程或内核级线程
    • 进程调度程序:又称分派程序,操作系统中实现处理器调度的程序,是操作系统的最核心部分
    • 处理器调度策略的优劣直接影响到整个系统的性能

    【低级调度的主要功能】

    • 记住进程或内核级线程的状态
    • 决定某个进程或内核级线程什么时候获得处理器,以及占用多长时间
    • 把处理器分配给进程或内核级线程
    • 收回处理器

    二、处理器调度算法
    【选择处理器调度算法的原则】

    • 资源利用率:使得CPU或其他资源的利用率尽可能高且能够并行工作
    • 响应时间:使交互用户的响应时间尽可能小,或尽快处理实时任务
    • 周转时间:提交给系统开始到执行完成获得结果为止的这段时间间隔称周转时间,应该使周转时间或平均周转时间尽可能短
    • 吞吐量:单位时间处理的进程数尽可能多
    • 公平性:确保每个用户每个进程获得合理的CPU份额或其他资源份额

    【优先数调度算法】

    • 根据分配给进程的优先数决定运行进程
      #抢占式优先数调度算法
      #非抢占式优先数调度算法
    • 优先数的确定准则
      #进程负担任务的紧迫程度
      #进程的交互性
      #进程使用外设的频度
      #进程进入系统的时间长短

    【与进入系统时间相关的优先数】

    • 计算时间短(作业/进程)优先
    • 剩余计算时间短进程优先
    • 响应比高者(作业/进程)优先
      响应比=等待事件/进入时间
    • 先来先服务:先进队先被选择
      #多用于高级调度、低级调度中,以计算为主的进程过于优越

    【时间片轮转调度算法】

    • 根据各个进程进入就绪队列的时间先后轮流占有CPU一个时间片
    • 时间片中断
    • 时间片的确定:选择长短合适的时间片,过长则退化为先来先服务算法,过短则调度开销大
    • 单时间片,多时间片和动态时间片

    【分级调度算法】

    • 又称多队列策略,反馈循环队列
    • 基本思想
      #建立多个不同优先级的就绪进程队列
      #多个就绪进程队列间按照优先数调度
      #高优先级就绪进程,分配的时间片短
      #单个就绪进程队列中进程的优先数和时间片相同

    【分级调度算法的例】
    在这里插入图片描述

    【分级调度算法的分级原则】

    • 一般分级原则
      #外设访问,交互型,时间紧迫程度,系统效率,用户立场
    • 现代操作系统的实现模型
      #多个高优先级的实时进程队列,如:硬实时、网络、软实时
      #多个分时任务的进程队列,根据基准优先数和执行行为调整
      #对列数可能多达32-128个

    【彩票调度算法】

    • 基本思想:为进城发放针对系统各种资源(如CPU时间)的彩票;当调度程序需要做出决策时,随机选择一张彩票,持有该彩票的进程将获得系统资源
    • 合作进程之间的彩票交换
    展开全文
  • 1、概述 ... 1.1、为什么需要工作调度系统 一个完整的数据分析系统通常都是由大量任务单元组成: ...为了很好地组织起这样的复杂执行计划,需要一个工作调度系统来调度执行; 例如,我们可能有这样一个需求...

    1、概述

    azkaban官网:https://azkaban.github.io/

    1.1、为什么需要工作流调度系统

    一个完整的数据分析系统通常都是由大量任务单元组成:
        shell脚本程序,java程序,mapreduce程序、hive脚本等
    各任务单元之间存在时间先后及前后依赖关系
    为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行;
    例如,我们可能有这样一个需求,某个业务系统每天产生20G原始数据,我们每天都要对其进行处理,处理步骤如下所示:
    1、    通过Hadoop先将原始数据同步到HDFS上;
    2、    借助MapReduce计算框架对原始数据进行转换,生成的数据以分区表的形式存储到多张Hive表中;
    3、    需要对Hive中多个表的数据进行JOIN处理,得到一个明细数据Hive大表;
    4、    将明细数据进行各种统计分析,得到结果报表信息;
    5、    需要将统计分析得到的结果数据同步到业务系统中,供业务调用使用。

     


    1.2、工作流调度实现方式

    简单的任务调度:直接使用linux的crontab来定义;

    复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如ooize、azkaban、airflow等

    1.3、常见工作流调度系统

    市面上目前有许多工作流调度器

    在hadoop领域,常见的工作流调度器有Oozie, Azkaban,Cascading,Hamake等

    1.4、各种调度工具特性对比

    下面的表格对上述四种hadoop工作流调度器的关键特性进行了比较,尽管这些工作流调度器能够解决的需求场景基本一致,但在设计理念,目标用户,应用场景等方面还是存在显著的区别,在做技术选型的时候,可以提供参考

    特性

    Hamake

    Oozie

    Azkaban

    Cascading

    工作流描述语言

    XML

    XML (xPDL based)

    text file with key/value pairs

    Java API

    依赖机制

    data-driven

    explicit

    explicit

    explicit

    是否要web容器

    No

    Yes

    Yes

    No

    进度跟踪

    console/log messages

    web page

    web page

    Java API

    Hadoop job调度支持

    no

    yes

    yes

    yes

    运行模式

    command line utility

    daemon

    daemon

    API

    Pig支持

    yes

    yes

    yes

    yes

    事件通知

    no

    no

    no

    yes

    需要安装

    no

    yes

    yes

    no

    支持的hadoop版本

    0.18+

    0.20+

    currently unknown

    0.18+

    重试支持

    no

    workflownode evel

    yes

    yes

    运行任意命令

    yes

    yes

    yes

    yes

    Amazon EMR支持

    yes

    no

    currently unknown

    yes

    1.5、Azkaban与Oozie对比

    对市面上最流行的两种调度器,给出以下详细对比,以供技术选型参考。总体来说,ooize相比azkaban是一个重量级的任务调度系统,功能全面,但配置使用也更复杂。如果可以不在意某些功能的缺失,轻量级调度器azkaban是很不错的候选对象。

    功能
        两者均可以调度mapreduce,pig,java,脚本工作流任务
        两者均可以定时执行工作流任务

    工作流定义
        Azkaban使用Properties文件定义工作流
        Oozie使用XML文件定义工作流

    工作流传参
            Azkaban支持直接传参,例如${input}
            Oozie支持参数和EL表达式,例如${fs:dirSize(myInputDir)}

    定时执行
            Azkaban的定时执行任务是基于时间的
            Oozie的定时执行任务基于时间和输入数据

    资源管理
            Azkaban有较严格的权限控制,如用户对工作流进行读/写/执行等操作
            Oozie暂无严格的权限控制

    工作流执行
            Azkaban有两种运行模式,分别是solo server mode(executor server和web server部署在同一台节点)和multi server mode(executor server和web server可以部署在不同节点)
            Oozie作为工作流服务器运行,支持多用户和多工作流

    工作流管理
            Azkaban支持浏览器以及ajax方式操作工作流
            Oozie支持命令行、HTTP REST、Java API、浏览器操作工作流


    2、Azkaban介绍

    Azkaban是由Linkedin开源的一个批量工作流任务调度器。用于在一个工作流内以一个特定的顺序运行一组工作和流程。

    Azkaban定义了一种KV文件(properties)格式来建立任务之间的依赖关系,并提供一个易于使用的web用户界面维护和跟踪你的工作流。

    它有如下功能特点:
        Web用户界面
        方便上传工作流
        方便设置任务之间的关系
        调度工作流
        认证/授权(权限的工作)
        能够杀死并重新启动工作流
        模块化和可插拔的插件机制
        项目工作区
        工作流和任务的日志记录和审计
     

    3、Azkaban安装部署

    3.1、azkaban的编译

    我们这里选用azkaban3.51.0这个版本自己进行重新编译,编译完成之后得到我们需要的安装包进行安装

    注意:我们这里编译需要使用jdk1.8的版本来进行编译,如果编译服务器使用的jdk版本是1.7的,记得切换成jdk1.8,我们这里使用的是jdk8u141这个版本来进行编译

    cd /export/softwares/
    wget https://github.com/azkaban/azkaban/archive/3.51.0.tar.gz
    tar -zxvf 3.51.0.tar.gz -C ../servers/
    cd /export/servers/azkaban-3.51.0/
    yum -y install git
    yum -y install gcc-c++
    ./gradlew build installDist -x test
    

    编译之后需要的安装文件列表如下

    azkaban-exec-server

    编译完成之后得到我们需要的安装包在以下目录下即可获取得到

    azkaban-exec-server存放目录

    /export/servers/azkaban-3.51.0/azkaban-exec-server/build/distributions

    azkaban-web-server

    azkaban-web-server存放目录

    /export/servers/azkaban-3.51.0/azkaban-web-server/build/distributions

    azkaban-solo-server

    azkaban-solo-server存放目录

    /export/servers/azkaban-3.51.0/azkaban-solo-server/build/distributions

    execute-as-user.c

    azkaban two  server模式下需要的C程序在这个路径下面

    /export/servers/azkaban-3.51.0/az-exec-util/src/main/c

    数据库脚本文件

    数据库脚本文件在这个路径下面

    /export/servers/azkaban-3.51.0/azkaban-db/build/install/azkaban-db

    3.2、azkaban单服务模式安装与使用

    所需软件

    azkaban-solo-server

    单服务模式安装

    第一步:解压

    azkaban 的solo  server使用的是一个单节点的模式来进行启动服务的,只需要一个azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz的安装包即可启动,所有的数据信息都是保存在H2这个azkaban默认的数据当中,上传我们的压缩包,然后修改配置文件启动即可

    cd /export/softwares
    tar -zxvf azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz -C ../servers/
    

    第二步:修改两个配置文件

    修改时区配置文件
    cd /export/servers/azkaban-solo-server-0.1.0-SNAPSHOT/conf
    vim azkaban.properties
    
    
    default.timezone.id=Asia/Shanghai
    

    修改commonprivate.properties配置文件
    cd /export/servers/azkaban-solo-server-0.1.0-SNAPSHOT/plugins/jobtypes
    vim commonprivate.properties
    
    
    
    execute.as.user=false
    memCheck.enabled=false
    

    第三步:启动solo-server

    启动azkaban-solo-server
    cd  /export/servers/azkaban-solo-server-0.1.0-SNAPSHOT
    bin/start-solo.sh
    
    

    第四步:浏览器页面访问

    浏览器页面访问      http://node03:8081/

    单服务模式使用

    需求:使用azkaban调度我们的shell脚本,执行linux的shell命令

    创建普通文本文件  foo.job,文件内容如下
    
    
    type=command
    command=echo "hello world"
    

    然后将这个文件打包为压缩文件,如下:

    azkaban上传我们的压缩包

    3.3、azkaban两个服务模式安装

    1、确认所需软件:
    Azkaban Web服务安装包
    azkaban-web-server-0.1.0-SNAPSHOT.tar.gz
    Azkaban执行服务安装包
    azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz
    
    编译之后的sql脚本
    create-all-sql-0.1.0-SNAPSHOT.sql
    

    C程序文件脚本

    execute-as-user.c程序

    2、数据库准备

    进入mysql的客户端执行以下命令
    
    mysql  -uroot -p
    
    执行以下命令:
    CREATE DATABASE azkaban;
    CREATE USER 'azkaban'@'%' IDENTIFIED BY 'azkaban';    
    GRANT all privileges ON azkaban.* to 'azkaban'@'%' identified by 'azkaban' WITH GRANT OPTION; 
    flush privileges;
    use azkaban; 
    source /export/softwares/create-all-sql-0.1.0-SNAPSHOT.sql;
    

    3、解压软件安装包

    解压azkaban-web-server
    cd /export/softwares
    tar -zxvf azkaban-web-server-0.1.0-SNAPSHOT.tar.gz -C ../servers/
    cd /export/servers
    mv azkaban-web-server-0.1.0-SNAPSHOT/ azkaban-web-server-3.51.0
    
    解压azkaban-exec-server
    cd /export/softwares
    tar -zxvf azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz -C ../servers/
    cd /export/servers
    mv azkaban-exec-server-0.1.0-SNAPSHOT/ azkaban-exec-server-3.51.0
    

    4、安装SSL安全认证

    安装ssl安全认证,允许我们使用https的方式访问我们的azkaban的web服务
    密码一定要一个个的字母输入,或者粘贴也行
    
    
    cd /export/servers/azkaban-web-server-3.51.0
    keytool -keystore keystore -alias jetty -genkey -keyalg RSA
    

    5、azkaban web server安装

    修改azkaban-web-server的配置文件
    cd /export/servers/azkaban-web-server-3.51.0/conf
    vim azkaban.properties
    
    
    
    # Azkaban Personalization Settings
    azkaban.name=Azkaban
    azkaban.label=My Azkaban
    azkaban.color=#FF3601
    azkaban.default.servlet.path=/index
    web.resource.dir=web/
    default.timezone.id=Asia/Shanghai
    # Azkaban UserManager class
    user.manager.class=azkaban.user.XmlUserManager
    user.manager.xml.file=conf/azkaban-users.xml
    # Loader for projects
    executor.global.properties=conf/global.properties
    azkaban.project.dir=projects
    # Velocity dev mode
    velocity.dev.mode=false
    # Azkaban Jetty server properties.
    jetty.use.ssl=true
    jetty.maxThreads=25
    jetty.port=8081
    
    
    
    jetty.ssl.port=8443
    jetty.keystore=/export/servers/azkaban-web-server-3.51.0/keystore
    jetty.password=azkaban
    jetty.keypassword=azkaban
    jetty.truststore=/export/servers/azkaban-web-server-3.51.0/keystore
    jetty.trustpassword=azkaban
    
    
    
    
    # Azkaban Executor settings
    # mail settings
    mail.sender=
    mail.host=
    # User facing web server configurations used to construct the user facing server URLs. They are useful when there is a reverse proxy between Azkaban web servers and users.
    # enduser -> myazkabanhost:443 -> proxy -> localhost:8081
    # when this parameters set then these parameters are used to generate email links.
    # if these parameters are not set then jetty.hostname, and jetty.port(if ssl configured jetty.ssl.port) are used.
    # azkaban.webserver.external_hostname=myazkabanhost.com
    # azkaban.webserver.external_ssl_port=443
    # azkaban.webserver.external_port=8081
    job.failure.email=
    job.success.email=
    lockdown.create.projects=false
    cache.directory=cache
    # JMX stats
    jetty.connector.stats=true
    executor.connector.stats=true
    # Azkaban mysql settings by default. Users should configure their own username and password.
    database.type=mysql
    mysql.port=3306
    mysql.host=node03
    mysql.database=azkaban
    mysql.user=azkaban
    mysql.password=azkaban
    mysql.numconnections=100
    #Multiple Executor
    azkaban.use.multiple.executors=true
    #azkaban.executorselector.filters=StaticRemainingFlowSize,MinimumFreeMemory,CpuStatus
    azkaban.executorselector.comparator.NumberOfAssignedFlowComparator=1
    azkaban.executorselector.comparator.Memory=1
    azkaban.executorselector.comparator.LastDispatched=1
    azkaban.executorselector.comparator.CpuUsage=1
    
    
    
    azkaban.activeexecutor.refresh.milisecinterval=10000
    azkaban.queueprocessing.enabled=true
    azkaban.activeexecutor.refresh.flowinterval=10
    azkaban.executorinfo.refresh.maxThreads=10
    

    6、azkaban  executor server 安装

    第一步:修改azkaban-exex-server配置文件

    修改azkaban-exec-server的配置文件
    cd /export/servers/azkaban-exec-server-3.51.0/conf
    vim azkaban.properties
    
    # Azkaban Personalization Settings
    azkaban.name=Azkaban
    azkaban.label=My Azkaban
    azkaban.color=#FF3601
    azkaban.default.servlet.path=/index
    web.resource.dir=web/
    default.timezone.id=Asia/Shanghai
    # Azkaban UserManager class
    user.manager.class=azkaban.user.XmlUserManager
    user.manager.xml.file=conf/azkaban-users.xml
    # Loader for projects
    executor.global.properties=conf/global.properties
    azkaban.project.dir=projects
    # Velocity dev mode
    velocity.dev.mode=false
    # Azkaban Jetty server properties.
    jetty.use.ssl=true
    jetty.maxThreads=25
    jetty.port=8081
    
    
    jetty.keystore=/export/servers/azkaban-web-server-3.51.0/keystore
    jetty.password=azkaban
    jetty.keypassword=azkaban
    jetty.truststore=/export/servers/azkaban-web-server-3.51.0/keystore
    jetty.trustpassword=azkaban
    
    
    # Where the Azkaban web server is located
    azkaban.webserver.url=https://node03:8443
    # mail settings
    mail.sender=
    mail.host=
    # User facing web server configurations used to construct the user facing server URLs. They are useful when there is a reverse proxy between Azkaban web servers and users.
    # enduser -> myazkabanhost:443 -> proxy -> localhost:8081
    # when this parameters set then these parameters are used to generate email links.
    # if these parameters are not set then jetty.hostname, and jetty.port(if ssl configured jetty.ssl.port) are used.
    # azkaban.webserver.external_hostname=myazkabanhost.com
    # azkaban.webserver.external_ssl_port=443
    # azkaban.webserver.external_port=8081
    job.failure.email=
    job.success.email=
    lockdown.create.projects=false
    cache.directory=cache
    # JMX stats
    jetty.connector.stats=true
    executor.connector.stats=true
    # Azkaban plugin settings
    azkaban.jobtype.plugin.dir=plugins/jobtypes
    # Azkaban mysql settings by default. Users should configure their own username and password.
    database.type=mysql
    mysql.port=3306
    mysql.host=node03
    mysql.database=azkaban
    mysql.user=azkaban
    mysql.password=azkaban
    mysql.numconnections=100
    # Azkaban Executor settings
    executor.maxThreads=50
    executor.flow.threads=30
    

    第二步:添加插件

    将我们编译后的C文件execute-as-user.c

    上传到这个目录来/export/servers/azkaban-exec-server-3.51.0/plugins/jobtypes

    或者直接将我们/export/softwares下面的文件拷贝过来也行

    cp /export/softwares/execute-as-user.c /export/servers/azkaban-exec-server-3.51.0/plugins/jobtypes/
    
    
    然后执行以下命令生成execute-as-user
    yum -y install gcc-c++
    cd /export/servers/azkaban-exec-server-3.51.0/plugins/jobtypes
    gcc execute-as-user.c -o execute-as-user 
    chown root execute-as-user
    chmod 6050 execute-as-user
    

    第三步:修改配置文件

    修改配置文件
    cd  /export/servers/azkaban-exec-server-3.47.0/plugins/jobtypes
    vim commonprivate.properties
    execute.as.user=false
    memCheck.enabled=false
    azkaban.native.lib=/export/servers/azkaban-exec-server-3.51.0/plugins/jobtypes
    

    最终生成如下

    7、启动服务

    第一步:启动azkaban exec server

    cd /export/servers/azkaban-exec-server-3.51.0
    bin/start-exec.sh
    

    第二步:激活我们的exec-server

    node03机器任意目录下执行以下命令
    curl -G "node03:$(<./executor.port)/executor?action=activate" && echo
    

    第三步:启动azkaban-web-server

    cd /export/servers/azkaban-web-server-3.51.0/
    bin/start-web.sh
    

    访问地址:https://node03:8443

    修改linux的时区问题

    由于先前做好了时钟同步,所以不用担心时区问题,不需要修改时区了

    注:先配置好服务器节点上的时区

    1. 先生成时区配置文件Asia/Shanghai,用交互式命令 tzselect 即可

    2. 拷贝该时区文件,覆盖系统本地时区配置

    cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime 

    4、Azkaban实战

    Azkaba内置的任务类型支持command、java

    Command类型单一job示例

    创建job描述文件

    创建文本文件,更改名称为mycommand.job

    注意后缀.txt一定不要带上,保存为格式为UFT-8 without bom

    内容如下

    type=command
    command=echo 'hello world'
    

    将job资源文件打包成zip文件

    创建project并上传压缩包

    通过azkaban的web管理平台创建project并上传job压缩包

    首先创建project

    上传zip包

    启动执行job

    Command类型多job工作流flow

    1、创建有依赖关系的多个job描述

    第一个job:foo.job

    type=command
    command=echo 'foo'
    

    第二个job:bar.job依赖foo.job

    type=command
    dependencies=foo
    command=echo 'bar'
    

    2、将所有job资源文件打到一个zip包中

    3、在azkaban的web管理界面创建工程并上传zip包
    4、启动工作流flow

    HDFS操作任务

    1. 创建job描述文件fs.job

    type=command
    command=/export/servers/hadoop-2.6.0-cdh5.14.0/bin/hadoop fs -mkdir /azkaban
    

    2、将job资源文件打包成zip文件

    3、通过azkaban的web管理平台创建project并上传job压缩包

    4、启动执行该job

    MAPREDUCE任务

    Mr任务依然可以使用command的job类型来执行

    1. 创建job描述文件,及mr程序jar包(示例中直接使用hadoop自带的example jar)

    type=command
    command=/export/servers/hadoop-2.6.0-cdh5.14.0/bin/hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.14.0.jar pi 3 5
    

    2、将所有job资源文件打到一个zip包中

    3、在azkaban的web管理界面创建工程并上传zip包

    4、启动job

    HIVE脚本任务

        创建job描述文件和hive脚本
    Hive脚本: hive.sql
     

    create database if not exists azhive;
    use azhive;
    create table if not exists aztest(id string,name string) row format delimited fields terminated by '\t';
    

    Job描述文件:hive.job        

    type=command
    command=/export/servers/hive-1.1.0-cdh5.14.0/bin/hive -f 'hive.sql'
    

    将所有job资源文件打到一个zip包中

    在azkaban的web管理界面创建工程并上传zip包

    启动job

    azkaban的定时任务

    使用azkaban的scheduler功能可以实现对我们的作业任务进行定时调度功能

    */1 * ? * *  每分钟执行一次定时调度任务
    0 1 ? * *  每天晚上凌晨一点钟执行这个任务
    0 */2 ? * *  每隔两个小时定时执行这个任务
    30 21 ? * * 每天晚上九点半定时执行这个任务
    

     

    展开全文
  • 调度系统,更确切地说,作业调度系统(Job Scheduler)或者说工作调度系统(workflow Scheduler)是任何一个稍微有点规模,不是简单玩玩的大数据开发平台都必不可少的重要组成部分。 除了Crontab,Quartz这类偏...

    什么是调度系统

    调度系统,更确切地说,作业调度系统(Job Scheduler)或者说工作流调度系统(workflow Scheduler)是任何一个稍微有点规模,不是简单玩玩的大数据开发平台都必不可少的重要组成部分。

    除了Crontab,Quartz这类偏单机的定时调度程序/库。开源的分布式作业调度系统也有很多,比较知名的比如:oozie,azkaban,chronos,zeus等等,此外,还有包括阿里的TBSchedule,SchedulerX,腾讯的Lhotse以及我司历尽十年磨砺的TASKCTL

    作业系统的两大种类

    现在市面上的调度系统根据功能性可以分为两类定时类作业调度系统&DAG工作流类作业调度系统这两类系统的架构和功能实现通常存在很大的差异,下面就来跟大家普及一下这两种作业系统的不同之处;

    定时类作业系统

    定时类系统的方向,重点定位于大量并发的任务分片执行场景;

    在实际应用场景中,通常平时维护工作需要定时执行的业务逻辑相对离散无序,仅仅存在一定的简单关联。

    例如:

    • 需要定时批量清理一批机器的磁盘空间,
    • 需要定时生成一批商品清单,
    • 需要定时批量对一批数据建索引,
    • 需要定时对一批用户发送推送通知等等。

    核心目标基本两点:

    1.作业分片逻辑支持:将一个大的任务拆分成多个小任务分配到不同的服务器上执行, 难点在于要做到不漏,不重,保证负载平衡,节点崩溃时自动进行任务迁移等

    2.高可用精确定时触发:由于平时经常涉及到实际业务流程的及时性和准确性,所以通常需要保证任务触发的强实时和可靠性

    所以"负载均衡,弹性扩容",“状态同步”和“失效转移”通常是这类调度系统在架构设计时重点考虑的特性

    DAG工作流类作业调度系统

    主要定位于有序作业的调度依赖关系的正确处理,分片执行的逻辑通常不是系统关注的粒度,如果某些作业真的关注分片逻辑,通常交给后端集群(比如MR任务自带分片能力)或者具体类型的任务执行后端去实现。

    DAG工作流类调度系统所服务的通常是作业繁多,作业之间的流程依赖比较复杂的场景;

    如:大数据开发平台的离线数仓报表处理业务,从数据采集,清洗,到各个层级的报表的汇总运算,到最后数据导出到外部业务系统,一个完整的业务流程,可能涉及到成百上千个相互交叉依赖关联的作业。

    所以DAG工作流类调度系统关注的重点,通常会包括:

    • 足够丰富灵活的依赖触发机制(如:时间触发任务,依赖触发任务,混合触发任务)
    • 作业的计划,变更和执行流水的管理和同步
    • 任务的优先级管理,业务隔离,权限管理等
    • 各种特殊流程的处理(如:暂停任务,重刷历史数据,人工标注失败/成功,临时任务和周期任务的协同等)
    • 完备的监控报警通知机制

    小结:这两类系统的定位目标,并不是绝对冲突矛盾的,并且从目前定时类调度系统的发展来看,也需要处理一些复杂的作业间强依赖关系了,比如 "微批(少量DAG批量作业处理)" 概念的提出。只不过,要同时圆满的支持这两大类需求,从实现的角度来说是很困难的,因为侧重点的不同,在架构上多少会对某些方面做些取舍,当前这两类系统都没有能够做到完美的两者兼顾。

    为什么需要调度系统

    我们都知道大数据的计算、分析和处理,一般由多个任务单元组成(Hive、Sparksql、Spark、Shell等),每个任务单元完成特定的数据处理逻辑。

    多个任务单元之间往往有着强依赖关系,上游任务执行并成功,下游任务才可以执行。比如上游任务结束后拿到 A 结果,下游任务需结合 A 结果才能产出 B 结果,因此下游任务的开始一定是在上游任务成功运行拿到结果之后才可以开始。

    而为了保证数据处理结果的准确性,就必须要求这些任务按照上下游依赖关系有序、高效的执行。一个较为基础的处理方式是,预估出每个任务处理所需时间,根据先后顺序,计算出每个任务的执行的起止时间,通过定时跑任务的方式,让整个系统保持稳定的运行。

    一个完整的数据分析任务最少执行一次,在数据量较少,依赖关系较为简单的低频数据处理过程中,这种调度方式完全可以满足需求。

    然而在企业级场景中,更多的是需要每天执行,如果任务数量较多,在任务启动的时间计算上就将耗费大量时间,另外如果出现上游任务执行时长超出原定预计时间或者运行异常的问题,上述的处理方式将完全无法应对,也会对人力物力造成重复损耗,因此,对于企业数据开发过程来说,一个完整且高效的工作流调度系统将起到至关重要的作用。

    写在最后

    TASKCTL目前是暂时唯一提出 "无序定时和有序DAG作业流" 完整概念的调度产品。既可以在定时中处理 "微批" 的控制,也能够在DAG作业流中处理 "定时" 的控制。

    例如:

    • 在大数据分布式(分片)计算中,对数据进行实时ETL跑批处理,
    • 在ETL作业跑批中,对某个作业或一段分支进行时间窗口内循环定时处理

    了解产品信息可以参读:

    随着大数据应用需求的不断膨胀,数据处理的复杂度和实时性要求越来越高。TASKCTL作为国内自主研发的专业调度产品,为企业进入大数据2.0时代做好提前布局。

    对啦,我们的公众号“敏捷调度taskctl”(ID:gh_79ababc7910b)长期更新互联网最新资讯、行业内职场趣闻、以及有趣且实用性较高的编程插件和开发框架知识分享之类的

    如果你开心的话可以关注一下下呢!期待(๑˙ー˙๑)

    社群讨论75273038(qq)

    展开全文
  • 中文名调度命令外文名Dispatching command定义行车调度处理日常行车工作中问题分类口头和书面作用保证运输秩序的正常运转学科交通工程调度命令调度命令概念编辑语音调度命令是指行车调度处理日常行车工作中有关...
  • 所谓作业调度是指按照某种原则,从后备作业队列中选取作业进入内存,并为作业做好运行前的准备工作以及作业完成 后的善后处理工作。设计作业调度算法时应达到如下目标: (1) 某段时间内尽可能运行更多的作业,...
  • 线程的调度

    2013-03-25 08:46:59
    一个抢占式操作系统(比如Microsoft Windows)必须使用某种算法来决定那个线程应该被调度,应该调度多长时间。  每个线程的线程内核对象都维护了一个CONTEXT上下文结构,里面存放了线程最近一次被CPU执行的...
  • 摘要:本文将会从最基础的调度算法说起,逐个分析各种主流调度算法的原理,带大家一起探索CPU调度的奥秘。
  • 【Linux】Linux 2.6 对调度器的改进

    万次阅读 2018-07-30 19:21:04
    从进程调度的角度来看,Linux2.6之前的版本有如下的缺点: 由于只设置了一个进程就绪队列,于是在一轮调度中先耗尽时间片的进程虽然已经无法取得处理器控制权,但是还要参与weight值的计算,导致白白浪费了处理器的...
  • 日期 内核版本 CPU架构 作者 2019.04.06 Linux-5.0 ...时钟中断是系统中调度和抢占的驱动因素,在时钟中断中会进行进程运行时间的更新等,并更新调度标志,以决定是否进行调度。下面以Pow...
  • 前面放完建设四个现代化大数据平台乌托邦理想的大卫星,接下来的文章得谈谈具体...本文重点谈理论,会先从大的场景划分的角度对市面上的各种调度系统进行分类讨论,然后再针对具体的作业调度系统,探讨一下各自的优缺点
  • linux进程调度器模拟

    千次阅读 2011-04-25 17:23:00
    Linux 中的调度任务是一项复杂的任务。Linux 能在各种机型(如企业服务器、客户端桌面、...甚至,建立调度方案以便在各种处理器拓扑中验证给定的工作负荷,您要做好应对烦恼的准备。幸运的是,类似 LinSched(用户空
  • 日期 内核版本 架构 作者 GitHub CSDN 2016-06-14 ...1 前言1.1 进程调度内存中保存了对每个进程的唯一描述, 并通过若干结构与其他进程连接起来.调度器面对的情形就是这样, 其任务是在程序之间共享CPU时间, 创
  • 随阿里经济体和阿里云丰富的业务需求(尤其是双十一)和磨练,伏羲的内涵不断扩大,从单一的资源调度器(对标开源系统的YARN)扩展成大数据的核心调度服务,覆盖数据调度(Data Placement)、资源调度(Resouce ...
  • 调度系统的分类解析一、什么是调度系统二、调度系统的两大种类1、资源调度系统2、作业调度系统三、作业调度系统的两大种类1、定时分片类作业调度系统2、DAG工作流类调度系统 一、什么是调度系统 调度系统,更确切地...
  • ESXi CPU调度原理

    千次阅读 2019-04-13 21:57:22
    要想做好优化,需要首先了解一下ESXi的CPU调度原理。首先,作为虚拟化的基础架构平台,ESXi提供计算资源的共享,在ESXi上运行的所有虚拟机都可以共享该物理机的计算资源,包括CPU和内存,但是当虚拟机的CPU需求增多...
  • 作业调度算法有哪些

    千次阅读 2016-03-14 16:54:01
    作业调度是指按照时间周期(年、月、日、时、分、秒等)对作业进行分割,并根据业务需求、作业长度、存储...从被选中的作业做好执行前的准备工作; 在作业执行结束时,做善后处理工作。 进行作业调度有很多作业调度
  • 利用四步算法解决薄板厂生产调度问题,刘洪伟,袁林艳,作为钢铁行业的一部分,薄板厂的收益很受重视,而提高生产效率是改善收益最经济有效的方法,这要求薄板厂做好生产调度工作。本文
  • Linux 调度器模拟

    2011-04-14 00:09:00
    from: http://www.ibm.com/developerworks/cn/linux/l-linux-scheduler-simulator/index.html?ca=drs-<br />    Linux 调度器模拟 用 LinSched 在用户...为单核计算机开发的调度器很难合适地在
  • 操作系统的作业调度

    千次阅读 2017-08-03 14:55:54
     所谓作业调度是指按照某种原则,从后备作业队列中选取作业进入内存,并为作业做好运行前的准备工作以及作业完成后的善后处理工作。设计作业调度算法时应达到如下目标: • (1) 某段时间内尽可能运行更多的作业...
  • 防止误调度,显著提高调度工作效率,规范调度操作命令。 一、为什么调度员要使用该系统 1、保障调度操作安全 各级电网调度承担着保障管辖范围内电网安全、可靠、经济运行的重任,一旦发生误调度事故,将给国家...
  • ucos-ii 任务调度

    2011-10-22 20:30:14
    (1)任务级的任务切换原理  μC/OS-II是一个多任务的操作系统,在没有用户自己定义的中断情况下... 如果(不是中断嵌套并且系统可以被调度){  确定优先级最高的任务  如果(最高级的任务不是当前的任务){
  • Quartz 任务调度全攻略

    千次阅读 2012-02-03 14:49:13
    Quartz是一个开源的任务调度系统,它能用来调度很多任务的执行。 运行环境•Quartz 能嵌入在其他应用程序里运行。 •Quartz 能在一个应用服务器里被实例化(或servlet容器), 并且参与XA事务 •Quartz能独立运行...
  • 文章目录一、调度算法的原理和分类1.进程调度简介2.按不同需求对调度的进程分类3.调度算法分类二、使用步骤1.引入库1.引入库总结 一、调度算法的原理和分类 1.进程调度简介   进程调度的研究是整个操作系统理论的...
  • Hadoop 现在几乎已经成为业界在大数据上事实的标准,越来越多...既然涉及数据处理,一个不可不提的术语就是“作业” or “job”,大量的作业必然要引入作业管理及调度,hadoop也不能例外。 传统企业中的调度工具
  • 深入解读Linux进程调度Schedule

    千次阅读 2019-04-02 22:11:15
    调度系统是现代操作系统非常核心的基础子系统之一,尤其在多任务并行操作系统(Multitasking OS)上,系统可能运行于单核或者多核CPU上,进程可能处于运行状态或者在内存中可运行等待状态。如何实现多任务同时使用...
  • 分布式爬虫调度策略

    千次阅读 2017-08-31 22:55:27
    前言: 爬虫是偏IO型的任务,分布式爬虫的实现难度比分布式计算和分布式存储简单得多。...Python分布式爬虫比较常用的应该是scrapy框架加上Redis内存数据库,中间的调度任务等用scrapy-redis模块实现
  • 返璞归真的Linux BFS调度

    千次阅读 2012-04-13 23:05:53
    自Linux 2.6以来(严格说应该是2.5),O(n)调度器被人们认为是一种千年之前就应该抛弃的东西被重重的甩开了,此后出现了O(1),CFS等,再也没人提起O(n)了。说实话,Linux的调度器远比标准Unix的来得复杂,因为Linux被...
  • 本文是苏宁大数据离线任务开发调度平台实践系列文章之上篇,详解苏宁的任务调度模块。目 录1.绪言\t12.设计目标与主要功能\t23.专业术语\t34.调度架构设计\t55.服务重启和任务状态恢复\t65.1 Master Active 组合服务...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 31,620
精华内容 12,648
关键字:

如何做好调度工作