精华内容
下载资源
问答
  • <9> go worker线程池
    千次阅读
    2015-11-24 16:03:28

    Worker Pools

    package main
    import "fmt"
    import "time"
    // 使用goroutine  开启大小为3的线程池
    // 其中1个channel为执行做通信,1个对结果进行保存
    
    // 创建的worker
    func worker(id int, jobs <-chan int, results chan<- int) {
        for j := range jobs {
            fmt.Println("worker", id, "processing job", j)
            time.Sleep(time.Second)
            results <- j * 2
        }
    }
    func main() {
        // 创建channel
        jobs := make(chan int, 100)
        results := make(chan int, 100)
        // 3个worker作为一个pool
        for w := 1; w <= 3; w++ {
            go worker(w, jobs, results)
        }
    
        // 发送9个jobs,然后关闭
        for j := 1; j <= 9; j++ {
            jobs <- j
        }
        close(jobs)
    
        // 最后收集结果
        for a := 1; a <= 9; a++ {
            <-results
        }
    }

    输出:
    worker 1 processing job 1
    worker 2 processing job 2
    worker 3 processing job 3
    worker 1 processing job 4
    worker 2 processing job 5
    worker 3 processing job 6
    worker 1 processing job 7
    worker 2 processing job 8
    worker 3 processing job 9
    real 0m3.149s

    更多相关内容
  • goworker是Resque兼容的基于Go的后台工作者。 它允许您使用诸如Ruby之类的表达语言将作业推入队列,同时利用Go的效率和并发性来最大程度地减少作业延迟和成本。 goworker工人可以与Ruby Resque客户端一起运行,...
  • var _ goworker.GoJob = (*Job)(nil) // DoIt - Job struct must implements GoJob interface, need this function func (j *Job) DoIt() { log.Println("Worker do this job: ", j.SomeJobData) } func main() { ...
  • Go worker并发模式

    千次阅读 2017-03-05 20:39:25
    Go语言借助于goroutine和channel可以非常方便的处理并发任务。 package main import "fmt" import "time" // go goroutine func worker(id int, jobs chan int, results chan) { for j := range jobs {

    Go语言借助于goroutine和channel可以非常方便的处理并发任务。

    package main
    
    import "fmt"
    import "time"
    
    // go goroutine
    func worker(id int, jobs <-chan int, results chan<- int) {
           for j := range jobs {
                  fmt.Println("worker", id, "started  job", j)
                  time.Sleep(time.Second)
                  fmt.Println("worker", id, "finished job", j)
                  results <- j
           }
    }
    
    func main() {
    
           // 定义两个channel,相当于队列
           // jobs接收请求
           // results输出响应结果
           jobs := make(chan int, 100)
           results := make(chan int, 100)
    
           // 创建workpool,大小为3,可以理解成线程池
           for w := 1; w <= 3; w++ {
                  go worker(w, jobs, results)
           }
    
           // 发送任务
           for j := 1; j <= 5; j++ {
                  jobs <- j
           }
           close(jobs)
    
           // 得到结果
           for a := 1; a <= 5; a++ {
                  <-results
           }
    }


    展开全文
  • go worker

    2019-06-01 05:07:24
    之前工作中有项目用到goroutine池,抽空封装了下。 package pool import ( "fmt" "sync" "sync/atomic" "time" ) type Pool struct { mu *sync.Mutex tasks chan ITask ... workers []*worker fr...

    之前工作中有项目用到goroutine池,抽空封装了下。

    package pool
    
    import (
    	"fmt"
    	"sync"
    	"sync/atomic"
    	"time"
    )
    
    type Pool struct {
    	mu         *sync.Mutex
    	tasks      chan ITask
    	workerNum  int
    	workers    []*worker
    	freeWorker int32
    	done       chan struct{}
    }
    
    type worker struct {
    	id      int
    	p       *Pool
    	release chan struct{}
    }
    
    type ITask interface {
    	Execute()
    }
    
    func NewPool(workerNum int) *Pool {
    	return &Pool{
    		tasks:      make(chan ITask),
    		workerNum:  workerNum,
    		workers:    make([]*worker, 0),
    		done:       make(chan struct{}),
    		freeWorker: int32(workerNum),
    		mu:         &sync.Mutex{},
    	}
    }
    
    func (p *Pool) Put(task ITask) {
    	p.tasks <- task
    }
    
    func (p *Pool) Run() {
    	for i := 0; i < p.workerNum; i++ {
    		w := newWorker(i, p)
    		p.workers = append(p.workers, w)
    		go w.run()
    	}
    
    	select {
    	case <-p.done:
    		return
    	}
    }
    
    func (p *Pool) Resize(workerNum int) error {
    	if workerNum < 0 {
    		return fmt.Errorf("invalid worker num: %d", workerNum)
    	}
    
    	p.mu.Lock()
    	defer p.mu.Unlock()
    
    	if workerNum > p.workerNum {
    		newWorkerNum := workerNum - p.workerNum
    		for i := 0; i < newWorkerNum; i++ {
    			w := newWorker(i, p)
    			p.workers = append(p.workers, w)
    			go w.run()
    		}
    		p.workerNum = len(p.workers)
    
    	} else {
    		releaseNum := p.workerNum - workerNum
    		if releaseNum == p.workerNum {
    			return fmt.Errorf("invalid operation, pool size should not be zero")
    		}
    
    		for i := 0; i < releaseNum && i < p.workerNum; i++ {
    			close(p.workers[0].release)
    			if len(p.workers) > 1 {
    				p.workers = p.workers[1:]
    			} else {
    				p.workers = []*worker{}
    			}
    		}
    		p.workerNum = len(p.workers)
    	}
    	return nil
    }
    
    func (p *Pool) FreeSize() int {
    	return int(atomic.LoadInt32(&p.freeWorker))
    }
    
    func (p *Pool) Size() int {
    	p.mu.Lock()
    	defer p.mu.Unlock()
    	return p.workerNum
    }
    
    func (p *Pool) Close() {
    	close(p.done)
    }
    
    func newWorker(id int, p *Pool) *worker {
    	return &worker{
    		id:      id,
    		p:       p,
    		release: make(chan struct{}),
    	}
    }
    
    func (w *worker) run() {
    	p := w.p
    
    	for {
    		select {
    		case <-w.release:
    			return
    
    		case <-p.done:
    			return
    
    		case t := <-p.tasks:
    			atomic.AddInt32(&p.freeWorker, -1)
    			t.Execute()
    			atomic.AddInt32(&p.freeWorker, 1)
    		}
    	}
    }
    
    复制代码

    调用

    
    type Task struct {
    }
    
    func (t *Task) Execute() {
    	fmt.Println("task printf", time.Now())
    }
    
    func main() {
    	task := &Task{}
    
    	p := NewPool(6)
    	go p.Run()
    	defer p.Close()
    
    	time.Sleep(time.Second * 1)
    	for i := 0; i < 5; i++ {
    		p.Put(task)
    		time.Sleep(time.Second * 1)
    		fmt.Println("before: ", p.Size())
    		p.Resize(p.Size() / 2)
    		fmt.Println(p.Size())
    		fmt.Println()
    		// p.Resize(p.Size() + 10)
    	}
    }
    
    复制代码

    大概写了下,先存着备忘,还没怎么测试,测完有bug再进行修复。

    展开全文
  • 我们曾经研究过如何让Python和Go互相调度,当时发现,将Go语言写的模块打包成动态链接库,就能在Python中进行调度:优劣互补! Python+Go结合开发的探讨Go的优势很明显,...

    我们曾经研究过如何让Python和Go互相调度,当时发现,将Go语言写的模块打包成动态链接库,就能在Python中进行调度:

    优劣互补! Python+Go结合开发的探讨

    Go的优势很明显,从1亿减到1,在我的设备上测试,用Go运行只需要50ms,Python可能需要接近100倍的时间。

    但是,这种写法也有缺点:实在太麻烦了,大大增加了整个项目的耦合性。

    那Python中有没有办法不通过打包成动态链接库的方法,用Python调度Go的任务呢?答案是Go celery.

    https://github.com/gocelery/gocelery

    我们可以用Go写一个计算密集型任务的Worker,然后用Python的Celery beat来调度这个Worker,下面给大家演示一下:

    1.编写Go Worker

    最好是将计算密集型的任务改造成Go语言版的,这样收益才能最大化。

    比如这里,我使用的是上回从1亿减到1的老梗。

    PS,别被下面这段代码吓到了,其实大部分是可以去掉的配置项,核心代码就几行。



    输入命令:

    go run main.go
    

    即可运行该worker

    2.编写Python客户端

    每5秒调度一次1亿减到1,不过不跑Python worker. 由于Go Worker在运行,这里的minus会被Go Worker消费。

    另外请注意,这里的minus函数实际上只是为了能被识别到而编写的,其内容毫无意义,直接写个pass都没问题(因为实际上是Go Worker在消费)。

    编写完后,针对go_tasks模块启动beat:

    celery -A go_tasks beat
    

    此时,调度器就会调度Go Worker执行任务:

    可以看到,我们成功用Python的Celery Beat调度了Go写的Worker!可喜可贺。

    接下来可以看看如果单纯用Python的Worker做这样的计算是有多耗时:

    启动worker:

    celery worker -A python_tasks -l info --pool=eventlet
    

    启动beat调度器:

    celery -A python_tasks beat
    

    结果如下:



    可以看到,Python从1亿减到1平均需要5.2秒左右的时间,和Go版相差了100倍左右。

    如果我们将调度器的频率提高到每秒计算1次,Python版的Worker,其任务队列一定会堵塞,因为Worker消费能力不够强大。相比之下,Go版的Worker可就非常给力了。

    因此,如果你的项目中有这种计算密集型的任务,可以尝试将其提取成Go版本试试,说不定有惊喜呢。

    我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

    有任何问题,可以在公众号后台回复:加群回答二维码上相应的验证信息,进入互助群询问。

    原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

    点击下方阅读原文可获得更好的阅读体验

    Python实用宝典 (pythondict.com)

    不只是一个宝典

    欢迎关注公众号:Python实用宝典

    展开全文
  • 之前写过一篇文章,它有个响亮的名字:Handling 1 Million Requests per Minute with Go使用 Go 每分钟处理百万请求这是国外的一个作者写的,我做了...
  • <p>I'm a beginner gopher, and I wrote an event listener worker queue for a project I'm working on. <p>I've deployed it on a staging server. After around 100 events have been triggered the listeners ...
  • port 3251 -type metagame -frontend=false PS D:\Work\pitaya-test-game\demo\worker> go run .\main.go -type worker -port 3252 测试 测试是过了的 看看代码, worker.go 就这一个是陌生的东西,可靠的rpc请求,...
  • go-worker-base

    2021-05-19 09:10:00
    go-worker-base ##实现的通用协程池 job 目录是一个 要执行业务的demo worker 目录是 业务协程池的封装 workerManage.go 为通用协程池实现 worker.go 为基础协程池数据结构 ###实例1: var poolOne worker.WorkPool ...
  • worker 功能点 获取ETCD中,被master写入的cmd任务; 根据corn表达式,确定任务调度列表; 根据任务列表,进行任务执行; 对Job做分布式锁,防止集群并发调用。 执行日志存储; worker 启动 同master ( init args...
  • golang worker pool ,工作池,线程池

    千次阅读 2019-09-15 21:27:19
    golang worker pool ,线程池 , 工作池 并发限制goroutine池。 限制任务执行的并发性,而不是排队的任务数。 无论排队多少任务,都不会阻止提交任务。 通过队列支持 golang 工作池公共库 支持最大任务数,...
  • v8worker 是 V8 的 Go 语言封装版本。输出一个非堵塞消息传递接口到 V8 引擎。Go 和 JavaScript 通过消息的收发进行交互。V8 只在计算 JavaScript 的时候堵塞 goroutine 线程,没有任何 syscalls 系统调用。只提供三...
  • go-task-pool

    2021-04-18 16:34:13
    go协程任务池 ...GoWorker 协程池任务Worker 描述:在GoPool中,Worker可以有很多个,但给定相同的id,其执行的任务一定在同一个Worker上,且按调用顺序执行,但不同id不保证一定在不同Worker中 主要方法
  • 使用 Golang 实现了一个...以此来控制固定时间内处理的请求数源码地址https://github.com/qianguozheng/go-workerpool.git用途 控制goroutine的数目 简练模型 实际使用场景及灵感来源http://marcio.io/2015/07/handling
  • kubernetes之client-go基础包workqueue

    千次阅读 2019-05-01 13:43:37
    上述kubernetes的控制器模型 通过client-go的informer watch资源变化,当资源发生变化时会通过回调函数将资源写入队列,由controller中的worker消费者完成业务处理。 版本是: client-go v11.0.0 通用队列 ...
  • 这是Go(golang)和V8 JavaScript之间的最小绑定。 基本概念是只向JavaScript公开两种方法:发送和接收。
  • 源码地址:https://github.com/go-workflow/go-workflow 整个工作流引擎设计思路参考java的工作流引擎Activiti,但是解耦了所有的数据,只保留流程的流转部分,以更适应微服务架构,同时整个架构更轻量。 在流程...
  • 使用v8worker2在Golang上构建的ReasonML运行时
  • Go协程池设计思路(Task-Job-Worker)

    千次阅读 2020-04-13 00:18:24
    1. 铺垫:Go 的接收器Receiver 在go语言中,没有类的概念,但是可以给类型(结构体,自定义类型)定义方法。所谓方法就是定义了接受者的函数。接受者定义在func关键字和函数名之间。可以理解成为结构体定义函数方法...
  • 简介继上一篇Go 每日一库之 ants,这篇文章我们来一起看看ants的源码。Pool通过上篇文章,我们知道ants池有两种创建方式:p, _ := ants.NewPool(cap):这...
  • Go协程与协程池

    2020-04-12 16:25:20
    1. Golang协程 golang和其它语言最大区别莫过于goroutine,也就是go的协程,example如下: package main import "fmt" import "time" func go_worker(name string) ... fmt.Println("this is go worker :" , na...
  • ws-worker

    2021-03-08 00:30:18
    工人 这是一个工作节点
  • Go 工作池在这个例子中,我们来看一下如何使用gorouotine和channel来实现工作池。package main import "fmt" import "time" // 我们将在worker函数里面运行几个并行实例,这个函数从jobs通道 //...
  • go语言管道(channel)

    2022-04-17 22:42:46
    channel式go语言协程中数据通信的双向通道。但是在实际应用中,为了代码的简单和易懂,一般使用的channel是单向的。 使用 1. channel的定义和收发数据 package channel func main(){ //var c chan int c的...
  • 工人 Vela正在积极开发中,并且是预发布产品。 在生产中使用风险自负。 请随时通过向我们发送反馈。 Vela是基于编写的技术构建的管道自动化(CI / CD)框架。 Vela使用类似于的语法来定义其配置。...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 26,902
精华内容 10,760
热门标签
关键字:

goworker

友情链接: 超级玛丽v1.1.zip