Combine中錯誤處理和Scheduler使用詳解
錯誤處理
到目前為止,在我們編寫的大部分代碼中,我們沒有處理錯誤,而處理的都是“happy path”。在前面的文章中,我們了解到,Combine Publisher 聲明了兩個約束:
Output
定義 Publisher 發(fā)出的值的類型;Failure
定義 Publisher 發(fā)出的失敗的類型。
現(xiàn)在,我們將深入了解 Failure
在 Publisher 中的作用。
Never
失敗類型為 Never
的 Publisher 表示永遠(yuǎn)不會發(fā)出失敗。它為這些 Publisher 提供了強(qiáng)大的保證。這類 Publisher 可讓我們專注于使用值,同時絕對確保 Publisher 只有成功完成的事件。
在新的 Playground 頁面添加以下代碼:
import Combine import Foundation import PlaygroundSupport PlaygroundPage.current.needsIndefiniteExecution = true func example(_ desc: String, _ action:() -> Void) { print("--- (desc) ---") action() } var subscriptions = Set<AnyCancellable>() example("Just") { Just("Hello") }
我們創(chuàng)建了一個帶有 Hello
字符串值的 Just
。 Just 是不會發(fā)出失敗的。 請按住 Command 并單擊 Just 初始化程序并選擇 Jump to Definition,查看定義:
In contrast with Result.Publisher, a Just publisher can’t fail with an error. And unlike Optional.Publisher, a Just publisher always produces a value.
Combine 對 Never
的障保證不僅是理論上的,而是深深植根于框架及其各種 API 中。Combine 提供了幾個 Operator,這些 Operator 僅在保證 Publisher 永遠(yuǎn)不會發(fā)出失敗事件時才可用。第一個是 sink
的變體,只處理值:
example("Just") { Just("Hello") .sink(receiveValue: { print($0) }) .store(in: &subscriptions) }
在上面的示例中,我們使用 sink(receiveValue:)
,這種特定的重載使我們可以忽略 Publisher 的完成事件,而只處理其發(fā)出的值。
此重載僅適用于這類“可靠”的 Publisher。在錯誤處理方面,Combine 是智能且安全的,如果可能拋出錯誤,它會強(qiáng)制我們處理完成事件。要看到這一點(diǎn),我們需要將 Never
的 Publisher 變成可能發(fā)出失敗事件的 Publisher。
setFailureType(to:)
func setFailureType<E>(to failureType: E.Type) -> Publishers.SetFailureType<Self, E> where E : Error
將 Never
Publisher 轉(zhuǎn)變?yōu)榭赡馨l(fā)出失敗事件的 Publisher 的第一種方法是使用 setFailureType
。這是另一個僅適用于失敗類型為 Never 的 Publisher 的 Operator:
example("setFailureType") { Just("Hello") .setFailureType(to: MyError.self) }
可以使用 .eraseToAnyPublisher()
,來確認(rèn)已改變的 Publisher 類型:
繼續(xù)修改上述代碼:
enum MyError: Error { case ohNo } example("setFailureType") { Just("Hello") .setFailureType(to: MyError.self) .sink( receiveCompletion: { completion in switch completion { case .failure(.ohNo): print("Finished with OhNo!") case .finished: print("Finished successfully!") } }, receiveValue: { value in print("Got value: (value)") } ) .store(in: &subscriptions) }
現(xiàn)在我們只能使用 sink(receiveCompletion:receiveValue:)
。 sink(receiveValue:)
重載不再可用,因?yàn)榇?Publisher 可能會發(fā)出失敗事件??梢試L試注釋掉 receiveCompletion
查看編譯錯誤。
此外,失敗類型為為 MyError
,這使我們可以針對.failure(.ohNo)
情況而無需進(jìn)行不必要的強(qiáng)制轉(zhuǎn)換來處理該錯誤。
當(dāng)然,setFailureType
的作用只是類型定義。 由于原始 Publisher 是 Just
,因此實(shí)際上也不會引發(fā)任何錯誤。
assign(to:on:)
assign
Operator 僅適用于不會發(fā)出失敗事件的 Publisher,與 setFailureType
相同。 向提供的 keypath 發(fā)送錯誤會導(dǎo)致未定義的行為。添加以下示例進(jìn)行測試:
example("assign(to:on:)") { class Person { var name = "Unknown" } let person = Person() print(person.name) Just("Layer") .handleEvents( receiveCompletion: { _ in print(person.name) } ) .assign(to: .name, on: person) .store(in: &subscriptions) }
我們定義一個具有 name
屬性的 Person
類。創(chuàng)建一個 Person
實(shí)例并立即打印其 name
。一旦 Publisher 發(fā)送完成事件,使用 handleEvents
再次打印此 name
。最后,使用 assign
將 name
設(shè)置為 Publisher 發(fā)出的值:
--- assign(to:on:) --- Unknown Layer
在 Just("Layer")
正下方添加以下行:
.setFailureType(to: Error.self)
這意味著它不再是 Publisher<String, Never>
,而是現(xiàn)在的 Publisher<String, Error>
。運(yùn)行 Playground,我們將進(jìn)行驗(yàn)證:
Referencing instance method 'assign(to:on:)' on 'Publisher' requires the types 'any Error' and 'Never' be equivalent
assign(to:)
assign(to:on:)
有一個棘手的部分——它會 strong 捕獲提供給 on 參數(shù)的對象。在上一個示例之后添加以下代碼:
example("assign(to:)") { class MyViewModel: ObservableObject { @Published var currentDate = Date() init() { Timer.publish(every: 1, on: .main, in: .common) .autoconnect() .prefix(3) .assign(to: .currentDate, on: self) .store(in: &subscriptions) } } let vm = MyViewModel() vm.$currentDate .sink(receiveValue: { print($0) }) .store(in: &subscriptions) }
我們 MyViewModel
中定義一個 @Published
屬性。 它的初始值為當(dāng)前日期。在 init
中創(chuàng)建一個 Timer Publisher,它每秒發(fā)出當(dāng)前日期。使用 prefix
Operator 只接受 3 個更新。使用 assign(to:on:)
將每個日期更新給@Published
屬性。實(shí)例化 MyViewModel
,sink
vm.$currentDate
,并打印出每個值:
--- assign(to:) --- 2022-12-24 07:32:33 +0000 2022-12-24 07:32:34 +0000 2022-12-24 07:32:35 +0000 2022-12-24 07:32:36 +0000
看起來一切都很好。但是對assign(to:on:)
的調(diào)用創(chuàng)建了一個 strong 持有 self 的 Subscription。 導(dǎo)致 self 掛在Subscription 上,而 Subscription 掛在 self 上,創(chuàng)建了一個導(dǎo)致內(nèi)存泄漏的引用循環(huán)。
因此引入了該 Operator 的另一個重載 assign(to:)
。該 Operator 通過對 Publisher 的 inout 引用來將值分配給 @Published
屬性。因此以下兩行:
.assign(to: .currentDate, on: self) .store(in: &subscriptions)
可以被替換為:
.assign(to: &$currentDate)
使用 assign(to:)
Operator 將 inout 引用 Publisher 會打破引用循環(huán)。此外,它會在內(nèi)部自動處理 Subscription 的內(nèi)存管理,這樣我們就可以省略 store(in: &subscriptions)
。
assertNoFailure(_:file:line:)
當(dāng)我們在開發(fā)過程確認(rèn) Publisher 以失敗事件完成時,assertNoFailure
Operator 非常有用。它不會阻止上游發(fā)出失敗事件。但是,如果它檢測到錯誤,它會因錯誤而崩潰:
example("assertNoFailure") { Just("Hello") .setFailureType(to: MyError.self) .assertNoFailure() .sink(receiveValue: { print("Got value: ($0) ")}) .store(in: &subscriptions) }
我們使用 Just
創(chuàng)建一個“可靠”的 Publisher 并將其錯誤類型設(shè)置為 MyError
。如果 Publisher 以錯誤事件完成,則使用 assertNoFailure
以崩潰。這會將 Publisher 的失敗類型轉(zhuǎn)回 Never。使用 sink
打印出任何接收到的值。請注意,由于 assertNoFailure
將失敗類型設(shè)置回 Never
,因此 sink(receiveValue:)
重載可以直接使用。
運(yùn)行 Playground,它可以正常工作:
--- assertNoFailure --- Got value: Hello
在 setFailureType
之后,添加以下行:
.tryMap { _ in throw MyError.ohNo }
一旦 Hello 被推送到下游,使用 tryMap
拋出錯誤。再次運(yùn)行 Playground:
Playground execution failed: error: Execution was interrupted, reason: EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP, subcode=0x0). ... frame #0: 0x00007fff232fbbf2 Combine`Combine.Publishers.AssertNoFailure...
由于 Publisher 發(fā)出失敗事件,playground 會 crash。 在某種程度上,我們可以將 assertNoFailure()
視為代碼的保護(hù)機(jī)制。 雖然我們不應(yīng)該在生產(chǎn)環(huán)境中使用它,但在開發(fā)過程中提前發(fā)現(xiàn)問題非常有用。
處理錯誤
try* Operator
Combine 提供了一個區(qū)分可能引發(fā)錯誤和可能不會引發(fā)錯誤的 Operator 的方法:try
前綴。
注意:Combine 中所有以 try 為前綴的 Operator 在遇到錯誤時的行為相同。我們將只在本章中嘗試使用 tryMap Operator。
example("tryMap") { enum NameError: Error { case tooShort(String) case unknown } ["Aaaa", "Bbbbb", "Cccccc"] .publisher .map { value in return value.count } .sink( receiveCompletion: { print("Completed with ($0)") }, receiveValue: { print("Got value: ($0)") } ) }
在上面的示例中,我們定義一個 NameError
錯誤枚舉。創(chuàng)建發(fā)布三個字符串的 Publisher。將每個字符串映射到它的長度。運(yùn)行示例并查看控制臺輸出:
--- tryMap ---
Got value: 4
Got value: 5
Got value: 6
Completed with finished
將上面示例中的 map 替換為以下內(nèi)容:
.tryMap { value -> Int in let length = value.count guard length >= 5 else { throw NameError.tooShort(value) } return value.count }
我們檢查字符串的長度是否大于等于 5。否則,我們會拋出錯誤:
--- tryMap --- Completed with failure(Page_Contents.(unknown context at $10e3cb984).(unknown context at $10e3cba6c).(unknown context at $10e3cbaa8).NameError.tooShort("Aaaa"))
映射錯誤
map
和 tryMap
之間的區(qū)別不僅僅是后者允許拋出錯誤。 map
繼承了現(xiàn)有的失敗類型并且只操作 Publisher 的值,但 tryMap
沒有——它實(shí)際上將錯誤類型擦除為普通的 Swift 錯誤。 與帶有 try 前綴的所有 Operator 都是如此。
example("map vs tryMap") { enum NameError: Error { case tooShort(String) case unknown } Just("Hello") .setFailureType(to: NameError.self) .map { $0 + " World!" } .sink( receiveCompletion: { completion in switch completion { case .finished: print("Done!") case .failure(.tooShort(let name)): print("(name) is too short!") case .failure(.unknown): print("An unknown name error occurred") } }, receiveValue: { print("Got value ($0)") } ) .store(in: &subscriptions) }
我們定義一個用于此示例的 NameError
。創(chuàng)建一個只發(fā)出字符串 Hello
的 Just
。使用 setFailureType
設(shè)置失敗類型為 NameError
。使用 map
將另一個字符串附加。最后,使用 sink
的 receiveCompletion
為 NameError
的每個情況打印出適當(dāng)?shù)南?。運(yùn)行 Playground:
--- map vs tryMap --- Got value Hello World! Done!
Completion
的失敗類型是 NameError
,這正是我們想要的。 setFailureType
允許我們專門針對 NameError
進(jìn)行處理,例如 failure(.tooShort(let name))
。
將 map
更改為 tryMap
。
.tryMap { throw NameError.tooShort($0) }
我們會立即注意到 Playground 不再編譯。 再次點(diǎn)擊 completion
:
tryMap
刪除了我們的類型錯誤并將其替換為通用 Swift.Error
類型。即使我們實(shí)際上并沒有從 tryMap
中拋出錯誤,也會發(fā)生這種情況。
原因很簡單:Swift 還不支持類型化 throws
,盡管自 2015 年以來 Swift Evolution 中一直在討論這個主題。這意味著當(dāng)我們使用帶有 try 前綴的 Operator 時,我們的錯誤類型將總是被抹去到最常見的父類:Swift.Error
。
一種方法是將通用錯誤手動轉(zhuǎn)換為特定的錯誤類型,但這不是最理想的。它打破了嚴(yán)格類型錯誤的整個目的。幸運(yùn)的是,Combine 為這個問題提供了一個很好的解決方案,稱為 mapError
。
在調(diào)用 tryMap 之后,添加以下行:
.mapError { $0 as? NameError ?? .unknown }
mapError
接收上游 Publisher 拋出的任何錯誤,并將其映射到我們想要的任何錯誤。在這種情況下,我們可以利用它將錯誤轉(zhuǎn)換回 NameError
。這會將 Failure
恢復(fù)為所需要的類型,并將我們的 Publisher 轉(zhuǎn)回 Publisher<String, NameError>
。構(gòu)建并運(yùn)行 Playground,最終可以按預(yù)期編譯和工作:
--- map vs tryMap --- Hello is too short!
捕獲錯誤并重試
很多時候,當(dāng)我們請求資源或執(zhí)行某些計算時,失敗可能是由于網(wǎng)絡(luò)不穩(wěn)定或其他資源不可用而導(dǎo)致的一次性 事件。
在這些情況下,我們通常會編寫一個機(jī)制來重試不同的工作,跟蹤嘗試次數(shù),并處理如果所有嘗試都失敗的情況。Combine 讓這一切變得非常簡單。
retry
Operator 接受一個數(shù)字。如果 Publisher 失敗,它將重新訂閱上游并重試至我們指定的次數(shù)。如果所有重試都失敗,它將錯誤推送到下游,就像沒有 retry
Operator 一樣:
example("Catching and retrying") { enum MyError: Error { case network } var service1 = PassthroughSubject<Int, MyError>() service1.send(completion: .failure(.network)) service1 .handleEvents( receiveSubscription: { _ in print("Trying ...") }, receiveCompletion: { guard case .failure(let error) = $0 else { return } print("Got error: (error)") } ) .retry(3) .sink( receiveCompletion: { print("($0)") }, receiveValue: { number in print("Got Number: (number)") } ) .store(in: &subscriptions) }
我們有一個 service1
,它發(fā)出了失敗事件。因此,訂閱 service1
肯定會獲得失敗事件。我們嘗試三次,并通過 handleEvents
打印訂閱和完成:
--- Catching and retrying --- Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network failure(Page_Contents.(unknown context at $10fc7b584).(unknown context at $10fc7b77c).(unknown context at $10fc7b7b8).MyError.network)
運(yùn)行 Playerground,我們會看到有四次 Trying。初始 Trying,加上由 retry
Operator 觸發(fā)的三次重試。 由于 service1
不斷失敗,因此 Operator 會耗盡所有重試嘗試并將錯誤推送到 sink
。
調(diào)整代碼:
example("Catching and retrying") { enum MyError: Error { case network } var service1 = PassthroughSubject<Int, MyError>() service1.send(completion: .failure(.network)) service1 .handleEvents( receiveSubscription: { _ in print("Trying ...") }, receiveCompletion: { guard case .failure(let error) = $0 else { return } print("Got error: (error)") } ) .retry(3) .replaceError(with: 1) .sink( receiveCompletion: { print("($0)") }, receiveValue: { number in print("Got Number: (number)") } ) .store(in: &subscriptions) }
在 service1
重試后,若還是失敗,我們將通過 replaceError
將失敗替換為 1:
--- Catching and retrying --- Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network Got Number: 1 finished
或者,我們可以使用 catch
捕獲 service1
的失敗,并為下游提供另一個 Publisher:
example("Catching and retrying") { enum MyError: Error { case network } var service1 = PassthroughSubject<Int, MyError>() service1.send(completion: .failure(.network)) var service2 = PassthroughSubject<Int, MyError>() service1 .handleEvents( receiveSubscription: { _ in print("Trying ...") }, receiveCompletion: { guard case .failure(let error) = $0 else { return } print("Got error: (error)") } ) .retry(3) .catch { error in return service2 } .sink( receiveCompletion: { print("($0)") }, receiveValue: { number in print("Got Number: (number)") } ) .store(in: &subscriptions) service2.send(2) service2.send(completion: .finished) }
此時,下游將獲得到 service2
發(fā)出的值 2 和完成事件:
--- Catching and retrying --- Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network Got Number: 2 finished
cheduler
我們已經(jīng)遇到了一些將 Scheduler 作為參數(shù)的 Operator。大多數(shù)情況下,我們會簡單地使用 DispatchQueue.main
,因?yàn)樗奖?、易于理解。除?DispatchQueue.main
,我們肯定已經(jīng)使用了全局并發(fā)隊列,或創(chuàng)建一個串行調(diào)度隊列來運(yùn)行操作。
但是為什么 Combine 需要一個新的類似概念呢?我們接著將了解為什么會出現(xiàn) Scheduler 的概念,將探索 Combine 如何使異步事件和操作更易于使用,當(dāng)然,我們還會試使用 Combine 提供的所有 Scheduler。
Scheduler 簡介
根據(jù) Apple 的文檔,Scheduler 是一種定義何時及如何執(zhí)行閉包的協(xié)議。Scheduler 提供上下文以盡快或在將來的某個事件執(zhí)行未來的操作。該操作就是協(xié)議本身中定義的閉包。閉包也可以隱藏 Publisher 在特定 Scheduler 上執(zhí)行的某些值的傳遞。
我們會注意到此定義有意避免對線程的任何引用,這是因?yàn)榫唧w的實(shí)現(xiàn)是在 Scheduler 協(xié)議中,提供的“上下文”中的。因此,我們的代碼將在哪個線程上執(zhí)行取決于選擇的 Scheduler。
記住這個重要的概念:Scheduler 不等于線程。我們將在后面詳細(xì)了解這對每個 Scheduler 意味著什么。讓我們從事件流的角度來看 Scheduler 的概念:
我們在上圖中看到的內(nèi)容:
- 在主 (UI) 線程上發(fā)生用戶操作,如按鈕按下;
- 它會觸發(fā)一些工作在 Background Scheduler 上進(jìn)行處理;
- 要顯示的最終數(shù)據(jù)在主線程上傳遞給 Subscriber,Subscriber 可以更新 UI。
我們可以看到 Scheduler 的概念深深植根于前臺/后臺執(zhí)行的概念。此外,根據(jù)我們選擇的實(shí)現(xiàn),工作可以串行化或并行化。
因此,要全面了解 Scheduler,需要查看哪些類符合 Scheduler 協(xié)議。首先,我們需要了解與 Scheduler 相關(guān)的兩個重要 Operator。
Scheduler Operator
Combine 提供了兩個基本的 Operator 來使用 Scheduler:
subscribe(on:)
和 subscribe(on:options:)
在指定的 Scheduler 上創(chuàng)建 Subscription(開始工作);
receive(on:)
和 receive(on:options:)
在指定的 Scheduler 上傳遞值。
此外,以下 Operator 將 Scheduler 和 Scheduler options 作為參數(shù):
debounce(for:scheduler:options:)
delay(for:tolerance:scheduler:options:)
measureInterval(using:options:)
throttle(for:scheduler:latest:)
timeout(_:scheduler:options:customError:)
subscribe(on:) 和 receive(on:)
在我們訂閱它之前,Publisher 是一個無生命的實(shí)體。但是當(dāng)我們訂閱 Publisher 時會發(fā)生什么?有幾個步驟:
- Publiser
receive
Subscriber 并創(chuàng)建 Subscription; - Subscriber
receive
Subscription 并從 Publiser 請求值(虛線); - Publiser 開始工作(通過 Subscription);
- Publiser 發(fā)出值(通過 Subscription);
- Operator 轉(zhuǎn)換值;
- Subscriber 收到最終值。
當(dāng)代碼訂閱 Publiser 時,步驟一、二和三通常發(fā)生在當(dāng)前線程上。 但是當(dāng)我們使用 subscribe(on:)
Operator 時,所有這些操作都在我們指定的 Scheduler 上運(yùn)行。
我們可能希望 Publiser 在后臺執(zhí)行一些昂貴的計算以避免阻塞主線程。 執(zhí)行此操作的簡單方法是使用 subscribe(on:)
。以下是偽代碼:
let queue = DispatchQueue(label: "serial queue") let subscription = publisher .subscribe(on: queue) .sink { value in ...
如果我們收到值后,想更新一些 UI 怎么辦?我們可以在閉包中執(zhí)行類似 DispatchQueue.main.async { ... }
的操作,從主線程執(zhí)行 UI 更新。有一種更有效的方法可以使用 Combine 的 receive(on:)
:
let subscription = publisher .subscribe(on: queue) .receive(on: DispatchQueue.main) .sink { value in ...
即使計算工作正常并從后臺線程發(fā)出結(jié)果,我們現(xiàn)在也可以保證始終在主隊列上接收值。這是安全地執(zhí)行 UI 更新所需要的。
Scheduler 實(shí)現(xiàn)
Apple 提供了幾種 Scheduler 協(xié)議的具體實(shí)現(xiàn):
ImmediateScheduler
:一個簡單的 Scheduler,它立即在當(dāng)前線程上執(zhí)行代碼,這是默認(rèn)的執(zhí)行上下文,除非使用subscribe(on:)
、receive(on:)
或任何其他將 Scheduler 作為參數(shù)的 Operator 進(jìn)行修改。RunLoop
:綁定到 Foundation 的 Thread 對象。DispatchQueue
:可以是串行的或并發(fā)的。OperationQueue
:規(guī)范工作項(xiàng)執(zhí)行的隊列。
這里省略了 TestScheduler
,是一個虛擬的、模擬的 Scheduler,它是任何響應(yīng)式編程框架測試時不可或缺的一部分。
ImmediateScheduler
在 Playground 中新增代碼:
example("ImmediateScheduler") { let source = Timer .publish(every: 1.0, on: .main, in: .common) .autoconnect() .scan(0) { counter, _ in counter + 1 } let publisher = source .receive(on: ImmediateScheduler.shared) .eraseToAnyPublisher() publisher.sink(receiveValue: { _ in print(Thread.current) }) .store(in: &subscriptions) }
運(yùn)行 Playground,我們會看到 Publisher 發(fā)出的每個值,都是在 MainThread
上:
--- ImmediateScheduler --- <_NSMainThread: 0x129617390>{number = 1, name = main} <_NSMainThread: 0x129617390>{number = 1, name = main} <_NSMainThread: 0x129617390>{number = 1, name = main} <_NSMainThread: 0x129617390>{number = 1, name = main} <_NSMainThread: 0x129617390>{number = 1, name = main}
當(dāng)前線程是主線程, ImmediateScheduler
立即在當(dāng)前線程上調(diào)度。當(dāng)我們在 .receive(on: ImmediateScheduler.shared)
前添加一行:
.receive(on: DispatchQueue.global())
執(zhí)行 Playground,我們將在不同的線程收到值:
--- ImmediateScheduler --- <NSThread: 0x12e7286c0>{number = 4, name = (null)} <NSThread: 0x12e7286c0>{number = 4, name = (null)} <NSThread: 0x11f005310>{number = 2, name = (null)} <NSThread: 0x11f005310>{number = 2, name = (null)} <NSThread: 0x12e7286c0>{number = 4, name = (null)}
ImmediateScheduler options 由于大多數(shù) Operator 在其參數(shù)中接受 Scheduler,我們還可以找到一個接受 SchedulerOptions
值的參數(shù)。在 ImmediateScheduler
的情況下,此類型被定義為 Never
,因此在使用 ImmediateScheduler
時,我們永遠(yuǎn)不應(yīng)該為 Operator 的 options 參數(shù)傳遞值。
ImmediateScheduler 的陷阱 關(guān)于 ImmediateScheduler
的一件事是它是即時的。我們無法使用 Scheduler 協(xié)議的任何 schedule(after:)
變體,因?yàn)槲覀冃枰付ǖ?SchedulerTimeType
沒有初始化方法,對于 ImmediateScheduler
無意義。
RunLoop scheduler
RunLoop 早于 DispatchQueue,它是一種在線程級別管理輸入源的方法。主線程有一個關(guān)聯(lián)的 RunLoop,我們還可以通過從當(dāng)前線程調(diào)用 RunLoop.current
為任何線程獲取一個 RunLoop。
在 Playground 中添加此代碼:
example("RunLoop") { let source = Timer .publish(every: 1.0, on: .main, in: .common) .autoconnect() .scan(0) { counter, _ in counter + 1 } let publisher = source .receive(on: DispatchQueue.global()) .handleEvents(receiveOutput: { _ in print("DispatchQueue.global: \(Thread.current)") }) .receive(on: RunLoop.current) .handleEvents(receiveOutput: { _ in print("RunLoop.current: \(Thread.current)") }) .eraseToAnyPublisher() publisher.sink(receiveValue: { _ in }) .store(in: &subscriptions) }
當(dāng)前 RunLoop.current 就是主線程的 RunLoop。執(zhí)行 Playground:
--- RunLoop --- DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)} RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main} DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)} RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main} DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)} RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main}
每發(fā)出一個值,都通過一個全局并發(fā)隊列的線程,然后在主線程上繼續(xù)。
RunLoop Options 與 ImmediateScheduler
一樣,RunLoop 不提供 SchedulerOptions
參數(shù)。
RunLoop 陷阱 RunLoop 的使用應(yīng)僅限于主線程的 RunLoop,以及我們在需要時控制的 Foundation 線程中可用的 RunLoop。要避免的一個是在 DispatchQueue 上執(zhí)行的代碼中使用 RunLoop.current。這是因?yàn)?DispatchQueue 線程可能是短暫的,這使得它們幾乎不可能依賴 RunLoop。
DispatchQueue Scheduler
DispatchQueue 符合 Scheduler 協(xié)議,并且完全可用于所有將 Scheduler 作為參數(shù)的 Operator。Dispatch 框架是 Foundation 的一個強(qiáng)大組件,它允許我們通過向系統(tǒng)管理的調(diào)度隊列提交工作來在多核硬件上同時執(zhí)行代碼。DispatchQueue 可以是串行的(默認(rèn))或并發(fā)的。串行隊列按順序執(zhí)行你提供給它的所有工作項(xiàng)。并發(fā)隊列將并行啟動多個工作項(xiàng),以最大限度地提高 CPU 使用率:
- 串行隊列通常用于保證某些操作不重疊。因此,如果所有操作都發(fā)生在同一個隊列中,他們可以使用共享資源而無需加鎖。
- 并發(fā)隊列將同時執(zhí)行盡可能多的操作。因此,它更適合純計算。
我們一直使用的最熟悉的隊列是 DispatchQueue.main。它直接映射到主線程,在這個隊列上執(zhí)行的所有操作都可以自由地更新用戶界面。 當(dāng)然,UI 更新只能在主線程進(jìn)行。所有其他隊列,無論是串行的還是并發(fā)的,都在系統(tǒng)管理的線程池中執(zhí)行它們的代碼。這意味著我們永遠(yuǎn)不應(yīng)該對隊列中運(yùn)行的代碼中的當(dāng)前線程做出任何假設(shè)。尤其不應(yīng)使用 RunLoop.current 來安排工作,因?yàn)?DispatchQueue 管理其線程的方式有不同。
所有調(diào)度隊列共享同一個線程池,執(zhí)行的串行隊列將使用該池中的任何可用線程。一個直接的結(jié)果是,來自同一隊列的兩個連續(xù)工作項(xiàng)可能使用不同的線程,但仍可以按順序執(zhí)行。這是一個重要的區(qū)別:當(dāng)使用 subscribe(on:)
、receive(on:)
或任何其他有 Scheduler 參數(shù)的 Operator 時,我們永遠(yuǎn)不應(yīng)假設(shè)線程每次都是相同的。
在 Playground 中添加代碼:
example("DispatchQueue") { let source = PassthroughSubject<Void, Never>() let sourceQueue = DispatchQueue.main let subscription = sourceQueue.schedule(after: sourceQueue.now, interval: .seconds(1)) { source.send() } .store(in: &subscriptions) let serialQueue = DispatchQueue(label: "Serial queue") source .handleEvents(receiveOutput: { _ in print("\(Thread.current)") }) .receive(on: serialQueue) .handleEvents(receiveOutput: { _ in print("\(Thread.current)") }) .sink(receiveValue: { _ in }) .store(in: &subscriptions) }
Timer 在主隊列 sourceQueue
上觸發(fā)并通過 source
發(fā)送 Void 值。接著在串行隊列 serialQueue
上接收值:
--- DispatchQueue --- <_NSMainThread: 0x126f0a250>{number = 1, name = main} <NSThread: 0x128025cd0>{number = 2, name = (null)} <_NSMainThread: 0x126f0a250>{number = 1, name = main} <NSThread: 0x1178243e0>{number = 6, name = (null)} <_NSMainThread: 0x126f0a250>{number = 1, name = main} <NSThread: 0x117904d90>{number = 5, name = (null)} <_NSMainThread: 0x126f0a250>{number = 1, name = main} <NSThread: 0x1178243e0>{number = 6, name = (null)} <_NSMainThread: 0x126f0a250>{number = 1, name = main} <NSThread: 0x1178243e0>{number = 6, name = (null)}
將 sourceQueue 也改為 DispatchQueue(label: "Serial queue")
,也將在全局并發(fā)隊列上發(fā)出值:
--- DispatchQueue --- <NSThread: 0x137e275b0>{number = 6, name = (null)} <NSThread: 0x130905310>{number = 2, name = (null)} <NSThread: 0x130905310>{number = 2, name = (null)} <NSThread: 0x130905310>{number = 2, name = (null)} <NSThread: 0x127e0f400>{number = 4, name = (null)} <NSThread: 0x137e275b0>{number = 6, name = (null)}
DispatchQueue Options DispatchQueue 是唯一提供一組 Options 的 Scheduler,當(dāng) Operator 需要 SchedulerOptions
參數(shù)時,我們可以傳遞這些 Options。主要圍繞 QoS(服務(wù)質(zhì)量)值,獨(dú)立于 DispatchQueue 上已設(shè)置的值。例如:
.receive( on: serialQueue, options: DispatchQueue.SchedulerOptions(qos: .userInteractive) )
我們將 DispatchQueue.SchedulerOptions
的實(shí)例傳遞.userInteractive
。在實(shí)際開發(fā)中使用這些 Options 有助于操作系統(tǒng)決定在同時有許多隊列忙碌的情況下首先安排哪個任務(wù)。
OperationQueue Scheduler
由于 OperationQueue 在內(nèi)部使用 Dispatch,因此在表面上幾乎沒有區(qū)別:
example("OperationQueue") { let queue = OperationQueue() let subscription = (1...10).publisher .receive(on: queue) .print() .sink { value in print("Received \(value)") } .store(in: &subscriptions) }
創(chuàng)建一個簡單的 Publisher 發(fā)出 1 到 10 之間的數(shù)字,然后打印該值,執(zhí)行 Playground:
--- OperationQueue --- receive subscription: (ReceiveOn) request unlimited receive value: (1) Received 1 receive value: (8) Received 8 receive value: (9) Received 9 receive value: (6) Received 6 receive value: (3) Received 3 receive value: (5) Received 5 receive finished receive value: (10) receive value: (4) receive value: (7) receive value: (2)
按順序發(fā)出但無序到達(dá)!我們可以更改打印行以顯示當(dāng)前線程:
print("Received \(value) on thread \(Thread.current)")
再次執(zhí)行 Playground:
--- OperationQueue --- receive subscription: (ReceiveOn) request unlimited receive value: (4) Received 4 on thread <NSThread: 0x14d720980>{number = 2, name = (null)} receive value: (10) Received 10 on thread <NSThread: 0x14d720980>{number = 2, name = (null)} receive value: (3) Received 3 on thread <NSThread: 0x14e833620>{number = 6, name = (null)} receive value: (5) Received 5 on thread <NSThread: 0x14e80dfd0>{number = 4, name = (null)} receive value: (1) Received 1 on thread <NSThread: 0x14d70d840>{number = 5, name = (null)} receive finished receive value: (2) receive value: (9) receive value: (8) receive value: (6)
每個值都是在不同的線程上接收的!如果我們查看有關(guān) OperationQueue 的文檔,有一條關(guān)于線程的說明,OperationQueue 使用 Dispatch 框架(因此是 DispatchQueue)來執(zhí)行操作。這意味著它不保證它會為每個交付的值使用相同的底層線程。
此外,每個 OperationQueue 中都有一個參數(shù)可以解釋一切:它是 maxConcurrentOperationCount。它默認(rèn)為系統(tǒng)定義的數(shù)字,允許操作隊列同時執(zhí)行大量操作。由于 Publisher 幾乎在同一時間發(fā)出所有值,它們被 Dispatch 的并發(fā)隊列分派到多個線程。
對代碼進(jìn)行一些修改:
queue.maxConcurrentOperationCount = 1
再次執(zhí)行 Playground:
--- OperationQueue --- receive subscription: (ReceiveOn) request unlimited receive value: (1) Received 1 on thread <NSThread: 0x117609390>{number = 4, name = (null)} receive value: (2) Received 2 on thread <NSThread: 0x117609390>{number = 4, name = (null)} receive value: (3) Received 3 on thread <NSThread: 0x117609390>{number = 4, name = (null)} receive value: (4) Received 4 on thread <NSThread: 0x117609390>{number = 4, name = (null)} receive value: (5) Received 5 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive value: (6) Received 6 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive value: (7) Received 7 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive value: (8) Received 8 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive value: (9) Received 9 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive value: (10) Received 10 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive finished
這一次,我們將獲得真正的順序執(zhí)行——將 maxConcurrentOperationCount
設(shè)置為 1 相當(dāng)于使用串行隊列。
OperationQueue Options OperationQueue 沒有可用的 SchedulerOptions
。它實(shí)際上是 RunLoop.SchedulerOptions
類型,本身沒有提供任何 Options。
OperationQueue 陷阱 我們剛剛看到 OperationQueue 默認(rèn)并發(fā)執(zhí)行操作,我們需要非常清楚這一點(diǎn),因?yàn)樗赡軙o我們帶來麻煩。當(dāng)我們的 Publisher 發(fā)出值時都有大量工作要執(zhí)行時,它可能是一個很好的工具。我們可以通過調(diào)整 maxConcurrentOperationCount
參數(shù)來控制負(fù)載。
內(nèi)容參考
- Combine | Apple Developer Documentation;
- 來自 Kodeco 的書籍《Combine: Asynchronous Programming with Swift》;
- 對上述 Kodeco 書籍的漢語自譯版 《Combine: Asynchronous Programming with Swift》整理與補(bǔ)充。
以上就是Combine中錯誤處理和Scheduler使用詳解的詳細(xì)內(nèi)容,更多關(guān)于Combine錯誤處理Scheduler的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
switch實(shí)現(xiàn)一個兩數(shù)的運(yùn)算代碼示例
這篇文章主要介紹了switch實(shí)現(xiàn)一個兩數(shù)的運(yùn)算代碼示例,需要的朋友可以參考下2017-06-06簡陋的swift carthage copy-frameworks 輔助腳本代碼
下面小編就為大家分享一篇簡陋的swift carthage copy-frameworks 輔助腳本代碼,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-01-01Spring中BeanFactory與FactoryBean的區(qū)別解讀
這篇文章主要介紹了Spring中BeanFactory與FactoryBean的區(qū)別解讀,Java的BeanFactory是Spring框架中的一個接口,它是用來管理和創(chuàng)建對象的工廠接口,在Spring中,我們可以定義多個BeanFactory來管理不同的組件,需要的朋友可以參考下2023-12-12Swift利用Decodable解析JSON的一個小問題詳解
這篇文章主要給大家介紹了關(guān)于Swift利用Decodable解析JSON的一個小問題的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-04-04