关于await问题

来源:7-11 协程的挂起原理分析2

骑着面包去草原

2019-07-30

老师您好!
我是学SpringBoot 的 我使用SpringBoot2.0 WebFlux
异步开启协程像这样:

mono {
 val oldPartner = partnerService.findByCorpid(newPartner.corpid)
                .awaitSingle() // 数据库查询
 if (oldPartner != null) {
     newPartner.id = oldPartner.id
 }

 val authInfoJSON = json.getJSONObject("auth_info")

 val agentObj = authInfoJSON.getJSONArray("agent")
         .getJSONObject(0)

 var agent = agentService
         .findByCorpidAndAgentTypeOne(newPartner.corpid, SuiteEnum.CENTER)
         .awaitSingle() // 数据库查询
}

数据库使用的Mogodb Reactive
上述“数据库查询”的地方使用awaitSingle() 会不会造成线程阻塞?

mono源码 :

public fun <T> mono(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> T?
): Mono<T> {
    require(context[Job] === null) { "Mono context cannot contain job in it." +
            "Its lifecycle should be managed via Disposable handle. Had $context" }
    return monoInternal(GlobalScope, context, block)
}

@Deprecated(
    message = "CoroutineScope.mono is deprecated in favour of top-level mono",
    level = DeprecationLevel.WARNING,
    replaceWith = ReplaceWith("mono(context, block)")
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
@LowPriorityInOverloadResolution
public fun <T> CoroutineScope.mono(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> T?
): Mono<T> = monoInternal(this, context, block)

private fun <T> monoInternal(
    scope: CoroutineScope, // support for legacy mono in scope
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
    val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
    val newContext = scope.newCoroutineContext(context + reactorContext)
    val coroutine = MonoCoroutine(newContext, sink)
    sink.onDispose(coroutine)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}

private class MonoCoroutine<in T>(
    parentContext: CoroutineContext,
    private val sink: MonoSink<T>
) : AbstractCoroutine<T>(parentContext, true), Disposable {
    var disposed = false

    override fun onCompleted(value: T) {
        if (!disposed) {
            if (value == null) sink.success() else sink.success(value)
        }
    }

    override fun onCancelled(cause: Throwable, handled: Boolean) {
        if (!disposed) {
            sink.error(cause)
        } else if (!handled) {
            handleCoroutineException(context, cause)
        }
    }

    override fun dispose() {
        disposed = true
        cancel()
    }

    override fun isDisposed(): Boolean = disposed
}

是这个库提供的:implementation(“org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.0-RC”)

写回答

1回答

bennyhuo

2019-07-30

这个我不熟。。。它是用协程实现的?如果是suspend方法,就看他怎么调度了
0
4
bennyhuo
回复
骑着面包去草原
客气啦
2019-07-30
共4条回复

Android首选开发语言Kotlin入门与进阶

Google I/O大会钦定Android一级开发语言,现在不学,等待何时

3122 学习 · 387 问题

查看课程