期貨

期貨和承諾

語言

作者:Philipp Haller、Aleksandar Prokopec、Heather Miller、Viktor Klang、Roland Kuhn 和 Vojin Jovanovic

簡介

期貨提供一種方法,可以有效率且非封鎖的方式,推理執行許多平行作業。

Future 是可能尚未存在的數值的佔位元物件。一般而言,Future 的值會同時提供,並可在之後使用。以這種方式組合同時作業,往往會產生更快速的非同步非封鎖平行程式碼。

預設情況下,futures 和 promises 是非封鎖的,使用回呼而不是典型的封鎖操作。為了在語法和概念上簡化回呼的使用,Scala 提供了組合器,例如 flatMapforeachfilter,用於以非封鎖方式組合 futures。封鎖仍然是可能的 - 在絕對必要的情況下,可以封鎖 futures(儘管不建議這樣做)。

典型的 future 如下所示

val inverseFuture: Future[Matrix] = Future {
  fatMatrix.inverse() // non-blocking long lasting computation
}(executionContext)

或使用更慣用的

implicit val ec: ExecutionContext = ...
val inverseFuture : Future[Matrix] = Future {
  fatMatrix.inverse()
} // ec is implicitly passed
given ExecutionContext = ...
val inverseFuture : Future[Matrix] = Future {
  fatMatrix.inverse()
} // execution context is implicitly passed

這兩個程式碼片段都將 fatMatrix.inverse() 的執行委派給 ExecutionContext,並在 inverseFuture 中體現計算結果。

執行內容

Future 和 Promises 圍繞 ExecutionContexts 展開,負責執行計算。

ExecutionContext 類似於 Executor:它可以自由地在新的執行緒、已合併的執行緒或目前的執行緒中執行計算(儘管不建議在目前的執行緒中執行計算 - 如下所述)。

scala.concurrent 套件開箱即用,提供 ExecutionContext 實作,一個全域靜態執行緒池。也可以將 Executor 轉換為 ExecutionContext。最後,使用者可以自由地擴充 ExecutionContext 特質來實作自己的執行內容,儘管只應在罕見的情況下執行此操作。

全域執行內容

ExecutionContext.global 是由 ForkJoinPool 支援的 ExecutionContext。它應足以應付大多數情況,但需要小心使用。 ForkJoinPool 管理有限數量的執行緒(執行緒的最大數量稱為平行處理層級)。並行封鎖運算的數量只能在每個封鎖呼叫都包覆在 blocking 呼叫內時,才能超過平行處理層級(稍後會詳細說明)。否則,全球執行緒環境中的執行緒池可能會發生飢餓,而且無法進行運算。

預設情況下,ExecutionContext.global 會將其底層 fork-join 池的平行處理層級設定為可用處理器的數量(Runtime.availableProcessors)。可以透過設定下列其中一個(或多個)VM 屬性來覆寫此組態

  • scala.concurrent.context.minThreads - 預設為 1
  • scala.concurrent.context.numThreads - 可以是數字或乘數 (N),格式為『xN』;預設為 Runtime.availableProcessors
  • scala.concurrent.context.maxThreads - 預設為 Runtime.availableProcessors

平行處理層級會設定為 numThreads,只要它保持在 [minThreads; maxThreads] 內。

如上所述,ForkJoinPool 可以增加執行緒數量,超過其 parallelismLevel,在封鎖運算的情況下。如 ForkJoinPool API 中所述,這只有在明確通知池時才有可能

import scala.concurrent.{ Future, ExecutionContext }
import scala.concurrent.forkjoin._

// the following is equivalent to `implicit val ec = ExecutionContext.global`
import ExecutionContext.Implicits.global

Future {
  ForkJoinPool.managedBlock(
    new ManagedBlocker {
       var done = false

       def block(): Boolean = {
         try {
           myLock.lock()
           // ...
         } finally {
          done = true
         }
         true
       }

       def isReleasable: Boolean = done
    }
  )
}
import scala.concurrent.{ Future, ExecutionContext }
import scala.concurrent.forkjoin.*

// the following is equivalent to `given ExecutionContext = ExecutionContext.global`
import ExecutionContext.Implicits.global

Future {
  ForkJoinPool.managedBlock(
    new ManagedBlocker {
       var done = false

       def block(): Boolean =
         try
           myLock.lock()
           // ...
         finally
           done = true
         true

       def isReleasable: Boolean = done
    }
  )
}

很幸運地,concurrent 套件提供了一個方便的方法來執行此操作

import scala.concurrent.Future
import scala.concurrent.blocking

Future {
  blocking {
    myLock.lock()
    // ...
  }
}

請注意,blocking 是一個一般性的建構,將會在 下方更深入地討論。

最後但並非最不重要的一點,您必須記住,ForkJoinPool 並非設計用於長時間的封鎖操作。即使已使用 blocking 通知,但池可能不會如您預期般產生新的工作執行緒,而當新的工作執行緒建立時,它們的數量可能多達 32767。舉例來說,以下程式碼將使用 32000 個執行緒

implicit val ec = ExecutionContext.global

for (i <- 1 to 32000) {
  Future {
    blocking {
      Thread.sleep(999999)
    }
  }
}
given ExecutionContext = ExecutionContext.global

for i <- 1 to 32000 do
  Future {
    blocking {
      Thread.sleep(999999)
    }
  }

如果您需要封裝長時間的封鎖操作,我們建議使用專用的 ExecutionContext,例如透過封裝 Java Executor

調整 Java Executor

使用 ExecutionContext.fromExecutor 方法,您可以將 Java Executor 封裝到 ExecutionContext 中。例如

