Juliaで並列計算

Juliaでの並列計算に関する調査メモ。

Juliaでの並列計算の概要

Juliaでのマルチプロセッシング環境は、メッセージパッシングに基づいている。MPIなどの通常のメッセージパッシングは、プロセス間でデータや命令などを相互にやりとりする。しかし、Juliaのメッセージパッシングの実装はあるプロセスから他のプロセスへの一方通行であることが特色となっている。そのため、ユーザは片方のプロセスの管理だけを行えば良い。

あるプロセスから他のプロセスへのメッセージとして重要なのが、"remote call"(remotecall関数)と"remote reference"(fetch関数)である。これらはそれぞれ、「あるプロセスから他のプロセスへの処理の指示」、「指示された処理を行ったプロセスでの処理結果を指示を出したプロセスが参照」に対応する。

  • remote call(remotecall関数)
    あるプロセスから他のプロセスに引数を渡して関数を実行するための呼び出し。返り値は下記のremote referenceとなる。
  • remote reference(fetch関数)
    処理を行ったプロセスに保持されたオブジェクトを他のオブジェクトから参照すること。

基本的な実行方法

以下、実行環境はUbuntu14.04(VM Ware上で実行)、Juliaのバージョンは0.3.0。実行結果と次のプロンプトの間の空行は省略。

公式ドキュメントの"Parallel Map and Loops"までを参考にして、Juliaでの並列計算の基本的な実行方法を確認する。

次の例では、remotecall関数を用いて2つのワーカープロセスに一様乱数を生成させ、その値をマスタープロセスがfeatch関数を用いて参照している。

$ ./julia -p 2
julia> # 乱数種の設定
julia> srand(123)
julia> # 2つのワーカープロセスで区間[0, 1]の一様乱数を2個ずつ生成
julia> r = remotecall(2, rand, 2, 2)
RemoteRef(2,1,4)
julia> # 生成した一様乱数の値を取得
julia> fetch(r)
2x2 Array{Float64,2}:
 0.341326  0.0673966
 0.766994  0.0680754

この「remotecall関数によるワーカープロセスへの処理の指示」+「fetch関数による処理結果の参照」という手間を軽減するために、remotecall_fetch関数が用意されている。

julia> remotecall_fetch(2, getindex, r, 1, 1)
0.3413258405054578

@spawnatマクロを使用すると、第2引数で指定した式を第1引数で指定したワーカープロセスで実行する。次の例では、先に生成した一様乱数に1を加算する処理を行って、その結果をマスタープロセスがfetch関数により参照している。

julia> s = @spawnat 2 1 + fetch(r)
RemoteRef(2,1,6)
julia> fetch(s)
2x2 Array{Float64,2}:
 1.34133  1.0674 
 1.76699  1.06808

以上の例では、ワーカープロセスの個数を指定してJuliaを起動していた。一方で、Juliaを起動した後に、ワーカープロセスを起動することもできる。

julia> # 起動しているプロセス数(マスタープロセス+ワーカープロセス)の確認
julia> nprocs()
3
julia> nworkers()
2
julia> addprocs(2)
2-element Array{Any,1}:
 4
 5
julia> # ワーカープロセスが4個に、マスタープロセスとあわせてプロセス数が5個になったことの確認
julia> nworkers()
4
julia> nprocs()
5
julia> # 4番目と5番目のワーカープロセスを停止
julia> rmprocs(4, 5)
:ok
julia> nworkers()
2

remotecall関数を用いた表記は特に便利でもない。代替として、@spawnマクロを使用することもできる。

julia> r = @spawn rand(2,2)
RemoteRef(3,1,10)
julia> s = @spawn 1 .+ fetch(r)
RemoteRef(3,1,11)
julia> fetch(s)
2x2 Array{Float64,2}:
 1.03     1.16674
 1.99027  1.89547

2行目の処理で、"1 .+r"ではなく"1 .+fetch(r)"にしているのは、どのワーカープロセスでこの処理が実行されるかが分からないためである。

マスタープロセスで定義した関数はワーカープロセスで認識することができないので、単純に@spawnマクロで関数を呼び出すと以下のエラーが発生する。

julia> function rand2(dims...)
         return 2*rand(dims...)
       end
rand2 (generic function with 1 method)
julia> rand2(2,2)
2x2 Array{Float64,2}:
 1.5369   1.34792 
 1.88103  0.790906
julia> @spawn rand2(2,2)
RemoteRef(2,1,13)
julia> exception on 2: ERROR: function rand2 not defined on process 2
 in error at error.jl:21
 in anonymous at serialize.jl:397
 in anonymous at multi.jl:1279
 in anonymous at multi.jl:848
 in run_work_thunk at multi.jl:621
 in run_work_thunk at multi.jl:630
 in anonymous at task.jl:6
julia> @spawn rand2(2,2)
RemoteRef(3,1,14)
julia> exception on 3: ERROR: function rand2 not defined on process 3
 in error at error.jl:21
 in anonymous at serialize.jl:397
 in anonymous at multi.jl:1279
 in anonymous at multi.jl:848
 in run_work_thunk at multi.jl:621
 in run_work_thunk at multi.jl:630
 in anonymous at task.jl:6

すべてのワーカープロセスで変数や関数を認識させて、上記のような事態を回避するには変数や関数の前に@everywhereマクロを付加する。

julia> @everywhere function rand2(dims...)
         2 * rand(dims...)
       end
