Ahogrammer

Deep Dive Into NLP, ML and Cloud

KerasのLambda層でreshapeしたとき、保存に失敗する(場合がある)話

TL;DR

  • keras.backendのreshapeを使ってLambda層でreshapeしたい
  • reshapeのshapeにテンソルを指定するとモデルの保存(save)に失敗する
  • saveではなくsave_weightsを使うと保存できる

背景

まず問題が起きる状況について説明しておきたい。 簡単にまとめると以下のような状況だ。

  • ミニバッチごとにshapeの異なるデータを入力する
  • モデルの途中にreshapeが入っている
  • reshapeには入力のshapeを使用する

ミニバッチごとにshapeの異なるデータを入力するというのは、言語処理なんかではよく行われると思う。 たとえば、LSTMでの計算を効率化するために、ミニバッチ内の最大の系列長に合わせてpaddingを行い、系列長を揃える場合が挙げられる。

上記の状況を示すプログラムは以下の通り:

import keras.backend as K
from keras.layers import Input, Lambda

emb_size = 25
x = Input(batch_shape=(None, None, None, emb_size))
s = K.shape(x)
pred = Lambda(K.reshape, 
              arguments={'shape': (-1, s[-2], emb_size)})(x)
model = Model(inputs=[x], outputs=pred)
model.compile('sgd', 'categorical_crossentropy')
model.save('model.h5')

実際にはこんなモデルはありえないが、説明のためのtoy programだと了承していただきたい。

コードについて簡単に説明しておく。 まず、入力のうち3つの次元は動的に決まる。このうち一つはバッチサイズである。 この入力に対して、keras.backendのshapeを使ってshapeを取得している。ちなみに取得したshapeはTensorで表される:

>>> s = K.shape(x)
>>> s
<tf.Tensor 'Shape:0' shape=(4,) dtype=int32>

次に、取得したshapeとbackendのreshapeを使って入力を変換している。 Reshape層を使わないのは、Reshape層のshapeにTensorを指定できないからである。 Tensorを指定すると「Tensorはboolみたいには使用できないよ」という例外が出る:

>>> s = K.shape(x)
>>> Reshape(target_shape=(-1, s[-2], emb_size))(x)
Traceback (most recent call last):
...
    raise TypeError("Using a `tf.Tensor` as a Python `bool` is not allowed. "
TypeError: Using a `tf.Tensor` as a Python `bool` is not allowed. Use `if t is not None:` instead of `if t:` to test if a tensor is defined, and use TensorFlow ops such as tf.cond to execute subgraphs conditioned on the value of a tensor.

それならば、K.int_shapeで得られるTensorではないshapeを指定すればよいではないか?という話になるがそれもできない。 というのも、K.int_shapeで得られたshapeのs[-2]はNoneであり、ReshapeにはNoneを指定できないからである。 そのときに発生する例外は以下の通り:

>>> s = K.int_shape(x)
>>> Reshape(target_shape=(-1, s[-2], emb_size))(x)
Traceback (most recent call last):
...
TypeError: unorderable types: NoneType() < int()

したがって、keras.backendのreshapeを使うことになる。 ただ、Kerasの場合、レイヤーを接続してモデルを構築しないと「keras_historyがないよ」という例外が発生する。 したがって、Lambdaでreshapeをラップしている。

pred = Lambda(K.reshape, 
              arguments={'shape': (-1, s[-2], emb_size)})(x)

このtoy programではcompileはうまくいく。また学習も順調に進む。

model = Model(inputs=[x], outputs=pred)
model.compile('sgd', 'categorical_crossentropy')

問題

しかし以下のようにsaveメソッドを使ってモデルを保存すると問題が起きる。

model.save('model.h5')

確認しているだけでも以下の5つの例外がランダム(に見えるよう)に発生する:

  • TypeError: cannot serialize ‘_io.TextIOWrapper’ object
  • TypeError: object.new(PyCapsule) is not safe, use PyCapsule.new()
  • AttributeError: ‘NoneType’ object has no attribute ‘update’
  • TypeError: cannot deepcopy this pattern object
  • TypeError: can’t pickle module objects

調べていくと、これらの例外はreshape時にテンソルを指定すると、Lambdaでシリアライズができないことに起因するようだ。

解決策

save_weightsでネットワークの重みだけ保存することで解決する。

model.save_weights('model_weights.h5')

まとめ

  • keras.backendのreshapeを使ってLambda層でreshapeしたい
  • reshapeのshapeにテンソルを指定するとモデルの保存(save)に失敗する
  • saveではなくsave_weightsを使うと保存できる

Keras の fit と fit_generator の速度を比較する

Kerasでモデルを学習させるときによく使われるのが、fitメソッドfit_generatorメソッドだ。 各メソッドについて簡単に説明すると、fitは訓練用データを一括で与えると内部でbatch_size分に分割して学習してくれる。 それに対し、fit_generatorではbatch_size分のデータを生成するgeneratorを自分で作成して与える必要がある。 ミニバッチごとに入力の前処理をしたい場合なんかはfit_generatorを使うことになる。

本記事では、これらfitメソッドとfit_generatorメソッドを使って同じモデルを学習させ、学習時間を比較してみる。 なぜ比較するのかというと、「ひょっとして学習時間に差があるのでは?」と気になったからだ。 検証してはっきりさせておくことで、fitとfit_generatorの使い分けに役立てられればいいと思う。

検証は以下のように進める:

  • LSTMによる文書分類モデルを構築
  • fitとfit_generatorそれぞれで学習時間を計測

検証に用いたコードはこちら

モデルの構築

まずは検証に使うモデルを構築する。 文書分類タスクとして、IMDBデータセットを使った評価分析を行うことにする。 また、モデルとしては単純なLSTMのネットワークを構築する。 モデルの構築について順をおって説明する。

まずは必要なモジュールをインポートする。 今回は単純なモデルなので、Functional APIではなくSequentialモデルでモデルを構築していく。 インポートのコードは以下の通り:

import numpy as np
from keras.preprocessing import sequence
from keras.models import Sequential
from keras.layers import Dense, Embedding
from keras.layers import LSTM
from keras.datasets import imdb

インポートの次はハイパーパラメータを設定する。 今回は語彙数を20000語、文長は80に制限し、バッチサイズは32とした。 ハイパーパラメータの設定は以下の通り:

max_features = 20000
maxlen = 80
batch_size = 32

次に、データのロードと前処理を行う。 ロードしたデータはミニバッチ時に効率よく計算するためにpaddingして長さを揃える。 本来はfit_generatorに与えるgenerator内で前処理すべきだと思うが、今回はfitとfit_generatorの速度を比較したいため、予め前処理して条件を揃える。 コードは以下の通り:

(x_train, y_train), (x_test, y_test) = 
imdb.load_data(num_words=max_features)

x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)