ExecutionContext.fromExecutor(new ThreadPoolExecutor( /* your configuration */ ))
ExecutionContext.fromExecutor(ThreadPoolExecutor( /* your configuration */ ))

同步執行內容

有人可能會想建立一個 ExecutionContext,在目前執行緒中執行運算

val currentThreadExecutionContext = ExecutionContext.fromExecutor(
  new Executor {
    // Do not do this!
    def execute(runnable: Runnable) = runnable.run()
})

這應該避免,因為它會在您未來的執行中引入不確定性。

Future {
  doSomething
}(ExecutionContext.global).map {
  doSomethingElse
}(currentThreadExecutionContext)

doSomethingElse 呼叫可能會在 doSomething 的執行緒或主執行緒中執行,因此可能是非同步或同步的。正如 此處 所解釋的,回呼不應同時具備這兩種特性。

期貨

一個 Future 是一個物件,持有某個可能在某個時間點可用的值。這個值通常是某個其他運算的結果

  1. 如果運算尚未完成,我們說這個 Future未完成的。
  2. 如果運算已完成,並產生一個值或一個例外,我們說這個 Future已完成的。

完成可以採取兩種形式

  1. 當一個 Future 以一個值完成時,我們說這個 future 以那個值成功完成。
  2. 當一個 Future 以運算拋出的例外完成時,我們說這個 Future 以那個例外失敗。

一個 Future 有個重要的特性,就是它只能被指定一次。一旦一個 Future 物件被賦予一個值或一個例外,它就等於不可變的 – 它永遠不會被覆寫。

建立一個 future 物件最簡單的方法是呼叫 Future.apply 方法,它會啟動一個非同步運算,並回傳一個持有那個運算結果的 future。一旦 future 完成,結果就會可用。

請注意 Future[T] 是一個表示 future 物件的型別,而 Future.apply 是建立並排程一個非同步運算的方法,然後回傳一個將以那個運算的結果完成的 future 物件。

這可以用一個範例最好說明。

假設我們想要使用某個熱門社群網站的假設 API 來取得某個特定使用者的朋友清單。我們將開啟一個新的工作階段,然後送出一個要求來取得某個特定使用者的朋友清單

import scala.concurrent._
import ExecutionContext.Implicits.global

val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
  session.getFriends()
}
import scala.concurrent.*
import ExecutionContext.Implicits.global

val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
  session.getFriends()
}

在上面,我們首先匯入 scala.concurrent 套件的內容,以使 Future 型別可見。我們稍後會說明第二個匯入。

我們接著初始化一個會話變數,我們將使用它來使用假設的 createSessionFor 方法,將要求傳送至伺服器。若要取得使用者的朋友清單,必須透過網路傳送要求,這可能會花費很長的時間。這會以呼叫 getFriends 方法來說明,該方法會傳回 List[Friend]。為了在回應抵達之前更有效地使用 CPU,我們不應封鎖程式其餘部分,這個運算應非同步地排程。Future.apply 方法執行完全相同的動作,它會同時執行指定的運算區塊,在本例中會傳送要求至伺服器並等待回應。

一旦伺服器回應,朋友清單就會在未來 f 中提供。

嘗試失敗可能會導致例外。在下列範例中,session 值初始化錯誤,因此 Future 區塊中的運算會擲回 NullPointerException。這個未來 f 接著會因這個例外而失敗,而不是順利完成。

val session = null
val f: Future[List[Friend]] = Future {
  session.getFriends()
}

上方的程式碼列 import ExecutionContext.Implicits.global 會匯入預設的全球執行緒環境。執行緒環境會執行提交給它們的任務,您可以將執行緒環境視為執行緒池。它們對於 Future.apply 方法至關重要,因為它們會處理如何以及何時執行非同步運算。您可以定義自己的執行緒環境並將它們與 Future 搭配使用,但目前來說,只要知道您可以匯入預設的執行緒環境(如上所示)就已足夠。

我們的範例基於假設的社群網路 API,其中運算包含傳送網路要求並等待回應。提供一個包含非同步運算的範例,以便您可以立即試用,是很公平的。假設您有一個文字檔,而且您想要找出特定關鍵字第一次出現的位置。此運算可能包含在從磁碟中擷取檔案內容時的封鎖,因此與運算的其他部分同時執行是有意義的。

val firstOccurrence: Future[Int] = Future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}

回呼

我們現在知道如何啟動非同步運算來建立新的未來值,但我們尚未顯示如何使用結果,一旦結果可用,我們就可以對其執行有用的操作。我們通常對運算的結果感興趣,而不仅仅是其副作用。

在許多未來的實作中,一旦未來的客戶端對其結果感興趣,它就必須封鎖自己的運算並等到未來完成 - 只有這樣才能使用未來的值來繼續自己的運算。儘管這由 Scala Future API 允許,正如我們稍後將展示的,但從效能的角度來看,更好的方法是透過在未來註冊回呼,以完全非封鎖的方式執行。此回呼會在未來完成後非同步呼叫。如果在註冊回呼時未來已經完成,則回呼可能會非同步執行,或在同一個執行緒上循序執行。

註冊回呼最通用的形式是使用 onComplete 方法,它會採用類型為 Try[T] => U 的回呼函式。如果未來完成成功,則會將回呼套用至類型為 Success[T] 的值,否則會套用至類型為 Failure[T] 的值。

Try[T] 類似於 Option[T]Either[T, S],因為它是一個單子,可能持有某種類型的值。不過,它特別設計為持有值或某個可拋出物件。 Option[T] 可以是值 (即 Some[T]) 或完全沒有值 (即 None),而 Try[T] 在持有值時為 Success[T],否則為 Failure[T],後者會持有例外狀況。 Failure[T] 持有的資訊不只是一個單純的 None,它會說明為何沒有值。同時,你可以將 Try[T] 視為 Either[Throwable, T] 的特殊版本,專門用於左邊值為 Throwable 的情況。