julia> @spawn rand2(2,2)
RemoteRef(2,1,20)
julia> fetch(r)
2x2 Array{Float64,2}:
 1.92811  0.535094 
 1.40141  0.0465075

同様に、Juliaのソースコードやパッケージをすべてのワーカープロセスに認識させるためには、include関数やusing関数の前に@everywhereを付加する。

@everywhere include("defs.jl")
@everywhere using Clustering

Juliaを起動するときに、すべてのワーカープロセスにJuliaのソースコードの記述内容を認識させるためには、-Lオプションでファイル名を指定すれば良い。

$ julia -p <n> -L file1.jl -L file2.jl driver.jl

データの移動

並列計算においては、プロセス間でのデータや命令のやり取りを極力削減することが処理時間の短縮化につながる。
次の例では、 1000 \times 1000の行列の2乗を計算する処理において、

  • 方法1: マスタープロセスで行列を生成した後にワーカープロセスに転送して、ワーカープロセスが行列を2乗
  • 方法2: ワーカープロセスで行列を生成し2乗

という2つの方法を示している。

julia> # 方法1: マスタープロセスでデータを生成し、ワーカープロセスに転送
julia> A = rand(1000,1000)
julia> Bref = @spawn A^2
RemoteRef(3,1,24)
julia> fetch(Bref)
julia> 
julia> # 方法2: ワーカープロセスでデータを生成して、マスタープロセスから参照
julia> Bref = @spawn rand(1000,1000)^2
julia> fetch(Bref)

このケースの場合は、方法2ではプロセス間のデータ転送がなく、方法1に比べて効率的である。しかし、いつでも方法2のようにワーカープロセスでデータを生成する方法が良いとは限らず、マスタープロセスや他のワーカープロセスでどのようなデータを使用するかについて考慮しながら、その場その場で適したデータ生成・転送方法を選択する必要がある。

並列Map処理とループ

各ワーカープロセス間でデータのやり取りが必要ない場合は、「@parallel for」構文と「pmap関数」の2つの方法によって、並列計算が可能になる。

次の例では、ワーカープロセスで \{0, 1\}の論理値の乱数を合計200000000回発生させ(Map処理)、その乱数の和を計算(Reduce処理)している。

julia> nheads = @parallel (+) for i=1:200000000
         int(randbool())
       end
99999830

このようにfor文の前に@parallelや集約処理を記述することによって、各ワーカープロセスでデータのやり取りを必要としない並列計算が可能になる。この「@parallel for」構文は、1つ1つの処理が軽いときに用いる。

次の例では、ワーカープロセスで成分が区間 [0, 1]上の 1000 \times 1000の行列を10個生成し、pmap関数を用いて特異値分解を実行している。

julia> M = {rand(1000,1000) for i=1:10}
julia> pmap(svd, M)

このように1つ1つの処理が重いときは、pmap関数を用いる。

k平均法

並列計算の応用の一つとして、k平均法を実行してみよう。
Juliaでは、Clusteringパッケージの関数を用いてk平均法を実行する。ここでは、各成分が区間[0, 1]の一様乱数からなる 1000 \times 1000の行列 xに対して、k平均法により50個( =k)のクラスターを生成する処理を初期値を変えながら200回繰り返してみる。

まずは、逐次処理から。

$ julia
julia> using Clustering
julia> srand(123)
julia> x = rand(1000, 1000)
julia> k = 50
julia> @time [result_seq = kmeans(x, k; maxiter=50) for i=1:200];
elapsed time: 49.761205104 seconds (1121608128 bytes allocated, 1.58% gc time)

約50秒計算に要していることが分かる。

続いて、4つのワーカープロセスによる並列計算。

$ julia -p 4
julia> @everywhere using Clustering
julia> srand(123)
julia> @everywhere x = rand(1000, 1000)
julia> @everywhere k = 50
julia> @time @parallel [result_par = kmeans(x, k; maxiter=50) for i=1:200]
elapsed time: 20.371857333 seconds (11684024 bytes allocated)

約20秒で計算が完了しているから、2.5倍近くスピードアップしていることが分かる。

まとめと残課題

以上で、基本的な並列計算は実行できるようになったと思われる。一方で、今後調査しなければならない課題もいくつかある。

  • 並列計算での乱数生成
    RではL'ecuyerのアルゴリズムなど、baseパッケージのRNGkind関数で並列計算に向いた乱数生成法が提供されている。このような並列計算用の乱数生成法が存在しないと、乱数を大量に発生させる必要があるケースでは実用的ではなくなってしまう。Juliaでそのような機能は提供されているのだろうか。おそらくこの問題意識に基づいたソースコードGithub上で公開されていると思われるので、要調査。
  • ロードバランシング
    Juliaではタスクのスケジューリングを行っているようであるが(公式ドキュメント"Synchronization With Remote References Scheduling"参照)、ロードバランシングに相当する負荷調整も行っているのだろうか。
  • タイミングの測定
    Rのsnowパッケージには、マスタープロセスからワーカープロセスに処理やデータを送信したタイミング、ワーカープロセスが処理を行った時間、マスタープロセスがワーカープロセスから処理の結果を受信したタイミングを測定する機能が提供されている。Juliaではこの3つのタイミングのうち、最後のタイミングは存在しないと思われる。前者2つのタイミングを測定することは可能だろうか。
  • メッセージパッシング以外の並列計算方法
    Rでは、ソケット、フォーク、MPI、PVMなど様々な通信方式が提供されている。Juliaでメッセージパッシング以外の並列計算方法はないだろうか。

参考文献