Ahogrammer

Deep Dive Into NLP, ML and Cloud

SageMaker Processingでカスタムイメージを使ってデータを加工する

SageMaker Processingは、データの前処理や後処理、特徴エンジニアリング、モデルの評価といった機械学習のワークロードをSageMaker上で実行するための機能です。SageMaker Processingを使うことで、これらの処理をするためのジョブをコンテナ上で実行することが出来ます。ジョブは入力のデータをS3から受け取り、処理した結果をS3へ出力する仕組みになっています。

f:id:Hironsan:20201002114554p:plain
SageMaker Processingの処理フロー。画像はAmazon SageMaker Processing | Documentationより。

SageMaker Processingは、Scikit-learnやSparkの他に、カスタムイメージから作成したコンテナ上でデータを処理できます。今回はGPU上でTensorFlow Hubを動かすためのコンテナを作成し、テキストを加工してみます。やることとしては、Universal Sentence Encoderを使って、テキストを固定長のベクトルに変換し、S3へ書き出します。

以下の手順で進めていきます。

  • Dockerfileの作成
  • ECRへのイメージのPush
  • スクリプトの実装
  • ジョブの実行

Dockerfileの作成

まずは、コンテナを定義しましょう。今回はTensorFlow Hubを使ってGPU上でテキストの前処理を行いたいので、以下のAWS公式のDockerfileを基にDockerfileを作成します。

github.com

TensorFlow Text、TensorFlow Hubをインストールし、ENTRYPOINTを書き換えています。TensorFlowのバージョンを書き換える心配がなければ、ECR上にあるGPU用のイメージをベースイメージに指定して書くと、記述が簡潔になると思います。

RUN ${PIP} install --no-cache-dir -U \
    ...
    tensorflow-text==2.3.0 \
    tensorflow-hub==0.9.0 \
...
ENTRYPOINT ["python3"]

ECRへのPush

ECRへのPushについては、公式ドキュメントが詳しいですが、ここでは以下の手順で行います。

  1. ECRにリポジトリを作成
  2. DockerクライアントをECRにPushできるように認証
  3. イメージをビルド
  4. イメージにタグ付け
  5. イメージをECRにPush

最初にリポジトリを作ると、リポジトリへプッシュするために必要なコマンドが「View Push Command」から表示できるので(以下図)、コピペするだけで済みます。

f:id:Hironsan:20200929095647p:plain
リポジトリへのPushの手順

まずは、Dockerクライアントを認証します。regionaws_account_idを変更します。

aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {aws_account_id}.dkr.ecr.{region}.amazonaws.com

次に、イメージをビルドします。

docker build -t sagemaker-processing-tensorflow .

ビルドが完了したら、タグを付けます。aws_account_idrepository_nameを変更します。

docker tag sagemaker-processing-tensorflow:latest {aws_account_id}.dkr.ecr.region.amazonaws.com/{repository_name}

最後にイメージをPushします。

docker push {aws_account_id}.dkr.ecr.{region}.amazonaws.com/{repository_name}

以上で、ECR上にイメージを用意できました。

スクリプトの実装

続いて、コンテナ上で実行するスクリプトを実装します。今回は、TensorFlow HubのUniversal Sentence Encoderを使って、読み込んだテキストを固定長のベクトルに変換するスクリプトを実装してみました。今回は小さなテキストファイルで試すので、一括で変換しています。

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text


class USEEncoder:

  def __init__(self, model_url='https://tfhub.dev/google/universal-sentence-encoder-qa/3'):
    self.model = hub.load(model_url)

  def encode_question(self, questions):
    embeddings = self.model.signatures['question_encoder'](tf.constant(questions))
    return embeddings['outputs'].numpy()

  def encode_response(self, responses, contexts):
    assert len(responses) == len(contexts)
    embeddings = self.model.signatures['response_encoder'](
        input=tf.constant(responses),
        context=tf.constant(contexts)
    )
    return embeddings['outputs'].numpy()


# Read data locally 
lines = [line.rstrip() for line in open('/opt/ml/processing/input/dataset.txt')]

# Preprocess the data set
encoder = USEEncoder()
embeddings = encoder.encode_question(lines)

# Save data locally
with open('/opt/ml/processing/output/train/train.txt', 'w') as f:
    for line in embeddings.tolist():
        f.write('{}\n'.format(line))
print('Finished running processing job')

ジョブの実行

ここまできたら、あとはノートブックからSageMaker Processingを実行するだけです。

まずは、ScriptProcessorを用意します。ScriptProcessorにイメージのURIを渡すことで、渡したイメージから作成したコンテナ上で処理を実行できます。

import sagemaker
from sagemaker.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

