作者:Philipp Haller、Aleksandar Prokopec、Heather Miller、Viktor Klang、Roland Kuhn 和 Vojin Jovanovic
簡介
期貨提供一種方法,可以有效率且非封鎖的方式,推理執行許多平行作業。
Future
是可能尚未存在的數值的佔位元物件。一般而言,Future 的值會同時提供,並可在之後使用。以這種方式組合同時作業,往往會產生更快速的非同步非封鎖平行程式碼。
預設情況下,futures 和 promises 是非封鎖的,使用回呼而不是典型的封鎖操作。為了在語法和概念上簡化回呼的使用,Scala 提供了組合器,例如 flatMap
、foreach
和 filter
,用於以非封鎖方式組合 futures。封鎖仍然是可能的 - 在絕對必要的情況下,可以封鎖 futures(儘管不建議這樣做)。
典型的 future 如下所示
val inverseFuture: Future[Matrix] = Future {
fatMatrix.inverse() // non-blocking long lasting computation
}(executionContext)
或使用更慣用的
implicit val ec: ExecutionContext = ...
val inverseFuture : Future[Matrix] = Future {
fatMatrix.inverse()
} // ec is implicitly passed
given ExecutionContext = ...
val inverseFuture : Future[Matrix] = Future {
fatMatrix.inverse()
} // execution context is implicitly passed
這兩個程式碼片段都將 fatMatrix.inverse()
的執行委派給 ExecutionContext
,並在 inverseFuture
中體現計算結果。
執行內容
Future 和 Promises 圍繞 ExecutionContext
s 展開,負責執行計算。
ExecutionContext
類似於 Executor:它可以自由地在新的執行緒、已合併的執行緒或目前的執行緒中執行計算(儘管不建議在目前的執行緒中執行計算 - 如下所述)。
scala.concurrent
套件開箱即用,提供 ExecutionContext
實作,一個全域靜態執行緒池。也可以將 Executor
轉換為 ExecutionContext
。最後,使用者可以自由地擴充 ExecutionContext
特質來實作自己的執行內容,儘管只應在罕見的情況下執行此操作。
全域執行內容
ExecutionContext.global
是由 ForkJoinPool 支援的 ExecutionContext
。它應足以應付大多數情況,但需要小心使用。 ForkJoinPool
管理有限數量的執行緒(執行緒的最大數量稱為平行處理層級)。並行封鎖運算的數量只能在每個封鎖呼叫都包覆在 blocking
呼叫內時,才能超過平行處理層級(稍後會詳細說明)。否則,全球執行緒環境中的執行緒池可能會發生飢餓,而且無法進行運算。
預設情況下,ExecutionContext.global
會將其底層 fork-join 池的平行處理層級設定為可用處理器的數量(Runtime.availableProcessors)。可以透過設定下列其中一個(或多個)VM 屬性來覆寫此組態
- scala.concurrent.context.minThreads - 預設為
1
- scala.concurrent.context.numThreads - 可以是數字或乘數 (N),格式為『xN』;預設為
Runtime.availableProcessors
- scala.concurrent.context.maxThreads - 預設為
Runtime.availableProcessors
平行處理層級會設定為 numThreads
,只要它保持在 [minThreads; maxThreads]
內。
如上所述,ForkJoinPool
可以增加執行緒數量,超過其 parallelismLevel
,在封鎖運算的情況下。如 ForkJoinPool
API 中所述,這只有在明確通知池時才有可能
import scala.concurrent.{ Future, ExecutionContext }
import scala.concurrent.forkjoin._
// the following is equivalent to `implicit val ec = ExecutionContext.global`
import ExecutionContext.Implicits.global
Future {
ForkJoinPool.managedBlock(
new ManagedBlocker {
var done = false
def block(): Boolean = {
try {
myLock.lock()
// ...
} finally {
done = true
}
true
}
def isReleasable: Boolean = done
}
)
}
import scala.concurrent.{ Future, ExecutionContext }
import scala.concurrent.forkjoin.*
// the following is equivalent to `given ExecutionContext = ExecutionContext.global`
import ExecutionContext.Implicits.global
Future {
ForkJoinPool.managedBlock(
new ManagedBlocker {
var done = false
def block(): Boolean =
try
myLock.lock()
// ...
finally
done = true
true
def isReleasable: Boolean = done
}
)
}
很幸運地,concurrent 套件提供了一個方便的方法來執行此操作
import scala.concurrent.Future
import scala.concurrent.blocking
Future {
blocking {
myLock.lock()
// ...
}
}
請注意,blocking
是一個一般性的建構,將會在 下方更深入地討論。
最後但並非最不重要的一點,您必須記住,ForkJoinPool
並非設計用於長時間的封鎖操作。即使已使用 blocking
通知,但池可能不會如您預期般產生新的工作執行緒,而當新的工作執行緒建立時,它們的數量可能多達 32767。舉例來說,以下程式碼將使用 32000 個執行緒
implicit val ec = ExecutionContext.global
for (i <- 1 to 32000) {
Future {
blocking {
Thread.sleep(999999)
}
}
}
given ExecutionContext = ExecutionContext.global
for i <- 1 to 32000 do
Future {
blocking {
Thread.sleep(999999)
}
}
如果您需要封裝長時間的封鎖操作,我們建議使用專用的 ExecutionContext
,例如透過封裝 Java Executor
。
調整 Java Executor
使用 ExecutionContext.fromExecutor
方法,您可以將 Java Executor
封裝到 ExecutionContext
中。例如
ExecutionContext.fromExecutor(new ThreadPoolExecutor( /* your configuration */ ))
ExecutionContext.fromExecutor(ThreadPoolExecutor( /* your configuration */ ))
同步執行內容
有人可能會想建立一個 ExecutionContext
,在目前執行緒中執行運算
val currentThreadExecutionContext = ExecutionContext.fromExecutor(
new Executor {
// Do not do this!
def execute(runnable: Runnable) = runnable.run()
})
這應該避免,因為它會在您未來的執行中引入不確定性。
Future {
doSomething
}(ExecutionContext.global).map {
doSomethingElse
}(currentThreadExecutionContext)
doSomethingElse
呼叫可能會在 doSomething
的執行緒或主執行緒中執行,因此可能是非同步或同步的。正如 此處 所解釋的,回呼不應同時具備這兩種特性。
期貨
一個 Future
是一個物件,持有某個可能在某個時間點可用的值。這個值通常是某個其他運算的結果
- 如果運算尚未完成,我們說這個
Future
是未完成的。 - 如果運算已完成,並產生一個值或一個例外,我們說這個
Future
是已完成的。
完成可以採取兩種形式
- 當一個
Future
以一個值完成時,我們說這個 future 以那個值成功完成。 - 當一個
Future
以運算拋出的例外完成時,我們說這個Future
以那個例外失敗。
一個 Future
有個重要的特性,就是它只能被指定一次。一旦一個 Future
物件被賦予一個值或一個例外,它就等於不可變的 – 它永遠不會被覆寫。
建立一個 future 物件最簡單的方法是呼叫 Future.apply
方法,它會啟動一個非同步運算,並回傳一個持有那個運算結果的 future。一旦 future 完成,結果就會可用。
請注意 Future[T]
是一個表示 future 物件的型別,而 Future.apply
是建立並排程一個非同步運算的方法,然後回傳一個將以那個運算的結果完成的 future 物件。
這可以用一個範例最好說明。
假設我們想要使用某個熱門社群網站的假設 API 來取得某個特定使用者的朋友清單。我們將開啟一個新的工作階段,然後送出一個要求來取得某個特定使用者的朋友清單
import scala.concurrent._
import ExecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
session.getFriends()
}
import scala.concurrent.*
import ExecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
session.getFriends()
}
在上面,我們首先匯入 scala.concurrent
套件的內容,以使 Future
型別可見。我們稍後會說明第二個匯入。
我們接著初始化一個會話變數,我們將使用它來使用假設的 createSessionFor
方法,將要求傳送至伺服器。若要取得使用者的朋友清單,必須透過網路傳送要求,這可能會花費很長的時間。這會以呼叫 getFriends
方法來說明,該方法會傳回 List[Friend]
。為了在回應抵達之前更有效地使用 CPU,我們不應封鎖程式其餘部分,這個運算應非同步地排程。Future.apply
方法執行完全相同的動作,它會同時執行指定的運算區塊,在本例中會傳送要求至伺服器並等待回應。
一旦伺服器回應,朋友清單就會在未來 f
中提供。
嘗試失敗可能會導致例外。在下列範例中,session
值初始化錯誤,因此 Future
區塊中的運算會擲回 NullPointerException
。這個未來 f
接著會因這個例外而失敗,而不是順利完成。
val session = null
val f: Future[List[Friend]] = Future {
session.getFriends()
}
上方的程式碼列 import ExecutionContext.Implicits.global
會匯入預設的全球執行緒環境。執行緒環境會執行提交給它們的任務,您可以將執行緒環境視為執行緒池。它們對於 Future.apply
方法至關重要,因為它們會處理如何以及何時執行非同步運算。您可以定義自己的執行緒環境並將它們與 Future
搭配使用,但目前來說,只要知道您可以匯入預設的執行緒環境(如上所示)就已足夠。
我們的範例基於假設的社群網路 API,其中運算包含傳送網路要求並等待回應。提供一個包含非同步運算的範例,以便您可以立即試用,是很公平的。假設您有一個文字檔,而且您想要找出特定關鍵字第一次出現的位置。此運算可能包含在從磁碟中擷取檔案內容時的封鎖,因此與運算的其他部分同時執行是有意義的。
val firstOccurrence: Future[Int] = Future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
回呼
我們現在知道如何啟動非同步運算來建立新的未來值,但我們尚未顯示如何使用結果,一旦結果可用,我們就可以對其執行有用的操作。我們通常對運算的結果感興趣,而不仅仅是其副作用。
在許多未來的實作中,一旦未來的客戶端對其結果感興趣,它就必須封鎖自己的運算並等到未來完成 - 只有這樣才能使用未來的值來繼續自己的運算。儘管這由 Scala Future
API 允許,正如我們稍後將展示的,但從效能的角度來看,更好的方法是透過在未來註冊回呼,以完全非封鎖的方式執行。此回呼會在未來完成後非同步呼叫。如果在註冊回呼時未來已經完成,則回呼可能會非同步執行,或在同一個執行緒上循序執行。
註冊回呼最通用的形式是使用 onComplete
方法,它會採用類型為 Try[T] => U
的回呼函式。如果未來完成成功,則會將回呼套用至類型為 Success[T]
的值,否則會套用至類型為 Failure[T]
的值。
Try[T]
類似於 Option[T]
或 Either[T, S]
,因為它是一個單子,可能持有某種類型的值。不過,它特別設計為持有值或某個可拋出物件。 Option[T]
可以是值 (即 Some[T]
) 或完全沒有值 (即 None
),而 Try[T]
在持有值時為 Success[T]
,否則為 Failure[T]
,後者會持有例外狀況。 Failure[T]
持有的資訊不只是一個單純的 None
,它會說明為何沒有值。同時,你可以將 Try[T]
視為 Either[Throwable, T]
的特殊版本,專門用於左邊值為 Throwable
的情況。
回到我們的社群網路範例,假設我們想要擷取最近自己的貼文清單並將它們呈現在螢幕上。我們透過呼叫 getRecentPosts
方法來執行此動作,它會傳回 List[String]
,也就是最近文字貼文的清單
import scala.util.{Success, Failure}
val f: Future[List[String]] = Future {
session.getRecentPosts()
}
f.onComplete {
case Success(posts) => for (post <- posts) println(post)
case Failure(t) => println("An error has occurred: " + t.getMessage)
}
import scala.util.{Success, Failure}
val f: Future[List[String]] = Future {
session.getRecentPosts()
}
f.onComplete {
case Success(posts) => for post <- posts do println(post)
case Failure(t) => println("An error has occurred: " + t.getMessage)
}
onComplete
方法在於允許用戶端處理失敗和成功的未來運算結果,因此具有普遍性。如果只需要處理成功的結果,可以使用 foreach
回呼
val f: Future[List[String]] = Future {
session.getRecentPosts()
}
for {
posts <- f
post <- posts
} println(post)
val f: Future[List[String]] = Future {
session.getRecentPosts()
}
for
posts <- f
post <- posts
do println(post)
Future
提供一種乾淨的方式,使用 failed
投影來處理僅失敗的結果,它會將 Failure[Throwable]
轉換為 Success[Throwable]
。在 投影片 下方的區段中提供這樣做的範例。
回到先前的範例,搜尋關鍵字的第一個出現位置,您可能想要將關鍵字的位置列印到螢幕上
val firstOccurrence: Future[Int] = Future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurrence.onComplete {
case Success(idx) => println("The keyword first appears at position: " + idx)
case Failure(t) => println("Could not process file: " + t.getMessage)
}
onComplete
和 foreach
方法的結果類型都是 Unit
,表示無法串連呼叫這些方法。請注意,這是故意的設計,以避免暗示串連呼叫可能暗示已註冊的回呼執行順序(在同一個未來上註冊的回呼是無序的)。
話雖如此,我們現在應該說明回呼確切在何時被呼叫。由於它需要未來中的值可用,因此只能在未來完成後呼叫它。但是,無法保證它會由完成未來的執行緒或建立回呼的執行緒呼叫。相反地,回呼是由某個執行緒在未來物件完成後的一段時間執行。我們說回呼會最終執行。
此外,即使在同一應用程式的不同執行之間,回呼執行的順序也並未預先定義。事實上,回呼可能不會依序一個接著一個呼叫,而可能同時並行執行。這表示在下列範例中,變數 totalA
可能無法設定為計算文字中正確數量的 a 字元大小寫。
@volatile var totalA = 0
val text = Future {
"na" * 16 + "BATMAN!!!"
}
text.foreach { txt =>
totalA += txt.count(_ == 'a')
}
text.foreach { txt =>
totalA += txt.count(_ == 'A')
}
在上方,兩個回呼可能一個接著一個執行,在這種情況下,變數 totalA
會持有預期的值 18
。然而,它們也可能並行執行,因此 totalA
可能會變成 16
或 2
,因為 +=
並非原子操作(亦即它包含一個讀取步驟和一個寫入步驟,可能會與其他讀取和寫入任意交錯)。
為了完整起見,此處列出回呼的語意
-
在未來註冊
onComplete
回呼可確保在未來最終完成後呼叫對應的封閉。 -
註冊
foreach
回呼具有與onComplete
相同的語意,不同之處在於只有在未來成功完成時才會呼叫封閉。 -
在已經完成的未來註冊回呼,最終將執行回呼(如 1 所暗示)。
-
如果在未來註冊多個回呼,則其執行的順序未定義。事實上,回呼可能彼此並行執行。然而,特定
ExecutionContext
實作可能導致順序明確定義。 -
在某些呼叫回擲出例外情況時,其他呼叫回仍會執行。
-
在某些呼叫回從未完成時(例如呼叫回包含無限迴圈),其他呼叫回可能完全不會執行。在這些情況下,潛在的封鎖呼叫回必須使用
blocking
建構(請參閱下方)。 -
執行後,呼叫回會從未來物件中移除,因此符合 GC 資格。
函數組合和 For-Comprehensions
我們展示的呼叫回機制足以將未來結果與後續運算串連。然而,它有時不方便且會產生龐大的程式碼。我們透過範例來說明這一點。假設我們有一個用於與貨幣交易服務介接的 API。假設我們想要購買美元,但僅在有利可圖時才購買。我們首先展示如何使用呼叫回來完成此操作
val rateQuote = Future {
connection.getCurrentValue(USD)
}
for (quote <- rateQuote) {
val purchase = Future {
if (isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
for (amount <- purchase)
println("Purchased " + amount + " USD")
}
val rateQuote = Future {
connection.getCurrentValue(USD)
}
for quote <- rateQuote do
val purchase = Future {
if isProfitable(quote) then connection.buy(amount, quote)
else throw Exception("not profitable")
}
for amount <- purchase do
println("Purchased " + amount + " USD")
我們從建立未來 rateQuote
開始,它會取得目前的匯率。從伺服器取得此值且未來成功完成後,運算會在 foreach
呼叫回中進行,我們準備決定是否購買。因此,我們建立另一個未來 purchase
,它會在有利可圖時決定購買,然後發送請求。最後,在購買完成後,我們會將通知訊息印出到標準輸出。
這是有用的,但由於兩個原因而不方便。首先,我們必須使用 foreach
並在其中嵌套第二個 purchase
future。想像一下,在 purchase
完成後,我們想要賣出其他貨幣。我們必須在 foreach
回呼中重複此模式,這會讓程式碼過度縮排、龐大且難以理解。
其次, purchase
future 不在其他程式碼的範圍內,只能在 foreach
回呼中對其執行動作。這表示應用程式的其他部分看不到 purchase
future,也無法註冊另一個 foreach
回呼,例如,賣出其他貨幣。
由於這兩個原因,futures 提供組合器,允許更直接的組合。其中一個基本的組合器是 map
,它給定一個 future 和一個 future 值的對應函式,產生一個新的 future,一旦原始 future 成功完成,就會以對應的值完成。您可以用與對應集合相同的方式來對應 future。
讓我們使用 map
組合器重新編寫前一個範例
val rateQuote = Future {
connection.getCurrentValue(USD)
}
val purchase = rateQuote.map { quote =>
if (isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
purchase.foreach { amount =>
println("Purchased " + amount + " USD")
}
val rateQuote = Future {
connection.getCurrentValue(USD)
}
val purchase = rateQuote.map { quote =>
if isProfitable(quote) then connection.buy(amount, quote)
else throw Exception("not profitable")
}
purchase.foreach { amount =>
println("Purchased " + amount + " USD")
}
藉由在 rateQuote
上使用 map
,我們消除了 foreach
回呼,更重要的是,消除了巢狀結構。如果我們現在決定要出售其他貨幣,就足夠在 purchase
上再次使用 map
。
但是,如果 isProfitable
傳回 false
會發生什麼事,因此導致引發例外狀況?在這種情況下,purchase
會因該例外狀況而失敗。此外,假設連線中斷,而且 getCurrentValue
引發例外狀況,導致 rateQuote
失敗。在這種情況下,我們沒有值可以對應,因此 purchase
會自動因與 rateQuote
相同的例外狀況而失敗。
總之,如果原始 future 順利完成,則傳回的 future 會以原始 future 的對應值完成。如果對應函式引發例外狀況,則 future 會以該例外狀況完成。如果原始 future 因例外狀況而失敗,則傳回的 future 也會包含相同的例外狀況。這種例外狀況傳播語意也存在於其他組合子中。
future 的設計目標之一是讓它們可以在 for-comprehension 中使用。基於這個原因,future 也有 flatMap
和 withFilter
組合子。flatMap
方法會採用一個函式,將值對應到新的 future g
,然後傳回一個 future,在 g
完成後完成。
假設我們想要將美元兌換成瑞士法郎 (CHF)。我們必須取得這兩種貨幣的報價,然後根據這兩個報價決定是否買入。以下是 for-comprehension 中使用 flatMap
和 withFilter
的範例
val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }
val purchase = for {
usd <- usdQuote
chf <- chfQuote
if isProfitable(usd, chf)
} yield connection.buy(amount, chf)
purchase foreach { amount =>
println("Purchased " + amount + " CHF")
}
val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }
val purchase = for
usd <- usdQuote
chf <- chfQuote
if isProfitable(usd, chf)
yield connection.buy(amount, chf)
purchase.foreach { amount =>
println("Purchased " + amount + " CHF")
}
當 usdQuote
和 chfQuote
都完成後,purchase
未來才會完成,它取決於這兩個未來的值,因此它自己的計算無法提早開始。
上面的 for-comprehension 被翻譯成
val purchase = usdQuote.flatMap {
usd =>
chfQuote
.withFilter(chf => isProfitable(usd, chf))
.map(chf => connection.buy(amount, chf))
}
這比 for-comprehension 難以理解,但我們分析它以更好地理解 flatMap
操作。flatMap
操作將其自己的值映射到其他未來。一旦這個不同的未來完成,結果的未來將用它的值完成。在我們的例子中,flatMap
使用 usdQuote
未來的值將 chfQuote
的值映射到第三個未來,該未來發送請求購買一定數量的瑞士法郎。結果的未來 purchase
僅在從 map
返回的第三個未來完成後才完成。
這可能會令人費解,但幸運的是,flatMap
操作很少在 for-comprehension 之外使用,而 for-comprehension 更容易使用和理解。
filter
組合器會建立一個新的未來,其中僅包含原始未來的值(如果它滿足某些謂詞)。否則,新的未來會因 NoSuchElementException
而失敗。對於呼叫 filter
的未來,其效果與呼叫 withFilter
完全相同。
collect
和 filter
組合器之間的關係類似於集合 API 中這些方法的關係。
由於 Future
特質在概念上可以包含兩種類型的值(運算結果和例外),因此需要處理例外的組合器。
假設我們根據 rateQuote
決定購買一定數量。connection.buy
方法會取得要購買的 amount
和預期的 quote
。它會傳回已購買的數量。如果 quote
在此期間已變更,它會擲回 QuoteChangedException
,而且不會購買任何東西。如果我們希望我們的未來包含 0
,而不是例外,我們會使用 recover
組合器
val purchase: Future[Int] = rateQuote.map {
quote => connection.buy(amount, quote)
}.recover {
case QuoteChangedException() => 0
}
組合子 recover
會建立一個新的 future,如果原始 future 成功完成,則它會包含與原始 future 相同的結果。如果沒有,則會將部分函數參數套用至導致原始 future 失敗的 Throwable
。如果它將 Throwable
映射至某個值,則新的 future 會使用該值成功完成。如果部分函數未定義於該 Throwable
,則產生的 future 會使用相同的 Throwable
失敗。
組合子 recoverWith
會建立一個新的 future,如果原始 future 成功完成,則它會包含與原始 future 相同的結果。否則,會將部分函數套用至導致原始 future 失敗的 Throwable
。如果它將 Throwable
映射至某個 future,則此 future 會使用該 future 的結果完成。它與 recover
的關係類似於 flatMap
與 map
的關係。
組合子 fallbackTo
會建立一個新的 future,如果此 future 成功完成,則它會包含此 future 的結果,否則會包含參數 future 的成功結果。如果此 future 和參數 future 都失敗,則新的 future 會使用此 future 的例外完成,如下列範例所示,它會嘗試列印美元值,但如果無法取得美元值,則會列印瑞士法郎值
val usdQuote = Future {
connection.getCurrentValue(USD)
}.map {
usd => "Value: " + usd + "$"
}
val chfQuote = Future {
connection.getCurrentValue(CHF)
}.map {
chf => "Value: " + chf + "CHF"
}
val anyQuote = usdQuote.fallbackTo(chfQuote)
anyQuote.foreach { println(_) }
組合器 andThen
純粹用於產生副作用。它會傳回一個新 future,其結果與目前的 future 完全相同,不論目前的 future 是否失敗。一旦目前的 future 完成結果,對應於 andThen
的封閉就會被呼叫,然後新的 future 會完成與這個 future 相同的結果。這確保了多個 andThen
呼叫會依序執行,就像以下範例中將社群網路的近期貼文儲存到可變動集合,然後將所有貼文呈現在畫面上
val allPosts = mutable.Set[String]()
Future {
session.getRecentPosts()
}.andThen {
case Success(posts) => allPosts ++= posts
}.andThen {
case _ =>
clearAll()
for (post <- allPosts) render(post)
}
val allPosts = mutable.Set[String]()
Future {
session.getRecentPosts()
}.andThen {
case Success(posts) => allPosts ++= posts
}.andThen {
case _ =>
clearAll()
for post <- allPosts do render(post)
}
總之,future 上的組合器是純粹函數式的。每個組合器都會傳回一個新的 future,與它衍生的 future 有關聯。
投影
為了讓 for-comprehension 能在以例外傳回的結果上執行,future 也有投影。如果原始的 future 失敗,failed
投影會傳回包含 Throwable
型別值的 future。如果原始的 future 成功,failed
投影會失敗,並出現 NoSuchElementException
。以下範例會將例外印在畫面上
val f = Future {
2 / 0
}
for (exc <- f.failed) println(exc)
val f = Future {
2 / 0
}
for exc <- f.failed do println(exc)
這個範例中的 for-comprehension 會轉譯成
f.failed.foreach(exc => println(exc))
因為 f
在此並未成功,所以封閉會註冊到新成功的 Future[Throwable]
上的 foreach
回呼。以下範例不會在畫面上印出任何東西
val g = Future {
4 / 2
}
for (exc <- g.failed) println(exc)
val g = Future {
4 / 2
}
for exc <- g.failed do println(exc)
延伸 Future
計畫支援延伸 Futures API,並提供額外的公用程式方法。這將允許外部架構提供更專業的公用程式。
封鎖
Future 通常是非同步的,且不會封鎖底層執行緒。不過,在某些情況下,有必要進行封鎖。我們區分執行緒封鎖的兩種形式:呼叫從 future 內部封鎖執行緒的任意程式碼,以及從另一個 future 外部封鎖,等待該 future 完成。
在 Future 中進行封鎖
正如在全域 ExecutionContext
中所見,可以使用 blocking
結構來通知 ExecutionContext
有關封鎖呼叫。然而,實作完全由 ExecutionContext
自行決定。雖然有些 ExecutionContext
例如 ExecutionContext.global
會透過 ManagedBlocker
來實作 blocking
,但有些執行緒環境例如固定執行緒池
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(x))
將不會執行任何動作,如下所示
implicit val ec =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
Future {
blocking { blockingStuff() }
}
given ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
Future {
blocking { blockingStuff() }
}
與下列效果相同
Future { blockingStuff() }
封鎖程式碼也可能會擲回例外。在此情況下,例外會轉送給呼叫者。
在 Future 外部進行封鎖
如前所述,強烈建議不要封鎖 future,以確保效能並防止死結。建議使用未來的回呼和組合器來使用其結果。然而,在某些情況下可能需要進行封鎖,而 Futures and Promises API 也支援此功能。
在上述貨幣交易範例中,一個封鎖的地方是在應用程式的最後,以確保所有 future 都已完成。以下是如何封鎖 future 結果的範例
import scala.concurrent._
import scala.concurrent.duration._
object awaitPurchase {
def main(args: Array[String]): Unit = {
val rateQuote = Future {
connection.getCurrentValue(USD)
}
val purchase = rateQuote.map { quote =>
if (isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
Await.result(purchase, 0.nanos)
}
}
import scala.concurrent.*
import scala.concurrent.duration.*
@main def awaitPurchase =
val rateQuote = Future {
connection.getCurrentValue(USD)
}
val purchase = rateQuote.map { quote =>
if isProfitable(quote) then connection.buy(amount, quote)
else throw Exception("not profitable")
}
Await.result(purchase, 0.nanos)
如果 future 失敗,呼叫者會收到 future 失敗的例外。這包括 failed
投影 - 如果原始 future 成功完成,封鎖它會導致擲回 NoSuchElementException
。
或者,呼叫 Await.ready
會等到 future 完成,但不會擷取其結果。同樣地,如果 future 失敗,呼叫該方法不會擲回例外。
Future
特徵實作具備 ready()
和 result()
方法的 Awaitable
特徵。這些方法無法由客戶端直接呼叫,只能由執行內容呼叫。
例外
當非同步運算拋出未處理的例外時,與這些運算相關聯的未來會失敗。失敗的未來會儲存 Throwable
的執行個體,而不是結果值。Future
提供 failed
投影方法,允許將此 Throwable
視為另一個 Future
的成功值。下列例外會收到特殊處理
-
scala.runtime.NonLocalReturnControl[_]
- 此例外會保留與回傳相關聯的值。通常,方法主體中的return
建構會轉譯為使用此例外的throw
。不會保留此例外,而是將相關聯的值儲存到未來或承諾中。 -
ExecutionException
- 當運算因未處理的InterruptedException
、Error
或scala.util.control.ControlThrowable
而失敗時儲存。在此情況下,ExecutionException
會將未處理的例外作為其原因。其背後的原因是防止傳播通常不會由客戶端程式碼處理的關鍵和控制流程相關例外,同時告知客戶端運算在哪些未來失敗。
致命例外 (由 NonFatal
決定) 會從執行失敗非同步運算的執行緒中重新擲出。這會通知管理執行執行緒問題的程式碼,並允許它在必要時快速失敗。請參閱 NonFatal
以取得更精確的說明,說明哪些例外被視為致命。
ExecutionContext.global
預設會透過列印堆疊追蹤來處理致命例外。
致命例外表示與運算相關聯的 Future
永遠不會完成。也就是說,「致命」表示錯誤無法由 ExecutionContext
復原,且也不打算由使用者程式碼處理。相比之下,應用程式程式碼可能會嘗試從「失敗」的 Future
復原,它已完成但有例外。
執行內容文字可以自訂為處理致命例外的報告者。請參閱工廠方法 fromExecutor
和 fromExecutorService
。
由於有必要為執行執行緒設定 UncaughtExceptionHandler
,因此當傳遞 null
執行器時,fromExecutor
會建立一個設定與 global
相同的內容文字,但會使用提供的報告者來處理例外,以提供便利。
以下範例示範如何取得具有自訂錯誤處理的 ExecutionContext
,並顯示如上所述的不同例外結果
import java.util.concurrent.{ForkJoinPool, TimeoutException}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}
object Test extends App {
def crashing(): Int = throw new NoSuchMethodError("test")
def failing(): Int = throw new NumberFormatException("test")
def interrupt(): Int = throw new InterruptedException("test")
def erroring(): Int = throw new AssertionError("test")
// computations can fail in the middle of a chain of combinators, after the initial Future job has completed
def testCrashes()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => crashing())
def testFails()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => failing())
def testInterrupted()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => interrupt())
def testError()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => erroring())
// Wait for 1 second for the the completion of the passed `future` value and print it
def check(future: Future[Int]): Unit =
try {
Await.ready(future, 1.second)
for (completion <- future.value) {
println(s"completed $completion")
// In case of failure, also print the cause of the exception, when defined
completion match {
case Failure(exception) if exception.getCause != null =>
println(s" caused by ${exception.getCause}")
_ => ()
}
}
} catch {
// If the future value did not complete within 1 second, the call
// to `Await.ready` throws a TimeoutException
case _: TimeoutException => println(s"did not complete")
}
def reporter(t: Throwable) = println(s"reported $t")
locally {
// using the `global` implicit context
import ExecutionContext.Implicits._
// a successful Future
check(Future(42)) // completed Success(42)
// a Future that completes with an application exception
check(Future(failing())) // completed Failure(java.lang.NumberFormatException: test)
// same, but the exception is thrown somewhere in the chain of combinators
check(testFails()) // completed Failure(java.lang.NumberFormatException: test)
// a Future that does not complete because of a linkage error;
// the trace is printed to stderr by default
check(testCrashes()) // did not complete
// a Future that completes with an operational exception that is wrapped
check(testInterrupted()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.InterruptedException: test
// a Future that completes due to a failed assert, which is bad for the app,
// but is handled the same as interruption
check(testError()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.AssertionError: test
}
locally {
// same as `global`, but adds a custom reporter that will handle uncaught
// exceptions and errors reported to the context
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(null, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
}
locally {
// does not handle uncaught exceptions; the executor would have to be
// configured separately
val executor = ForkJoinPool.commonPool()
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
// the reporter is not invoked and the Future does not complete
check(testCrashes()) // did not complete
}
locally {
// sample minimal configuration for a context and underlying pool that
// use the reporter
val handler: Thread.UncaughtExceptionHandler =
(_: Thread, t: Throwable) => reporter(t)
val executor = new ForkJoinPool(
Runtime.getRuntime.availableProcessors,
ForkJoinPool.defaultForkJoinWorkerThreadFactory, // threads use the pool's handler
handler,
/*asyncMode=*/ false
)
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
}
}
import java.util.concurrent.{ForkJoinPool, TimeoutException}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}
def crashing(): Int = throw new NoSuchMethodError("test")
def failing(): Int = throw new NumberFormatException("test")
def interrupt(): Int = throw new InterruptedException("test")
def erroring(): Int = throw new AssertionError("test")
// computations can fail in the middle of a chain of combinators,
// after the initial Future job has completed
def testCrashes()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => crashing())
def testFails()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => failing())
def testInterrupted()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => interrupt())
def testError()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => erroring())
// Wait for 1 second for the the completion of the passed `future` value and print it
def check(future: Future[Int]): Unit =
try
Await.ready(future, 1.second)
for completion <- future.value do
println(s"completed $completion")
// In case of failure, also print the cause of the exception, when defined
completion match
case Failure(exception) if exception.getCause != null =>
println(s" caused by ${exception.getCause}")
case _ => ()
catch
// If the future value did not complete within 1 second, the call
// to `Await.ready` throws a TimeoutException
case _: TimeoutException => println(s"did not complete")
def reporter(t: Throwable) = println(s"reported $t")
@main def test(): Unit =
locally:
// using the `global` implicit context
import ExecutionContext.Implicits.given
// a successful Future
check(Future(42)) // completed Success(42)
// a Future that completes with an application exception
check(Future(failing())) // completed Failure(java.lang.NumberFormatException: test)
// same, but the exception is thrown somewhere in the chain of combinators
check(testFails()) // completed Failure(java.lang.NumberFormatException: test)
// a Future that does not complete because of a linkage error;
// the trace is printed to stderr by default
check(testCrashes()) // did not complete
// a Future that completes with an operational exception that is wrapped
check(testInterrupted()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.InterruptedException: test
// a Future that completes due to a failed assert, which is bad for the app,
// but is handled the same as interruption
check(testError()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.AssertionError: test
locally:
// same as `global`, but adds a custom reporter that will handle uncaught
// exceptions and errors reported to the context
given ExecutionContext = ExecutionContext.fromExecutor(null, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
locally:
// does not handle uncaught exceptions; the executor would have to be
// configured separately
val executor = ForkJoinPool.commonPool()
given ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
// the reporter is not invoked and the Future does not complete
check(testCrashes()) // did not complete
locally:
// sample minimal configuration for a context and underlying pool that
// use the reporter
val handler: Thread.UncaughtExceptionHandler =
(_: Thread, t: Throwable) => reporter(t)
val executor = new ForkJoinPool(
Runtime.getRuntime.availableProcessors,
ForkJoinPool.defaultForkJoinWorkerThreadFactory, // threads use the pool's handler
handler,
/*asyncMode=*/ false
)
given ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
end test
承諾
到目前為止,我們只考慮使用 Future
方法啟動非同步運算所建立的 Future
物件。不過,也可以使用承諾來建立未來。
未來被定義為為尚未存在的結果建立的唯讀佔位符物件類型,而承諾可以視為可寫入的單一指派容器,它會完成未來。也就是說,承諾可以用於成功完成未來的值(透過「完成」承諾)使用 success
方法。相反地,承諾也可以用於完成未來的例外,透過讓承諾失敗,使用 failure
方法。
承諾 p
完成 p.future
傳回的未來。此未來特定於承諾 p
。根據實作,可能是 p.future eq p
。
考慮以下的生產者-消費者範例,其中一個運算產生一個值,並將它傳遞給另一個使用該值的運算。此值的傳遞是使用承諾完成的。
import scala.concurrent.{ Future, Promise }
import scala.concurrent.ExecutionContext.Implicits.global
val p = Promise[T]()
val f = p.future
val producer = Future {
val r = produceSomething()
p.success(r)
continueDoingSomethingUnrelated()
}
val consumer = Future {
startDoingSomething()
f.foreach { r =>
doSomethingWithResult()
}
}
在此,我們建立一個承諾,並使用它的 future
方法取得它完成的 Future
。然後,我們開始兩個非同步運算。第一個執行一些運算,產生一個值 r
,然後用來完成未來 f
,透過履行承諾 p
。第二個執行一些運算,然後讀取已完成未來 f
的結果 r
。請注意,consumer
可以取得結果,在 producer
任務完成執行 continueDoingSomethingUnrelated()
方法之前。
如前所述,承諾具有單一指定語義。因此,它們只能完成一次。對已完成(或失敗)的承諾呼叫 success
會擲出 IllegalStateException
。
以下範例顯示如何使承諾失敗。
val p = Promise[T]()
val f = p.future
val producer = Future {
val r = someComputation
if (isInvalid(r))
p.failure(new IllegalStateException)
else {
val q = doSomeMoreComputation(r)
p.success(q)
}
}
val p = Promise[T]()
val f = p.future
val producer = Future {
val r = someComputation
if isInvalid(r) then
p.failure(new IllegalStateException)
else
val q = doSomeMoreComputation(r)
p.success(q)
}
在此,producer
計算中間結果 r
,並檢查它是否有效。如果它無效,它會透過以例外完成承諾 p
來使承諾失敗。在此情況下,關聯的未來 f
會失敗。否則,producer
會繼續計算,並最後透過完成承諾 p
來以有效結果完成未來 f
。
承諾也可以使用 complete
方法完成,該方法會採用潛在值 Try[T]
– 類型為 Failure[Throwable]
的失敗結果或類型為 Success[T]
的成功結果。
類似於 success
,對已完成的承諾呼叫 failure
和 complete
會擲出 IllegalStateException
。
使用承諾(其作業已透過單子作業描述)和透過單子作業組成的未來編寫的程式的一個優點是這些程式是確定性的。此處的確定性表示,假設程式中沒有擲出例外,則程式的結果(在未來中觀察到的值)將永遠相同,不論平行程式的執行排程為何。
在某些情況下,客戶端可能只在承諾尚未完成時才想要完成承諾(例如,從幾個不同的未來執行多個 HTTP 要求,而客戶端只對第一個 HTTP 回應感興趣 - 對應於第一個未來完成承諾)。基於這些原因,承諾上存在方法 tryComplete
、trySuccess
和 tryFailure
。客戶端應當意識到使用這些方法會導致程式不是確定性的,而是取決於執行時程。
方法 completeWith
會使用另一個未來完成承諾。在未來完成後,承諾也會使用該未來的結果完成。下列程式會列印 1
val f = Future { 1 }
val p = Promise[Int]()
p.completeWith(f)
p.future.foreach { x =>
println(x)
}
當使用例外中斷承諾時,三個 Throwable
子類型會被特別處理。如果用於中斷承諾的 Throwable
是 scala.runtime.NonLocalReturnControl
,則承諾會使用對應的值完成。如果用於中斷承諾的 Throwable
是 Error
、InterruptedException
或 scala.util.control.ControlThrowable
的實例,則 Throwable
會被包裝為新的 ExecutionException
的原因,而後者會中斷承諾。
使用承諾,onComplete
方法的未來和 future
建構,您可以實作前面所述的任何功能組合組合子。假設您想要實作一個新的組合子 first
,它採用兩個未來 f
和 g
,並產生一個第三個未來,由 f
或 g
(以先完成者為準)完成,但僅在它成功的情況下。
以下是如何這樣做的範例
def first[T](f: Future[T], g: Future[T]): Future[T] = {
val p = Promise[T]
f.foreach { x =>
p.trySuccess(x)
}
g.foreach { x =>
p.trySuccess(x)
}
p.future
}
def first[T](f: Future[T], g: Future[T]): Future[T] =
val p = Promise[T]
f.foreach { x =>
p.trySuccess(x)
}
g.foreach { x =>
p.trySuccess(x)
}
p.future
請注意,在此實作中,如果 f
和 g
都沒有成功,則 first(f, g)
永遠不會完成(無論是具有值或例外)。
公用程式
為了簡化並行應用程式中的時間處理,scala.concurrent
引入 Duration
抽象。不應將 Duration
視為另一個一般時間抽象。它應與並行庫一同使用,並存在於 scala.concurrent
套件中。
Duration
是一個表示時間長度的基本類別。它可以是有限的或無限的。有限的持續時間用 FiniteDuration
類別表示,該類別由 Long
長度和 java.util.concurrent.TimeUnit
構成。無限的持續時間也從 Duration
延伸,僅存在兩個實例,Duration.Inf
和 Duration.MinusInf
。該函式庫還提供幾個 Duration
子類別用於隱式轉換目的,不應使用這些子類別。
抽象 Duration
包含允許的方法
- 轉換為不同的時間單位 (
toNanos
、toMicros
、toMillis
、toSeconds
、toMinutes
、toHours
、toDays
和toUnit(unit: TimeUnit)
)。 - 持續時間比較 (
<
、<=
、>
和>=
)。 - 算術運算 (
+
、-
、*
、/
和unary_-
)。 - 此持續時間與參數中提供的持續時間之間的最小值和最大值 (
min
、max
)。 - 檢查持續時間是否有限 (
isFinite
)。
Duration
可以透過以下方式實例化
- 例如,隱含地從類型
Int
和Long
,val d = 100 millis
。 - 通過傳遞
Long
長度和java.util.concurrent.TimeUnit
,例如,val d = Duration(100, MILLISECONDS)
。 - 通過解析表示時間段的字串,例如,
val d = Duration("1.2 µs")
。
Duration 也提供 unapply
方法,因此可以在模式匹配結構中使用它。範例
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit._
// instantiation
val d1 = Duration(100, MILLISECONDS) // from Long and TimeUnit
val d2 = Duration(100, "millis") // from Long and String
val d3 = 100 millis // implicitly from Long, Int or Double
val d4 = Duration("1.2 µs") // from String
// pattern matching
val Duration(length, unit) = 5 millis
import scala.concurrent.duration.*
import java.util.concurrent.TimeUnit.*
// instantiation
val d1 = Duration(100, MILLISECONDS) // from Long and TimeUnit
val d2 = Duration(100, "millis") // from Long and String
val d3 = 100.millis // implicitly from Long, Int or Double
val d4 = Duration("1.2 µs") // from String
// pattern matching
val Duration(length, unit) = 5.millis