回到我們的社群網路範例,假設我們想要擷取最近自己的貼文清單並將它們呈現在螢幕上。我們透過呼叫 getRecentPosts 方法來執行此動作,它會傳回 List[String],也就是最近文字貼文的清單

import scala.util.{Success, Failure}

val f: Future[List[String]] = Future {
  session.getRecentPosts()
}

f.onComplete {
  case Success(posts) => for (post <- posts) println(post)
  case Failure(t) => println("An error has occurred: " + t.getMessage)
}
import scala.util.{Success, Failure}

val f: Future[List[String]] = Future {
  session.getRecentPosts()
}

f.onComplete {
  case Success(posts) => for post <- posts do println(post)
  case Failure(t) => println("An error has occurred: " + t.getMessage)
}

onComplete 方法在於允許用戶端處理失敗和成功的未來運算結果,因此具有普遍性。如果只需要處理成功的結果,可以使用 foreach 回呼

val f: Future[List[String]] = Future {
  session.getRecentPosts()
}

for {
  posts <- f
  post <- posts
} println(post)
val f: Future[List[String]] = Future {
  session.getRecentPosts()
}

for
  posts <- f
  post <- posts
do println(post)

Future 提供一種乾淨的方式,使用 failed 投影來處理僅失敗的結果,它會將 Failure[Throwable] 轉換為 Success[Throwable]。在 投影片 下方的區段中提供這樣做的範例。

回到先前的範例,搜尋關鍵字的第一個出現位置,您可能想要將關鍵字的位置列印到螢幕上

val firstOccurrence: Future[Int] = Future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}

firstOccurrence.onComplete {
  case Success(idx) => println("The keyword first appears at position: " + idx)
  case Failure(t) => println("Could not process file: " + t.getMessage)
}

onCompleteforeach 方法的結果類型都是 Unit,表示無法串連呼叫這些方法。請注意,這是故意的設計,以避免暗示串連呼叫可能暗示已註冊的回呼執行順序(在同一個未來上註冊的回呼是無序的)。

話雖如此,我們現在應該說明回呼確切在何時被呼叫。由於它需要未來中的值可用,因此只能在未來完成後呼叫它。但是,無法保證它會由完成未來的執行緒或建立回呼的執行緒呼叫。相反地,回呼是由某個執行緒在未來物件完成後的一段時間執行。我們說回呼會最終執行。

此外,即使在同一應用程式的不同執行之間,回呼執行的順序也並未預先定義。事實上,回呼可能不會依序一個接著一個呼叫,而可能同時並行執行。這表示在下列範例中,變數 totalA 可能無法設定為計算文字中正確數量的 a 字元大小寫。

@volatile var totalA = 0

val text = Future {
  "na" * 16 + "BATMAN!!!"
}

text.foreach { txt =>
  totalA += txt.count(_ == 'a')
}

text.foreach { txt =>
  totalA += txt.count(_ == 'A')
}

在上方,兩個回呼可能一個接著一個執行,在這種情況下,變數 totalA 會持有預期的值 18。然而,它們也可能並行執行,因此 totalA 可能會變成 162,因為 += 並非原子操作(亦即它包含一個讀取步驟和一個寫入步驟,可能會與其他讀取和寫入任意交錯)。

為了完整起見,此處列出回呼的語意

  1. 在未來註冊 onComplete 回呼可確保在未來最終完成後呼叫對應的封閉。

  2. 註冊 foreach 回呼具有與 onComplete 相同的語意,不同之處在於只有在未來成功完成時才會呼叫封閉。

  3. 在已經完成的未來註冊回呼,最終將執行回呼(如 1 所暗示)。

  4. 如果在未來註冊多個回呼,則其執行的順序未定義。事實上,回呼可能彼此並行執行。然而,特定 ExecutionContext 實作可能導致順序明確定義。

  5. 在某些呼叫回擲出例外情況時,其他呼叫回仍會執行。

  6. 在某些呼叫回從未完成時(例如呼叫回包含無限迴圈),其他呼叫回可能完全不會執行。在這些情況下,潛在的封鎖呼叫回必須使用 blocking 建構(請參閱下方)。

  7. 執行後,呼叫回會從未來物件中移除,因此符合 GC 資格。

函數組合和 For-Comprehensions

我們展示的呼叫回機制足以將未來結果與後續運算串連。然而,它有時不方便且會產生龐大的程式碼。我們透過範例來說明這一點。假設我們有一個用於與貨幣交易服務介接的 API。假設我們想要購買美元,但僅在有利可圖時才購買。我們首先展示如何使用呼叫回來完成此操作

val rateQuote = Future {
  connection.getCurrentValue(USD)
}

for (quote <- rateQuote) {
  val purchase = Future {
    if (isProfitable(quote)) connection.buy(amount, quote)
    else throw new Exception("not profitable")
  }

  for (amount <- purchase)
    println("Purchased " + amount + " USD")
}
val rateQuote = Future {
  connection.getCurrentValue(USD)
}

for quote <- rateQuote do
  val purchase = Future {
    if isProfitable(quote) then connection.buy(amount, quote)
    else throw Exception("not profitable")
  }

  for amount <- purchase do
    println("Purchased " + amount + " USD")

我們從建立未來 rateQuote 開始,它會取得目前的匯率。從伺服器取得此值且未來成功完成後,運算會在 foreach 呼叫回中進行,我們準備決定是否購買。因此,我們建立另一個未來 purchase,它會在有利可圖時決定購買,然後發送請求。最後,在購買完成後,我們會將通知訊息印出到標準輸出。

