平行集合

概觀

語言

Aleksandar Prokopec、Heather Miller

如果您使用的是 Scala 2.13+,而且想使用 Scala 的平行集合,您必須匯入一個獨立的模組,如 這裡 所述。

動機

近年來,處理器製造商已從單核心架構轉移到多核心架構,學術界和業界都承認熱門平行程式設計仍然是一項艱鉅的挑戰。

平行集合包含在 Scala 標準函式庫中,旨在透過免除使用者處理低階平行化細節,同時提供他們熟悉且簡單的高階抽象,來簡化平行程式設計。希望,並且仍然是,集合抽象背後的隱式平行化會讓可靠的平行執行更接近主流開發人員的工作流程。

這個想法很簡單 - 集合是一個理解良好且經常使用的程式設計抽象。而且,由於它們的規律性,它們能夠有效地並行化,而且是透明的。透過允許使用者「替換」順序集合,以在平行中操作的集合,Scala 的平行集合向前邁進一大步,讓平行化更輕易地帶入更多程式碼中。

採用以下順序範例,我們對某個大型集合執行單子操作

val list = (1 to 10000).toList
list.map(_ + 42)

若要平行執行相同的操作,只要呼叫順序集合 list 上的 par 方法即可。之後,可以像一般使用順序集合一樣使用平行集合。只要執行下列動作,即可將上述範例平行化

list.par.map(_ + 42)

Scala 平行集合函式庫的設計靈感來自 Scala 的(順序)集合函式庫(在 2.8 中引入),並與其深度整合。它提供許多來自 Scala(順序)集合函式庫的重要資料結構的平行對應,包括

  • ParArray
  • ParVector
  • mutable.ParHashMap
  • mutable.ParHashSet
  • immutable.ParHashMap
  • immutable.ParHashSet
  • ParRange
  • ParTrieMapcollection.concurrent.TrieMap 在 2.10 中是新的)

除了共用架構外,Scala 的平行集合程式庫還與順序集合程式庫共用可擴充性。也就是說,使用者可以整合自己的集合類型,並自動繼承標準程式庫中其他平行集合上所有預先定義的(平行)運算,就像一般的順序集合一樣。

一些範例

為了說明平行集合的普遍性和實用性,我們提供一些簡單的範例用法,所有範例都會透明地平行執行。

注意:以下一些範例會對小型集合進行運算,這並不建議。它們僅提供作為說明用途的範例。一般來說,當集合大小較大時,通常數千個元素,加速效果才會顯著。(有關平行集合大小與效能之間關係的更多資訊,請參閱本指南效能區段的相關小節。)

map

使用平行 mapString 集合轉換為全大寫

scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)

scala> lastNames.map(_.toUpperCase)
res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(SMITH, JONES, FRANKENSTEIN, BACH, JACKSON, RODIN)

fold

ParArray 上透過 fold 進行加總

scala> val parArray = (1 to 10000).toArray.par
parArray: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3, ...

scala> parArray.fold(0)(_ + _)
res0: Int = 50005000

filter

使用平行 filter 選擇字母「I」之後的姓氏。

scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)

scala> lastNames.filter(_.head >= 'J')
res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Jackson, Rodin)

建立平行集合

平行集合的用法應該與順序集合完全相同,唯一值得注意的差異是取得平行集合的方式。

一般來說,建立平行集合有兩種選擇

首先,使用 new 關鍵字和適當的匯入陳述

import scala.collection.parallel.immutable.ParVector
val pv = new ParVector[Int]

其次,從順序集合進行轉換

val pv = Vector(1,2,3,4,5,6,7,8,9).par

這裡要擴充說明的是這些轉換方法,順序集合可以透過呼叫順序集合的 par 方法轉換成平行集合,同樣地,平行集合可以透過呼叫平行集合的 seq 方法轉換成順序集合。

請注意:本質上是順序的集合(指元素必須一個接著一個存取),例如清單、佇列和串流,會透過將元素複製到類似的平行集合中,轉換成對應的平行集合。例如 List,它會轉換成標準的不可變平行序列,也就是 ParVector。當然,這些集合類型所需的複製會產生其他集合類型(例如 ArrayVectorHashMap 等)不會產生的額外負擔。

如需進一步了解平行集合的轉換,請參閱本指南的 轉換具體平行集合類別 部分。

語意

雖然平行集合抽象感覺與一般的順序集合非常相似,但重要的是要注意它的語意有所不同,特別是在副作用和非關聯運算方面。

