SageMaker Processingでカスタムイメージを使ってデータを加工する
SageMaker Processingは、データの前処理や後処理、特徴エンジニアリング、モデルの評価といった機械学習のワークロードをSageMaker上で実行するための機能です。SageMaker Processingを使うことで、これらの処理をするためのジョブをコンテナ上で実行することが出来ます。ジョブは入力のデータをS3から受け取り、処理した結果をS3へ出力する仕組みになっています。
SageMaker Processingは、Scikit-learnやSparkの他に、カスタムイメージから作成したコンテナ上でデータを処理できます。今回はGPU上でTensorFlow Hubを動かすためのコンテナを作成し、テキストを加工してみます。やることとしては、Universal Sentence Encoderを使って、テキストを固定長のベクトルに変換し、S3へ書き出します。
以下の手順で進めていきます。
- Dockerfileの作成
- ECRへのイメージのPush
- スクリプトの実装
- ジョブの実行
Dockerfileの作成
まずは、コンテナを定義しましょう。今回はTensorFlow Hubを使ってGPU上でテキストの前処理を行いたいので、以下のAWS公式のDockerfileを基にDockerfileを作成します。
TensorFlow Text、TensorFlow Hubをインストールし、ENTRYPOINTを書き換えています。TensorFlowのバージョンを書き換える心配がなければ、ECR上にあるGPU用のイメージをベースイメージに指定して書くと、記述が簡潔になると思います。
RUN ${PIP} install --no-cache-dir -U \ ... tensorflow-text==2.3.0 \ tensorflow-hub==0.9.0 \ ... ENTRYPOINT ["python3"]
ECRへのPush
ECRへのPushについては、公式ドキュメントが詳しいですが、ここでは以下の手順で行います。
- ECRにリポジトリを作成
- DockerクライアントをECRにPushできるように認証
- イメージをビルド
- イメージにタグ付け
- イメージをECRにPush
最初にリポジトリを作ると、リポジトリへプッシュするために必要なコマンドが「View Push Command」から表示できるので(以下図)、コピペするだけで済みます。
まずは、Dockerクライアントを認証します。region
とaws_account_id
を変更します。
aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {aws_account_id}.dkr.ecr.{region}.amazonaws.com
次に、イメージをビルドします。
docker build -t sagemaker-processing-tensorflow .
ビルドが完了したら、タグを付けます。aws_account_id
とrepository_name
を変更します。
docker tag sagemaker-processing-tensorflow:latest {aws_account_id}.dkr.ecr.region.amazonaws.com/{repository_name}
最後にイメージをPushします。
docker push {aws_account_id}.dkr.ecr.{region}.amazonaws.com/{repository_name}
以上で、ECR上にイメージを用意できました。
スクリプトの実装
続いて、コンテナ上で実行するスクリプトを実装します。今回は、TensorFlow HubのUniversal Sentence Encoderを使って、読み込んだテキストを固定長のベクトルに変換するスクリプトを実装してみました。今回は小さなテキストファイルで試すので、一括で変換しています。
import tensorflow as tf import tensorflow_hub as hub import tensorflow_text class USEEncoder: def __init__(self, model_url='https://tfhub.dev/google/universal-sentence-encoder-qa/3'): self.model = hub.load(model_url) def encode_question(self, questions): embeddings = self.model.signatures['question_encoder'](tf.constant(questions)) return embeddings['outputs'].numpy() def encode_response(self, responses, contexts): assert len(responses) == len(contexts) embeddings = self.model.signatures['response_encoder']( input=tf.constant(responses), context=tf.constant(contexts) ) return embeddings['outputs'].numpy() # Read data locally lines = [line.rstrip() for line in open('/opt/ml/processing/input/dataset.txt')] # Preprocess the data set encoder = USEEncoder() embeddings = encoder.encode_question(lines) # Save data locally with open('/opt/ml/processing/output/train/train.txt', 'w') as f: for line in embeddings.tolist(): f.write('{}\n'.format(line)) print('Finished running processing job')
ジョブの実行
ここまできたら、あとはノートブックからSageMaker Processingを実行するだけです。
まずは、ScriptProcessor
を用意します。ScriptProcessor
にイメージのURIを渡すことで、渡したイメージから作成したコンテナ上で処理を実行できます。
import sagemaker from sagemaker.processing import ScriptProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput role = sagemaker.get_execution_role() script_processor = ScriptProcessor( image_uri='YOUR IMAGE URI', role=role, instance_count=1, instance_type='ml.p3.2xlarge', command=['python3'] )
ScriptProcessor
を用意したら、run
メソッドで実行します。ProcessingInput
のsource
には処理したいファイルが存在する場所を指定します。destination
にはsource
で指定したファイルをコンテナ上のどこにコピーするか指定します。ProcessingOutput
のsource
には処理済みのファイルが存在する場所を指定します。今回は指定していませんが、destination
を指定することで処理済みのファイルをコピーする先を指定できます。
script_processor.run( code='processing_script.py', inputs=[ ProcessingInput( source='dataset.txt', destination='/opt/ml/processing/input' ) ], outputs=[ ProcessingOutput( source='/opt/ml/processing/output/train' ) ] )
実行が完了すると、S3上に処理したファイルが出力されます。以下のように固定長のベクトルに変換されていることを確認できます。
[0.027010727673768997, 0.04176567867398262, ...] ...
おわりに
今回は、SageMaker ProcessingにてGPU上でTensorFlow Hubを使って前処理をしてみました。イメージさえ用意すれば、TensorFlow Hub以外のパッケージを使うこともできるので、自分の必要なパッケージをインストールしたコンテナさえ用意すれば、簡単に使えるかと思います。
参考資料
benchmark関数を使ってデータセットの処理時間の計測と改善に取り組む
TensorFlowには、tf.data.Dataset
APIという入力のパイプラインを実現するための強力な機能があります。入力のパイプラインを最適化することで学習全体を高速化できるため、定量的に計測して改善する価値があります。そこで、本記事ではTensorFlow Datasetsに含まれるbenchmark
関数を使ってデータセットの処理時間を測定する方法について紹介します。
ベンチマーク関数
benchmark関数は、tf.data.Datasetのベンチマークを行うための関数です。Dataset
を渡すことで、データセットの処理時間に関する以下の情報を出力できます。
- 総実行時間
- セットアップ時間(最初のバッチにかかる時間)
- 秒あたりに処理できるデータ数(examples/sec)
ここまではドキュメントに書いてある話ですが、実際に実行すると以下のような出力をされて面食らいます。ドキュメントにはstatistics
を返すとさらっと書いてあるだけなので、ソースコードを読んでわかったことを説明しましょう。
{ "first":{ "avg":13.807230435075414, "duration":0.07242582100025174, "num_examples":1 }, "first+last":{ "avg":4597.854424099337, "duration":26.09912992699992, "num_examples":120000 }, "last":{ "avg":4610.610683215085, "duration":26.02670410599967, "num_examples":119999 }, "raw":{ "end_time":4150.842121449, "first_batch_time":4124.815417343, "num_iter":120000, "start_time":4124.742991522 } }
first
は最初の1バッチに関する統計情報です。最初の1バッチはファイルオープンやネットワーク接続の時間がかかる場合があるため、分けているようです。last
は最初の1バッチ以外を表しています。したがって、first+last
で全バッチを表しています。duration
はかかった時間(s)、num_examples
はデータ数、avg
は1秒あたりに処理したデータ数を表しています。raw
にはパフォーマンスカウンタの値が格納されており、これらの値からduration
を計算しています。
以下では、first+last
のduration
を見て、全体にかかっている時間を計測してみます。
ベンチマーク
benchmark
関数を使うためには、TensorFlow Datasetsをインストールする必要があります。この関数は2020年6月25日のPull Requestでマージされているので、TensorFlow Datasets v3.2.0から利用可能です。バージョン3.2.0以上のTensorFlow Datasetsをインストールしましょう。
pip install -U tensorflow-datasets
インストールしたら、必要なパッケージをインポートします。
import tensorflow as tf import tensorflow_datasets as tfds
まずは、単にデータセットを渡してみましょう。以下のようになりました。
dataset = tfds.load('mnist', split='train', as_supervised=True) stats = tfds.core.benchmark( dataset .repeat(2) ) stats['first+last']['duration'] # 26.867365005000465
次に、map
に処理をさせてみます。少し遅くなることがわかります。
def normalize_img(image, label): """Normalizes images: `uint8` -> `float32`.""" return tf.cast(image, tf.float32) / 255., label dataset = tfds.load('mnist', split='train', as_supervised=True) stats = tfds.core.benchmark( dataset .map(normalize_img) .repeat(2) ) stats['first+last']['duration'] # 39.00072752699998
では、map
を並列実行したらどうでしょうか?先ほどより高速になりました。
dataset = tfds.load('mnist', split='train', as_supervised=True) stats = tfds.core.benchmark( dataset .map( normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE ) .repeat(2) ) stats['first+last']['duration'] # 30.73008285800006
map
の結果をキャッシュしてみましょう。さらに速くなりました。
dataset = tfds.load('mnist', split='train', as_supervised=True) stats = tfds.core.benchmark( dataset .map( normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE ) .cache() .repeat(2) ) stats['first+last']['duration'] # 23.48708672599969
map
の前にbatch
を入れて、map
をベクトル化します。効果的なようです。
dataset = tfds.load('mnist', split='train', as_supervised=True) stats = tfds.core.benchmark( dataset .batch(32) .map( normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE ) .cache() .repeat(2), batch_size=32 ) print(stats['first+last']['duration']) # 6.9288019289997465
以上で、benchmark
関数の使い方の説明は終わりです。benchmark
関数を使ってデータセットの処理速度を定量化することで、改善のための一つの指標とすることができます。TensorFlow Profilerなどと合わせて使うことで、ボトルネックを分析し、学習全体を高速化して、生産性の向上やマシンコストの低下を実現させるのに役立ちそうです。
参考資料
Wikipediaの前処理はもうやめて「Wiki-40B」を使う
最近の自然言語処理では、大規模なテキストから単語の分散表現や言語モデルを学習させて使っています。学習する際のテキストとしては、分量や利用しやすさの都合からWikipediaが選ばれることが多いですが、その前処理は意外と面倒で時間のかかる作業です。そこで、本記事では比較的最近リリースされた前処理済みのデータセット「Wiki-40B」とその使い方を紹介します。
Wiki-40Bとは?
Wiki-40Bは、40言語以上のWikipediaを前処理して作られたデータセットです。このデータセットは言語ごとに学習/検証/テスト用に分かれているので、単語分散表現や言語モデルの学習・評価に使えます。言語ごとの対応状況については、以下のページを参照するとよいでしょう。
前処理としては、大きくは以下の2つに分けられます。
- ページのフィルタリング
- ページ内の処理
ページのフィルタリングでは、不要なページそのものをフィルタリングして取り除いています。Wikipediaにはコンテンツを表すページだけでなく、以下のようなページが含まれています。これらのページは十分な量のテキストが含まれていなかったり、含まれていてもリストで細切れだったりするので、言語モデルや分散表現の学習には不向きです。そのため、最初に取り除きます。
- 曖昧さ回避ページ
- リダイレクトページ
- 削除済みページ
- 非エンティティページ(リスト、インフォボックス、画像などのページ)
ページ内の処理では、マークアップやページ内の非コンテンツ部分を除去します。マークアップの除去はみなさんやられていると思いますが、それだけでは十分ではありません。たとえば、参考文献や外部リンク、脚注などの節や、画像やキャプション、リスト、テーブルといった構造化された情報を取り除かなければ質の高いデータセットになりません。以下は非コンテンツ部分の例です。
以下は、従来のWikipediaのデータセットとの比較です。不要な節が除去され、学習に必要な部分は残っていることを確認できます。また、従来の前処理では誤って除去されていたフレーズが残っていることも確認できます。
Wiki-40Bの使い方
Wiki-40Bは、TensorFlow Datasetsを利用することで、簡単に使えます。まずは、TensorFlow Datasetsをインストールしましょう。ここで注意なのですが、インストールするTensorFlow Datasetsのバージョンはv3.2.0
以上にします。Wiki-40B自体はv3.0.0
以降で利用可能ですが、v3.2.0
からデータセット名が変更されているので、v3.2.0
以降を推奨します。
pip install -U tensorflow-datasets
次に、データセットを読み込みます。データセット名はwiki40b/{lang}
の形式になっており、{lang}
の部分に言語を指定します。今回は日本語を読み込むので、wiki40b/ja
を指定します。また、データセットは学習/検証/テスト用に分かれており、それぞれ全記事の90/5/5%を占めています。今回はsplit='test'
と指定して、テスト用データセットを読み込むことにしましょう。
import tensorflow_datasets as tfds ds = tfds.load('wiki40b/ja', split='test')
データセットのレコードは以下のような形式の辞書です。text
には前処理済みのテキスト、wikidata_id
は対応するWikidataのIDです。version_id
が何のバージョンを表すかはよくわかりませんでした。わかる方がいたらコメントで教えて頂けると助かります。
FeaturesDict({ 'text': Text(shape=(), dtype=tf.string), 'version_id': Text(shape=(), dtype=tf.string), 'wikidata_id': Text(shape=(), dtype=tf.string), })
実際のデータは以下のようになっています。
>>> list(ds.as_numpy_iterator())[0] {'text': b'\n_START_ARTICLE_\n\xe3\x83\x93\xe3\x83\xbc\xe3\x83\x88\xe3\x81' b'\x9f\xe3\x81\x91\xe3\x81\x97\xe3\x81\xae\xe6\x95\x99\xe7\xa7\x91' b'\xe6\x9b\xb8\xe3\x81\xab\xe8\xbc\x89\xe3\x82\x89\xe3\x81\xaa\xe3' b'\x81\x84\xe6\x97\xa5\xe6\x9c\xac\xe4\xba\xba\xe3\x81\xae\xe8\xac' b'\x8e\n_START_SECTION_\n\xe6\xa6\x82\xe8\xa6\x81\n_START_PARAGRAP' b'H_\n\xe3\x80\x8c\xe6\x95\x99\xe7\xa7\x91\xe6\x9b\xb8\xe3' b'\x81\xab\xe3\x81\xaf\xe6\xb1\xba\xe3\x81\x97\xe3\x81\xa6\xe8\xbc' b'\x89\xe3\x82\x89\xe3\x81\xaa\xe3\x81\x84\xe3\x80\x8d\xe6\x97\xa5' b'\xe6\x9c\xac\xe4\xba\xba\xe3\x81\xae\xe8\xac\x8e\xe3\x82\x84\xe3' b'\x81\x97\xe3\x81\x8d\xe3\x81\x9f\xe3\x82\x8a\xe3\x82\x92\xe5\xa4' b'\x9a\xe8\xa7\x92\xe7\x9a\x84\xe3\x81\xab\xe6\xa4\x9c\xe8\xa8\xbc' b'\xe3\x81\x97\xe3\x80\x81\xe6\x97\xa5\xe6\x9c\xac\xe4\xba\xba\xe3' b'\x81\xaeDNA\xe3\x82\x92\xe8\xa7\xa3\xe6\x98\x8e\xe3\x81' b'\x99\xe3\x82\x8b\xe3\x80\x82_NEWLINE_\xe6\x96\xb0\xe6' b'\x98\xa5\xe7\x95\xaa\xe7\xb5\x84\xe3\x81\xa8\xe3\x81\x97\xe3\x81' b'\xa6\xe5\xae\x9a\xe6\x9c\x9f\xe7\x9a\x84\xe3\x81\xab\xe6\x94\xbe' b'\xe9\x80\x81\xe3\x81\x95\xe3\x82\x8c\xe3\x81\xa6\xe3\x81\x8a\xe3' b'\x82\x8a\xe3\x80\x81\xe5\xb9\xb4\xe6\x9c\xab\xe3\x81\xae\xe5\x8d' b'\x88\xe5\x89\x8d\xe4\xb8\xad\xe3\x81\xab\xe5\x86\x8d\xe6\x94\xbe' b'\xe9\x80\x81\xe3\x81\x95\xe3\x82\x8c\xe3\x82\x8b\xe3\x81\xae\xe3' b'\x81\x8c\xe6\x81\x92\xe4\xbe\x8b\xe3\x81\xa8\xe3\x81\xaa\xe3\x81' b'\xa3\xe3\x81\xa6\xe3\x81\x84\xe3\x82\x8b\xe3\x80\x82', 'version_id': b'1848243370795951995', 'wikidata_id': b'Q11331136'}
以下はテキストをデコードした結果です。なんらかの構造があることがわかります。
_START_ARTICLE_ ビートたけしの教科書に載らない日本人の謎 _START_SECTION_ 概要 _START_PARAGRAPH_ 「教科書には決して載らない」日本人の謎やしきたりを多角的に検証し、日本人のDNAを解明する。_NEWLINE_新春番組として定期的に放送されており、年末の午前中に再放送されるのが恒例となっている。
文書の構造は以下の4つのマークアップで表されます。
_START_ARTICLE_
_START_SECTION_
_START_PARAGRAPH_
_NEWLINE_
_START_ARTICLE_
は記事の始まりを表しており、その後にページタイトルが続きます。_START_SECTION_
は節の始まりを表しており、その後には節のタイトルが続きます。_START_PARAGRAPH_
は節のタイトルと節内のパラグラフを分割する役割があります。_NEWLINE_
は1パラグラフの終わりを表しています。
以上で、Wiki-40Bの使い方の説明は終わりです。今回はTensorFlow Datasetsからの使い方を説明しましたが、Huggingface Datasetsを使って読み込むこともできます。いずれにせよ、最新版のWikipediaを使いたいとか、前処理で落とした情報の中に使いたい情報があるという状況でないなら、Wiki-40Bを使うことで生産性を向上させられるでしょう。