老师,请问activeWorker <- activeRequest是什么意思?

来源:16-4 队列实现调度器

阿憨阿憨

2021-11-27

func (s *QueuedScheduler) Run() {
	s.workerChan = make(chan chan engine.Request)
	s.requestChan = make(chan engine.Request)
	go func() {
		var requestQ []engine.Request
		var workerQ []chan engine.Request
		for {
			var activeRequest engine.Request
			var activeWorker chan engine.Request
			if len(requestQ) > 0 && len(workerQ) > 0 {
				activeWorker = workerQ[0]
				activeRequest = requestQ[0]
			}

			select {
			case r := <-s.requestChan:
				requestQ = append(requestQ, r)
			case w := <-s.workerChan:
				workerQ = append(workerQ, w)
			case activeWorker <- activeRequest:
				workerQ = workerQ[1:]
				requestQ = requestQ[1:]
			}
		}
	}()
}

老师,请问activeWorker <- activeRequest是什么意义?activeWorker也只是一个chan啊,而且是一个无人消费的空chan啊.

对不起老师,是我没表达清楚,是这样的
如果要activeWorker <- activeRequest执行成功,则需要requestQ和workerQ中同时有数据.
activeWorker是从workerQ中获取的
workQ是从<-workerChan中获取的
workChan来自WorkerReady()的推送
但是在createWorker()中推入WorkerReady()的只是一个空的make(chan engine.Request).
也就是说如果走到activeWorker <- activeRequest区间, activeWorker也只是一个 make(chan engine.Request)而已
既然activeWorker是一个空的产,不断的向里面推activeRequest,又没有弄出来让别人消费activeWorker,怎么不会死锁呢.

实在是不明白, activeWorker <- activeRequest 的意义, 是不是我activeWorker理解的不对.activeWorker里面究竟是什么,又是谁在消费他.
希望老师指点一下.困扰良久了

queued.go

//WorkerReady 将传入的chan request 推入到workerChan中,以供run函数消费
func (qs *QueuedScheduler) WorkerReady(w chan engine.Request) {
	qs.workerChan <- w
}

cocurrent.go

//createWorker 创建协程
func createWorker(in chan Request, out chan ParseResult, ready ReadyNotifier) {
	go func() {
		for {
			//将一个空的chan推入到workerChan中,里面的run函数会不断的从中获取
			ready.WorkerReady(in)

			//调用worker处理传入的request,并将处理结果推入到out中,传出
			result, err := worker(<-in)
			if err != nil {
				continue
			}
			out <- result
		}
	}()
}

cocurrent.go

//Run 执行引擎
func (e *ConcurrentEngine) Run(seeds ...Request) {
	//执行传入的调度器
	e.Scheduler.Run()

	out := make(chan ParseResult)
	for i := 0; i < e.WorkerCount; i++ {
		//创建指定数量的协程,并将worker通过WorkerChan()方法初始化一个channel
		createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
	}
	......

queued.go

//WorkerChan 初始化一个request chan
func (qs *QueuedScheduler) WorkerChan() chan engine.Request {
	return make(chan engine.Request)
}
写回答

1回答

ccmouse

2021-11-27

后面就有

if len(requestQ) > 0 && len(workerQ) > 0 {

activeWorker = workerQ[0]

activeRequest = requestQ[0]

}

如果workerQ不空,那我们就认为第一(0)个就是activeWorker,并尝试将activeRequest发送给它。

反之如果workerQ空,就像同学说的,activeWorker就是个无人消费的空chan,这句activeWorker <- activeRequest就永远不会被select选中。

0
2
ccmouse
回复
阿憨阿憨
ready.WorkerReady(in) 这个chan本身是空的,不过这个调用将导致这个in被推到了s.workerChan中,而 case w := <-s.workerChan: workerQ = append(workerQ, w) 就会把这个推进workerChan的channel从workerChan取出来,放入workerQ中。那么我们的activeWorker就有了。 从worker的角度来讲,系统(scheduler)会分配一个chan Request(in)给worker,worker必须通过把此chan用WorkerReady调用来通知系统我准备接收数据了。那么系统就会把新的request(activeRequest)通过这个chan Request(in,此时成为了activeWorker)发送给worker。
2021-11-29
共2条回复

Google资深工程师深度讲解Go语言 由浅入深掌握Go语言

语法+分布式爬虫实战 为转型工程师量身打造

5995 学习 · 1909 问题

查看课程