Scala 3 — 書籍

並行性

語言

當您想在 Scala 中撰寫平行和並行應用程式時,可以使用原生 Java Thread—但 Scala Future 提供更高級別和慣用的方法,因此較為優先,且在此章節中涵蓋。

簡介

以下是 Scala Future 的 Scaladoc 說明

Future 代表一個值,它可能目前無法取得,但會在某個時間點取得,或是在無法取得該值時產生例外狀況。」

為了說明這代表什麼意思,我們先來看看單執行緒程式設計。在單執行緒的世界中,您可以將方法呼叫的結果繫結到變數,如下所示

def aShortRunningTask(): Int = 42
val x = aShortRunningTask()

在此程式碼中,值 42 會立即繫結到 x

當您使用 Future 時,指派程序看起來類似

def aLongRunningTask(): Future[Int] = ???
val x = aLongRunningTask()

但此案例中的主要差異在於,由於 aLongRunningTask 需要不確定的時間才能傳回,因此 x 中的值可能目前無法取得,但它會在某個時間點(未來)取得。

另一種看待此問題的方式是從封鎖的角度。在此單執行緒範例中,println 陳述式不會印出,直到 aShortRunningTask 完成

def aShortRunningTask(): Int =
  Thread.sleep(500)
  42
val x = aShortRunningTask()
println("Here")

相反地,如果 aShortRunningTask 是以 Future 建立,則 println 陳述式會幾乎立即印出,因為 aShortRunningTask 會在其他執行緒上產生,它不會封鎖。

在本章中,您將會看到如何使用 future,包括如何在平行處理中執行多個 future,以及在 for 運算式中結合其結果。您還會看到用於處理 future 中的值(一旦傳回)的方法範例。

當您思考未來時,重要的是要知道它們被視為一次性的,「在其他執行緒上處理這個相對較慢的運算,並在完成時回呼我一個結果」建構。作為對比點,Akka actor 被用於執行很長的時間,並在其生命週期中回應許多請求。雖然一個 actor 可能永遠存在,但一個 future 最終會包含僅執行一次的運算結果。

REPL 中的範例

future 用於建立一個暫時的並行處理區塊。例如,當您需要呼叫執行時間不確定的演算法時,您會使用 future,例如呼叫遠端微服務,因此您想在主執行緒之外執行它。

為了展示其運作方式,讓我們從 REPL 中的 Future 範例開始。首先,貼上這些必要的 import 陳述式

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

現在您已準備好建立一個 future。對於這個範例,首先定義一個執行時間長、單執行緒的演算法

def longRunningAlgorithm() =
  Thread.sleep(10_000)
  42

那個精巧的演算法會在延遲十秒後傳回整數值 42。現在透過將它包進 Future 建構函式中,並將結果指定給一個變數,來呼叫那個演算法

scala> val eventualInt = Future(longRunningAlgorithm())
eventualInt: scala.concurrent.Future[Int] = Future(<not completed>)

您的運算(呼叫 longRunningAlgorithm())會立即開始執行。如果您立即檢查變數 eventualInt 的值,您會看到 future 尚未完成

scala> eventualInt
val res1: scala.concurrent.Future[Int] = Future(<not completed>)

但如果您在十秒後再次檢查,您會看到它已成功完成

scala> eventualInt
val res2: scala.concurrent.Future[Int] = Future(Success(42))

雖然那是一個相對簡單的範例,但它顯示了基本方法:只要用您的執行時間長的演算法建構一個新的 Future 即可。

需要注意的是,您預期的 42 包在 Success 中,而 Success 又包在 Future 中。這是要了解的一個關鍵概念:Future 中的值永遠是 scala.util.Try 類型之一的實例:SuccessFailure。因此,當您使用 future 的結果時,請使用一般的 Try 處理技術。

使用 map 與 future

Futuremap 方法,您可以像使用集合上的 map 方法一樣使用它。這是您在建立變數 a 之後立即呼叫 map 時,結果的樣子

scala> val a = Future(longRunningAlgorithm()).map(_ * 2)
a: scala.concurrent.Future[Int] = Future(<not completed>)

如所示,對於使用 longRunningAlgorithm 建立的 future,初始輸出顯示 Future(<not completed>)。但當您在十秒後檢查 a 的值時,您會看到它包含預期的結果 84

scala> a
res1: scala.concurrent.Future[Int] = Future(Success(84))

再次強調,成功的結果包在 SuccessFuture 中。

使用 callback 方法與 future

除了像 map 這樣的更高階函數之外,您也可以使用 callback 方法與 future。一個常用的 callback 方法是 onComplete,它會取一個部分函數,您可以在其中處理 SuccessFailure 的情況

Future(longRunningAlgorithm()).onComplete {
  case Success(value) => println(s"Got the callback, value = $value")
  case Failure(e) => e.printStackTrace
}

當您將該程式碼貼到 REPL 時,最終會看到結果

Got the callback, value = 42

其他未來方法

Future 類別有其他您可以使用的函式。它有一些您在 Scala 集合類別中可以找到的函式,包括

  • filter
  • flatMap
  • map

