Coroutine study notes
  xHZVWyBuuoyF 2023年11月02日 34 0

Ways to create coroutine:

Launch {} :  Non blocking.  Code after the coroutine block can run immediately while the coroutine is running. 

Launch can only be invoked within a coroutine scope. 

Launch returns a job. Job.join() waits for the launch block to complete. 


runBlocking {} :  Blocking. Wait for its body and all its children to complete.  Code after the coroutine block can only run after the coroutine block completes.  The underlying thread is blocked.


coroutineScope {}: Blocking. Wait for its body and all its children to complete. But underlying thread is released for other usage.


async {}:  Non blocking.

Similar to launch, the difference is that launch does not return value, async returns a “Deferred” instance, which can retrieve value by calling “Deferred.await()”.

async(start = CoroutineStart.LAZY), lazily starts the coroutine only when “start” or “await” method is called explicitly.

enclose the async call in coroutineScope. So that if one async call has problem, all coroutines within the scope are auto cancelled.


Pause execution within coroutine:  Use “delay” instead of “sleep”.  The current coroutine is suspended and other coroutine can run.

Structured concurrency:  An outer scope cannot complete until all its children coroutines complete.

Suspend function:

Suspend functions like “join” etc can only be called within a coroutine block. So enclose it with a “runBlocking”.

Functions invoked within launch or async block is not required to have “suspend” prefix. However, if coroutine methods such as “delay” or “yield” are called from the function, or function sends/receives data via channel. Then “suspend” prefix is mandatory, otherwise, you will get compilation error.   

Cancel coroutine:

val job = launch {}   job.cancel()

Cancellation is cooperative, means the launch block should contain “yield”, “delay” or “kotlinx.coroutines.isActive” check. Otherwise, launch block will still be executed completely.

Launch block can contain try {} finally {}, so that after cancellation, the finally block will be still be executed. This is useful for resource clean up.

Put “withContext(NonCancellable) {} ” within finally block, making it suspendable.

withTimeout(timeout time) { }  will cancel the block if execution time exceeds timeout time and throw TimeoutCancellationException.  withTimeoutOrNull(timeout time) {}  will return a null value on timeout instead of throwing exception.

By default, cancellation is bidirectional propagated, cancelation signal sent from child scope will be propagated back to parent, and then cancel all children within the parent scope. Alternatively, using “SupervisorJob” with makes cancellation unidirectional (cancel children will not cancel parent).

Exception handling:

Exception thrown from “launch” or “actor” will not interrupt parent coroutineScope execution.

Eg:

runBlocking {
    val job = launch {
        try {
            println(“Start of coroutine scope”) //printed  
            async {
                throw Exception(“Exception within async block”)      
            }    
            async {
                println(“Execution within sub-scope”)      // not printed, as first async block throws exception and cancels other children within parent scope
            }     
            println(“Continue executing after async block”)  // printed, parent scope execution continues even if child async block throws exception 
        }
        catch(e: Exception) {
            println(“Error caught within children block”)  //not printed, exception will be directly propagated to top level
        }        
    }	
}

Create a CoroutineExceptionHandler and pass it to “launch” or “async”.  Eg:

val handler = CoroutineExceptionHandler { _, exception -> 
    	  ...
}

GlobalScope.launch(handler) or GlobalScope.async(handler)  -> only works with “GlobalScope”. (Note: within runBlocking, launch {} and GlobalScope.launch{} is different). Also, catching the first exception cancels the remaining children coroutine blocks within the block, unless the children coroutine blocks is enclosed with a withContext(NonCancellable) {} block.

If there are multiple exceptions thrown from the children blocks, the handler only catches the first one.

Kotlin.coroutines.cancellation.CancellationException is transparent and are unwrapped by default. CancellationException thrown from async scope will not cancel other children blocks.  

Use “SupervisorJob” can also prevent exception throw being canncelled.

val supervisor = SupervisorJob()
runBlocking {
    val asyncScope = CoroutineScope(coroutineContext + supervisor)  
    val job = launch {
         for (i in 1 .. 10) {
            asyncScope.async {
	 throw Exception("Terminate child coroutine")
            }
        }
      }
      job.join()
}

Share mutable state and concurrency:

“Volatile” keyword will not make concurrency operation in coroutine block safe

Thread safe data structures such as “AtomicInteger” etc can be used safely within coroutine block

Put concurrency sensitive logic within “SingleThreadContext”

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
runBlocking {
   for (i in 1 .. 100) {
	      async {
	         withContext(counterContext) {
                           counter++
                        }
	      }
   } 
}
//more coarse grained approach
runBlocking {
   withContext(counterContext) {
      for (i in 1 .. 100) {
         async {
            couter ++
         }
      }
   }
}

Create an instance of kotlinx.coroutines.sync.Mutex and put concurrency sensitive logic within “mutex.withLock {}”

Create “kotlinx.coroutines.channels.actor”  block.  Internally, it has a variable “channel” to read message sent to the actor via “actor.send”.  Two message types needs to be defined:   Operator message: to perform thread unsafe operation.  Query message with a CompletableDeferred instance: to accept response callback from actor

Convert API to coroutine suspend function:

interface DatabaseSync {
	fun getLoyaltyMemberId(id: Int): Int 
}

interface DatabaseAsync {
	fun getLoyaltyMemberId(id: Int, callback: Consumer<Int>)
}
class CoroutineDBService {
	suspend fun getLoyaltyMemberIdSync(id: Int): Int = withContext(Dispatchers.IO) {
		dbSync.getLoyaltyMemberId(id)
	}
	
	suspend fun getLoyaltyMemberIdAsync(id: Int): Int = suspendCoroutine { cont ->
		dbAsync.getLoyaltyMemberId(id) {
			cont.resume(memberId)
		}
	}
}


Stream: 

Use sequence {} to generate a synchronous stream of data as Sequence via “yield” call

Use flow {} to generate an asynchronous stream of data as Flow via “emit” call

Use channelFlow {} to generate a flow of coroutine.  combine the pros of coroutine channel and flow. Data production is triggered on "collect" invocation

chanelFlow {
	val list = getDataList()
	list.foreach {
		launch {
    	send(data)
		}
	}
 }.map { data ->
 	//process data 
 }.flowOn(Dispatchers.DEFAULT)
 .collect {
 }

A flow is triggered until flow.collect() is called, it is possible to call collect multiple times to generate the same stream of data again

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  xHZVWyBuuoyF   2023年11月02日   35   0   0 coroutinekotline
xHZVWyBuuoyF