精华内容
下载资源
问答
  • 并发执行是多道程序系统中多个程序(逻辑上互相独立)或者一个程序中的多个程序段在执行的过程当中,时间互相重叠,一个程序执行没结束,另一个已经开始。并行执行是指一组程序按照独立的,不同步的速度执行,时间上...
    并发执行是多道程序系统中多个程序(逻辑上互相独立)或者一个程序中的多个程序段在执行的过程当中,时间互相重叠,一个程序执行没结束,另一个已经开始。

    并行执行是指一组程序按照独立的,不同步的速度执行,时间上不重叠;
    串行就是指令一个一个的执行。并行是指令同时并行执行。

    总结:
      并发是指多个线程轮流执行(单核CPU);
      并行是指多个线程同时执行(多核CPU),微观上是同时的;
      串行是指一个一个的执行,处理完一个才能处理下一个,不轮换;

    转载于:https://www.cnblogs.com/feixian-blog/p/8850273.html

    展开全文
  • 同一个多线程程序,如果在一个CUP上并发执行与 在多个CPCU上并行执行,从运行结果上有区别吗?(不关心执行效率)
  • 并发并行与Go并发编程

    千次阅读 2017-06-20 12:44:21
    并发与并行 并发(concurrency) 并发的关注点在于任务切分。举例来说,你是一个创业公司的CEO,开始只有你一个人,你一人分饰多角,一会做产品规划,一会写代码,一会见客户,虽然你不能见客户的同时写代码,但...

    并发与并行

    • 并发(concurrency) 并发的关注点在于任务切分。举例来说,你是一个创业公司的CEO,开始只有你一个人,你一人分饰多角,一会做产品规划,一会写代码,一会见客户,虽然你不能见客户的同时写代码,但由于你切分了任务,分配了时间片,表现出来好像是多个任务一起在执行。

    • 并行(parallelism) 并行的关注点在于同时执行。还是上面的例子,你发现你自己太忙了,时间分配不过来,于是请了工程师,产品经理,市场总监,各司一职,这时候多个任务可以同时执行了。

    GreenThread

    • 用户空间 首先是在用户空间,避免内核态和用户态的切换导致的成本。

    • 由语言或者框架层调度

    • 更小的栈空间允许创建大量实例(百万级别)

    几个概念

    • Continuation 这个概念不熟悉 FP 编程的人可能不太熟悉,不过这里可以简单的顾名思义,可以理解为让我们的程序可以暂停,然后下次调用继续(contine)从上次暂停的地方开始的一种机制。相当于程序调用多了一种入口。

    • Coroutine 是 Continuation 的一种实现,一般表现为语言层面的组件或者类库。主要提供 yield,resume 机制。

    • Fiber 和 Coroutine 其实是一体两面的,主要是从系统层面描述,可以理解成 Coroutine 运行之后的东西就是 Fiber。

    Goroutine

    Goroutine 其实就是前面 GreenThread 系列解决方案的一种演进和实现。

    • 首先,它内置了 Coroutine 机制。因为要用户态的调度,必须有可以让代码片段可以暂停/继续的机制。

    • 其次,它内置了一个调度器,实现了 Coroutine 的多线程并行调度,同时通过对网络等库的封装,对用户屏蔽了调度细节。

    • 最后,提供了 Channel 机制,用于 Goroutine 之间通信,实现 CSP 并发模型(Communicating Sequential Processes)。因为 Go 的 Channel 是通过语法关键词提供的,对用户屏蔽了许多细节。其实 Go 的 Channel 和 Java 中的 SynchronousQueue 是一样的机制,如果有 buffer 其实就是 ArrayBlockQueue。

    Goroutine 调度器

    这个图一般讲 Goroutine 调度器的地方都会引用,想要仔细了解的可以看看原博客(小编:点击阅读原文获取)。这里只说明几点:

    1. M 代表系统线程,P 代表处理器(核),G 代表 Goroutine。Go 实现了 M : N 的调度,也就是说线程和 Goroutine 之间是多对多的关系。这点在许多GreenThread / Coroutine 的调度器并没有实现。比如 Java 1.1 版本之前的线程其实是 GreenThread(这个词就来源于 Java),但由于没实现多对多的调度,也就是没有真正实现并行,发挥不了多核的优势,所以后来改成基于系统内核的 Thread 实现了。

    2. 某个系统线程如果被阻塞,排列在该线程上的 Goroutine 会被迁移。当然还有其他机制,比如 M 空闲了,如果全局队列没有任务,可能会从其他 M 偷任务执行,相当于一种 rebalance 机制。这里不再细说,有需要看专门的分析文章。

    3. 具体的实现策略和我们前面分析的机制类似。系统启动时,会启动一个独立的后台线程(不在 Goroutine 的调度线程池里),启动 netpoll 的轮询。当有 Goroutine 发起网络请求时,网络库会将 fd(文件描述符)和 pollDesc(用于描述 netpoll 的结构体,包含因为读 / 写这个 fd 而阻塞的 Goroutine)关联起来,然后调用 runtime.gopark 方法,挂起当前的 Goroutine。当后台的 netpoll 轮询获取到 epoll(Linux 环境下)的 event,会将 event 中的 pollDesc 取出来,找到关联的阻塞 Goroutine,并进行恢复。

    Goroutine 是银弹么?

    Goroutine 很大程度上降低了并发的开发成本,是不是我们所有需要并发的地方直接 go func 就搞定了呢?

    Go 通过 Goroutine 的调度解决了 CPU 利用率的问题。但遇到其他的瓶颈资源如何处理?比如带锁的共享资源,比如数据库连接等。互联网在线应用场景下,如果每个请求都扔到一个 Goroutine 里,当资源出现瓶颈的时候,会导致大量的 Goroutine 阻塞,最后用户请求超时。这时候就需要用 Goroutine 池来进行控流,同时问题又来了:池子里设置多少个 Goroutine 合适?

    所以这个问题还是没有从更本上解决。

    go没有严格的内置的logical processor数量限制,但是go的runtime默认限制了每个program最多使用10,000个线程,可以通过SetMaxThreads修改.下图展示了Concurrency和Parallelism的区别 

    goroutine使用

    go块

    go的用法很简单,如下. 如果没有最外面的括号{}(),会显示go块必须是一个函数调用.没有()只是一个函数的声明,有了()是一个调用(没有参数的)

    go func() {
      for _,n := range nums {
        out <- n
      }
      close(out)
    }()
    

    channel

    channel默认上是阻塞的,也就是说,如果Channel满了,就阻塞写,如果Channel空了,就阻塞读。于是,我们就可以使用这种特性来同步我们的发送和接收端。

    channel <-,发送一个新的值到通道中 <-channel,从通道中接收一个值,这个更像有两层含义,一个是会返回一个结果,当做赋值来用:msg := <-channel;另外一个含义是等待这个channel发送消息,所以还有一个等的含义在.所以如果你直接写fmt.Print(<-channel)本意只是想输出下这个chan传来的值,但是其实他还会阻塞住等着channel来发.

    默认发送和接收操作是阻塞的,直到发送方和接收方都准备完毕。

    func main() {
        messages := make(chan string)
        go func() { messages <- "ping" }()
        msg := <-messages
        fmt.Println(msg)
    }
    

    所以你要是这么写:是一辈子都不会执行到print的(会死锁)

    func main() {
        messages := make(chan string)
        messages <- "ping"
        msg := <-messages
        fmt.Println(msg)
    }
    

    所以在一个go程中,发送messages <- "msg"channel的时候,要格外小心,不然一不留神就死锁了.(解决方法:1. 用带缓存的chan; 2. 使用带有default的select发送)

    select {
    case messages <- "msg":
        fmt.Println("sent message")
    default:
        fmt.Println("no message sent")
    }
    

    range

    用于channel的range是阻塞的.下面程序会显示deadloc,去掉注释就好了.

    queue := make(chan string, 2)
    //queue <- "one"
    //queue <- "two"
    //close(queue)
    for elem := range queue {
      fmt.Println(elem)
    }
    

    通道缓冲

    加了缓存之后,就像你向channel发送消息的时候(message <- "ping"),"ping"就已经发送出去了(到缓存).就像一个异步的队列?到时候,<-message直接从缓存中取值就好了(异步...)

    但是你要这么写,利用通道缓冲,就可以.无缓冲的意味着只有在对应的接收(<-chan)通道准备好接收时,才允许发送(chan <-),可缓存通道允许在没有对应接收方的情况下,缓存限定数量的值。

    func main() {
      message := make(chan string,1)
      message <- "ping"
      msg := <-message
      fmt.Print(msg)
    }
    

    要是多发一个messages <- "channel",fatal error: all goroutines are asleep - deadlock!,要是多接受一个fmt.Println(<-messages),会打印出buffered channel,然后报同样的error

    func main() {
        messages := make(chan string, 2)
        messages <- "buffered"
        messages <- "channel"
        fmt.Println(<-messages)
        fmt.Println(<-messages)
    }
    

    通道同步

    使用通道同步,如果你把 <- done 这行代码从程序中移除,程序甚至会在 worker还没开始运行时就结束了。

    func worker(done chan bool) {
        fmt.Print("working...")
        time.Sleep(time.Second) // working
        fmt.Println("done")
        done <- true
    }
    func main() {
        done := make(chan bool, 1)
        go worker(done)
        <-done //blocking 阻塞在这里,知道worker执行完毕
    }
    

    发送方向

    可以指定这个通道是不是只用来发送或者接收值。这个特性提升了程序的类型安全性。pong 函数允许通道(pings)来接收数据,另一通道(pongs)来发送数据。

    func ping(pings chan<- string, msg string) {
        pings <- msg
    }
    
    func pong(pings <-chan string, pongs chan<- string) {
        msg := <-pings
        pongs <- msg
    }
    
    func main() {
        pings := make(chan string, 1)
        pongs := make(chan string, 1)
        ping(pings, "passed message")
        pong(pings, pongs)
        fmt.Println(<-pongs)
    }
    

    select

    Go 的select 让你可以同时等待多个通道操作。(poll/epoll?) 注意select 要么写个死循环用超时,要不就定好次数.或者加上default让select变成非阻塞的

    go func() {
        time.Sleep(time.Second * 1)
        c1 <- "one"
    }()
    
    go func() {
        time.Sleep(time.Second * 2)
        c2 <- "two"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
    

    超时处理

    其中time.After返回<-chan Time,直接向select发送消息

    select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
    }
    

    非阻塞通道操作

    default,当监听的channel都没有准备好的时候,默认执行的.

    select {
    case msg := <-messages:
        fmt.Println("received message", msg)
    default:
        fmt.Println("no message received")
    }
    

    可以使用 select 语句来检测 chan 是否已经满了

    ch := make (chan int, 1)
    ch <- 1
    select {
    case ch <- 2:
    default:
        fmt.Println("channel is full !")
    }
    

    通道关闭

    一个非空的通道也是可以关闭的,但是通道中剩下的值仍然可以被接收到

    queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue)
    for elem := range queue {
        fmt.Println(elem)
    }
    

    定时器

    在未来某一刻执行一次时使用的

    定时器表示在未来某一时刻的独立事件。你告诉定时器需要等待的时间,然后它将提供一个用于通知的通道。可以显示的关闭

    timer1 := time.NewTimer(time.Second * 2)
    <-timer1.C
    

    <-timer1.C 直到这个定时器的通道 C 明确的发送了定时器失效的值(2s)之前,将一直阻塞。如果你只是要单纯的等待用time.Sleep,定时器是可以在它失效之前把它给取消的stop2 := timer2.Stop()

    打点器

    当你想要在固定的时间间隔重复执行,定时的执行,直到我们将它停止

    func main() {
        //打点器和定时器的机制有点相似:一个通道用来发送数据。这里我们在这个通道上使用内置的 range 来迭代值每隔500ms 发送一次的值。
        ticker := time.NewTicker(time.Millisecond * 500)
        go func() {
            for t := range ticker.C {
                fmt.Println("Tick at", t)
            }
        }()
    
        //打点器可以和定时器一样被停止。一旦一个打点停止了,将不能再从它的通道中接收到值。我们将在运行后 1600ms停止这个打点器。
        time.Sleep(time.Millisecond * 1600)
        ticker.Stop()
        fmt.Println("Ticker stopped")
    }
    

    生成器

    类似于提供了一个服务,不过只是适用于调用不是很频繁

    func rand_generator_2() chan int {
        out := make(chan int)
        go func() {
            for {
                out <- rand.Int()
            }
        }()
        return out
    }
    
    func main() {
        // 生成随机数作为一个服务
        rand_service_handler := rand_generator_2()
        fmt.Printf("%dn", <-rand_service_handler)
    }
    

    多路复用

    Apache使用处理每个连接都需要一个进程,所以其并发性能不是很好。而Nighx使用多路复用的技术,让一个进程处理多个连接,所以并发性能比较好。

    多路复用技术可以用来整合多个通道。提升性能和操作的便捷。

    其实就是整合了多个上面的生成器

    func rand_generator_3() chan int {
        rand_generator_1 := rand_generator_2()
        rand_generator_2 := rand_generator_2()
        out := make(chan int)
    
        go func() {
            for {
                //读取生成器1中的数据,整合
                out <- <-rand_generator_1
            }
        }()
        go func() {
            for {
                //读取生成器2中的数据,整合
                out <- <-rand_generator_2
            }
        }()
        return out
    }
    

    Furture技术

    可以在不准备好参数的情况下调用函数。函数调用和函数参数准备这两个过程可以完全解耦。可以在调用的时候不关心数据是否准备好,返回值是否计算好的问题。让程序中的组件在准备好数据的时候自动跑起来。 这个最后取得<-q.result也是可以放到execQuery上面的把

    Furture技术可以和各个其他技术组合起来用。可以通过多路复用技术,监听多个结果Channel,当有结果后,自动返回。也可以和生成器组合使用,生成器不断生产数据,Furture技术逐个处理数据。Furture技术自身还可以首尾相连,形成一个并发的pipe filter。这个pipe filter可以用于读写数据流,操作数据流。

    type query struct {
        sql chan string
        result chan string
    }
    
    func execQuery(q query) {
        go func() {
            sql := <-q.sql
            q.result <- "get " + sql
        }()
    
    }
    
    func main() {
        q := query{make(chan string, 1), make(chan string, 1)}
        execQuery(q)
    
        //准备参数
        q.sql <- "select * from table"
        fmt.Println(<-q.result)
    }
    

    Chain Filter技术

    程序创建了10个Filter,每个分别过滤一个素数,所以可以输出前10个素数。

    func Generate(ch chan<- int) {
        for i := 2; ; i++ {
            ch <- i 
        }
    }
    
    func Filter(in <-chan int, out chan<- int, prime int) {
        for {
            i := <-in // Receive value from 'in'.
            if i%prime != 0 {
                out <- i // Send 'i' to 'out'.
            }
        }
    }
    
    // The prime sieve: Daisy-chain Filter processes.
    func main() {
        ch := make(chan int) // Create a new channel.
        go Generate(ch)      // Launch Generate goroutine.
        for i := 0; i < 10; i++ {
            prime := <-ch
            print(prime, "n")
            ch1 := make(chan int)
            go Filter(ch, ch1, prime)
            ch = ch1
        }
    }
    

    共享变量

    有些时候使用共享变量可以让代码更加简洁

    type sharded_var struct {
        reader chan int
        writer chan int
    }
    
    func sharded_var_whachdog(v sharded_var) {//共享变量维护协程
        go func() {
            var value int = 0
            for { //监听读写通道,完成服务
                select {
                case value = <-v.writer:
                case v.reader <- value:
                }
            }
        }()
    }
    
    func main() {
        v := sharded_var{make(chan int), make(chan int)} //初始化,并开始维护协程
        sharded_var_whachdog(v)
    
        fmt.Println(<-v.reader)
        v.writer <- 1
        fmt.Println(<-v.reader)
    }
    

    Concurrency patterns

    下面介绍了一些常用的并发模式.

    Runner

    当你的程序会运行在后台,可以是cron job或者是Iron.io这样的worker-based云环境.这个程序就可以监控和中断你的程序,如果你的程序运行的太久了.

    定义了三个channel来通知任务状态.

    • interrupt:接收系统的终止信号(比如ctrl-c),接收到之后系统就优雅的退出
    • complete:指示任务完成状态或者返回错误
    • timeout:当超时了之后,系统就优雅的退出

    tasks是一个函数类型的slice,你可以往里面存放签名为func funcName(id int){}的函数,作为你的任务.task(id)就是在执行任务了(当然只是用来模拟任务,可以定义一个任务接口来存放任务,此处是为了简便). 注意tasks里面的任务是串行执行的,这些任务的执行发生在一个单独的goroutine中.

    New方法里的interrupt channel buffer设置为1,也就是说当用户重复ctrl+c的时候,程序也只会收到一个信号,其他的信号会被丢弃.

    在run()方法中,在开始执行任务前(task(id)),会前检查执行流程有没有被中断(if r.gotInterrupt() {}),这里用了一个带default语句的select.一旦收到中断的事件,程序就不再接受任何其他事件了(signal.Stop(r.interrupt)).

    在Start()方法中,在go块中执行run()方法,任何当前的goroutine会阻塞在select这边,直到收到run()返回的complete channel或者超时返回.

    // Runner runs a set of tasks within a given timeout and can be shut down on an operating system interrupt.
    type Runner struct {
        // interrupt channel reports a signal from the operating system.
        interrupt chan os.Signal
    
        // complete channel reports that processing is done.
        complete chan error
    
        // timeout reports that time has run out.
        timeout <-chan time.Time
    
        // tasks holds a set of functions that are executed
        // synchronously in index order.
        tasks []func(int)
    }
    
    // ErrTimeout is returned when a value is received on the timeout channel.
    var ErrTimeout = errors.New("received timeout")
    
    // ErrInterrupt is returned when an event from the OS is received.
    var ErrInterrupt = errors.New("received interrupt")
    
    // New returns a new ready-to-use Runner.
    func New(d time.Duration) *Runner {
        return &Runner{
            interrupt: make(chan os.Signal, 1),
            complete:  make(chan error),
            timeout:   time.After(d),
        }
    }
    
    // Add attaches tasks to the Runner. A task is a function that takes an int ID. ...表示可以传入多个参数
    func (r *Runner) Add(tasks ...func(int)) { 
        r.tasks = append(r.tasks, tasks...)
    }
    
    // Start runs all tasks and monitors channel events.
    func (r *Runner) Start() error {
        // We want to receive all interrupt based signals.
        signal.Notify(r.interrupt, os.Interrupt)
    
        // Run the different tasks on a different goroutine.
        go func() {
            r.complete <- r.run()
        }()
    
        select {
        // Signaled when processing is done.
        case err := <-r.complete:
            return err
    
        // Signaled when we run out of time.
        case <-r.timeout:
            return ErrTimeout
        }
    }
    
    // run executes each registered task.
    func (r *Runner) run() error {
        for id, task := range r.tasks {
            // Check for an interrupt signal from the OS.
            if r.gotInterrupt() {
                return ErrInterrupt
            }
    
            // Execute the registered task.
            task(id)
        }
    
        return nil
    }
    
    // gotInterrupt verifies if the interrupt signal has been issued.
    func (r *Runner) gotInterrupt() bool {
        select {
        // Signaled when an interrupt event is sent.
        case <-r.interrupt:
            // Stop receiving any further signals.
            signal.Stop(r.interrupt)
            return true
    
        // Continue running as normal.
        default:
            return false
        }
    }
    

    main方法

    const timeout = 3 * time.Second
    
    // main is the entry point for the program.
    func main() {
        log.Println("Starting work.")
    
        // Create a new timer value for this run.
        r := runner.New(timeout)
    
        // Add the tasks to be run.
        r.Add(createTask(), createTask(), createTask())
    
        // Run the tasks and handle the result.
        if err := r.Start(); err != nil {
            switch err {
            case runner.ErrTimeout:
                log.Println("Terminating due to timeout.")
                os.Exit(1)
            case runner.ErrInterrupt:
                log.Println("Terminating due to interrupt.")
                os.Exit(2)
            }
        }
    
        log.Println("Process ended.")
    }
    
    // createTask returns an example task that sleeps for the specified
    // number of seconds based on the id.
    func createTask() func(int) {
        return func(id int) {
            log.Printf("Processor - Task #%d.", id)
            time.Sleep(time.Duration(id) * time.Second)
        }
    }
    

    Pooling

    当你有一些特定的资源要共享,比如数据库连接或者内存buffers,这个模式就非常有用

    goroutine要用一个资源,就去pool中去拿,用完了就还回去.

    例子中的资源是只要实现了io.Closer接口即可.

    • m用来保证多goroutine下对Poll的操作都是value-safe的.
    • resources将会是一个buffered channel,会包含将要分享的资源.
    • factory的作用是创建一个新的资源,当poll有需要的时候.
    • closed用来指示pool有无被关闭

    New函数接受一个用来创建新资源的函数对象(fn func() (io.Closer, error),返回一个资源)还有一个size参数.

    Acquire函数先从pool中取资源,要是取不到用factory新建一个

    func (p *Pool) Acquire() (io.Closer, error) {
        select {
        // Check for a free resource.
        case r, _ := <-p.resources:
            return r, nil
    
        // Provide a new resource since there are none available.
        default:
            return p.factory()
        }
    }
    

    Release函数:如果pool已经关闭,就直接return.否则就向resource这个buffered channel里发送要释放的资源.default语句是如果resource已经满了,就关闭这个pool.

    Close函数:当程序运行完关闭pool的时候,应该调用Close函数,这个函数首先关闭resource这个buffered channel,然后再把buffered channel中的任务关闭(io.Closer).注意这个加锁.

    // Pool manages a set of resources that can be shared safely by multiple goroutines.
    // The resource being managed must implement  the io.Closer interface.
    type Pool struct {
        m         sync.Mutex
        resources chan io.Closer
        factory   func() (io.Closer, error)
        closed    bool
    }
    
    // ErrPoolClosed is returned when an Acquire returns on a closed pool.
    var ErrPoolClosed = errors.New("Pool has been closed.")
    
    // New creates a pool that manages resources. A pool requires a
    // function that can allocate a new resource and the size of the pool.
    func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
        if size <= 0 {
            return nil, errors.New("Size value too small.")
        }
    
        return &Pool{
            factory:   fn,
            resources: make(chan io.Closer, size),
        }, nil
    }
    
    // Acquire retrieves a resource    from the pool.
    func (p *Pool) Acquire() (io.Closer, error) {
        select {
        // Check for a free resource.
        case r, ok := <-p.resources:
            log.Println("Acquire:", "Shared Resource")
            if !ok {
                return nil, ErrPoolClosed
            }
            return r, nil
    
        // Provide a new resource since there are none available.
        default:
            log.Println("Acquire:", "New Resource")
            return p.factory()
        }
    }
    
    // Release places a new resource onto the pool.
    func (p *Pool) Release(r io.Closer) {
        // Secure this operation with the Close operation.
        p.m.Lock()
        defer p.m.Unlock()
    
        // If the pool is closed, discard the resource.
        if p.closed {
            r.Close()
            return
        }
    
        select {
        // Attempt to place the new resource on the queue.
        case p.resources <- r:
            log.Println("Release:", "In Queue")
    
        // If the queue is already at cap we close the resource.
        default:
            log.Println("Release:", "Closing")
            r.Close()
        }
    }
    
    // Close will shutdown the pool and close all existing resources.
    func (p *Pool) Close() {
        // Secure this operation with the Release operation.
        p.m.Lock()
        defer p.m.Unlock()
    
        // If the pool is already close, don't do anything.
        if p.closed {
            return
        }
    
        // Set the pool as closed.
        p.closed = true
    
        // Close the channel before we drain the channel of its
        // resources. If we don't do this, we will have a deadlock.
        close(p.resources)
    
        // Close the resources
        for r := range p.resources {
            r.Close()
        }
    }
    

    main

    const (
        maxGoroutines   = 25 // the number of routines to use.
        pooledResources = 2  // number of resources in the pool
    )
    
    // dbConnection simulates a resource to share.
    type dbConnection struct {
        ID int32
    }
    
    // Close implements the io.Closer interface so dbConnection can be managed by the pool. Close performs any resource release management.
    func (dbConn *dbConnection) Close() error {
        log.Println("Close: Connection", dbConn.ID)
        return nil
    }
    
    // idCounter provides support for giving each connection a unique id.
    var idCounter int32
    
    // createConnection is a factory method that will be called by the pool when a new connection is needed.
    func createConnection() (io.Closer, error) {
        id := atomic.AddInt32(&idCounter, 1)
        log.Println("Create: New Connection", id)
    
        return &dbConnection{id}, nil
    }
    
    // main is the entry point for all Go programs.
    func main() {
        var wg sync.WaitGroup
        wg.Add(maxGoroutines)
    
        // Create the pool to manage our connections.
        p, err := pool.New(createConnection, pooledResources)
        if err != nil {
            log.Println(err)
        }
    
        // Perform queries using connections from the pool.
        for query := 0; query < maxGoroutines; query++ {
            // Each goroutine needs its own copy of the query value else they will all be sharing the same query variable.
            go func(q int) {
                performQueries(q, p)
                wg.Done()
            }(query)
        }
    
        // Wait for the goroutines to finish.
        wg.Wait()
    
        // Close the pool.
        log.Println("Shutdown Program.")
        p.Close()
    }
    
    // performQueries tests the resource pool of connections.
    func performQueries(query int, p *pool.Pool) {
        // Acquire a connection from the pool.
        conn, err := p.Acquire()
        if err != nil {
            log.Println(err)
            return
        }
    
        // Release the connection back to the pool.
        defer p.Release(conn)
    
        // Wait to simulate a query response.
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
        log.Printf("Query: QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
    }
    

    Work

    New函数开启了固定个数(maxGoroutines)个goroutine,注意这边work是一个unbuffered channel.这个for range会阻塞直到channel中有值可以取.要是work这个channel被关闭了,这个for range就结束,然后调用wg.Done

    Run函数提交任务到pool中去w.work <- w.注意这个work是一个unbuffered channel,所以得等一个goroutine把它取走,否则会阻塞住.这是我们需要保证的,因为我们想要调用者保证这个任务被提交之后立即开始运行

    type Worker interface {
        Task()
    }
    
    // Pool provides a pool of goroutines that can execute any Worker
    // tasks that are submitted.
    type Pool struct {
        work chan Worker
        wg   sync.WaitGroup
    }
    
    // New creates a new work pool.
    func New(maxGoroutines int) *Pool {
        p := Pool{
            work: make(chan Worker),
        }
    
        p.wg.Add(maxGoroutines)
        for i := 0; i < maxGoroutines; i++ {
            go func() {
                for w := range p.work {
                    w.Task()
                }
                p.wg.Done()
            }()
        }
    
        return &p
    }
    
    // Run submits work to the pool.
    func (p *Pool) Run(w Worker) {
        p.work <- w
    }
    
    // Shutdown waits for all the goroutines to shutdown.
    func (p *Pool) Shutdown() {
        close(p.work)
        p.wg.Wait()
    }
    

    main

    // names provides a set of names to display.
    var names = []string{
        "steve",
        "bob",
        "mary",
        "therese",
        "jason",
    }
    
    // namePrinter provides special support for printing names.
    type namePrinter struct {
        name string
    }
    
    // Task implements the Worker interface.
    func (m *namePrinter) Task() {
        log.Println(m.name)
        time.Sleep(time.Second)
    }
    
    // main is the entry point for all Go programs.
    func main() {
        // Create a work pool with 2 goroutines.
        p := work.New(2)
    
        var wg sync.WaitGroup
        wg.Add(100 * len(names))
    
        for i := 0; i < 100; i++ {
            // Iterate over the slice of names.
            for _, name := range names {
                // Create a namePrinter and provide the
                // specific name.
                np := namePrinter{
                    name: name,
                }
    
                go func() {
                    // Submit the task to be worked on. When RunTask
                    // returns we know it is being handled.
                    p.Run(&np)
                    wg.Done()
                }()
            }
        }
    
        wg.Wait()
    
        // Shutdown the work pool and wait for all existing work
        // to be completed.
        p.Shutdown()
    }
    

    另一种worker的写法

    创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue。把工作发送到工作队列中去JobQueue <- work

    var (
        MaxWorker = os.Getenv("MAX_WORKERS")
        MaxQueue  = os.Getenv("MAX_QUEUE")
    )
    
    // Job represents the job to be run
    type Job struct {
        Payload Payload
    }
    
    // A buffered channel that we can send work requests on.
    var JobQueue chan Job
    
    // Worker represents the worker that executes the job
    type Worker struct {
        WorkerPool  chan chan Job
        JobChannel  chan Job
        quit        chan bool
    }
    
    func NewWorker(workerPool chan chan Job) Worker {
        return Worker{
            WorkerPool: workerPool,
            JobChannel: make(chan Job),
            quit:       make(chan bool)}
    }
    
    // Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it
    func (w Worker) Start() {
        go func() {
            for {
                // register the current worker into the worker queue.
                w.WorkerPool <- w.JobChannel
    
                select {
                case job := <-w.JobChannel:
                    // we have received a work request.
                    if err := job.Payload.UploadToS3(); err != nil {
                        log.Errorf("Error uploading to S3: %s", err.Error())
                    }
    
                case <-w.quit:
                    // we have received a signal to stop
                    return
                }
            }
        }()
    }
    
    // Stop signals the worker to stop listening for work requests.
    func (w Worker) Stop() {
        go func() {
            w.quit <- true
        }()
    }
    

    我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job。dispatcher.Run()(这个类似资源池)

    type Dispatcher struct {
        // A pool of workers channels that are registered with the dispatcher
        WorkerPool chan chan Job
    }
    
    func NewDispatcher(maxWorkers int) *Dispatcher {
        pool := make(chan chan Job, maxWorkers)
        return &Dispatcher{WorkerPool: pool}
    }
    
    func (d *Dispatcher) Run() {
        // starting n number of workers
        for i := 0; i < d.maxWorkers; i++ {
            worker := NewWorker(d.pool)
            worker.Start()
        }
    
        go d.dispatch()
    }
    
    func (d *Dispatcher) dispatch() {
        for {
            select {
            case job := <-JobQueue:
                // a job request has been received
                go func(job Job) {
                    // try to obtain a worker job channel that is available.
                    // this will block until a worker is idle
                    jobChannel := <-d.WorkerPool
    
                    // dispatch the job to the worker job channel
                    jobChannel <- job
                }(job)
            }
        }
    }
    展开全文
  • 《 Quartz 线程处理之——并行执行与串行执行 》 Quartz定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行, 如果定时任执行太长,会长时间占用资源,导致其它任务堵塞...




    《 Quartz 线程处理之——并行执行与串行执行 》





    Quartz定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行, 如果定时任执行太长,会长时间占用资源,导致其它任务堵塞。
    1.在Spring中这时需要设置concurrent的值为false, 禁止并发执行。


    <property name="concurrent" value="true" />




    2.当不使用spring的时候就需要在Job的实现类上加@DisallowConcurrentExecution的注释
    @DisallowConcurrentExecution 禁止并发执行多个相同定义的JobDetail, 这个注解是加在Job类上的, 但意思并不是不能同时执行多个Job, 而是不能并发执行同一个Job Definition(由JobDetail定义), 但是可以同时执行多个不同的JobDetail, 举例说明,我们有一个Job类,叫做SayHelloJob, 并在这个Job上加了这个注解, 然后在这个Job上定义了很多个JobDetail, 如sayHelloToJoeJobDetail, sayHelloToMikeJobDetail, 那么当scheduler启动时, 不会并发执行多个sayHelloToJoeJobDetail或者sayHelloToMikeJobDetail, 但可以同时执行sayHelloToJoeJobDetail跟sayHelloToMikeJobDetail


    @PersistJobDataAfterExecution 同样, 也是加在Job上,表示当正常执行完Job后, JobDataMap中的数据应该被改动, 以被下一次调用时用。当使用@PersistJobDataAfterExecution 注解时, 为了避免并发时, 存储数据造成混乱, 强烈建议把@DisallowConcurrentExecution注解也加上。


     


     


    @DisallowConcurrentExecution


    此标记用在实现Job的类上面,意思是不允许并发执行,按照我之前的理解是 不允许调度框架在同一时刻调用Job类,后来经过测试发现并不是这样,而是Job(任务)的执行时间[比如需要10秒]大于任务的时间间隔 [Interval(5秒)],那么默认情况下,调度框架为了能让 任务按照我们预定的时间间隔执行,会马上启用新的线程执行任务。否则的话会等待任务执行完毕以后 再重新执行!(这样会导致任务的执行不是按照我们预先定义的时间间隔执行)


    测试代码,这是官方提供的例子。设定的时间间隔为3秒,但job执行时间是5秒,设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行,否则会在3秒时再启用新的线程执行


    org.quartz.threadPool.threadCount = 5 这里配置框架的线程池中线程的数量,要多配置几个,否则@DisallowConcurrentExecution不起作用


    org.quartz.scheduler.instanceName = MyScheduler
    org.quartz.threadPool.threadCount = 5
    org.quartz.jobStore.class =org.quartz.simpl.RAMJobStore



    /*
     * Copyright 2005 - 2009 Terracotta, Inc.
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy
     * of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations
     * under the License.
     *
     */
    
    
    package org.quartz.examples.example5;
    
    
    import java.util.Date;
    
    
    import org.quartz.DisallowConcurrentExecution;
    import org.quartz.Job;
    import org.quartz.JobDataMap;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.quartz.PersistJobDataAfterExecution;
    
    
    /**
     * <p> A dumb implementation of Job, for unit testing purposes. </p>
     * 
     * @author James House
     */
    @PersistJobDataAfterExecution
    @DisallowConcurrentExecution
    public class StatefulDumbJob implements Job {
    
    
        /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
         * 
         * Constants.
         * 
         * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
    
    
        public static final String NUM_EXECUTIONS = "NumExecutions";
    
    
        public static final String EXECUTION_DELAY = "ExecutionDelay";
    
    
        /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
         * 
         * Constructors.
         * 
         * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
    
    
        public StatefulDumbJob() {
        }
    
    
        /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
         * 
         * Interface.
         * 
         * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
    
    
        /**
         * <p> Called by the <code>{@link org.quartz.Scheduler}</code> when a <code>{@link org.quartz.Trigger}</code> fires that is associated with the <code>Job</code>. </p>
         * 
         * @throws JobExecutionException if there is an exception while executing the job.
         */
        public void execute(JobExecutionContext context) throws JobExecutionException {
            System.err.println("---" + context.getJobDetail().getKey() + " executing.[" + new Date() + "]");
    
    
            JobDataMap map = context.getJobDetail().getJobDataMap();
    
    
            int executeCount = 0;
            if (map.containsKey(NUM_EXECUTIONS)) {
                executeCount = map.getInt(NUM_EXECUTIONS);
            }
    
    
            executeCount++;
    
    
            map.put(NUM_EXECUTIONS, executeCount);
    
    
            long delay = 5000l;
            if (map.containsKey(EXECUTION_DELAY)) {
                delay = map.getLong(EXECUTION_DELAY);
            }
    
    
            try {
                Thread.sleep(delay);
            } catch (Exception ignore) {
            }
    
    
            System.err.println(" -" + context.getJobDetail().getKey() + " complete (" + executeCount + ").");
    
    
        }
    
    
    }




    /*
     * Copyright 2005 - 2009 Terracotta, Inc.
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy
     * of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations
     * under the License.
     *
     */
    
    
    package org.quartz.examples.example5;
    
    
    import static org.quartz.JobBuilder.newJob;
    import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
    import static org.quartz.TriggerBuilder.newTrigger;
    import static org.quartz.DateBuilder.*;
    
    
    import java.util.Date;
    
    
    import org.quartz.JobDetail;
    import org.quartz.Scheduler;
    import org.quartz.SchedulerFactory;
    import org.quartz.SchedulerMetaData;
    import org.quartz.SimpleTrigger;
    import org.quartz.impl.StdSchedulerFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    /**
     * Demonstrates the behavior of <code>StatefulJob</code>s, as well as how misfire instructions affect the firings of triggers of <code>StatefulJob</code> s - when the jobs take longer to execute that the frequency of the trigger's repitition.
     * 
     * <p> While the example is running, you should note that there are two triggers with identical schedules, firing identical jobs. The triggers "want" to fire every 3 seconds, but the jobs take 10 seconds to execute. Therefore, by the time the jobs complete their execution, the triggers have already "misfired" (unless the scheduler's "misfire threshold" has been set to more than 7 seconds). You should see that one of the jobs has its misfire instruction set to <code>SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT</code>, which causes it to fire immediately, when the misfire is detected. The other trigger uses the default "smart policy" misfire instruction, which causes the trigger to advance to its next fire time (skipping those that it has missed) - so that it does not refire immediately, but rather at the next scheduled time. </p>
     * 
     * @author <a href="mailto:bonhamcm@thirdeyeconsulting.com">Chris Bonham</a>
     */
    public class MisfireExample {
        public void run() throws Exception {
            Logger log = LoggerFactory.getLogger(MisfireExample.class);
    
    
            log.info("------- Initializing -------------------");
    
    
            // First we must get a reference to a scheduler
            SchedulerFactory sf = new StdSchedulerFactory();
            Scheduler sched = sf.getScheduler();
    
    
            log.info("------- Initialization Complete -----------");
    
    
            log.info("------- Scheduling Jobs -----------");
    
    
            // jobs can be scheduled before start() has been called
    
    
            // get a "nice round" time a few seconds in the future...
            Date startTime = nextGivenSecondDate(null, 15);
    
    
            // statefulJob1 will run every three seconds
            // (but it will delay for ten seconds)
            JobDetail job = newJob(StatefulDumbJob.class).withIdentity("statefulJob1", "group1").usingJobData(StatefulDumbJob.EXECUTION_DELAY, 10000L).build();
    
    
            SimpleTrigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(startTime).withSchedule(simpleSchedule().withIntervalInSeconds(3).repeatForever()).build();
    
    
            Date ft = sched.scheduleJob(job, trigger);
            log.info(job.getKey() + " will run at: " + ft + " and repeat: " + trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
    
    
            log.info("------- Starting Scheduler ----------------");
    
    
            // jobs don't start firing until start() has been called...
            sched.start();
    
    
            log.info("------- Started Scheduler -----------------");
    
    
            try {
                // sleep for ten minutes for triggers to file....
                Thread.sleep(600L * 1000L);
            } catch (Exception e) {
            }
    
    
            log.info("------- Shutting Down ---------------------");
    
    
            sched.shutdown(true);
    
    
            log.info("------- Shutdown Complete -----------------");
    
    
            SchedulerMetaData metaData = sched.getMetaData();
            log.info("Executed " + metaData.getNumberOfJobsExecuted() + " jobs.");
        }
    
    
        public static void main(String[] args) throws Exception {
    
    
            MisfireExample example = new MisfireExample();
            example.run();
        }
    
    
    }


     


     


    @PersistJobDataAfterExecution


    此标记说明在执行完Job的execution方法后保存JobDataMap当中固定数据,在默认情况下 也就是没有设置 @PersistJobDataAfterExecution的时候 每个job都拥有独立JobDataMap


    否则改任务在重复执行的时候具有相同的JobDataMap


     


    /*
     * Copyright 2005 - 2009 Terracotta, Inc.
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy
     * of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations
     * under the License.
     *
     */
    
    
    package com.quartz.demo.example6;
    
    
    import java.util.Date;
    
    
    import org.quartz.DisallowConcurrentExecution;
    import org.quartz.Job;
    import org.quartz.JobDataMap;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.quartz.JobKey;
    import org.quartz.PersistJobDataAfterExecution;
    
    
    @PersistJobDataAfterExecution
    @DisallowConcurrentExecution
    public class BadJob1 implements Job {
    
    
        public BadJob1() {
        }
    
    
        public void execute(JobExecutionContext context) throws JobExecutionException {
            JobKey jobKey = context.getJobDetail().getKey();
            JobDataMap dataMap = context.getJobDetail().getJobDataMap();
    
    
            int denominator = dataMap.getInt("denominator");
            System.out.println("---" + jobKey + " executing at " + new Date() + " with denominator " + denominator);
    
    
            denominator++;
            dataMap.put("denominator", denominator);
        }
    
    
    }


     


     


    /*
     * Copyright 2005 - 2009 Terracotta, Inc.
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy
     * of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations
     * under the License.
     *
     */
    
    
    package com.quartz.demo.example6;
    
    
    import static org.quartz.JobBuilder.newJob;
    import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
    import static org.quartz.TriggerBuilder.newTrigger;
    import static org.quartz.DateBuilder.*;
    
    
    import java.util.Date;
    
    
    import org.quartz.JobDetail;
    import org.quartz.Scheduler;
    import org.quartz.SchedulerFactory;
    import org.quartz.SimpleTrigger;
    import org.quartz.impl.StdSchedulerFactory;
    
    
    public class JobExceptionExample {
    
    
        public void run() throws Exception {
    
    
            // First we must get a reference to a scheduler
            SchedulerFactory sf = new StdSchedulerFactory();
            Scheduler sched = sf.getScheduler();
    
    
            // jobs can be scheduled before start() has been called
    
    
            // get a "nice round" time a few seconds in the future...
            Date startTime = nextGivenSecondDate(null, 2);
    
    
            JobDetail job = newJob(BadJob1.class).withIdentity("badJob1", "group1").usingJobData("denominator", "0").build();
    
    
            SimpleTrigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(startTime).withSchedule(simpleSchedule().withIntervalInSeconds(2).repeatForever()).build();
    
    
            Date ft = sched.scheduleJob(job, trigger);
    
    
            //任务每2秒执行一次 那么在BadJob1的方法中拿到的JobDataMap的数据是共享的.
            //这里要注意一个情况: 就是JobDataMap的数据共享只针对一个BadJob1任务。
            //如果在下面在新增加一个任务 那么他们之间是不共享的 比如下面
    
    
            JobDetail job2 = newJob(BadJob1.class).withIdentity("badJob1", "group1").usingJobData("denominator", "0").build();
    
    
            SimpleTrigger trigger2 = newTrigger().withIdentity("trigger1", "group1").startAt(startTime).withSchedule(simpleSchedule().withIntervalInSeconds(2).repeatForever()).build();
    
    
            //这个job2与job执行的JobDataMap不共享
            sched.scheduleJob(job2, trigger2);
    
    
            sched.start();
    
    
            try {
                // sleep for 30 seconds
                Thread.sleep(30L * 1000L);
            } catch (Exception e) {
            }
    
    
            sched.shutdown(false);
        }
    
    
        public static void main(String[] args) throws Exception {
    
    
            JobExceptionExample example = new JobExceptionExample();
            example.run();
        }
    
    
    }













    来自:https://my.oschina.net/blueskyer/blog/325812

    展开全文
  • 1.5.1 并行与并发性的区别和联系

    千次阅读 2016-06-19 22:18:31
    并行性和并发性是既相似又有区别的两个概念。并行性是指两个或多个事件在同一时刻发生,...倘若在计算机系统中有多个处理器,则这些可以并发执行的程序便被分配到多个处理器上,实现并行执行,即利用每个处理器来处理

    并行性和并发性是既相似又有区别的两个概念。并行性是指两个或多个事件在同一时刻发生,并发性是指两个或多个事件在同一时间间隔内发生。

    在多道程序环境下,并发性是指在一段时间内,宏观上有多个程序在同时运行,但在单处理器系统中每个时刻却仅有一道程序执行,故微观上这些程序只能分时地交替执行。倘若在计算机系统中有多个处理器,则这些可以并发执行的程序便被分配到多个处理器上,实现并行执行,即利用每个处理器来处理一个可并发执行的程序。

    展开全文
  • shell 并行执行与串行执行

    千次阅读 2020-06-16 17:47:26
    并行执行:& 串行执行:&& 出现错误退出 什么都不加,出现错误会继续执行
  • 并行与并发

    千次阅读 2017-11-27 13:18:23
    程序一般指命令处理器执行一系列指令的语句的集合,而硬件描述语言并不是在命令处理器处理指令,其本身描述实际上是一个数字逻辑电路,所以硬件描述语言并不是程序。 在单核处理器中,从时钟周期这一微观角度上
  • 并行与并发的区别

    2016-11-14 14:36:45
    并行与并发的区别 并发(concurrent)和并行(parallel)是两个相似但又有区别的概念。并行是指多个事件在同一时刻发生,而并发是指多个事件在同一时间段发生。在多道程序环境下,并
  • 并发并行 概念非常相似,难以区分。分别体现在两个方面。 并发(Concurrency),体现在(1)单个处理器;(2)逻辑上同步运行。 并行(Parallelism),体现在(1)多处理器,多核心;(2)物理上同步运行。 并行的...
  • Hive 并行执行

    2019-11-28 21:40:20
    Hive 并行执行 Hive会将一个查询转化成一个或者多个阶段。 这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。 不过,...
  • 并行并发的区别

    千次阅读 多人点赞 2019-05-13 09:40:22
    并行并发有什么区别? 并行(parallelism),是同一时刻,两个线程都在执行并发(concurrency),是同一时刻,只有一个执行,但是一个时间段内,两个线程都执行了。 并行 并发 【Java面试题答案...
  • 并发执行并行执行

    2020-11-16 17:35:33
    并发执行:多个进程在使用同一个cpu,每个进程都独占cpu一会,然后让出cpu资源,供其他进程执行 并行执行:多个进程,在同一时刻,每个家进程都有一个cpu进行运算。 程序是如何变成进程的? 涉及到冯诺依曼体系,首先...
  • Java并发并行与并发的区别

    千次阅读 2017-07-14 22:07:34
     在服务器只有一个cpu的情况下,多个线程同时执行时,其实即为并发执行,多个线程要交替执行,  而在服务器有多个cpu或有多核的cpu的情况下,才能实现真正的并行,即不同的线程在不同的cpu内核中执行;
  • 关于多核单核、并行与并发

    千次阅读 2017-10-14 15:44:10
    单核cpu的话只能是并发,多核cpu才能做到并行执行。可能有这样的疑问:那多进程的并发有什么意义,不但没有提高cpu的利用效率,由于调度等开销,还降低了cpu的使用。这样的想法是片面的:1.进程并不是时时刻刻都占用...
  • 并行并发在计算机编程中是非常重要的两个概念,但是它们常常被混淆。下面我来用一句话来概括,然后用通俗易懂的语言解释并举例说明。 并发:两个任务共享时间片段。在计算的场景中是指,只有一个CPU的情况下,有...
  • 假设有AB两个任务,则串行、并行并发的区别如图1所示。 串行 A和B两个任务运行在一个CPU线程上,在A任务执行完之前不可以执行B。即,在整个程序的运行过程中,仅存在一个运行上下文,即一个调用栈一个堆。程序会...
  • 在操作系统中是指,一组程序按独立异步的速度执行,不等同于时间上的重叠(同一个时刻发生),也可以表述为8为数据同时通过并行线进行传送,数据传送速度大大提高,但并行传送的线路长度收到限制,长度增加,干扰会...
  • Python中的并发编程(2): 并行与并发

    千次阅读 2018-03-19 15:15:10
    并行与并发 很多人都会有一个问题,并发(Concurrency)和并行(Parallelism)是一个概念吗?它们之间有什么区别呢?只有在充分了解概念的情况下,才能在接下来的学习中,不被文献中充斥的各种概念弄混淆;在实践中,也...
  • 并发并行、高并发和多线程

    千次阅读 2018-06-25 10:51:38
    1.并发并行的区别并发:当有多个线程在操作时,如果系统只有一个CPU,把CPU运行时间划分成若干个时间段,分配给各个线程执行,在一个时间段的线程代码运行时,其它线程处于挂起状态。这种方式我们称之为并发...
  • 并行”是指无论从微观还是宏观,二者都是一起执行的,就好像两个人各拿一把铁锨在挖坑,一小时后,每人一个大坑。 而“并发”在微观上不是同时执行的,只是把时间分成若干段,使多个进程快速交替的执行,从宏观...
  • 并行与并发的理解

    2012-11-05 10:49:50
    # 并行与并发的理解 **背景**:根据Golang-china里面一帖子的讨论,到网上查资料做的笔记。 >"并行是关于性能的;并发是关于程序设计的。--ROB PIKE" ## 博客参考 - [并发并行的区别:吃馒头的比喻]...
  • 并行程序设计 目前并行程序设计的状况是:①并行软件的发展落后于并行硬件;②和串行系统的应用软件相比,现今的并行系统应用软件甚少且不成熟;③并行软件的缺乏是发展并行计算的主要障碍;④而且这种状态仍在...
  • 并行是什么意思?并发的区别是什么? 并行:指两个或两个以上事件或活动在同一时刻发生。如多个任务在多个 CPU 或 CPU 的多个核上同时执行,不存在 CPU 资源的竞争、等待行为。...并发执行的线程...
  • 并发并行

    2019-11-02 13:30:18
    并发和并行 并行和并发是容易被混淆的两个概念。他们都可以标识两个或者多个任务一起执行,但是侧重点有所不同。...但是在外部观察者来看,即时多个任务之间是串行并发的,也会造成多个任务并行执行的错觉。 ...
  • 并发并行什么是并发什么是并行,他们的区别是什么?你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行.你吃饭吃到一半,电话来了,你停了下来接了电话,接完后电话以后继续...
  • golang 并发与并行

    千次阅读 2019-06-12 20:00:30
    一个线程是一个执行空间,这个空间会被操作系统调度来运行函数中所写的代码。每个进程至少包含一个线程,每个进程的初始线程被称作主线程。因为执行这个线程的空间是应用程序的本身的空间,所以当主线程终止时,应用...
  • Shell-使用&和wait让你的脚本并行执行

    万次阅读 多人点赞 2019-02-19 21:57:56
    假定业务上多个业务逻辑没有先后关系,每个脚本的执行时间也很长 ,推荐并行执行。 一般情况下,我们会把每个业务逻辑写到一个单独的脚本里,在服务器上逐一调用,每次都要手工去敲命令。 如果我们把这些脚本放到一个...
  • iOS 并发与并行

    千次阅读 2015-11-01 23:16:26
    并发行和并行性的区别可以用馒头做比喻。前者相当于一个人同时吃三个馒头和三个人同时吃一个馒头。   并发性(Concurrence):指两个或两个以上的事件或活动在同一时间间隔内发生。并发的实质是一个物理CPU(也...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 200,938
精华内容 80,375
关键字:

并行执行与并发执行