Ahogrammer

Deep Dive Into NLP, ML and Cloud

fastTextの学習済みモデルのリンク

5年近く前から、fastTextの学習済みモデルをGoogle Driveで公開しているのですが、どうやらGoogle Driveのセキュリティアップデートにより、ダウンロードできなくなっていたようです。

結果として何が起きたかと言うと、私のGmailにアクセス権限を求めるメッセージが毎日来るようになりました。新しいリンクへ変更したので、そちらからダウンロードしてください。ファイル名は「vector.zip」と「vector_neologd.zip」です。

Facebookもモデルを公開しているので、そちらを使うのも検討してみると良いと思います。

SageMaker Processingでカスタムイメージを使ってデータを加工する

SageMaker Processingは、データの前処理や後処理、特徴エンジニアリング、モデルの評価といった機械学習のワークロードをSageMaker上で実行するための機能です。SageMaker Processingを使うことで、これらの処理をするためのジョブをコンテナ上で実行することが出来ます。ジョブは入力のデータをS3から受け取り、処理した結果をS3へ出力する仕組みになっています。

f:id:Hironsan:20201002114554p:plain
SageMaker Processingの処理フロー。画像はAmazon SageMaker Processing | Documentationより。

SageMaker Processingは、Scikit-learnやSparkの他に、カスタムイメージから作成したコンテナ上でデータを処理できます。今回はGPU上でTensorFlow Hubを動かすためのコンテナを作成し、テキストを加工してみます。やることとしては、Universal Sentence Encoderを使って、テキストを固定長のベクトルに変換し、S3へ書き出します。

以下の手順で進めていきます。

  • Dockerfileの作成
  • ECRへのイメージのPush
  • スクリプトの実装
  • ジョブの実行

Dockerfileの作成

まずは、コンテナを定義しましょう。今回はTensorFlow Hubを使ってGPU上でテキストの前処理を行いたいので、以下のAWS公式のDockerfileを基にDockerfileを作成します。

github.com

TensorFlow Text、TensorFlow Hubをインストールし、ENTRYPOINTを書き換えています。TensorFlowのバージョンを書き換える心配がなければ、ECR上にあるGPU用のイメージをベースイメージに指定して書くと、記述が簡潔になると思います。

RUN ${PIP} install --no-cache-dir -U \
    ...
    tensorflow-text==2.3.0 \
    tensorflow-hub==0.9.0 \
...
ENTRYPOINT ["python3"]

ECRへのPush

ECRへのPushについては、公式ドキュメントが詳しいですが、ここでは以下の手順で行います。

  1. ECRにリポジトリを作成
  2. DockerクライアントをECRにPushできるように認証
  3. イメージをビルド
  4. イメージにタグ付け
  5. イメージをECRにPush

最初にリポジトリを作ると、リポジトリへプッシュするために必要なコマンドが「View Push Command」から表示できるので(以下図)、コピペするだけで済みます。

f:id:Hironsan:20200929095647p:plain
リポジトリへのPushの手順

まずは、Dockerクライアントを認証します。regionaws_account_idを変更します。

aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {aws_account_id}.dkr.ecr.{region}.amazonaws.com

次に、イメージをビルドします。

docker build -t sagemaker-processing-tensorflow .

ビルドが完了したら、タグを付けます。aws_account_idrepository_nameを変更します。

docker tag sagemaker-processing-tensorflow:latest {aws_account_id}.dkr.ecr.region.amazonaws.com/{repository_name}

最後にイメージをPushします。

docker push {aws_account_id}.dkr.ecr.{region}.amazonaws.com/{repository_name}

以上で、ECR上にイメージを用意できました。

スクリプトの実装

続いて、コンテナ上で実行するスクリプトを実装します。今回は、TensorFlow HubのUniversal Sentence Encoderを使って、読み込んだテキストを固定長のベクトルに変換するスクリプトを実装してみました。今回は小さなテキストファイルで試すので、一括で変換しています。

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text


class USEEncoder:

  def __init__(self, model_url='https://tfhub.dev/google/universal-sentence-encoder-qa/3'):
    self.model = hub.load(model_url)

  def encode_question(self, questions):
    embeddings = self.model.signatures['question_encoder'](tf.constant(questions))
    return embeddings['outputs'].numpy()

  def encode_response(self, responses, contexts):
    assert len(responses) == len(contexts)
    embeddings = self.model.signatures['response_encoder'](
        input=tf.constant(responses),
        context=tf.constant(contexts)
    )
    return embeddings['outputs'].numpy()


# Read data locally 
lines = [line.rstrip() for line in open('/opt/ml/processing/input/dataset.txt')]

# Preprocess the data set
encoder = USEEncoder()
embeddings = encoder.encode_question(lines)

# Save data locally
with open('/opt/ml/processing/output/train/train.txt', 'w') as f:
    for line in embeddings.tolist():
        f.write('{}\n'.format(line))
print('Finished running processing job')

ジョブの実行

ここまできたら、あとはノートブックからSageMaker Processingを実行するだけです。

まずは、ScriptProcessorを用意します。ScriptProcessorにイメージのURIを渡すことで、渡したイメージから作成したコンテナ上で処理を実行できます。

import sagemaker
from sagemaker.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

role = sagemaker.get_execution_role()
script_processor = ScriptProcessor(
    image_uri='YOUR IMAGE URI',
    role=role,
    instance_count=1,
    instance_type='ml.p3.2xlarge',
    command=['python3']
)

