平行集合

建立自訂平行集合

語言

沒有組合器的平行集合

就像可以定義自訂順序集合而不定義其建構器一樣,也可以定義平行集合而不定義其組合器。沒有組合器的後果是,轉換方法(例如 mapflatMapcollectfilter,…)預設會傳回層級中最近的標準集合類型。例如,範圍沒有建構器,因此對範圍的元素進行對應會建立一個向量。

在以下範例中,我們定義一個平行字串集合。由於字串在邏輯上是不可變的序列,因此我們讓平行字串繼承 immutable.ParSeq[Char]

class ParString(val str: String)
extends immutable.ParSeq[Char] {

接著,我們定義每個不可變序列中找到的方法

  def apply(i: Int) = str.charAt(i)

  def length = str.length

我們還必須定義此平行集合的順序對應。在這種情況下,我們傳回 WrappedString 類別

  def seq = new collection.immutable.WrappedString(str)

最後,我們必須為平行字串集合定義一個分割器。我們將分割器命名為 ParStringSplitter,並讓它繼承一個序列分割器,也就是 SeqSplitter[Char]

  def splitter = new ParStringSplitter(str, 0, str.length)

  class ParStringSplitter(private var s: String, private var i: Int, private val ntl: Int)
  extends SeqSplitter[Char] {

    final def hasNext = i < ntl

    final def next = {
      val r = s.charAt(i)
      i += 1
      r
    }

在上面,ntl 代表字串的總長度,i 是目前位置,s 是字串本身。

平行集合反覆運算器或分割器除了順序集合反覆運算器中找到的 nexthasNext 之外,還需要更多方法。首先,它們有一個稱為 remaining 的方法,用來傳回此分割器尚未橫跨的元素數量。接著,它們有一個稱為 dup 的方法,用來複製目前的分割器。

    def remaining = ntl - i

    def dup = new ParStringSplitter(s, i, ntl)

最後,方法 splitpsplit 用於建立分割器,用以遍歷當前分割器元素的子集。方法 split 的合約是,它會傳回一個分割器序列,用以遍歷當前分割器遍歷的不相交、不重疊的元素子集,且這些子集中沒有一個是空的。如果當前分割器有 1 個或更少的元素,則 split 只會傳回一個此分割器的序列。方法 psplit 必須傳回一個分割器序列,用以遍歷與 sizes 參數指定的元素數量完全相同的元素。如果 sizes 參數指定的元素比當前分割器少,則會在最後附加一個包含其餘元素的分割器。如果 sizes 參數要求的元素比當前分割器中剩下的元素多,則會為每個大小附加一個空的分割器。最後,呼叫 splitpsplit 會使當前分割器失效。

   def split = {
      val rem = remaining
      if (rem >= 2) psplit(rem / 2, rem - rem / 2)
      else Seq(this)
    }

    def psplit(sizes: Int*): Seq[ParStringSplitter] = {
      val splitted = new ArrayBuffer[ParStringSplitter]
      for (sz <- sizes) {
        val next = (i + sz) min ntl
        splitted += new ParStringSplitter(s, i, next)
        i = next
      }
      if (remaining > 0) splitted += new ParStringSplitter(s, i, ntl)
      splitted
    }
  }
}

在上面,split 是根據 psplit 實作的,這在並行序列中很常見。實作並行映射、集合或可迭代項目的分割器通常比較容易,因為它不需要 psplit

因此,我們取得一個並行字串類別。唯一的缺點是,呼叫轉換器方法(例如 filter)不會產生並行字串,而是並行向量,這可能是次佳的 - 在過濾後從向量中再次產生字串可能會很昂貴。

具備合併器的平行集合

假設我們想要 filter 平行字串的字元,例如移除逗號。如上所述,呼叫 filter 會產生平行向量,而我們想要取得平行字串(因為 API 中的某些介面可能需要順序字串)。