它的回呼函式是

  • onComplete
  • andThen
  • foreach

其他轉換函式包括

  • fallbackTo
  • recover
  • recoverWith

請參閱 未來和承諾 頁面,以了解未來可用的其他函式。

執行多個未來並加入其結果

若要並行執行多個運算,並在所有未來完成時加入其結果,請使用 for 表達式。

正確的方法是

  1. 啟動傳回 Future 結果的運算
  2. for 表達式中合併其結果
  3. 使用 onComplete 或類似技術提取合併的結果

範例

正確方法的三個步驟顯示在以下範例中。重點是您首先啟動傳回未來的運算,然後在 for 表達式中加入它們

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

val startTime = System.currentTimeMillis()
def delta() = System.currentTimeMillis() - startTime
def sleep(millis: Long) = Thread.sleep(millis)

@main def multipleFutures1 =

  println(s"creating the futures:   ${delta()}")

  // (1) start the computations that return futures
  val f1 = Future { sleep(800); 1 }   // eventually returns 1
  val f2 = Future { sleep(200); 2 }   // eventually returns 2
  val f3 = Future { sleep(400); 3 }   // eventually returns 3

  // (2) join the futures in a `for` expression
  val result =
    for
      r1 <- f1
      r2 <- f2
      r3 <- f3
    yield
      println(s"in the 'yield': ${delta()}")
      (r1 + r2 + r3)

  // (3) process the result
  result.onComplete {
    case Success(x) =>
      println(s"in the Success case: ${delta()}")
      println(s"result = $x")
    case Failure(e) =>
      e.printStackTrace
  }

  println(s"before the 'sleep(3000)': ${delta()}")

  // important for a little parallel demo: keep the jvm alive
  sleep(3000)

當您執行該應用程式時,您會看到類似這樣的輸出

creating the futures:   1
before the 'sleep(3000)': 2
in the 'yield': 806
in the Success case: 806
result = 6

如輸出所示,未來很快就會建立,而且在方法結束處的 sleep(3000) 語句正前方的列印語句在短短兩毫秒內就會執行完畢。所有這些程式碼都是在 JVM 的主執行緒上執行的。然後,在 806 毫秒時,三個未來完成,yield 區塊中的程式碼就會執行。然後,程式碼會立即轉到 onComplete 方法中的 Success 案例。

806 毫秒的輸出是看出三個運算平行執行的關鍵。如果它們是循序執行的,總時間會大約是 1,400 毫秒,也就是三個運算的睡眠時間總和。但由於它們是平行執行的,總時間只會比執行時間最長的運算稍長:f1,也就是 800 毫秒。

請注意,如果運算是在 for 表達式中執行的,它們會循序執行,而不是平行執行。

// Sequential execution (no parallelism!)
for
  r1 <- Future { sleep(800); 1 }
  r2 <- Future { sleep(200); 2 }
  r3 <- Future { sleep(400); 3 }
yield
  r1 + r2 + r3

因此,如果你希望運算有可能平行執行,請記得在 for 表達式外執行它們。

傳回未來的函式

到目前為止,你已經看過如何將單執行緒演算法傳遞到 Future 建構函式中。你可以使用相同的技巧建立傳回 Future 的函式。

// simulate a slow-running method
def slowlyDouble(x: Int, delay: Long): Future[Int] = Future {
  sleep(delay)
  x * 2
}

與前述範例一樣,只要將函式呼叫的結果指定給新的變數即可。然後,當你立即檢查結果時,你會看到它尚未完成,但在延遲時間後,未來將會有結果。

scala> val f = slowlyDouble(2, 5_000L)
val f: concurrent.Future[Int] = Future(<not completed>)

scala> f
val res0: concurrent.Future[Int] = Future(<not completed>)

scala> f
val res1: concurrent.Future[Int] = Future(Success(4))

關於未來的重點

希望這些範例能讓你了解 Scala 未來的運作方式。總而言之,關於未來的幾個重點如下:

  • 您建立 future 以在主執行緒之外執行任務
  • future 用於一次性的、潛在長時間執行的並行任務,這些任務最終會傳回一個值;它們會建立一個暫時的並行處理空間
  • future 會在您建立它的時候開始執行
  • future 比執行緒好的地方在於它們與 for 運算式一起使用,並附帶各種簡化並行執行緒處理程序的回呼方法
  • 當您使用 future 時,您不必擔心執行緒管理的低階細節
  • 您可以使用 onCompleteandThen 等回呼方法,或 filtermap 等轉換方法來處理 future 的結果
  • Future 內的值永遠是 Try 類型的其中一個實例:SuccessFailure
  • 如果您使用多個 future 來產生單一結果,請在 for 運算式中將它們結合起來

此外,正如您在這些範例中的 import 陳述式中所見,Scala Future 取決於 ExecutionContext

有關 future 的更多詳細資訊,請參閱 Futures and Promises,這篇文章討論 future、promise 和執行內容。它也提供有關如何將 for 表達式轉換為 flatMap 動作的討論。

此頁面的貢獻者