SageMaker Processingでカスタムイメージを使ってデータを加工する
SageMaker Processingは、データの前処理や後処理、特徴エンジニアリング、モデルの評価といった機械学習のワークロードをSageMaker上で実行するための機能です。SageMaker Processingを使うことで、これらの処理をするためのジョブをコンテナ上で実行することが出来ます。ジョブは入力のデータをS3から受け取り、処理した結果をS3へ出力する仕組みになっています。
SageMaker Processingは、Scikit-learnやSparkの他に、カスタムイメージから作成したコンテナ上でデータを処理できます。今回はGPU上でTensorFlow Hubを動かすためのコンテナを作成し、テキストを加工してみます。やることとしては、Universal Sentence Encoderを使って、テキストを固定長のベクトルに変換し、S3へ書き出します。
以下の手順で進めていきます。
- Dockerfileの作成
- ECRへのイメージのPush
- スクリプトの実装
- ジョブの実行
Dockerfileの作成
まずは、コンテナを定義しましょう。今回はTensorFlow Hubを使ってGPU上でテキストの前処理を行いたいので、以下のAWS公式のDockerfileを基にDockerfileを作成します。
TensorFlow Text、TensorFlow Hubをインストールし、ENTRYPOINTを書き換えています。TensorFlowのバージョンを書き換える心配がなければ、ECR上にあるGPU用のイメージをベースイメージに指定して書くと、記述が簡潔になると思います。
RUN ${PIP} install --no-cache-dir -U \ ... tensorflow-text==2.3.0 \ tensorflow-hub==0.9.0 \ ... ENTRYPOINT ["python3"]
ECRへのPush
ECRへのPushについては、公式ドキュメントが詳しいですが、ここでは以下の手順で行います。
- ECRにリポジトリを作成
- DockerクライアントをECRにPushできるように認証
- イメージをビルド
- イメージにタグ付け
- イメージをECRにPush
最初にリポジトリを作ると、リポジトリへプッシュするために必要なコマンドが「View Push Command」から表示できるので(以下図)、コピペするだけで済みます。
まずは、Dockerクライアントを認証します。region
とaws_account_id
を変更します。
aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {aws_account_id}.dkr.ecr.{region}.amazonaws.com
次に、イメージをビルドします。
docker build -t sagemaker-processing-tensorflow .
ビルドが完了したら、タグを付けます。aws_account_id
とrepository_name
を変更します。
docker tag sagemaker-processing-tensorflow:latest {aws_account_id}.dkr.ecr.region.amazonaws.com/{repository_name}
最後にイメージをPushします。
docker push {aws_account_id}.dkr.ecr.{region}.amazonaws.com/{repository_name}
以上で、ECR上にイメージを用意できました。
スクリプトの実装
続いて、コンテナ上で実行するスクリプトを実装します。今回は、TensorFlow HubのUniversal Sentence Encoderを使って、読み込んだテキストを固定長のベクトルに変換するスクリプトを実装してみました。今回は小さなテキストファイルで試すので、一括で変換しています。
import tensorflow as tf import tensorflow_hub as hub import tensorflow_text class USEEncoder: def __init__(self, model_url='https://tfhub.dev/google/universal-sentence-encoder-qa/3'): self.model = hub.load(model_url) def encode_question(self, questions): embeddings = self.model.signatures['question_encoder'](tf.constant(questions)) return embeddings['outputs'].numpy() def encode_response(self, responses, contexts): assert len(responses) == len(contexts) embeddings = self.model.signatures['response_encoder']( input=tf.constant(responses), context=tf.constant(contexts) ) return embeddings['outputs'].numpy() # Read data locally lines = [line.rstrip() for line in open('/opt/ml/processing/input/dataset.txt')] # Preprocess the data set encoder = USEEncoder() embeddings = encoder.encode_question(lines) # Save data locally with open('/opt/ml/processing/output/train/train.txt', 'w') as f: for line in embeddings.tolist(): f.write('{}\n'.format(line)) print('Finished running processing job')
ジョブの実行
ここまできたら、あとはノートブックからSageMaker Processingを実行するだけです。
まずは、ScriptProcessor
を用意します。ScriptProcessor
にイメージのURIを渡すことで、渡したイメージから作成したコンテナ上で処理を実行できます。
import sagemaker from sagemaker.processing import ScriptProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput role = sagemaker.get_execution_role() script_processor = ScriptProcessor( image_uri='YOUR IMAGE URI', role=role, instance_count=1, instance_type='ml.p3.2xlarge', command=['python3'] )
ScriptProcessor
を用意したら、run
メソッドで実行します。ProcessingInput
のsource
には処理したいファイルが存在する場所を指定します。destination
にはsource
で指定したファイルをコンテナ上のどこにコピーするか指定します。ProcessingOutput
のsource
には処理済みのファイルが存在する場所を指定します。今回は指定していませんが、destination
を指定することで処理済みのファイルをコピーする先を指定できます。
script_processor.run( code='processing_script.py', inputs=[ ProcessingInput( source='dataset.txt', destination='/opt/ml/processing/input' ) ], outputs=[ ProcessingOutput( source='/opt/ml/processing/output/train' ) ] )
実行が完了すると、S3上に処理したファイルが出力されます。以下のように固定長のベクトルに変換されていることを確認できます。
[0.027010727673768997, 0.04176567867398262, ...] ...
おわりに
今回は、SageMaker ProcessingにてGPU上でTensorFlow Hubを使って前処理をしてみました。イメージさえ用意すれば、TensorFlow Hub以外のパッケージを使うこともできるので、自分の必要なパッケージをインストールしたコンテナさえ用意すれば、簡単に使えるかと思います。