role = sagemaker.get_execution_role()
script_processor = ScriptProcessor(
    image_uri='YOUR IMAGE URI',
    role=role,
    instance_count=1,
    instance_type='ml.p3.2xlarge',
    command=['python3']
)

ScriptProcessorを用意したら、runメソッドで実行します。ProcessingInputsourceには処理したいファイルが存在する場所を指定します。destinationにはsourceで指定したファイルをコンテナ上のどこにコピーするか指定します。ProcessingOutputsourceには処理済みのファイルが存在する場所を指定します。今回は指定していませんが、destinationを指定することで処理済みのファイルをコピーする先を指定できます。

script_processor.run(
    code='processing_script.py',
    inputs=[
        ProcessingInput(
            source='dataset.txt',
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output/train'
        )
    ]
)

実行が完了すると、S3上に処理したファイルが出力されます。以下のように固定長のベクトルに変換されていることを確認できます。

[0.027010727673768997, 0.04176567867398262, ...]
... 

おわりに

今回は、SageMaker ProcessingにてGPU上でTensorFlow Hubを使って前処理をしてみました。イメージさえ用意すれば、TensorFlow Hub以外のパッケージを使うこともできるので、自分の必要なパッケージをインストールしたコンテナさえ用意すれば、簡単に使えるかと思います。

参考資料

benchmark関数を使ってデータセットの処理時間の計測と改善に取り組む

TensorFlowには、tf.data.Dataset APIという入力のパイプラインを実現するための強力な機能があります。入力のパイプラインを最適化することで学習全体を高速化できるため、定量的に計測して改善する価値があります。そこで、本記事ではTensorFlow Datasetsに含まれるbenchmark関数を使ってデータセットの処理時間を測定する方法について紹介します。

ベンチマーク関数

benchmark関数は、tf.data.Datasetベンチマークを行うための関数です。Datasetを渡すことで、データセットの処理時間に関する以下の情報を出力できます。

  • 総実行時間
  • セットアップ時間(最初のバッチにかかる時間)
  • 秒あたりに処理できるデータ数(examples/sec)

ここまではドキュメントに書いてある話ですが、実際に実行すると以下のような出力をされて面食らいます。ドキュメントにはstatisticsを返すとさらっと書いてあるだけなので、ソースコードを読んでわかったことを説明しましょう。

{
  "first":{
    "avg":13.807230435075414,
    "duration":0.07242582100025174,
    "num_examples":1
  },
  "first+last":{
    "avg":4597.854424099337,
    "duration":26.09912992699992,
    "num_examples":120000
  },
  "last":{
    "avg":4610.610683215085,
    "duration":26.02670410599967,
    "num_examples":119999
  },
  "raw":{
    "end_time":4150.842121449,
    "first_batch_time":4124.815417343,
    "num_iter":120000,
    "start_time":4124.742991522
  }
}

firstは最初の1バッチに関する統計情報です。最初の1バッチはファイルオープンやネットワーク接続の時間がかかる場合があるため、分けているようです。lastは最初の1バッチ以外を表しています。したがって、first+lastで全バッチを表しています。durationはかかった時間(s)、num_examplesはデータ数、avgは1秒あたりに処理したデータ数を表しています。rawにはパフォーマンスカウンタの値が格納されており、これらの値からdurationを計算しています。

以下では、first+lastdurationを見て、全体にかかっている時間を計測してみます。

ベンチマーク

benchmark関数を使うためには、TensorFlow Datasetsをインストールする必要があります。この関数は2020年6月25日のPull Requestでマージされているので、TensorFlow Datasets v3.2.0から利用可能です。バージョン3.2.0以上のTensorFlow Datasetsをインストールしましょう。

pip install -U tensorflow-datasets

インストールしたら、必要なパッケージをインポートします。

import tensorflow as tf
import tensorflow_datasets as tfds

まずは、単にデータセットを渡してみましょう。以下のようになりました。

dataset = tfds.load('mnist', split='train', as_supervised=True)
stats = tfds.core.benchmark(
    dataset
    .repeat(2)
)
stats['first+last']['duration']
# 26.867365005000465

次に、mapに処理をさせてみます。少し遅くなることがわかります。

def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label

dataset = tfds.load('mnist', split='train', as_supervised=True)
stats = tfds.core.benchmark(
    dataset
    .map(normalize_img)
    .repeat(2)
)
stats['first+last']['duration']
# 39.00072752699998

では、mapを並列実行したらどうでしょうか?先ほどより高速になりました。