這是有用的,但由於兩個原因而不方便。首先,我們必須使用 foreach 並在其中嵌套第二個 purchase future。想像一下,在 purchase 完成後,我們想要賣出其他貨幣。我們必須在 foreach 回呼中重複此模式,這會讓程式碼過度縮排、龐大且難以理解。

其次, purchase future 不在其他程式碼的範圍內,只能在 foreach 回呼中對其執行動作。這表示應用程式的其他部分看不到 purchase future,也無法註冊另一個 foreach 回呼,例如,賣出其他貨幣。

由於這兩個原因,futures 提供組合器,允許更直接的組合。其中一個基本的組合器是 map,它給定一個 future 和一個 future 值的對應函式,產生一個新的 future,一旦原始 future 成功完成,就會以對應的值完成。您可以用與對應集合相同的方式來對應 future。

讓我們使用 map 組合器重新編寫前一個範例

val rateQuote = Future {
  connection.getCurrentValue(USD)
}

val purchase = rateQuote.map { quote =>
  if (isProfitable(quote)) connection.buy(amount, quote)
  else throw new Exception("not profitable")
}

purchase.foreach { amount =>
  println("Purchased " + amount + " USD")
}
val rateQuote = Future {
  connection.getCurrentValue(USD)
}

val purchase = rateQuote.map { quote =>
  if isProfitable(quote) then connection.buy(amount, quote)
  else throw Exception("not profitable")
}

purchase.foreach { amount =>
  println("Purchased " + amount + " USD")
}

藉由在 rateQuote 上使用 map,我們消除了 foreach 回呼,更重要的是,消除了巢狀結構。如果我們現在決定要出售其他貨幣,就足夠在 purchase 上再次使用 map

但是,如果 isProfitable 傳回 false 會發生什麼事,因此導致引發例外狀況?在這種情況下,purchase 會因該例外狀況而失敗。此外,假設連線中斷,而且 getCurrentValue 引發例外狀況,導致 rateQuote 失敗。在這種情況下,我們沒有值可以對應,因此 purchase 會自動因與 rateQuote 相同的例外狀況而失敗。

總之,如果原始 future 順利完成,則傳回的 future 會以原始 future 的對應值完成。如果對應函式引發例外狀況,則 future 會以該例外狀況完成。如果原始 future 因例外狀況而失敗,則傳回的 future 也會包含相同的例外狀況。這種例外狀況傳播語意也存在於其他組合子中。

future 的設計目標之一是讓它們可以在 for-comprehension 中使用。基於這個原因,future 也有 flatMapwithFilter 組合子。flatMap 方法會採用一個函式,將值對應到新的 future g,然後傳回一個 future,在 g 完成後完成。

假設我們想要將美元兌換成瑞士法郎 (CHF)。我們必須取得這兩種貨幣的報價,然後根據這兩個報價決定是否買入。以下是 for-comprehension 中使用 flatMapwithFilter 的範例

val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }

val purchase = for {
  usd <- usdQuote
  chf <- chfQuote
  if isProfitable(usd, chf)
} yield connection.buy(amount, chf)

purchase foreach { amount =>
  println("Purchased " + amount + " CHF")
}
val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }

val purchase = for
  usd <- usdQuote
  chf <- chfQuote
  if isProfitable(usd, chf)
yield connection.buy(amount, chf)

purchase.foreach { amount =>
  println("Purchased " + amount + " CHF")
}

usdQuotechfQuote 都完成後,purchase 未來才會完成,它取決於這兩個未來的值,因此它自己的計算無法提早開始。

上面的 for-comprehension 被翻譯成

val purchase = usdQuote.flatMap {
  usd =>
    chfQuote
      .withFilter(chf => isProfitable(usd, chf))
      .map(chf => connection.buy(amount, chf))
}

這比 for-comprehension 難以理解,但我們分析它以更好地理解 flatMap 操作。flatMap 操作將其自己的值映射到其他未來。一旦這個不同的未來完成,結果的未來將用它的值完成。在我們的例子中,flatMap 使用 usdQuote 未來的值將 chfQuote 的值映射到第三個未來,該未來發送請求購買一定數量的瑞士法郎。結果的未來 purchase 僅在從 map 返回的第三個未來完成後才完成。

這可能會令人費解,但幸運的是,flatMap 操作很少在 for-comprehension 之外使用,而 for-comprehension 更容易使用和理解。

filter 組合器會建立一個新的未來,其中僅包含原始未來的值(如果它滿足某些謂詞)。否則,新的未來會因 NoSuchElementException 而失敗。對於呼叫 filter 的未來,其效果與呼叫 withFilter 完全相同。

collectfilter 組合器之間的關係類似於集合 API 中這些方法的關係。

由於 Future 特質在概念上可以包含兩種類型的值(運算結果和例外),因此需要處理例外的組合器。

假設我們根據 rateQuote 決定購買一定數量。connection.buy 方法會取得要購買的 amount 和預期的 quote。它會傳回已購買的數量。如果 quote 在此期間已變更,它會擲回 QuoteChangedException,而且不會購買任何東西。如果我們希望我們的未來包含 0,而不是例外,我們會使用 recover 組合器

val purchase: Future[Int] = rateQuote.map {
  quote => connection.buy(amount, quote)
}.recover {
  case QuoteChangedException() => 0
}

