はじめに
こんにちは。さかぱ(@zacapa_23)です。
12月になると修論でとても忙しい。しかも2ヶ月前の自分が「12月でも余裕あるでしょ」と思っていたせいでAdvent Calenderに参加してしまいました...。本記事は18日目の記事です。
qiita.com
せっかくの機会なので、pythonで機械学習やデータ分析などをするときに便利な並列処理(multiprocessing)について知識をおすそわけできればいいなと思います。
まえおき
pythonのmultiprocessingについてググると、だいたいPoolかProcessの記事がほとんど。本記事は主にPoolに関する知識です(というかPoolしか使ったことがない...)。
と言っても、調べれば出てくるような数値計算の並列方法(よくあるのは二乗和)ではなく、実践的なテーマを用意しました。
実行環境
- Ubuntu 14.04.5 LTS
- Intel(R) Xeon(R) CPU E5-1620 v3 @ 3.50GHz (8スレッド)
- Python 3.5.2 :: Anaconda custom (64-bit)
- Jupyter 4.2.0
ニコニコ動画コメントの並列処理
10月の記事での前処理で並列処理をしていたので、題材はニコニコ動画コメントを使用(データの取得方法は以下の記事を参考に)。
www.monthly-hack.com
今回のゴールは、tar.gzファイルから各動画のラスト10コメにNFCK処理をして取得することにします。
共通部分
以下、それぞれの方法に共通するコードです。
import tarfile import glob import multiprocessing import unicodedata folder_path = '/path/to/data/thread' targz_list = glob.glob('{}/*.tar.gz'.format(folder_path))
方法1(脳筋)
for文を使って1つずつコメントを抽出してresultにappendします。並列処理をしないシンプルなコードです。とても時間がかかる。
result = [] for targz in targz_list: tf = tarfile.open(targz, 'r') item_list = tf.getnames()[1:] for ti in item_list: f = tf.extractfile(ti).read() comments = f.decode('utf-8').split('\n') for comment in comments[-11:-1]: comment = eval(comment)['comment'] comment = unicodedata.normalize('NFKC', comment) result.append(comment)
方法2(落とし穴)
multiprocessingでresultにappendする。しかしこれはできません。multiprocessingではグローバル変数の参照は可能ですが、追加や削除の操作はできないようです。これが最大の問題で、並列処理の結果をどう出力(保存)するかを考えなきゃいけません。
result = [] def get_comment(targz): tf = tarfile.open(targz, 'r') item_list = tf.getnames()[1:] for ti in item_list: f = tf.extractfile(ti).read() comments = f.decode('utf-8').split('\n') for comment in comments[-11:-1]: comment = eval(comment)['comment'] comment = unicodedata.normalize('NFKC', comment) result.append(comment) processes = max(1, multiprocessing.cpu_count() - 1) p = multiprocessing.Pool(processes) p.map(get_comment, targz_list)
方法3(逃げ道のようで落とし穴)
Queueを使って結果を出力する。result.get()で取り出せるのですが、1つずつ取り出さなきゃいけないのが面倒です。それとコメントの順番は考慮できません。
result = multiprocessing.Queue() def get_comment(targz): tf = tarfile.open(targz, 'r') item_list = tf.getnames()[1:] for ti in item_list: f = tf.extractfile(ti).read() comments = f.decode('utf-8').split('\n') for comment in comments[-11:-1]: comment = eval(comment)['comment'] comment = unicodedata.normalize('NFKC', comment) result.put(comment) processes = max(1, multiprocessing.cpu_count() - 1) p = multiprocessing.Pool(processes) p.map(get_comment, targz_list)
方法4(小規模データ向け)
関数のreturnを使う。個人的にはQueueよりも扱いやすいかもしれません。result_allは各プロセスのreturn値のlistになっています。
def get_comment(targz): result = [] tf = tarfile.open(targz, 'r') item_list = tf.getnames()[1:] for ti in item_list: f = tf.extractfile(ti).read() comments = f.decode('utf-8').split('\n') for comment in comments[-11:-1]: comment = eval(comment)['comment'] # string to dict comment = unicodedata.normalize('NFKC', comment) result.append(comment) return result processes = max(1, multiprocessing.cpu_count() - 1) p = multiprocessing.Pool(processes) result_all = p.map(get_comment, targz_list)
方法5(大規模データ向け)
対象データをミニバッチにして処理と保存をします。大量データセットで並列処理する場合には、この方法が一番おすすめです。まず実行途中で何かしらのエラーが起きても途中まで保存されているので安心です。
import pickle batchsize = 10000 data = [[i, targz_list[i:i+batchsize]] for i in range(0, len(targz_list), batchsize)] def get_comment(item): result = [] tf = tarfile.open(item[1], 'r') item_list = tf.getnames()[1:] for ti in item_list: f = tf.extractfile(ti).read() comments = f.decode('utf-8').split('\n') for comment in comments[-11:-1]: comment = eval(comment)['comment'] # string to dict comment = unicodedata.normalize('NFKC', comment) result.append(comment) with open('./result_{}.pkl'.format(item[0]), 'wb') as f: pickle.dump(result, f) processes = max(1, multiprocessing.cpu_count() - 1) p = multiprocessing.Pool(processes) p.map(get_comment, data)
おわりに
データが膨大な場合は、メモリが足りなくなるのでreturnもQueueも使わずに出力してしまったほうがいいのが結論です。ニコニコデータだけでなく修論でも使っています。世の中にはもっと効率のいい方法があるとは思うけど、現在自分が知っているのはここまで。ここに辿り着くのに色々試行錯誤したので、知識のおすそわけ的な感じで書き残してみました。