dataset = tfds.load('mnist', split='train', as_supervised=True)
stats = tfds.core.benchmark(
    dataset
    .map(
        normalize_img,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .repeat(2)
)
stats['first+last']['duration']
# 30.73008285800006

mapの結果をキャッシュしてみましょう。さらに速くなりました。

dataset = tfds.load('mnist', split='train', as_supervised=True)
stats = tfds.core.benchmark(
    dataset
    .map(
        normalize_img,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .cache()
    .repeat(2)
)
stats['first+last']['duration']
# 23.48708672599969

mapの前にbatchを入れて、mapをベクトル化します。効果的なようです。

dataset = tfds.load('mnist', split='train', as_supervised=True)
stats = tfds.core.benchmark(
    dataset
    .batch(32)
    .map(
        normalize_img,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .cache()
    .repeat(2),
    batch_size=32
)
print(stats['first+last']['duration'])
# 6.9288019289997465

以上で、benchmark関数の使い方の説明は終わりです。benchmark関数を使ってデータセットの処理速度を定量化することで、改善のための一つの指標とすることができます。TensorFlow Profilerなどと合わせて使うことで、ボトルネックを分析し、学習全体を高速化して、生産性の向上やマシンコストの低下を実現させるのに役立ちそうです。

参考資料

Wikipediaの前処理はもうやめて「Wiki-40B」を使う

最近の自然言語処理では、大規模なテキストから単語の分散表現や言語モデルを学習させて使っています。学習する際のテキストとしては、分量や利用しやすさの都合からWikipediaが選ばれることが多いですが、その前処理は意外と面倒で時間のかかる作業です。そこで、本記事では比較的最近リリースされた前処理済みのデータセットWiki-40B」とその使い方を紹介します。

Wiki-40Bとは?

Wiki-40Bは、40言語以上のWikipediaを前処理して作られたデータセットです。このデータセットは言語ごとに学習/検証/テスト用に分かれているので、単語分散表現や言語モデルの学習・評価に使えます。言語ごとの対応状況については、以下のページを参照するとよいでしょう。

前処理としては、大きくは以下の2つに分けられます。

  • ページのフィルタリング
  • ページ内の処理

ページのフィルタリングでは、不要なページそのものをフィルタリングして取り除いています。Wikipediaにはコンテンツを表すページだけでなく、以下のようなページが含まれています。これらのページは十分な量のテキストが含まれていなかったり、含まれていてもリストで細切れだったりするので、言語モデルや分散表現の学習には不向きです。そのため、最初に取り除きます。

  • 曖昧さ回避ページ
  • リダイレクトページ
  • 削除済みページ
  • 非エンティティページ(リスト、インフォボックス、画像などのページ)

ページ内の処理では、マークアップやページ内の非コンテンツ部分を除去します。マークアップの除去はみなさんやられていると思いますが、それだけでは十分ではありません。たとえば、参考文献や外部リンク、脚注などの節や、画像やキャプション、リスト、テーブルといった構造化された情報を取り除かなければ質の高いデータセットになりません。以下は非コンテンツ部分の例です。

f:id:Hironsan:20200926115716p:plain
不要な節の例: 参考文献

以下は、従来のWikipediaのデータセットとの比較です。不要な節が除去され、学習に必要な部分は残っていることを確認できます。また、従来の前処理では誤って除去されていたフレーズが残っていることも確認できます。

f:id:Hironsan:20200926121252p:plain
従来のデータセットとの比較。画像は「Wiki-40B: Multilingual Language Model Dataset」より。

Wiki-40Bの使い方

Wiki-40Bは、TensorFlow Datasetsを利用することで、簡単に使えます。まずは、TensorFlow Datasetsをインストールしましょう。ここで注意なのですが、インストールするTensorFlow Datasetsのバージョンはv3.2.0以上にします。Wiki-40B自体はv3.0.0以降で利用可能ですが、v3.2.0からデータセット名が変更されているので、v3.2.0以降を推奨します。

pip install -U tensorflow-datasets

次に、データセットを読み込みます。データセット名はwiki40b/{lang}の形式になっており、{lang}の部分に言語を指定します。今回は日本語を読み込むので、wiki40b/jaを指定します。また、データセットは学習/検証/テスト用に分かれており、それぞれ全記事の90/5/5%を占めています。今回はsplit='test'と指定して、テスト用データセットを読み込むことにしましょう。

import tensorflow_datasets as tfds

ds = tfds.load('wiki40b/ja', split='test')

データセットのレコードは以下のような形式の辞書です。textには前処理済みのテキスト、wikidata_idは対応するWikidataのIDです。version_idが何のバージョンを表すかはよくわかりませんでした。わかる方がいたらコメントで教えて頂けると助かります。

FeaturesDict({
    'text': Text(shape=(), dtype=tf.string),
    'version_id': Text(shape=(), dtype=tf.string),
    'wikidata_id': Text(shape=(), dtype=tf.string),
})

実際のデータは以下のようになっています。

>>> list(ds.as_numpy_iterator())[0]
{'text': b'\n_START_ARTICLE_\n\xe3\x83\x93\xe3\x83\xbc\xe3\x83\x88\xe3\x81'
         b'\x9f\xe3\x81\x91\xe3\x81\x97\xe3\x81\xae\xe6\x95\x99\xe7\xa7\x91'
         b'\xe6\x9b\xb8\xe3\x81\xab\xe8\xbc\x89\xe3\x82\x89\xe3\x81\xaa\xe3'
         b'\x81\x84\xe6\x97\xa5\xe6\x9c\xac\xe4\xba\xba\xe3\x81\xae\xe8\xac'
         b'\x8e\n_START_SECTION_\n\xe6\xa6\x82\xe8\xa6\x81\n_START_PARAGRAP'
         b'H_\n\xe3\x80\x8c\xe6\x95\x99\xe7\xa7\x91\xe6\x9b\xb8\xe3'
         b'\x81\xab\xe3\x81\xaf\xe6\xb1\xba\xe3\x81\x97\xe3\x81\xa6\xe8\xbc'
         b'\x89\xe3\x82\x89\xe3\x81\xaa\xe3\x81\x84\xe3\x80\x8d\xe6\x97\xa5'
         b'\xe6\x9c\xac\xe4\xba\xba\xe3\x81\xae\xe8\xac\x8e\xe3\x82\x84\xe3'
         b'\x81\x97\xe3\x81\x8d\xe3\x81\x9f\xe3\x82\x8a\xe3\x82\x92\xe5\xa4'
         b'\x9a\xe8\xa7\x92\xe7\x9a\x84\xe3\x81\xab\xe6\xa4\x9c\xe8\xa8\xbc'
         b'\xe3\x81\x97\xe3\x80\x81\xe6\x97\xa5\xe6\x9c\xac\xe4\xba\xba\xe3'
         b'\x81\xaeDNA\xe3\x82\x92\xe8\xa7\xa3\xe6\x98\x8e\xe3\x81'
         b'\x99\xe3\x82\x8b\xe3\x80\x82_NEWLINE_\xe6\x96\xb0\xe6'
         b'\x98\xa5\xe7\x95\xaa\xe7\xb5\x84\xe3\x81\xa8\xe3\x81\x97\xe3\x81'
         b'\xa6\xe5\xae\x9a\xe6\x9c\x9f\xe7\x9a\x84\xe3\x81\xab\xe6\x94\xbe'
         b'\xe9\x80\x81\xe3\x81\x95\xe3\x82\x8c\xe3\x81\xa6\xe3\x81\x8a\xe3'
         b'\x82\x8a\xe3\x80\x81\xe5\xb9\xb4\xe6\x9c\xab\xe3\x81\xae\xe5\x8d'
         b'\x88\xe5\x89\x8d\xe4\xb8\xad\xe3\x81\xab\xe5\x86\x8d\xe6\x94\xbe'
         b'\xe9\x80\x81\xe3\x81\x95\xe3\x82\x8c\xe3\x82\x8b\xe3\x81\xae\xe3'
         b'\x81\x8c\xe6\x81\x92\xe4\xbe\x8b\xe3\x81\xa8\xe3\x81\xaa\xe3\x81'
         b'\xa3\xe3\x81\xa6\xe3\x81\x84\xe3\x82\x8b\xe3\x80\x82',
 'version_id': b'1848243370795951995',
 'wikidata_id': b'Q11331136'}

以下はテキストをデコードした結果です。なんらかの構造があることがわかります。

_START_ARTICLE_
ビートたけしの教科書に載らない日本人の謎
_START_SECTION_
概要
_START_PARAGRAPH_
「教科書には決して載らない」日本人の謎やしきたりを多角的に検証し、日本人のDNAを解明する。_NEWLINE_新春番組として定期的に放送されており、年末の午前中に再放送されるのが恒例となっている。

文書の構造は以下の4つのマークアップで表されます。

  • _START_ARTICLE_
  • _START_SECTION_
  • _START_PARAGRAPH_
  • _NEWLINE_

_START_ARTICLE_は記事の始まりを表しており、その後にページタイトルが続きます。_START_SECTION_は節の始まりを表しており、その後には節のタイトルが続きます。_START_PARAGRAPH_は節のタイトルと節内のパラグラフを分割する役割があります。_NEWLINE_は1パラグラフの終わりを表しています。

以上で、Wiki-40Bの使い方の説明は終わりです。今回はTensorFlow Datasetsからの使い方を説明しましたが、Huggingface Datasetsを使って読み込むこともできます。いずれにせよ、最新版のWikipediaを使いたいとか、前処理で落とした情報の中に使いたい情報があるという状況でないなら、Wiki-40Bを使うことで生産性を向上させられるでしょう。

huggingface.co

参考資料