組合子 recover 會建立一個新的 future,如果原始 future 成功完成,則它會包含與原始 future 相同的結果。如果沒有,則會將部分函數參數套用至導致原始 future 失敗的 Throwable。如果它將 Throwable 映射至某個值,則新的 future 會使用該值成功完成。如果部分函數未定義於該 Throwable,則產生的 future 會使用相同的 Throwable 失敗。

組合子 recoverWith 會建立一個新的 future,如果原始 future 成功完成,則它會包含與原始 future 相同的結果。否則,會將部分函數套用至導致原始 future 失敗的 Throwable。如果它將 Throwable 映射至某個 future,則此 future 會使用該 future 的結果完成。它與 recover 的關係類似於 flatMapmap 的關係。

組合子 fallbackTo 會建立一個新的 future,如果此 future 成功完成,則它會包含此 future 的結果,否則會包含參數 future 的成功結果。如果此 future 和參數 future 都失敗,則新的 future 會使用此 future 的例外完成,如下列範例所示,它會嘗試列印美元值,但如果無法取得美元值,則會列印瑞士法郎值

val usdQuote = Future {
  connection.getCurrentValue(USD)
}.map {
  usd => "Value: " + usd + "$"
}
val chfQuote = Future {
  connection.getCurrentValue(CHF)
}.map {
  chf => "Value: " + chf + "CHF"
}

val anyQuote = usdQuote.fallbackTo(chfQuote)

anyQuote.foreach { println(_) }

組合器 andThen 純粹用於產生副作用。它會傳回一個新 future,其結果與目前的 future 完全相同,不論目前的 future 是否失敗。一旦目前的 future 完成結果,對應於 andThen 的封閉就會被呼叫,然後新的 future 會完成與這個 future 相同的結果。這確保了多個 andThen 呼叫會依序執行,就像以下範例中將社群網路的近期貼文儲存到可變動集合,然後將所有貼文呈現在畫面上

val allPosts = mutable.Set[String]()

Future {
  session.getRecentPosts()
}.andThen {
  case Success(posts) => allPosts ++= posts
}.andThen {
  case _ =>
    clearAll()
    for (post <- allPosts) render(post)
}
val allPosts = mutable.Set[String]()

Future {
  session.getRecentPosts()
}.andThen {
  case Success(posts) => allPosts ++= posts
}.andThen {
  case _ =>
    clearAll()
    for post <- allPosts do render(post)
}

總之,future 上的組合器是純粹函數式的。每個組合器都會傳回一個新的 future,與它衍生的 future 有關聯。

投影

為了讓 for-comprehension 能在以例外傳回的結果上執行,future 也有投影。如果原始的 future 失敗,failed 投影會傳回包含 Throwable 型別值的 future。如果原始的 future 成功,failed 投影會失敗,並出現 NoSuchElementException。以下範例會將例外印在畫面上

val f = Future {
  2 / 0
}
for (exc <- f.failed) println(exc)
val f = Future {
  2 / 0
}
for exc <- f.failed do println(exc)

這個範例中的 for-comprehension 會轉譯成

f.failed.foreach(exc => println(exc))

因為 f 在此並未成功,所以封閉會註冊到新成功的 Future[Throwable] 上的 foreach 回呼。以下範例不會在畫面上印出任何東西

val g = Future {
  4 / 2
}
for (exc <- g.failed) println(exc)
val g = Future {
  4 / 2
}
for exc <- g.failed do println(exc)

延伸 Future

計畫支援延伸 Futures API,並提供額外的公用程式方法。這將允許外部架構提供更專業的公用程式。

封鎖

Future 通常是非同步的,且不會封鎖底層執行緒。不過,在某些情況下,有必要進行封鎖。我們區分執行緒封鎖的兩種形式:呼叫從 future 內部封鎖執行緒的任意程式碼,以及從另一個 future 外部封鎖,等待該 future 完成。

在 Future 中進行封鎖

正如在全域 ExecutionContext 中所見,可以使用 blocking 結構來通知 ExecutionContext 有關封鎖呼叫。然而,實作完全由 ExecutionContext 自行決定。雖然有些 ExecutionContext 例如 ExecutionContext.global 會透過 ManagedBlocker 來實作 blocking,但有些執行緒環境例如固定執行緒池

ExecutionContext.fromExecutor(Executors.newFixedThreadPool(x))

將不會執行任何動作,如下所示

implicit val ec =
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))

Future {
  blocking { blockingStuff() }
}
given ExecutionContext =
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))

Future {
  blocking { blockingStuff() }
}

與下列效果相同

Future { blockingStuff() }

封鎖程式碼也可能會擲回例外。在此情況下,例外會轉送給呼叫者。

在 Future 外部進行封鎖

如前所述,強烈建議不要封鎖 future,以確保效能並防止死結。建議使用未來的回呼和組合器來使用其結果。然而,在某些情況下可能需要進行封鎖,而 Futures and Promises API 也支援此功能。

在上述貨幣交易範例中,一個封鎖的地方是在應用程式的最後,以確保所有 future 都已完成。以下是如何封鎖 future 結果的範例

import scala.concurrent._
import scala.concurrent.duration._

object awaitPurchase {
  def main(args: Array[String]): Unit = {
    val rateQuote = Future {
      connection.getCurrentValue(USD)
    }

    val purchase = rateQuote.map { quote =>
      if (isProfitable(quote)) connection.buy(amount, quote)
      else throw new Exception("not profitable")
    }

    Await.result(purchase, 0.nanos)
  }
}
import scala.concurrent.*
import scala.concurrent.duration.*

@main def awaitPurchase =
  val rateQuote = Future {
    connection.getCurrentValue(USD)
  }

  val purchase = rateQuote.map { quote =>
    if isProfitable(quote) then connection.buy(amount, quote)
    else throw Exception("not profitable")
  }

  Await.result(purchase, 0.nanos)