為了了解這是如何發生的,首先,我們想像一下平行運算如何執行。從概念上來說,Scala 的平行集合架構會透過遞迴「分割」給定的集合,平行地對集合的每個分割套用運算,然後重新「組合」所有平行完成的結果,來將平行集合上的運算平行化。

平行集合的這些並行和「無序」語義導致以下兩個含意

  1. 有副作用的運算會導致非確定性
  2. 非關聯運算會導致非確定性

有副作用的運算

考量平行集合架構的並行執行語義,為維持確定性,應避免對集合執行的運算造成副作用。一個簡單的範例是使用存取方法,例如 foreach 來遞增傳遞給 foreach 的封閉運算之外宣告的 var

scala> var sum = 0
sum: Int = 0

scala> val list = (1 to 1000).toList.par
list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,…

scala> list.foreach(sum += _); sum
res01: Int = 467766

scala> var sum = 0
sum: Int = 0

scala> list.foreach(sum += _); sum
res02: Int = 457073

scala> var sum = 0
sum: Int = 0

scala> list.foreach(sum += _); sum
res03: Int = 468520

在此,我們可以看到每次 sum 重新初始化為 0,且 foreach 再次呼叫 list 時,sum 會持有不同的值。此非確定性的來源是資料競爭– 對同一個可變變數進行並行的讀取/寫入。

在上方的範例中,兩個執行緒有可能在 sum 中讀取相同的值,花費一些時間對 sum 的該值執行一些運算,然後嘗試寫入新的值到 sum,可能會導致覆寫(因此遺失)有價值的結果,如下所示

ThreadA: read value in sum, sum = 0                value in sum: 0
ThreadB: read value in sum, sum = 0                value in sum: 0
ThreadA: increment sum by 760, write sum = 760     value in sum: 760
ThreadB: increment sum by 12, write sum = 12       value in sum: 12

上述範例說明了一個場景,其中兩個執行緒讀取同一個值 0,在其中一個執行緒或另一個執行緒可以將 0 與平行集合分區中的元素相加之前。在此情況下,ThreadA 讀取 0 並將其與其元素相加,0+760,而在 ThreadB 的情況下,將 0 與其元素相加,0+12。在計算各自的總和後,他們各自將計算出的值寫入 sum。由於 ThreadA 優先於 ThreadB,因此它會先寫入,僅在 sum 中的值在不久後被 ThreadB 覆寫,實際上完全覆寫(因此遺失)值 760

非關聯運算

給定這個「無序」語意,還必須小心僅執行關聯運算,以避免非確定性。也就是說,給定平行集合 pcoll,在對 pcoll 呼叫高階函式時,例如 pcoll.reduce(func),應確定 func 套用於 pcoll 元素的順序可以是任意的。一個簡單但顯而易見的範例是非關聯運算,例如減法

scala> val list = (1 to 1000).toList.par
list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,…

scala> list.reduce(_-_)
res01: Int = -228888

scala> list.reduce(_-_)
res02: Int = -61000

scala> list.reduce(_-_)
res03: Int = -331818

在上述範例中,我們取一個 ParVector[Int],呼叫 reduce,並傳遞 _-_ 給它,它只取兩個未命名元素,並從第二個元素中減去第一個元素。由於平行集合架構會產生執行緒,這些執行緒實際上會獨立對集合的不同區段執行 reduce(_-_),因此在同一個集合上執行 reduce(_-_) 兩次的結果將會不同。

注意:通常,人們認為,就像非關聯運算一樣,傳遞給平行集合上高階函式的非交換運算也會導致非決定性行為。事實並非如此,一個簡單的範例是字串串接–一個關聯但非交換的運算

scala> val strings = List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").par
strings: scala.collection.parallel.immutable.ParSeq[java.lang.String] = ParVector(abc, def, ghi, jk, lmnop, qrs, tuv, wx, yz)

scala> val alphabet = strings.reduce(_++_)
alphabet: java.lang.String = abcdefghijklmnopqrstuvwxyz

平行集合的「無序」語意只表示運算會以無序的方式執行(在時間意義上。也就是非順序),這並不表示結果會以無序的方式重新「組合」(在空間意義上)。相反地,結果通常總是會依序重新組合–也就是說,一個依序分割成區段 A、B、C 的平行集合,會再次依序重新組合成 A、B、C。而不是像 B、C、A 這樣的其他任意順序。

如需進一步瞭解平行集合如何分割和組合不同平行集合類型上的運算,請參閱本指南的架構區段。

此頁面的貢獻者