いよいよモデルを構築する。先にも書いたように、今回は単純なモデルなのでSequentialモデルで構築する。 モデルのアーキテクチャは、分散表現をLSTMに入力し、LSTMの出力結果を全結合層に与えてneg/posの確率を出力するというものだ。 モデルを定義するコードは以下の通り:

model = Sequential()
model.add(Embedding(max_features, 128))
model.add(LSTM(128, dropout=0.2, recurrent_dropout=0.2))
model.add(Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])

ここまででデータの準備とモデルの構築は完了した。 次は学習を行っていく。

fitメソッドによる学習

いよいよfitメソッドを使って構築したモデルを学習させる。 コードは以下のように書ける:

model.fit(x_train, y_train,
          batch_size=batch_size,
          epochs=1,
          validation_data=(x_test, y_test))

学習結果は以下の通り:

25000/25000 [==============================] - 329s - loss: 0.4559 - acc: 0.7876 - val_loss: 0.3681 - val_acc: 0.8416

学習は329秒で完了している。

fit_generatorメソッドによる学習

次にfit_generatorメソッドで学習を行う。 fit_generatorメソッドには、データを生成するジェネレータとステップ数を与える必要がある。 そのためのジェネレータとステップ数を生成するコードは以下の通り:

def batch_iter(data, labels, batch_size, shuffle=True):
    num_batches_per_epoch = int((len(data) - 1) / batch_size) + 1

    def data_generator():
        data_size = len(data)
        while True:
            # Shuffle the data at each epoch
            if shuffle:
                shuffle_indices = np.random.permutation(np.arange(data_size))
                shuffled_data = data[shuffle_indices]
                shuffled_labels = labels[shuffle_indices]
            else:
                shuffled_data = data
                shuffled_labels = labels

            for batch_num in range(num_batches_per_epoch):
                start_index = batch_num * batch_size
                end_index = min((batch_num + 1) * batch_size, data_size)
                X = shuffled_data[start_index: end_index]
                y = shuffled_labels[start_index: end_index]
                yield X, y

    return num_batches_per_epoch, data_generator()

train_steps, train_batches = batch_iter(x_train, y_train, batch_size)
valid_steps, valid_batches = batch_iter(x_test, y_test, batch_size)

いよいよfit_generatorメソッドを使って構築したモデルを学習させる。 コードは以下のように書ける:

model.fit_generator(train_batches, train_steps,
                    epochs=1, 
                    validation_data=valid_batches,
                    validation_steps=valid_steps)

学習結果は以下の通り:

782/782 [==============================] - 307s - loss: 0.4601 - acc: 0.7791 - val_loss: 0.3605 - val_acc: 0.8416

学習には307秒かかっている。

結論

fitの学習に329秒とfit_generatorの学習に307秒かかったので学習時間はほぼ同じという結果になった。 したがって、使い分けとしては、予め前処理しておける場合はfitを使い、ミニバッチごとに前処理したい場合はfit_generatorを使えば良さそう。

形態素解析を並列化して高速化するTip

自然言語処理ではその第一歩として形態素解析が行われることが多いと思います。 しかし、形態素解析をする際に、解析対象が大量にあると実行時間が結構かかります。 本記事では、Pythonconcurrent.futures モジュールを使った高速化方法を紹介します。

ナイーブな実装

想定している状況は、あるフォルダの中に大量のテキストファイルが存在し、そのファイルを読み込んで形態素解析するというものです。 以下のプログラムは、globを使ってファイルのリストを取得し、それらを読み込んでMeCab形態素解析をしています:

import glob
import MeCab

t = MeCab.Tagger()


def tokenize_parse(text_file):
    with open(text_file) as f:
        text = f.read()
        chunks = t.parse(text).splitlines()
        for chunk in chunks:
            pass  # do something                                                                                                                                                                        


for text_file in glob.glob("data/text/*/*.txt"):
    tokenize_parse(text_file)
    print('{} was processed'.format(text_file))

プログラムの内容はよくあるデータ処理のパターンに従っています:

  1. 処理したいファイルのリストを列挙する
  2. データを処理するhelper関数を書く
  3. ループ内でhelper関数を呼ぶことでデータを1件ずつ処理していく

このプログラムを7000ほどのテキストファイルに対して実行し、実行時間を計測してみましょう:

$ time python tokenizer_1.py
data/text/dokujo-tsushin/dokujo-tsushin-4778030.txt was processed
[... about 7375 more lines of output ...]
real    0m22.641s
user    0m10.134s
sys 0m1.139s

実行結果からこのプログラムの実行に 22.6秒 かかっていることがわかります。 並列に処理することで実行時間をどれくらい削減できるでしょうか?

パラレルな実装

データを並列に処理するためのアプローチとしては以下の方法が考えられます:

  1. テキストファイルのリストを分割する
  2. 分割数分のPythonプロセスを実行する
  3. 各プロセスは1で分割したデータを処理する
  4. 各プロセスの処理結果を統合する

並列処理するためにPython組み込みのモジュールである concurrent.futures を使うことができます。まずはimportしましょう:

import concurrent.futures

モジュールをimportしたら複数のPythonを並列に走らせるための命令を書きます。そのために ProcessPool を使います:

with concurrent.futures.ProcessPoolExecutor() as executor:

あとは executor.map() を使って通常のループを置き換えてしまいます:

text_files = glob.glob("data/text/*/*.txt")
for text_file, _ in zip(text_files, executor.map(tokenize_parse, text_files)):
    print('{} was processed'.format(text_file))

executor.map()関数の引数にはhelper関数と処理したデータを渡します。 そうすることで、リストの分割や分割したリストを子プロセスへ渡す、子プロセスを実行する、実行結果を統合するといった面倒な処理をしてくれます。

以上を踏まえて、ナイーブな実装を並列化してみました:

import concurrent.futures
import glob
import MeCab

t = MeCab.Tagger()


def tokenize_parse(text_file):
    with open(text_file) as f:
        text = f.read()
        chunks = t.parse(text).splitlines()
        for chunk in chunks:
            pass  # do something                                                                                                                                                                        


with concurrent.futures.ProcessPoolExecutor() as executor:
    text_files = glob.glob("data/text/*/*.txt")
    for text_file, _ in zip(text_files, executor.map(tokenize_parse, text_files)):
        print('{} was processed'.format(text_file))

実行時間を計測してみましょう:

$ time python tokenizer_2.py
data/text/dokujo-tsushin/dokujo-tsushin-4778030.txt was processed
[... about 7375 more lines of output ...]
real    0m10.284s
user    0m19.136s
sys 0m2.209s

実行時間は 10.3秒 になりました。ナイーブな場合と比べて約2倍に高速化されています。

まとめ

Python組み込みの concurrent.futures モジュールを使うことで簡単に並列処理を実装することができました。 実行時間を削減したいという方は並列化してみてください。