如果 future 失敗,呼叫者會收到 future 失敗的例外。這包括 failed 投影 - 如果原始 future 成功完成,封鎖它會導致擲回 NoSuchElementException

或者,呼叫 Await.ready 會等到 future 完成,但不會擷取其結果。同樣地,如果 future 失敗,呼叫該方法不會擲回例外。

Future 特徵實作具備 ready()result() 方法的 Awaitable 特徵。這些方法無法由客戶端直接呼叫,只能由執行內容呼叫。

例外

當非同步運算拋出未處理的例外時,與這些運算相關聯的未來會失敗。失敗的未來會儲存 Throwable 的執行個體,而不是結果值。Future 提供 failed 投影方法,允許將此 Throwable 視為另一個 Future 的成功值。下列例外會收到特殊處理

  1. scala.runtime.NonLocalReturnControl[_] - 此例外會保留與回傳相關聯的值。通常,方法主體中的 return 建構會轉譯為使用此例外的 throw。不會保留此例外,而是將相關聯的值儲存到未來或承諾中。

  2. ExecutionException - 當運算因未處理的 InterruptedExceptionErrorscala.util.control.ControlThrowable 而失敗時儲存。在此情況下,ExecutionException 會將未處理的例外作為其原因。其背後的原因是防止傳播通常不會由客戶端程式碼處理的關鍵和控制流程相關例外,同時告知客戶端運算在哪些未來失敗。

致命例外 (由 NonFatal 決定) 會從執行失敗非同步運算的執行緒中重新擲出。這會通知管理執行執行緒問題的程式碼,並允許它在必要時快速失敗。請參閱 NonFatal 以取得更精確的說明,說明哪些例外被視為致命。

ExecutionContext.global 預設會透過列印堆疊追蹤來處理致命例外。

致命例外表示與運算相關聯的 Future 永遠不會完成。也就是說,「致命」表示錯誤無法由 ExecutionContext 復原,且也不打算由使用者程式碼處理。相比之下,應用程式程式碼可能會嘗試從「失敗」的 Future 復原,它已完成但有例外。

執行內容文字可以自訂為處理致命例外的報告者。請參閱工廠方法 fromExecutorfromExecutorService

由於有必要為執行執行緒設定 UncaughtExceptionHandler,因此當傳遞 null 執行器時,fromExecutor 會建立一個設定與 global 相同的內容文字,但會使用提供的報告者來處理例外,以提供便利。

以下範例示範如何取得具有自訂錯誤處理的 ExecutionContext,並顯示如上所述的不同例外結果

import java.util.concurrent.{ForkJoinPool, TimeoutException}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}

object Test extends App {
  def crashing(): Int  = throw new NoSuchMethodError("test")
  def failing(): Int   = throw new NumberFormatException("test")
  def interrupt(): Int = throw new InterruptedException("test")
  def erroring(): Int  = throw new AssertionError("test")

  // computations can fail in the middle of a chain of combinators, after the initial Future job has completed
  def testCrashes()(implicit ec: ExecutionContext): Future[Int] =
    Future.unit.map(_ => crashing())
  def testFails()(implicit ec: ExecutionContext): Future[Int] =
    Future.unit.map(_ => failing())
  def testInterrupted()(implicit ec: ExecutionContext): Future[Int] =
    Future.unit.map(_ => interrupt())
  def testError()(implicit ec: ExecutionContext): Future[Int] =
    Future.unit.map(_ => erroring())

  // Wait for 1 second for the the completion of the passed `future` value and print it
  def check(future: Future[Int]): Unit =
    try {
      Await.ready(future, 1.second)
      for (completion <- future.value) {
        println(s"completed $completion")
        // In case of failure, also print the cause of the exception, when defined
        completion match {
          case Failure(exception) if exception.getCause != null =>
            println(s"  caused by ${exception.getCause}")
          _ => ()
        }
      }
    } catch {
      // If the future value did not complete within 1 second, the call
      // to `Await.ready` throws a TimeoutException
      case _: TimeoutException => println(s"did not complete")
    }

  def reporter(t: Throwable) = println(s"reported $t")

  locally {
    // using the `global` implicit context
    import ExecutionContext.Implicits._
    // a successful Future
    check(Future(42))        // completed Success(42)
    // a Future that completes with an application exception
    check(Future(failing())) // completed Failure(java.lang.NumberFormatException: test)
    // same, but the exception is thrown somewhere in the chain of combinators
    check(testFails())       // completed Failure(java.lang.NumberFormatException: test)
    // a Future that does not complete because of a linkage error;
    // the trace is printed to stderr by default
    check(testCrashes())     // did not complete
    // a Future that completes with an operational exception that is wrapped
    check(testInterrupted()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
                             //   caused by java.lang.InterruptedException: test
    // a Future that completes due to a failed assert, which is bad for the app,
    // but is handled the same as interruption
    check(testError())       // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
                             //   caused by java.lang.AssertionError: test
  }
  locally {
    // same as `global`, but adds a custom reporter that will handle uncaught
    // exceptions and errors reported to the context
    implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(null, reporter)
    check(testCrashes())     // reported java.lang.NoSuchMethodError: test
                             // did not complete
  }
  locally {
    // does not handle uncaught exceptions; the executor would have to be
    // configured separately
    val executor = ForkJoinPool.commonPool()
    implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
    // the reporter is not invoked and the Future does not complete
    check(testCrashes())     // did not complete
  }
  locally {
    // sample minimal configuration for a context and underlying pool that
    // use the reporter
    val handler: Thread.UncaughtExceptionHandler =
      (_: Thread, t: Throwable) => reporter(t)
    val executor = new ForkJoinPool(
      Runtime.getRuntime.availableProcessors,
      ForkJoinPool.defaultForkJoinWorkerThreadFactory, // threads use the pool's handler
      handler,
      /*asyncMode=*/ false
    )
    implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
    check(testCrashes())     // reported java.lang.NoSuchMethodError: test
                             // did not complete
  }
}
import java.util.concurrent.{ForkJoinPool, TimeoutException}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}

