Monthly Hacker's Blog

毎月のテーマに沿ったプログラミング記事を中心に書きます。

multiprocessingのあれこれ

はじめに

こんにちは。さかぱ(@zacapa_23)です。

12月になると修論でとても忙しい。しかも2ヶ月前の自分が「12月でも余裕あるでしょ」と思っていたせいでAdvent Calenderに参加してしまいました...。本記事は18日目の記事です。
qiita.com

せっかくの機会なので、pythonで機械学習やデータ分析などをするときに便利な並列処理(multiprocessing)について知識をおすそわけできればいいなと思います。

まえおき

pythonのmultiprocessingについてググると、だいたいPoolかProcessの記事がほとんど。本記事は主にPoolに関する知識です(というかPoolしか使ったことがない...)。

と言っても、調べれば出てくるような数値計算の並列方法(よくあるのは二乗和)ではなく、実践的なテーマを用意しました。

実行環境

  • Ubuntu 14.04.5 LTS
  • Intel(R) Xeon(R) CPU E5-1620 v3 @ 3.50GHz (8スレッド)
  • Python 3.5.2 :: Anaconda custom (64-bit)
  • Jupyter 4.2.0

ニコニコ動画コメントの並列処理

10月の記事での前処理で並列処理をしていたので、題材はニコニコ動画コメントを使用(データの取得方法は以下の記事を参考に)。
www.monthly-hack.com

今回のゴールは、tar.gzファイルから各動画のラスト10コメにNFCK処理をして取得することにします。

共通部分

以下、それぞれの方法に共通するコードです。

import tarfile
import glob
import multiprocessing
import unicodedata

folder_path = '/path/to/data/thread'
targz_list = glob.glob('{}/*.tar.gz'.format(folder_path))

方法1(脳筋)

for文を使って1つずつコメントを抽出してresultにappendします。並列処理をしないシンプルなコードです。とても時間がかかる。

result = []
for targz in targz_list:
    tf = tarfile.open(targz, 'r')
    item_list = tf.getnames()[1:]
    for ti in item_list:
        f = tf.extractfile(ti).read()
        comments = f.decode('utf-8').split('\n')
        for comment in comments[-11:-1]:
            comment = eval(comment)['comment']
            comment = unicodedata.normalize('NFKC', comment)
            result.append(comment)

方法2(落とし穴)

multiprocessingでresultにappendする。しかしこれはできません。multiprocessingではグローバル変数の参照は可能ですが、追加や削除の操作はできないようです。これが最大の問題で、並列処理の結果をどう出力(保存)するかを考えなきゃいけません。

result = []
def get_comment(targz):
    tf = tarfile.open(targz, 'r')
    item_list = tf.getnames()[1:]
    for ti in item_list:
        f = tf.extractfile(ti).read()
        comments = f.decode('utf-8').split('\n')
        for comment in comments[-11:-1]:
            comment = eval(comment)['comment']
            comment = unicodedata.normalize('NFKC', comment)
            result.append(comment)

processes = max(1, multiprocessing.cpu_count() - 1)
p = multiprocessing.Pool(processes)
p.map(get_comment, targz_list)

方法3(逃げ道のようで落とし穴)

Queueを使って結果を出力する。result.get()で取り出せるのですが、1つずつ取り出さなきゃいけないのが面倒です。それとコメントの順番は考慮できません。

result = multiprocessing.Queue()
def get_comment(targz):
    tf = tarfile.open(targz, 'r')
    item_list = tf.getnames()[1:]
    for ti in item_list:
        f = tf.extractfile(ti).read()
        comments = f.decode('utf-8').split('\n')
        for comment in comments[-11:-1]:
            comment = eval(comment)['comment']
            comment = unicodedata.normalize('NFKC', comment)
            result.put(comment)

processes = max(1, multiprocessing.cpu_count() - 1)
p = multiprocessing.Pool(processes)
p.map(get_comment, targz_list)

方法4(小規模データ向け)

関数のreturnを使う。個人的にはQueueよりも扱いやすいかもしれません。result_allは各プロセスのreturn値のlistになっています。

def get_comment(targz):
    result = []
    tf = tarfile.open(targz, 'r')
    item_list = tf.getnames()[1:]
    for ti in item_list:
        f = tf.extractfile(ti).read()
        comments = f.decode('utf-8').split('\n')
        for comment in comments[-11:-1]:
            comment = eval(comment)['comment'] # string to dict
            comment = unicodedata.normalize('NFKC', comment)
            result.append(comment)
    return result

processes = max(1, multiprocessing.cpu_count() - 1)
p = multiprocessing.Pool(processes)
result_all = p.map(get_comment, targz_list)

方法5(大規模データ向け)

対象データをミニバッチにして処理と保存をします。大量データセットで並列処理する場合には、この方法が一番おすすめです。まず実行途中で何かしらのエラーが起きても途中まで保存されているので安心です。

import pickle
batchsize = 10000
data = [[i, targz_list[i:i+batchsize]] for i in range(0, len(targz_list), batchsize)]
def get_comment(item):
    result = []
    tf = tarfile.open(item[1], 'r')
    item_list = tf.getnames()[1:]
    for ti in item_list:
        f = tf.extractfile(ti).read()
        comments = f.decode('utf-8').split('\n')
        for comment in comments[-11:-1]:
            comment = eval(comment)['comment'] # string to dict
            comment = unicodedata.normalize('NFKC', comment)
            result.append(comment)
    with open('./result_{}.pkl'.format(item[0]), 'wb') as f:
        pickle.dump(result, f)

processes = max(1, multiprocessing.cpu_count() - 1)
p = multiprocessing.Pool(processes)
p.map(get_comment, data)

おわりに

データが膨大な場合は、メモリが足りなくなるのでreturnもQueueも使わずに出力してしまったほうがいいのが結論です。ニコニコデータだけでなく修論でも使っています。世の中にはもっと効率のいい方法があるとは思うけど、現在自分が知っているのはここまで。ここに辿り着くのに色々試行錯誤したので、知識のおすそわけ的な感じで書き残してみました。