2011年07月13日

ScalaのSynchronizedQueueを使ったメモ

所用でscala.collection.mutable.SynchronizedQueueを使ったので、メモ。

SynchronizedQueueは名前の通り、enqueue(追加)やdequeue(取り出し)する時の処理をsynchronizedにやってくれるQueueクラス。

処理的にはQueueを継承して各処理でsynchronized{ super.処理 }してるだけ。

ScalaにはQueue以外にも、Stack、Set、Buffer、MapなんかのSynchronized版が存在する。



試しにsynchronizedでない通常のQueue(scala.collection.mutable.Queue)に対して、並列で要素の追加をしてみる。

scala> val queue = new scala.collection.mutable.Queue[Int]()

scala> // Queueに0〜100までの数値を並列で追加
scala> (0 to 100).par.foreach { i => queue.enqueue(i) }

scala> // Queueのサイズ
scala> queue.size
Int = 71

101件入れたはずなのに71件しか入っていない。

並列なので同時アクセスがあった時に追加分が上書かれている模様。しかも中には0〜100が入っているはずなのに、取り出すと0がたくさん出てくる。

scala> queue.dequeue
Int = 6
scala> queue.dequeue
Int = 0
scala> queue.dequeue
Int = 0
scala> queue.dequeue
Int = 0

というわけで、一般的なQueueに並列で要素を加えると見事なまでに破壊されるようだ。

それに対してSynchronizedQueueを使用すると、こうなる。

scala> val queue = new scala.collection.mutable.SynchronizedQueue[Int]()

scala> // Queueに0〜100までの数値を並列で追加
scala> (0 to 100).par.foreach { i => queue.enqueue(i) }

scala> // Queueのサイズ
scala> queue.size
Int = 101 ← ちゃんと全部入れられてる

要素の追加だけだとなんなので、取り出す方も並列でやってみる。

scala> (0 to 100).par.foreach ( i => println(queue.dequeue) )
12
16
18
5
20
(以下略)
ちゃんと数値が入ってるようだ。synchronizedしてるんだから当たり前だけど。



蛇足1

せっかくなのでSynchronizedMapも使ってみる。SynchronizedMapはclassではなくtraitなので、インスタンス作る時にwithすればいいらしい。

まずは普通のMapで実験。

scala> import scala.collection.mutable.{ HashMap, SynchronizedMap }

scala> // 普通に並列で50件加える
scala> val map = new HashMap[Int, Int]()
scala> (0 to 100).par foreach (i => map += i -> i)
//=> 終わらない

5分待ってみたけど上記の処理は終了しなかった。なんだ、これ。

(0 to 20)にしたら普通に実行できた。(0 to 30)以上にしたらいつまでも結果が返ってこなかった。

実行した際のCPU使用率は、Core i7の計8コアのうち、2つのコアが100%を指しており、時間が経つと片方のコアが空きになり、代わりに別のコアが100%になるという流れの繰り返しだった。要素の追加が競合した場合によろしくない挙動になっているらしい。

追加の直前にprintlnを入れると正常に(要素が一部落ちた状態だけど)処理が完了した。標準出力のために同期したことで本当の意味での同時実行がなくなったせいだろうか。LinkedHashMapでも同様だった。

まぁ、いいや。本題に戻ってこれをSynchronizedMapにしてみる。

scala> val map = new HashMap[Int, String]() with SynchronizedMap[Int, String]
scala> (0 to 100).par foreach (i => map += i -> "foo")
scala> map
//=> Map(5 -> foo, 60 -> foo, 23 -> foo, 10 -> foo...

こっちはちゃんと一瞬で実行できた。



蛇足2

Bufferの場合も変なことにならないか試してみる。Synchronizedでちゃんと動くかより、そうじゃない場合の崩れ方に興味がいってしまっている気がするけど、まぁ、いいや。

scala> import scala.collection.mutable.{ ArrayBuffer, SynchronizedBuffer }

scala> // 普通のArrayBufferに並列で要素を追加してみる
scala> val list = new ArrayBuffer[Int]()
scala> (0 to 100).par foreach (i => list += i)
//=> java.lang.ArrayIndexOutOfBoundsException

上記の処理はたまに完了することもあるけど、たいていはArrayIndexOutOfBoundsExceptionで落ちる。処理が完了した場合も要素が一部欠落している。

SynchronizedBufferで実行すれば、ちゃんと要素が入る。

val list = new ArrayBuffer[Int]() with SynchronizedBuffer[Int]
(0 to 100).par foreach (i => list += i)
list
//=> ArrayBuffer(0, 9, 7, 8, 37, 12, 1, 2, 13, 10, 14, 11 ...




結論

Scalaは並列処理のサンプルを書くのが楽だということが分かった。

以上。