def crashing(): Int  = throw new NoSuchMethodError("test")
def failing(): Int   = throw new NumberFormatException("test")
def interrupt(): Int = throw new InterruptedException("test")
def erroring(): Int  = throw new AssertionError("test")

// computations can fail in the middle of a chain of combinators,
// after the initial Future job has completed
def testCrashes()(using ExecutionContext): Future[Int] =
  Future.unit.map(_ => crashing())
def testFails()(using ExecutionContext): Future[Int] =
  Future.unit.map(_ => failing())
def testInterrupted()(using ExecutionContext): Future[Int] =
  Future.unit.map(_ => interrupt())
def testError()(using ExecutionContext): Future[Int] =
  Future.unit.map(_ => erroring())

// Wait for 1 second for the the completion of the passed `future` value and print it
def check(future: Future[Int]): Unit =
  try
    Await.ready(future, 1.second)
    for completion <- future.value do
      println(s"completed $completion")
      // In case of failure, also print the cause of the exception, when defined
      completion match
        case Failure(exception) if exception.getCause != null =>
          println(s"  caused by ${exception.getCause}")
        case _ => ()
  catch
    // If the future value did not complete within 1 second, the call
    // to `Await.ready` throws a TimeoutException
    case _: TimeoutException => println(s"did not complete")

def reporter(t: Throwable) = println(s"reported $t")