為避免此情況,我們必須為平行字串集合撰寫合併器。這次我們也會繼承 ParSeqLike 特質,以確保 filter 的回傳類型更具體,也就是 ParString,而不是 ParSeq[Char]ParSeqLike 有第三個類型參數,用來指定平行集合順序對應類型的類型(與只有兩個類型參數的順序 *Like 特質不同)。

class ParString(val str: String)
extends immutable.ParSeq[Char]
   with ParSeqLike[Char, ParString, collection.immutable.WrappedString]

所有方法都與之前相同,但我們新增一個受保護的方法 newCombinerfilter 會在內部使用此方法。

  protected[this] override def newCombiner: Combiner[Char, ParString] = new ParStringCombiner

接下來我們定義 ParStringCombiner 類別。組合器是建構子子類型,它們引入一個稱為 combine 的額外方法,它將另一個組合器作為參數,並傳回一個包含目前和參數組合器元素的新組合器。呼叫 combine 之後,目前和參數組合器會失效。如果參數與目前組合器為同一物件,則 combine 只會傳回目前組合器。預期此方法會很有效率,在最壞情況下,相對於元素數目,執行時間為對數,因為它會在平行運算期間被呼叫多次。

我們的 ParStringCombiner 內部會維護一連串字串建構子。它會透過將元素新增到序列中最後一個字串建構子來實作 +=,並透過串接目前和參數組合器的字串建構子清單來實作 combine。在平行運算結束時呼叫的 result 方法,會透過將所有字串建構子附加在一起來產生一個平行字串。這樣一來,元素只會在最後複製一次,而不是每次呼叫 combine 時都複製。理想情況下,我們希望將此程序平行化,並平行複製它們(這會針對平行陣列進行),但如果不觸及字串的內部表示,這是我們能做的最好的事情–我們必須忍受這個順序瓶頸。

private class ParStringCombiner extends Combiner[Char, ParString] {
  var sz = 0
  val chunks = new ArrayBuffer[StringBuilder] += new StringBuilder
  var lastc = chunks.last

  def size: Int = sz

  def +=(elem: Char): this.type = {
    lastc += elem
    sz += 1
    this
  }

  def clear = {
    chunks.clear
    chunks += new StringBuilder
    lastc = chunks.last
    sz = 0
  }

  def result: ParString = {
    val rsb = new StringBuilder
    for (sb <- chunks) rsb.append(sb)
    new ParString(rsb.toString)
  }

  def combine[U <: Char, NewTo >: ParString](other: Combiner[U, NewTo]) = if (other eq this) this else {
    val that = other.asInstanceOf[ParStringCombiner]
    sz += that.sz
    chunks ++= that.chunks
    lastc = chunks.last
    this
  }
}

我一般如何實作我的組合器?

沒有預先定義的食譜–這取決於手邊的資料結構,而且通常需要實作者的創意。不過,通常會採取以下幾種方法

  1. 串接和合併。某些資料結構有這些操作的高效實作(通常是對數)。如果手邊的集合是由此類資料結構支援,它的組合器可以是集合本身。手指樹、繩子和各種堆特別適合這種方法。

  2. 兩階段評估。並行陣列和並行雜湊表採取的方法,它假設元素可以有效地部分排序成可串接的儲存區,最終的資料結構可以並行建構。在第一階段,不同的處理器獨立地填充這些儲存區,並將儲存區串接在一起。在第二階段,分配資料結構,並使用來自不相交儲存區的元素,讓不同的處理器並行填充資料結構的不同部分。必須小心,不同的處理器絕不會修改資料結構的同一部分,否則可能會發生微妙的並行錯誤。如前一節所示,這種方法很容易應用於隨機存取序列。

  3. 並行資料結構。雖然後兩種方法實際上不需要資料結構本身的任何同步原語,但它們假設資料結構可以並行建構,這樣兩個不同的處理器絕不會修改同一個記憶體位置。有大量的並行資料結構可以由多個處理器安全地修改,例如並行跳躍表、並行雜湊表、分割排序清單、並行 AVL 樹。在這種情況下,一個重要的考量是並行資料結構具有水平可擴充的插入方法。對於並行集合,組合器可以是集合本身,並且所有執行並行操作的處理器之間共用一個組合器實例。