ScriptProcessorを用意したら、runメソッドで実行します。ProcessingInputsourceには処理したいファイルが存在する場所を指定します。destinationにはsourceで指定したファイルをコンテナ上のどこにコピーするか指定します。ProcessingOutputsourceには処理済みのファイルが存在する場所を指定します。今回は指定していませんが、destinationを指定することで処理済みのファイルをコピーする先を指定できます。

script_processor.run(
    code='processing_script.py',
    inputs=[
        ProcessingInput(
            source='dataset.txt',
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output/train'
        )
    ]
)

実行が完了すると、S3上に処理したファイルが出力されます。以下のように固定長のベクトルに変換されていることを確認できます。

[0.027010727673768997, 0.04176567867398262, ...]
... 

おわりに

今回は、SageMaker ProcessingにてGPU上でTensorFlow Hubを使って前処理をしてみました。イメージさえ用意すれば、TensorFlow Hub以外のパッケージを使うこともできるので、自分の必要なパッケージをインストールしたコンテナさえ用意すれば、簡単に使えるかと思います。

参考資料

benchmark関数を使ってデータセットの処理時間の計測と改善に取り組む

TensorFlowには、tf.data.Dataset APIという入力のパイプラインを実現するための強力な機能があります。入力のパイプラインを最適化することで学習全体を高速化できるため、定量的に計測して改善する価値があります。そこで、本記事ではTensorFlow Datasetsに含まれるbenchmark関数を使ってデータセットの処理時間を測定する方法について紹介します。

ベンチマーク関数

benchmark関数は、tf.data.Datasetベンチマークを行うための関数です。Datasetを渡すことで、データセットの処理時間に関する以下の情報を出力できます。

  • 総実行時間
  • セットアップ時間(最初のバッチにかかる時間)
  • 秒あたりに処理できるデータ数(examples/sec)

ここまではドキュメントに書いてある話ですが、実際に実行すると以下のような出力をされて面食らいます。ドキュメントにはstatisticsを返すとさらっと書いてあるだけなので、ソースコードを読んでわかったことを説明しましょう。

{
  "first":{
    "avg":13.807230435075414,
    "duration":0.07242582100025174,
    "num_examples":1
  },
  "first+last":{
    "avg":4597.854424099337,
    "duration":26.09912992699992,
    "num_examples":120000
  },
  "last":{
    "avg":4610.610683215085,
    "duration":26.02670410599967,
    "num_examples":119999
  },
  "raw":{
    "end_time":4150.842121449,
    "first_batch_time":4124.815417343,
    "num_iter":120000,
    "start_time":4124.742991522
  }
}

firstは最初の1バッチに関する統計情報です。最初の1バッチはファイルオープンやネットワーク接続の時間がかかる場合があるため、分けているようです。lastは最初の1バッチ以外を表しています。したがって、first+lastで全バッチを表しています。durationはかかった時間(s)、num_examplesはデータ数、avgは1秒あたりに処理したデータ数を表しています。rawにはパフォーマンスカウンタの値が格納されており、これらの値からdurationを計算しています。

以下では、first+lastdurationを見て、全体にかかっている時間を計測してみます。

ベンチマーク

benchmark関数を使うためには、TensorFlow Datasetsをインストールする必要があります。この関数は2020年6月25日のPull Requestでマージされているので、TensorFlow Datasets v3.2.0から利用可能です。バージョン3.2.0以上のTensorFlow Datasetsをインストールしましょう。

pip install -U tensorflow-datasets

インストールしたら、必要なパッケージをインポートします。

import tensorflow as tf
import tensorflow_datasets as tfds

まずは、単にデータセットを渡してみましょう。以下のようになりました。

dataset = tfds.load('mnist', split='train', as_supervised=True)
stats = tfds.core.benchmark(
    dataset
    .repeat(2)
)
stats['first+last']['duration']
# 26.867365005000465

次に、mapに処理をさせてみます。少し遅くなることがわかります。

def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label

dataset = tfds.load('mnist', split='train', as_supervised=True)
stats = tfds.core.benchmark(
    dataset
    .map(normalize_img)
    .repeat(2)
)
stats['first+last']['duration']
# 39.00072752699998

では、mapを並列実行したらどうでしょうか?先ほどより高速になりました。

dataset = tfds.load('mnist', split='train', as_supervised=True)
stats = tfds.core.benchmark(
    dataset
    .map(
        normalize_img,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .repeat(2)
)
stats['first+last']['duration']
# 30.73008285800006

mapの結果をキャッシュしてみましょう。さらに速くなりました。

dataset = tfds.load('mnist', split='train', as_supervised=True)
stats = tfds.core.benchmark(
    dataset
    .map(
        normalize_img,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .cache()
    .repeat(2)
)
stats['first+last']['duration']
# 23.48708672599969

mapの前にbatchを入れて、mapをベクトル化します。効果的なようです。

dataset = tfds.load('mnist', split='train', as_supervised=True)
stats = tfds.core.benchmark(
    dataset
    .batch(32)
    .map(
        normalize_img,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .cache()
    .repeat(2),
    batch_size=32
)
print(stats['first+last']['duration'])
# 6.9288019289997465

以上で、benchmark関数の使い方の説明は終わりです。benchmark関数を使ってデータセットの処理速度を定量化することで、改善のための一つの指標とすることができます。TensorFlow Profilerなどと合わせて使うことで、ボトルネックを分析し、学習全体を高速化して、生産性の向上やマシンコストの低下を実現させるのに役立ちそうです。

参考資料