@main def test(): Unit =
  locally:
    // using the `global` implicit context
    import ExecutionContext.Implicits.given
    // a successful Future
    check(Future(42))        // completed Success(42)
    // a Future that completes with an application exception
    check(Future(failing())) // completed Failure(java.lang.NumberFormatException: test)
    // same, but the exception is thrown somewhere in the chain of combinators
    check(testFails())       // completed Failure(java.lang.NumberFormatException: test)
    // a Future that does not complete because of a linkage error;
    // the trace is printed to stderr by default
    check(testCrashes())     // did not complete
    // a Future that completes with an operational exception that is wrapped
    check(testInterrupted()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
                             //   caused by java.lang.InterruptedException: test
    // a Future that completes due to a failed assert, which is bad for the app,
    // but is handled the same as interruption
    check(testError())       // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
                             //   caused by java.lang.AssertionError: test

  locally:
    // same as `global`, but adds a custom reporter that will handle uncaught
    // exceptions and errors reported to the context
    given ExecutionContext = ExecutionContext.fromExecutor(null, reporter)
    check(testCrashes())     // reported java.lang.NoSuchMethodError: test
                             // did not complete

  locally:
    // does not handle uncaught exceptions; the executor would have to be
    // configured separately
    val executor = ForkJoinPool.commonPool()
    given ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
    // the reporter is not invoked and the Future does not complete
    check(testCrashes())     // did not complete

  locally:
    // sample minimal configuration for a context and underlying pool that
    // use the reporter
    val handler: Thread.UncaughtExceptionHandler =
      (_: Thread, t: Throwable) => reporter(t)
    val executor = new ForkJoinPool(
      Runtime.getRuntime.availableProcessors,
      ForkJoinPool.defaultForkJoinWorkerThreadFactory, // threads use the pool's handler
      handler,
      /*asyncMode=*/ false
    )
    given ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
    check(testCrashes())     // reported java.lang.NoSuchMethodError: test
                             // did not complete
end test

承諾

到目前為止,我們只考慮使用 Future 方法啟動非同步運算所建立的 Future 物件。不過,也可以使用承諾來建立未來。

未來被定義為為尚未存在的結果建立的唯讀佔位符物件類型,而承諾可以視為可寫入的單一指派容器,它會完成未來。也就是說,承諾可以用於成功完成未來的值(透過「完成」承諾)使用 success 方法。相反地,承諾也可以用於完成未來的例外,透過讓承諾失敗,使用 failure 方法。

承諾 p 完成 p.future 傳回的未來。此未來特定於承諾 p。根據實作,可能是 p.future eq p

考慮以下的生產者-消費者範例,其中一個運算產生一個值,並將它傳遞給另一個使用該值的運算。此值的傳遞是使用承諾完成的。

import scala.concurrent.{ Future, Promise }
import scala.concurrent.ExecutionContext.Implicits.global

val p = Promise[T]()
val f = p.future

val producer = Future {
  val r = produceSomething()
  p.success(r)
  continueDoingSomethingUnrelated()
}

val consumer = Future {
  startDoingSomething()
  f.foreach { r =>
    doSomethingWithResult()
  }
}

在此,我們建立一個承諾,並使用它的 future 方法取得它完成的 Future。然後,我們開始兩個非同步運算。第一個執行一些運算,產生一個值 r,然後用來完成未來 f,透過履行承諾 p。第二個執行一些運算,然後讀取已完成未來 f 的結果 r。請注意,consumer 可以取得結果,在 producer 任務完成執行 continueDoingSomethingUnrelated() 方法之前。

如前所述,承諾具有單一指定語義。因此,它們只能完成一次。對已完成(或失敗)的承諾呼叫 success 會擲出 IllegalStateException

以下範例顯示如何使承諾失敗。

val p = Promise[T]()
val f = p.future

val producer = Future {
  val r = someComputation
  if (isInvalid(r))
    p.failure(new IllegalStateException)
  else {
    val q = doSomeMoreComputation(r)
    p.success(q)
  }
}
val p = Promise[T]()
val f = p.future

val producer = Future {
  val r = someComputation
  if isInvalid(r) then
    p.failure(new IllegalStateException)
  else
    val q = doSomeMoreComputation(r)
    p.success(q)
}

在此,producer 計算中間結果 r,並檢查它是否有效。如果它無效,它會透過以例外完成承諾 p 來使承諾失敗。在此情況下,關聯的未來 f 會失敗。否則,producer 會繼續計算,並最後透過完成承諾 p 來以有效結果完成未來 f

承諾也可以使用 complete 方法完成,該方法會採用潛在值 Try[T] – 類型為 Failure[Throwable] 的失敗結果或類型為 Success[T] 的成功結果。

類似於 success,對已完成的承諾呼叫 failurecomplete 會擲出 IllegalStateException

使用承諾(其作業已透過單子作業描述)和透過單子作業組成的未來編寫的程式的一個優點是這些程式是確定性的。此處的確定性表示,假設程式中沒有擲出例外,則程式的結果(在未來中觀察到的值)將永遠相同,不論平行程式的執行排程為何。

在某些情況下,客戶端可能只在承諾尚未完成時才想要完成承諾(例如,從幾個不同的未來執行多個 HTTP 要求,而客戶端只對第一個 HTTP 回應感興趣 - 對應於第一個未來完成承諾)。基於這些原因,承諾上存在方法 tryCompletetrySuccesstryFailure。客戶端應當意識到使用這些方法會導致程式不是確定性的,而是取決於執行時程。

方法 completeWith 會使用另一個未來完成承諾。在未來完成後,承諾也會使用該未來的結果完成。下列程式會列印 1

val f = Future { 1 }
val p = Promise[Int]()

p.completeWith(f)

p.future.foreach { x =>
  println(x)
}

當使用例外中斷承諾時,三個 Throwable 子類型會被特別處理。如果用於中斷承諾的 Throwablescala.runtime.NonLocalReturnControl,則承諾會使用對應的值完成。如果用於中斷承諾的 ThrowableErrorInterruptedExceptionscala.util.control.ControlThrowable 的實例,則 Throwable 會被包裝為新的 ExecutionException 的原因,而後者會中斷承諾。

使用承諾,onComplete 方法的未來和 future 建構,您可以實作前面所述的任何功能組合組合子。假設您想要實作一個新的組合子 first,它採用兩個未來 fg,並產生一個第三個未來,由 fg(以先完成者為準)完成,但僅在它成功的情況下。

以下是如何這樣做的範例

def first[T](f: Future[T], g: Future[T]): Future[T] = {
  val p = Promise[T]

  f.foreach { x =>
    p.trySuccess(x)
  }

  g.foreach { x =>
    p.trySuccess(x)
  }

  p.future
}
def first[T](f: Future[T], g: Future[T]): Future[T] =
  val p = Promise[T]

  f.foreach { x =>
    p.trySuccess(x)
  }

  g.foreach { x =>
    p.trySuccess(x)
  }

  p.future

請注意,在此實作中,如果 fg 都沒有成功,則 first(f, g) 永遠不會完成(無論是具有值或例外)。

公用程式

為了簡化並行應用程式中的時間處理,scala.concurrent 引入 Duration 抽象。不應將 Duration 視為另一個一般時間抽象。它應與並行庫一同使用,並存在於 scala.concurrent 套件中。

Duration 是一個表示時間長度的基本類別。它可以是有限的或無限的。有限的持續時間用 FiniteDuration 類別表示,該類別由 Long 長度和 java.util.concurrent.TimeUnit 構成。無限的持續時間也從 Duration 延伸,僅存在兩個實例,Duration.InfDuration.MinusInf。該函式庫還提供幾個 Duration 子類別用於隱式轉換目的,不應使用這些子類別。

抽象 Duration 包含允許的方法

  1. 轉換為不同的時間單位 (toNanostoMicrostoMillistoSecondstoMinutestoHourstoDaystoUnit(unit: TimeUnit))。
  2. 持續時間比較 (<<=>>=)。
  3. 算術運算 (+-*/unary_-)。
  4. 此持續時間與參數中提供的持續時間之間的最小值和最大值 (minmax)。
  5. 檢查持續時間是否有限 (isFinite)。

Duration 可以透過以下方式實例化

  1. 例如,隱含地從類型 IntLongval d = 100 millis
  2. 通過傳遞 Long 長度和 java.util.concurrent.TimeUnit,例如,val d = Duration(100, MILLISECONDS)
  3. 通過解析表示時間段的字串,例如,val d = Duration("1.2 µs")

Duration 也提供 unapply 方法,因此可以在模式匹配結構中使用它。範例

import scala.concurrent.duration._
import java.util.concurrent.TimeUnit._

// instantiation
val d1 = Duration(100, MILLISECONDS) // from Long and TimeUnit
val d2 = Duration(100, "millis") // from Long and String
val d3 = 100 millis // implicitly from Long, Int or Double
val d4 = Duration("1.2 µs") // from String

// pattern matching
val Duration(length, unit) = 5 millis
import scala.concurrent.duration.*
import java.util.concurrent.TimeUnit.*

// instantiation
val d1 = Duration(100, MILLISECONDS) // from Long and TimeUnit
val d2 = Duration(100, "millis") // from Long and String
val d3 = 100.millis // implicitly from Long, Int or Double
val d4 = Duration("1.2 µs") // from String

// pattern matching
val Duration(length, unit) = 5.millis

此頁面的貢獻者