當您想在 Scala 中撰寫平行和並行應用程式時,可以使用原生 Java Thread
—但 Scala Future 提供更高級別和慣用的方法,因此較為優先,且在此章節中涵蓋。
簡介
以下是 Scala Future
的 Scaladoc 說明
「
Future
代表一個值,它可能目前無法取得,但會在某個時間點取得,或是在無法取得該值時產生例外狀況。」
為了說明這代表什麼意思,我們先來看看單執行緒程式設計。在單執行緒的世界中,您可以將方法呼叫的結果繫結到變數,如下所示
def aShortRunningTask(): Int = 42
val x = aShortRunningTask()
在此程式碼中,值 42
會立即繫結到 x
。
當您使用 Future
時,指派程序看起來類似
def aLongRunningTask(): Future[Int] = ???
val x = aLongRunningTask()
但此案例中的主要差異在於,由於 aLongRunningTask
需要不確定的時間才能傳回,因此 x
中的值可能目前無法取得,但它會在某個時間點(未來)取得。
另一種看待此問題的方式是從封鎖的角度。在此單執行緒範例中,println
陳述式不會印出,直到 aShortRunningTask
完成
def aShortRunningTask(): Int =
Thread.sleep(500)
42
val x = aShortRunningTask()
println("Here")
相反地,如果 aShortRunningTask
是以 Future
建立,則 println
陳述式會幾乎立即印出,因為 aShortRunningTask
會在其他執行緒上產生,它不會封鎖。
在本章中,您將會看到如何使用 future,包括如何在平行處理中執行多個 future,以及在 for
運算式中結合其結果。您還會看到用於處理 future 中的值(一旦傳回)的方法範例。
當您思考未來時,重要的是要知道它們被視為一次性的,「在其他執行緒上處理這個相對較慢的運算,並在完成時回呼我一個結果」建構。作為對比點,Akka actor 被用於執行很長的時間,並在其生命週期中回應許多請求。雖然一個 actor 可能永遠存在,但一個 future 最終會包含僅執行一次的運算結果。
REPL 中的範例
future 用於建立一個暫時的並行處理區塊。例如,當您需要呼叫執行時間不確定的演算法時,您會使用 future,例如呼叫遠端微服務,因此您想在主執行緒之外執行它。
為了展示其運作方式,讓我們從 REPL 中的 Future
範例開始。首先,貼上這些必要的 import
陳述式
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
現在您已準備好建立一個 future。對於這個範例,首先定義一個執行時間長、單執行緒的演算法
def longRunningAlgorithm() =
Thread.sleep(10_000)
42
那個精巧的演算法會在延遲十秒後傳回整數值 42
。現在透過將它包進 Future
建構函式中,並將結果指定給一個變數,來呼叫那個演算法
scala> val eventualInt = Future(longRunningAlgorithm())
eventualInt: scala.concurrent.Future[Int] = Future(<not completed>)
您的運算(呼叫 longRunningAlgorithm()
)會立即開始執行。如果您立即檢查變數 eventualInt
的值,您會看到 future 尚未完成
scala> eventualInt
val res1: scala.concurrent.Future[Int] = Future(<not completed>)
但如果您在十秒後再次檢查,您會看到它已成功完成
scala> eventualInt
val res2: scala.concurrent.Future[Int] = Future(Success(42))
雖然那是一個相對簡單的範例,但它顯示了基本方法:只要用您的執行時間長的演算法建構一個新的 Future
即可。
需要注意的是,您預期的 42
包在 Success
中,而 Success
又包在 Future
中。這是要了解的一個關鍵概念:Future
中的值永遠是 scala.util.Try
類型之一的實例:Success
或 Failure
。因此,當您使用 future 的結果時,請使用一般的 Try
處理技術。
使用 map
與 future
Future
有 map
方法,您可以像使用集合上的 map
方法一樣使用它。這是您在建立變數 a
之後立即呼叫 map
時,結果的樣子
scala> val a = Future(longRunningAlgorithm()).map(_ * 2)
a: scala.concurrent.Future[Int] = Future(<not completed>)
如所示,對於使用 longRunningAlgorithm
建立的 future,初始輸出顯示 Future(<not completed>)
。但當您在十秒後檢查 a
的值時,您會看到它包含預期的結果 84
scala> a
res1: scala.concurrent.Future[Int] = Future(Success(84))
再次強調,成功的結果包在 Success
和 Future
中。
使用 callback 方法與 future
除了像 map
這樣的更高階函數之外,您也可以使用 callback 方法與 future。一個常用的 callback 方法是 onComplete
,它會取一個部分函數,您可以在其中處理 Success
和 Failure
的情況
Future(longRunningAlgorithm()).onComplete {
case Success(value) => println(s"Got the callback, value = $value")
case Failure(e) => e.printStackTrace
}
當您將該程式碼貼到 REPL 時,最終會看到結果
Got the callback, value = 42
其他未來方法
Future
類別有其他您可以使用的函式。它有一些您在 Scala 集合類別中可以找到的函式,包括
filter
flatMap
map
它的回呼函式是
onComplete
andThen
foreach
其他轉換函式包括
fallbackTo
recover
recoverWith
請參閱 未來和承諾 頁面,以了解未來可用的其他函式。
執行多個未來並加入其結果
若要並行執行多個運算,並在所有未來完成時加入其結果,請使用 for
表達式。
正確的方法是
- 啟動傳回
Future
結果的運算 - 在
for
表達式中合併其結果 - 使用
onComplete
或類似技術提取合併的結果
範例
正確方法的三個步驟顯示在以下範例中。重點是您首先啟動傳回未來的運算,然後在 for
表達式中加入它們
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
val startTime = System.currentTimeMillis()
def delta() = System.currentTimeMillis() - startTime
def sleep(millis: Long) = Thread.sleep(millis)
@main def multipleFutures1 =
println(s"creating the futures: ${delta()}")
// (1) start the computations that return futures
val f1 = Future { sleep(800); 1 } // eventually returns 1
val f2 = Future { sleep(200); 2 } // eventually returns 2
val f3 = Future { sleep(400); 3 } // eventually returns 3
// (2) join the futures in a `for` expression
val result =
for
r1 <- f1
r2 <- f2
r3 <- f3
yield
println(s"in the 'yield': ${delta()}")
(r1 + r2 + r3)
// (3) process the result
result.onComplete {
case Success(x) =>
println(s"in the Success case: ${delta()}")
println(s"result = $x")
case Failure(e) =>
e.printStackTrace
}
println(s"before the 'sleep(3000)': ${delta()}")
// important for a little parallel demo: keep the jvm alive
sleep(3000)
當您執行該應用程式時,您會看到類似這樣的輸出
creating the futures: 1
before the 'sleep(3000)': 2
in the 'yield': 806
in the Success case: 806
result = 6
如輸出所示,未來很快就會建立,而且在方法結束處的 sleep(3000)
語句正前方的列印語句在短短兩毫秒內就會執行完畢。所有這些程式碼都是在 JVM 的主執行緒上執行的。然後,在 806 毫秒時,三個未來完成,yield
區塊中的程式碼就會執行。然後,程式碼會立即轉到 onComplete
方法中的 Success
案例。
806 毫秒的輸出是看出三個運算平行執行的關鍵。如果它們是循序執行的,總時間會大約是 1,400 毫秒,也就是三個運算的睡眠時間總和。但由於它們是平行執行的,總時間只會比執行時間最長的運算稍長:f1
,也就是 800 毫秒。
請注意,如果運算是在
for
表達式中執行的,它們會循序執行,而不是平行執行。// Sequential execution (no parallelism!) for r1 <- Future { sleep(800); 1 } r2 <- Future { sleep(200); 2 } r3 <- Future { sleep(400); 3 } yield r1 + r2 + r3
因此,如果你希望運算有可能平行執行,請記得在
for
表達式外執行它們。
傳回未來的函式
到目前為止,你已經看過如何將單執行緒演算法傳遞到 Future
建構函式中。你可以使用相同的技巧建立傳回 Future
的函式。
// simulate a slow-running method
def slowlyDouble(x: Int, delay: Long): Future[Int] = Future {
sleep(delay)
x * 2
}
與前述範例一樣,只要將函式呼叫的結果指定給新的變數即可。然後,當你立即檢查結果時,你會看到它尚未完成,但在延遲時間後,未來將會有結果。
scala> val f = slowlyDouble(2, 5_000L)
val f: concurrent.Future[Int] = Future(<not completed>)
scala> f
val res0: concurrent.Future[Int] = Future(<not completed>)
scala> f
val res1: concurrent.Future[Int] = Future(Success(4))
關於未來的重點
希望這些範例能讓你了解 Scala 未來的運作方式。總而言之,關於未來的幾個重點如下:
- 您建立 future 以在主執行緒之外執行任務
- future 用於一次性的、潛在長時間執行的並行任務,這些任務最終會傳回一個值;它們會建立一個暫時的並行處理空間
- future 會在您建立它的時候開始執行
- future 比執行緒好的地方在於它們與
for
運算式一起使用,並附帶各種簡化並行執行緒處理程序的回呼方法 - 當您使用 future 時,您不必擔心執行緒管理的低階細節
- 您可以使用
onComplete
和andThen
等回呼方法,或filter
、map
等轉換方法來處理 future 的結果 Future
內的值永遠是Try
類型的其中一個實例:Success
或Failure
- 如果您使用多個 future 來產生單一結果,請在
for
運算式中將它們結合起來
此外,正如您在這些範例中的 import
陳述式中所見,Scala Future
取決於 ExecutionContext
。
有關 future 的更多詳細資訊,請參閱 Futures and Promises,這篇文章討論 future、promise 和執行內容。它也提供有關如何將 for
表達式轉換為 flatMap
動作的討論。