與集合架構整合

我們的 ParString 類別尚未完成。儘管我們已實作自訂組合器,而 filterpartitiontakeWhilespan 等方法將會使用該組合器,但大多數轉換器方法都需要隱含的 CanBuildFrom 證據(請參閱 Scala 集合指南以取得完整說明)。為了讓 ParString 可用並與集合架構完全整合,我們必須混合稱為 GenericParTemplate 的其他特質,並定義 ParString 的伴隨物件。

class ParString(val str: String)
extends immutable.ParSeq[Char]
   with GenericParTemplate[Char, ParString]
   with ParSeqLike[Char, ParString, collection.immutable.WrappedString] {

  def companion = ParString

在伴隨物件內,我們提供 CanBuildFrom 參數的隱含證據。

object ParString {
  implicit def canBuildFrom: CanCombineFrom[ParString, Char, ParString] =
    new CanCombinerFrom[ParString, Char, ParString] {
      def apply(from: ParString) = newCombiner
      def apply() = newCombiner
    }

  def newBuilder: Combiner[Char, ParString] = newCombiner

  def newCombiner: Combiner[Char, ParString] = new ParStringCombiner

  def apply(elems: Char*): ParString = {
	val cb = newCombiner
	cb ++= elems
	cb.result
  }
}

進一步自訂– 並行和其他集合

實作並行集合(與並行集合不同,並行集合是可以並行修改的集合,例如 collection.concurrent.TrieMap)並不總是容易的。特別是組合器通常需要很多想法。在到目前為止描述的大多數並行集合中,組合器使用兩步驟評估。在第一步中,元素會由不同的處理器加入組合器,而組合器會合併在一起。在第二步中,在所有元素都可用後,會建構結果集合。

組合器的另一種方法是將結果集合建構為元素。這需要集合是執行緒安全的– 組合器必須允許並行元素插入。在這種情況下,所有處理器會共用一個組合器。

若要將並行集合平行化,其組合器必須覆寫方法 canBeShared 以傳回 true。這將確保在呼叫並行操作時只會建立一個組合器。接下來,+= 方法必須是執行緒安全的。最後,如果目前組合器和引數組合器相同,方法 combine 仍會傳回目前組合器,否則可以自由引發例外狀況。

將 Splitter 分割成較小的 Splitter 以達成更好的負載平衡。預設情況下,會使用 remaining 方法傳回的資訊來決定何時停止分割 Splitter。對於某些集合,呼叫 remaining 方法可能會很耗費資源,因此應該使用其他方式來決定何時分割 Splitter。在此情況下,應該覆寫 Splitter 中的 shouldSplitFurther 方法。

預設實作會在剩餘元素的數量大於集合大小除以平行處理層級的八倍時分割 Splitter。

def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) =
    remaining > thresholdFromSize(coll.size, parallelismLevel)

等效地,Splitter 可以計算它被分割的次數,並實作 shouldSplitFurther,如果分割次數大於 3 + log(parallelismLevel),則傳回 true。這樣可以避免呼叫 remaining

此外,如果呼叫 remaining 對於特定集合而言並非便宜的運算(例如,它需要評估集合中的元素數量),則應該覆寫 Splitter 中的 isRemainingCheap 方法,並傳回 false

最後,如果 splitter 中的 remaining 方法實作起來非常麻煩,您可以在其集合中覆寫 isStrictSplitterCollection 方法,傳回 false。此類集合無法執行某些方法,這些方法仰賴 splitter 為嚴格,也就是在 remaining 方法中傳回正確的值。重要的是,這不會影響 for-comprehension 中使用的函式。

此頁面的貢獻者