关于队列实现调度器问题
来源:16-4 队列实现调度器

nitros
2020-02-27
老师好,调度器的处理这里用到了goroutine和channel的数据传输。所以一直有点蒙,在这里阐述下自己的理解。
先说下简单的调度器
1,要先开很多的worker,这时候就初次用到了goroutine
2,要往createWorker这个goroutine里传数据,就利用到了channel
由于实际工作者是worker,这时候定义了 chan request 和 chan parserResult
3,简单调度器的工作就是,
①把大量的request请求,发送到 chan request里
②之后goroutine里会接收chan request里的数据传达给worker工作,生成大量的parserResult 并把这些parserResult发送到 chan parserResult里。
③最后,外围有个地方会不停的接收来自chan parserResult的数据,来进行最后编译。
再说下队列实现调度器
1,要先开很多的worker,这时候就初次用到了goroutine
2,要往createWorker这个goroutine里传数据,就利用到了channel
由于实际工作者是worker,这时候定义了 chan request 和 chan parserResult(到这里为止是一样的)
3,队列调度器的工作是,
①把大量的request请求,发送到 chan request里
②chan request里的请求先不去分配工作,而是由队列 []Request进行接收
③func (s *QueuedScheduler) WorkerChan() chan engine.Request { return make(chan engine.Request) }
的意义不太理解
④workerChan chan chan engine.Request
存在意义不太理解
⑤var workerQ []chan engine.Request
这个队列又是要做什么的
总体来说,有了chan request已经够了,为什么又多了workerChan 这里没太懂。是想把createrWorker也用队列来管理是吧?
⑥之后goroutine里会接收workerChan里的数据传达给worker工作,生成大量的parserResult 并把这些parserResult发送到 chan parserResult里。
⑦最后,外围有个地方会不停的接收来自chan parserResult的数据,来进行最后编译。
上述内容有点偏长,可能问题阐述的也不是很明确。希望老师能给讲解一下其中的原理。
1回答
-
抱歉这个问题隔了很久。
首先这里的调度器总体上说做了一件事,它有一个chan request,又维护了很多worker,每次从chan requet里收到一个request,就发给一个worker。简单调度器和队列调度器都做这件事。
简单调度器直接用goroutine和channel的特性就做到了这一点,但是我们对整个系统各个环节控制较低,并发请求过多之后,系统到底应该是怎样的行为,是不确定的。所以又尝试了一个队列调度器。
为了阅读方便,具体代码在 https://git.imooc.com/coding-180/coding-180/src/master/crawler/scheduler/queued.go
队列调度器同样维护一个
requestChan chan engine.Request
用于接收请求。为了维护很多worker,我们有
workerChan chan chan engine.Request
这里有一个难点就是workerChan里的chan Request就是代表了worker。我们甚至可以写成
type worker chan engine.Request
workerChan chan worker
到这里,调度器做的就是从手里的两个channel,各取一个元素,那么就有了一个request和一个worker,把这个request发给这个worker,就解决了,然后不断重复。
但是各取一个元素这件事并不那么容易。channel的读取是会阻塞的。如果我先取request,取不到,那么在等待的时候我就没有办法取worker,这是不好的。所以要用select。用了select我们要保证很重要的一点:
select的每个case里面都要快速执行,不能有阻塞
这样我们所有的等待都发生在select上面,这是非常高效的,因为select可以等待多个信道,任何一个信道有了数据就会触发,不会发生数据送过来但来不及接收,或者有了数据来不及发送的情况。
所以我们在select里收到一个request,不能马上发给一个worker,而是把它存起来,存在哪里呢,队列里。所以我们有对应的
var requestQ []engine.Request
var workerQ []chan engine.Request
分别用来存储从reuqestChan和workerChan里接收到的东西(workerQ里的chan request就代表了worker)。
到这里,我们的select做同时做三件事,注意这三件事在“并”之后都非常快:
从requestChan收一个request,并存在requestQ
从workerChan收一个worker,并存在workerQ
把第一个requestQ里的request发给第一个workerQ里的worker,并把它们从队列里分别取出
最后还剩一个小问,
func (s *QueuedScheduler) WorkerChan() chan engine.Request {
return make(chan engine.Request)
}
这是干什么的?我们看一下它的调用者,在https://git.imooc.com/coding-180/coding-180/src/master/crawler/engine/concurrent.go#L28 就是告诉worker,你会从这里接收发给你的request。这个返回的新的channel又回从WorkerReady https://git.imooc.com/coding-180/coding-180/src/master/crawler/scheduler/queued.go#L18 这个函数被送回来。这样不是很直观,但是是为了和SimpleScheduler兼容。使得SimpleScheduler和QueuedScheduler可以互换。这里我在16-5重构和总结中进行了详细的讲述。
332022-01-25
相似问题