沒有組合器的平行集合
就像可以定義自訂順序集合而不定義其建構器一樣,也可以定義平行集合而不定義其組合器。沒有組合器的後果是,轉換方法(例如 map
、flatMap
、collect
、filter
,…)預設會傳回層級中最近的標準集合類型。例如,範圍沒有建構器,因此對範圍的元素進行對應會建立一個向量。
在以下範例中,我們定義一個平行字串集合。由於字串在邏輯上是不可變的序列,因此我們讓平行字串繼承 immutable.ParSeq[Char]
class ParString(val str: String)
extends immutable.ParSeq[Char] {
接著,我們定義每個不可變序列中找到的方法
def apply(i: Int) = str.charAt(i)
def length = str.length
我們還必須定義此平行集合的順序對應。在這種情況下,我們傳回 WrappedString
類別
def seq = new collection.immutable.WrappedString(str)
最後,我們必須為平行字串集合定義一個分割器。我們將分割器命名為 ParStringSplitter
,並讓它繼承一個序列分割器,也就是 SeqSplitter[Char]
def splitter = new ParStringSplitter(str, 0, str.length)
class ParStringSplitter(private var s: String, private var i: Int, private val ntl: Int)
extends SeqSplitter[Char] {
final def hasNext = i < ntl
final def next = {
val r = s.charAt(i)
i += 1
r
}
在上面,ntl
代表字串的總長度,i
是目前位置,s
是字串本身。
平行集合反覆運算器或分割器除了順序集合反覆運算器中找到的 next
和 hasNext
之外,還需要更多方法。首先,它們有一個稱為 remaining
的方法,用來傳回此分割器尚未橫跨的元素數量。接著,它們有一個稱為 dup
的方法,用來複製目前的分割器。
def remaining = ntl - i
def dup = new ParStringSplitter(s, i, ntl)
最後,方法 split
和 psplit
用於建立分割器,用以遍歷當前分割器元素的子集。方法 split
的合約是,它會傳回一個分割器序列,用以遍歷當前分割器遍歷的不相交、不重疊的元素子集,且這些子集中沒有一個是空的。如果當前分割器有 1 個或更少的元素,則 split
只會傳回一個此分割器的序列。方法 psplit
必須傳回一個分割器序列,用以遍歷與 sizes
參數指定的元素數量完全相同的元素。如果 sizes
參數指定的元素比當前分割器少,則會在最後附加一個包含其餘元素的分割器。如果 sizes
參數要求的元素比當前分割器中剩下的元素多,則會為每個大小附加一個空的分割器。最後,呼叫 split
或 psplit
會使當前分割器失效。
def split = {
val rem = remaining
if (rem >= 2) psplit(rem / 2, rem - rem / 2)
else Seq(this)
}
def psplit(sizes: Int*): Seq[ParStringSplitter] = {
val splitted = new ArrayBuffer[ParStringSplitter]
for (sz <- sizes) {
val next = (i + sz) min ntl
splitted += new ParStringSplitter(s, i, next)
i = next
}
if (remaining > 0) splitted += new ParStringSplitter(s, i, ntl)
splitted
}
}
}
在上面,split
是根據 psplit
實作的,這在並行序列中很常見。實作並行映射、集合或可迭代項目的分割器通常比較容易,因為它不需要 psplit
。
因此,我們取得一個並行字串類別。唯一的缺點是,呼叫轉換器方法(例如 filter
)不會產生並行字串,而是並行向量,這可能是次佳的 - 在過濾後從向量中再次產生字串可能會很昂貴。
具備合併器的平行集合
假設我們想要 filter
平行字串的字元,例如移除逗號。如上所述,呼叫 filter
會產生平行向量,而我們想要取得平行字串(因為 API 中的某些介面可能需要順序字串)。
為避免此情況,我們必須為平行字串集合撰寫合併器。這次我們也會繼承 ParSeqLike
特質,以確保 filter
的回傳類型更具體,也就是 ParString
,而不是 ParSeq[Char]
。ParSeqLike
有第三個類型參數,用來指定平行集合順序對應類型的類型(與只有兩個類型參數的順序 *Like
特質不同)。
class ParString(val str: String)
extends immutable.ParSeq[Char]
with ParSeqLike[Char, ParString, collection.immutable.WrappedString]
所有方法都與之前相同,但我們新增一個受保護的方法 newCombiner
,filter
會在內部使用此方法。
protected[this] override def newCombiner: Combiner[Char, ParString] = new ParStringCombiner
接下來我們定義 ParStringCombiner
類別。組合器是建構子子類型,它們引入一個稱為 combine
的額外方法,它將另一個組合器作為參數,並傳回一個包含目前和參數組合器元素的新組合器。呼叫 combine
之後,目前和參數組合器會失效。如果參數與目前組合器為同一物件,則 combine
只會傳回目前組合器。預期此方法會很有效率,在最壞情況下,相對於元素數目,執行時間為對數,因為它會在平行運算期間被呼叫多次。
我們的 ParStringCombiner
內部會維護一連串字串建構子。它會透過將元素新增到序列中最後一個字串建構子來實作 +=
,並透過串接目前和參數組合器的字串建構子清單來實作 combine
。在平行運算結束時呼叫的 result
方法,會透過將所有字串建構子附加在一起來產生一個平行字串。這樣一來,元素只會在最後複製一次,而不是每次呼叫 combine
時都複製。理想情況下,我們希望將此程序平行化,並平行複製它們(這會針對平行陣列進行),但如果不觸及字串的內部表示,這是我們能做的最好的事情–我們必須忍受這個順序瓶頸。
private class ParStringCombiner extends Combiner[Char, ParString] {
var sz = 0
val chunks = new ArrayBuffer[StringBuilder] += new StringBuilder
var lastc = chunks.last
def size: Int = sz
def +=(elem: Char): this.type = {
lastc += elem
sz += 1
this
}
def clear = {
chunks.clear
chunks += new StringBuilder
lastc = chunks.last
sz = 0
}
def result: ParString = {
val rsb = new StringBuilder
for (sb <- chunks) rsb.append(sb)
new ParString(rsb.toString)
}
def combine[U <: Char, NewTo >: ParString](other: Combiner[U, NewTo]) = if (other eq this) this else {
val that = other.asInstanceOf[ParStringCombiner]
sz += that.sz
chunks ++= that.chunks
lastc = chunks.last
this
}
}
我一般如何實作我的組合器?
沒有預先定義的食譜–這取決於手邊的資料結構,而且通常需要實作者的創意。不過,通常會採取以下幾種方法
-
串接和合併。某些資料結構有這些操作的高效實作(通常是對數)。如果手邊的集合是由此類資料結構支援,它的組合器可以是集合本身。手指樹、繩子和各種堆特別適合這種方法。
-
兩階段評估。並行陣列和並行雜湊表採取的方法,它假設元素可以有效地部分排序成可串接的儲存區,最終的資料結構可以並行建構。在第一階段,不同的處理器獨立地填充這些儲存區,並將儲存區串接在一起。在第二階段,分配資料結構,並使用來自不相交儲存區的元素,讓不同的處理器並行填充資料結構的不同部分。必須小心,不同的處理器絕不會修改資料結構的同一部分,否則可能會發生微妙的並行錯誤。如前一節所示,這種方法很容易應用於隨機存取序列。
-
並行資料結構。雖然後兩種方法實際上不需要資料結構本身的任何同步原語,但它們假設資料結構可以並行建構,這樣兩個不同的處理器絕不會修改同一個記憶體位置。有大量的並行資料結構可以由多個處理器安全地修改,例如並行跳躍表、並行雜湊表、分割排序清單、並行 AVL 樹。在這種情況下,一個重要的考量是並行資料結構具有水平可擴充的插入方法。對於並行集合,組合器可以是集合本身,並且所有執行並行操作的處理器之間共用一個組合器實例。
與集合架構整合
我們的 ParString
類別尚未完成。儘管我們已實作自訂組合器,而 filter
、partition
、takeWhile
或 span
等方法將會使用該組合器,但大多數轉換器方法都需要隱含的 CanBuildFrom
證據(請參閱 Scala 集合指南以取得完整說明)。為了讓 ParString
可用並與集合架構完全整合,我們必須混合稱為 GenericParTemplate
的其他特質,並定義 ParString
的伴隨物件。
class ParString(val str: String)
extends immutable.ParSeq[Char]
with GenericParTemplate[Char, ParString]
with ParSeqLike[Char, ParString, collection.immutable.WrappedString] {
def companion = ParString
在伴隨物件內,我們提供 CanBuildFrom
參數的隱含證據。
object ParString {
implicit def canBuildFrom: CanCombineFrom[ParString, Char, ParString] =
new CanCombinerFrom[ParString, Char, ParString] {
def apply(from: ParString) = newCombiner
def apply() = newCombiner
}
def newBuilder: Combiner[Char, ParString] = newCombiner
def newCombiner: Combiner[Char, ParString] = new ParStringCombiner
def apply(elems: Char*): ParString = {
val cb = newCombiner
cb ++= elems
cb.result
}
}
進一步自訂– 並行和其他集合
實作並行集合(與並行集合不同,並行集合是可以並行修改的集合,例如 collection.concurrent.TrieMap
)並不總是容易的。特別是組合器通常需要很多想法。在到目前為止描述的大多數並行集合中,組合器使用兩步驟評估。在第一步中,元素會由不同的處理器加入組合器,而組合器會合併在一起。在第二步中,在所有元素都可用後,會建構結果集合。
組合器的另一種方法是將結果集合建構為元素。這需要集合是執行緒安全的– 組合器必須允許並行元素插入。在這種情況下,所有處理器會共用一個組合器。
若要將並行集合平行化,其組合器必須覆寫方法 canBeShared
以傳回 true
。這將確保在呼叫並行操作時只會建立一個組合器。接下來,+=
方法必須是執行緒安全的。最後,如果目前組合器和引數組合器相同,方法 combine
仍會傳回目前組合器,否則可以自由引發例外狀況。
將 Splitter 分割成較小的 Splitter 以達成更好的負載平衡。預設情況下,會使用 remaining
方法傳回的資訊來決定何時停止分割 Splitter。對於某些集合,呼叫 remaining
方法可能會很耗費資源,因此應該使用其他方式來決定何時分割 Splitter。在此情況下,應該覆寫 Splitter 中的 shouldSplitFurther
方法。
預設實作會在剩餘元素的數量大於集合大小除以平行處理層級的八倍時分割 Splitter。
def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) =
remaining > thresholdFromSize(coll.size, parallelismLevel)
等效地,Splitter 可以計算它被分割的次數,並實作 shouldSplitFurther
,如果分割次數大於 3 + log(parallelismLevel)
,則傳回 true
。這樣可以避免呼叫 remaining
。
此外,如果呼叫 remaining
對於特定集合而言並非便宜的運算(例如,它需要評估集合中的元素數量),則應該覆寫 Splitter 中的 isRemainingCheap
方法,並傳回 false
。
最後,如果 splitter 中的 remaining
方法實作起來非常麻煩,您可以在其集合中覆寫 isStrictSplitterCollection
方法,傳回 false
。此類集合無法執行某些方法,這些方法仰賴 splitter 為嚴格,也就是在 remaining
方法中傳回正確的值。重要的是,這不會影響 for-comprehension 中使用的函式。