Ahogrammer

Deep Dive Into NLP, ML and Cloud

今日からはじめるレコメンデーション -探索と利用のジレンマとベイジアンアプローチ-

前回の記事では平均評価による推薦の問題点とその解決策について紹介しました。推薦の際に確信度が考慮されない問題点を信頼区間で、評価数が0の場合にスコアが不定になる問題点をスムージングによって解決する方法について紹介しました。

hironsan.hatenablog.com

今回は情報推薦における探索と利用のジレンマとその解決策としてのベイジアンアプローチについて紹介します。

探索と利用のジレンマ

探索(Exploration)と利用(Exploitation)のジレンマとは、有用そうな選択肢を選んでいると、他の選択肢がより有用であることを発見する機会が失われ、他の選択肢ばかりを試していると、有用そうな選択肢を利用できないというジレンマです。

ぼくはチーズバーガーが大好きです。しかし、チーズバーガーばかり食べていると他のより美味いハンバーガーを発見する機会がなくなります。かといって、アボガドバーガーや照り焼きバーガーなどのバーガーばかり食べているとチーズバーガを食べることができません。

もう少し真面目な例として、スロットマシンのプレイを挙げられます。スロットマシンでなるべく多くのコインを得るためには、どのマシンが良いか調べる必要があります。そのためには、実際にプレイしてスコアを出す方法が考えられます。仮に、勝利を1、負けを0とするとスコアはP(win) = \frac{#win}{#total} で計算できます。たとえば、3回プレイして1回勝つとスコアはP(win) = \frac{1}{3} です。

ここで問題となるのは、いったい何回プレイすればいいのかという点です。信頼区間的には、3回プレイして1回勝つより、300回プレイして100回勝つ方が信頼区間は狭くなって良い推定と言えます。つまり、プレイ数が少なすぎればあまり良い推定はできません。一方、プレイ数が多ければ良い推定はできますがその分コストがかかります。

このように、良い推定をするためにはより多くのデータを集める必要があるのですが、より多くのデータを集めるには局所最適な部分にもコストをかけなければならないというジレンマがあります。データを集めるほどよい推定ができるが、最高の解を利用したくもある。しかし、集めるときは利用できないし、利用するためには集めることができない。

探索と利用のジレンマは情報推薦にも関係しています。たとえば、YouTubeで猫の動画を見まくった場合、レコメンデーションシステムはそれを学んで、猫の動画を推薦してくるかもしれません。しかし、普通に考えると、他の動画も見たいはずです。これは利用はしているけど探索はしていない状態で、局所最適に陥っています。一方で、たとえ探索して推薦しても良いことは保証されません。このように、推薦においても探索と利用のジレンマは見られます。

ここまでで探索と利用をなるべくバランスよく行いたいということはなんとなく理解できました。次に問題となるのは、実際にどうやるかです。その方法の一つが、次に紹介するベイジアンアプローチです。

ベイジアンアプローチ

ベイジアンによるアプローチでは確率分布のパラメータも確率変数と考えます。たとえば、ベルヌーイ分布はf(k;\pi) = \pi^{k}(1-\pi)^{1-k}という形の分布で、パラメータとして\piを持ちますが、この\piも確率変数として考えます。確率変数が従う分布としてはベータ分布が使われます。

ベイズでない場合は尤度関数を定義して、最適なパラメータである\hat{\pi}最尤推定していました。ベイズの場合は\piは確率分布なので、もはや\hat{\pi}は固定値ではなくそれ自身が確率分布を持っています。したがって、形としてはp(\pi|x_1,\ldots, x_N) = p(\pi|X)のように表すことができます。なので、たとえばコインを100回投げて、その結果を使って\piの分布を推定します。

ここで重要となるのが以下のベイズの定理です。

\displaystyle{
p(\pi|X) = \frac{p(X|\pi)p(\pi)}{\int p(X|\pi)p(\pi)}
}

言葉で書くと以下のようになるでしょう。

\displaystyle{
\piの事後確率 = \frac{尤度 \cdot 事前確率}{正規化項}
}

p(\pi|X)は事後確率で、要するにデータXを観測した後のパラメータ\piの分布です。p(X|\pi)はパラメータ\piの元でのデータXの出現確率を表しています。p(\pi)は事前確率で、ここにベータ分布を使っていきます。少々複雑な式に見えますが、実際には分母を計算する必要がないので、p(\pi|X) \propto p(X|\pi)p(\pi)のようにシンプルになります。

実際にp(\pi|X)をシミュレーションしてみましょう。以下はmachine_learning_examplesbayesian_bandit.pyを使ってシミュレーションした結果です。データを集めるほどp(\pi|X)の形が細くなり、確信度が上がっていくということを表現できています。

f:id:Hironsan:20190426092821p:plain
事後分布のシミュレーション結果

事後分布を使うときは、分布から値をサンプリングして使います。ベイズでない場合は値が一つに決まるのでそれを使えばいいのですが、ベイズの場合は分布なので、そこからサンプリングして使うというわけです。

ベイズを使うことで使うことで、探索と利用を上手くバランスさせることができます。たとえば、以下の2つの分布について考えてみましょう。

f:id:Hironsan:20190426154823p:plain

2つの分布は狭い形と広い形をしています。狭い形の方はデータを多く集められていて、広い方は少ないと考えられます。ここからサンプリングすると、p(0.42<x<0.58)の範囲では狭い分布からサンプリングされる可能性が高く、それより小さい範囲では広い分布からサンプリングされる可能性が高くなっています。それにより、両方の分布を探索でき、かつ、狭い分布の知識を利用できるので探索と利用をバランスさせることができます。

アプローチの一般化

最後にベイズ的なアプローチを一般化しておきましょう。

まずは、以下のような尤度について考えました。

\displaystyle{
P(X|\theta) = \prod_i^Np(X_i|\theta)
}

次に事前分布について考えました。ここでの特徴はパラメータが特定の値ではなく分布という点です。ここに、事前の信念を入れることができます。

\displaystyle{
P(\theta)
}

最後に事後分布について考えました。これはつまり、より良いパラメータ分布をデータから得るということを表しています。

\displaystyle{
P(\theta|X) ∝ P(X|\theta)P(\theta)
}

ここまで用意できたら、あとは事後分布からサンプリングするだけです。サンプリングによってランク付けを行うことができるので、レコメンデーションに使うことができます。

おわりに

今回は情報推薦における探索と利用のジレンマとその解決策としてベイジアンアプローチを紹介しました。次回は協調フィルタリングについて紹介したいと思います。

参考資料

Kerasで固有表現認識のf1スコアを計算する

一般に固有表現認識では、学習済みモデルの性能を評価するためにf1が使用されます。その際、評価はトークンごとではなくエンティティごとに行われるのが特徴となっています。

f:id:Hironsan:20190424075228p:plain

f1スコアを評価する関数は、多くの機械学習フレームワークに実装されています。しかし、そのターゲットは分類タスクであり、固有表現認識のような系列ラベリングタスクではありません。

幸いなことに、Kerasでは、コールバックを介して学習中に検証データにアクセスすることができます。したがって、コールバックを使うことで固有表現認識用にf1スコアを計算することができます。

以下は、各エポックの終わりにf1スコア、再現率、および精度を計算して出力するためのコードです。ここにはコードを載せましたが、実際には自分で実装する必要はありません。このコールバックはseqevalパッケージに含まれています。

import numpy as np
from keras.callbacks import Callback
from seqeval.metrics import f1_score, classification_report


class F1Metrics(Callback):

    def __init__(self, id2label, pad_value=0, validation_data=None):
        """
        Args:
            id2label (dict): id to label mapping.
            (e.g. {1: 'B-LOC', 2: 'I-LOC'})
            pad_value (int): padding value.
        """
        super(F1Metrics, self).__init__()
        self.id2label = id2label
        self.pad_value = pad_value
        self.validation_data = validation_data
        self.is_fit = validation_data is None

    def find_pad_index(self, array):
        """Find padding index.
        Args:
            array (list): integer list.
        Returns:
            idx: padding index.
        Examples:
             >>> array = [1, 2, 0]
             >>> self.find_pad_index(array)
             2
        """
        try:
            return list(array).index(self.pad_value)
        except ValueError:
            return len(array)

    def get_length(self, y):
        """Get true length of y.
        Args:
            y (list): padded list.
        Returns:
            lens: true length of y.
        Examples:
            >>> y = [[1, 0, 0], [1, 1, 0], [1, 1, 1]]
            >>> self.get_length(y)
            [1, 2, 3]
        """
        lens = [self.find_pad_index(row) for row in y]
        return lens

    def convert_idx_to_name(self, y, lens):
        """Convert label index to name.
        Args:
            y (list): label index list.
            lens (list): true length of y.
        Returns:
            y: label name list.
        Examples:
            >>> # assumes that id2label = {1: 'B-LOC', 2: 'I-LOC'}
            >>> y = [[1, 0, 0], [1, 2, 0], [1, 1, 1]]
            >>> lens = [1, 2, 3]
            >>> self.convert_idx_to_name(y, lens)
            [['B-LOC'], ['B-LOC', 'I-LOC'], ['B-LOC', 'B-LOC', 'B-LOC']]
        """
        y = [[self.id2label[idx] for idx in row[:l]]
             for row, l in zip(y, lens)]
        return y

    def predict(self, X, y):
        """Predict sequences.
        Args:
            X (list): input data.
            y (list): tags.
        Returns:
            y_true: true sequences.
            y_pred: predicted sequences.
        """
        y_pred = self.model.predict_on_batch(X)

        # reduce dimension.
        y_true = np.argmax(y, -1)
        y_pred = np.argmax(y_pred, -1)

        lens = self.get_length(y_true)

        y_true = self.convert_idx_to_name(y_true, lens)
        y_pred = self.convert_idx_to_name(y_pred, lens)

        return y_true, y_pred

    def score(self, y_true, y_pred):
        """Calculate f1 score.
        Args:
            y_true (list): true sequences.
            y_pred (list): predicted sequences.
        Returns:
            score: f1 score.
        """
        score = f1_score(y_true, y_pred)
        print(' - f1: {:04.2f}'.format(score * 100))
        print(classification_report(y_true, y_pred, digits=4))
        return score

    def on_epoch_end(self, epoch, logs={}):
        if self.is_fit:
            self.on_epoch_end_fit(epoch, logs)
        else:
            self.on_epoch_end_fit_generator(epoch, logs)

    def on_epoch_end_fit(self, epoch, logs={}):
        X = self.validation_data[0]
        y = self.validation_data[1]
        y_true, y_pred = self.predict(X, y)
        score = self.score(y_true, y_pred)
        logs['f1'] = score

    def on_epoch_end_fit_generator(self, epoch, logs={}):
        y_true = []
        y_pred = []
        for X, y in self.validation_data:
            y_true_batch, y_pred_batch = self.predict(X, y)
            y_true.extend(y_true_batch)
            y_pred.extend(y_pred_batch)
        score = self.score(y_true, y_pred)
        logs['f1'] = score

id2label = {1: 'B-LOC', 2: 'I-LOC'}
f1score = F1Metrics(id2label)

使い方は簡単です。モデルを定義し、fitメソッドにコールバックを追加するだけです。

model.fit(x_train, y_train, 
          validation_data=(x_valid, y_valid),
          epochs=1,
          batch_size=32,
          callbacks=[f1score])

学習中の出力は以下のようになります。

Epoch 1/1
541/541 [==============================] - 46s 85ms/step - loss: 9.5729
 - f1: 53.24

             precision    recall  f1-score   support

        PER     0.5754    0.4484    0.5040      1617
        ORG     0.3798    0.4395    0.4075      1661
       MISC     0.4202    0.4387    0.4293       702
        LOC     0.6886    0.7650    0.7248      1668

avg / total     0.5320    0.5381    0.5315      5648

フォーマットはscikit-learnのclassification_report関数に似せているので使いやすいのではないかと思います。

github.com

fit_generatorで使う場合はF1Metricsの初期化メソッドに検証用データを渡してください。これは、fit_generatorを使った場合にコールバック内部から検証用データにアクセスできない仕様によるものです。

github.com

Django Rest Frameworkでソーシャル認証

Djangoには、組み込みのユーザ認証の仕組みがありますが、GitHubTwitterFacebookなどのサービスを介したソーシャル認証はサポートされていません。しかし、幸いなことに、サードパーティー製のパッケージを使って簡単に実装することができます。

本記事では、django-allauthを使ってTwitterアカウントによるログインを行う方法について説明します。Twitterでのログイン方法について説明しますが、FacebookGoogleGitHubアカウントに対してもほぼ同様の実装を使ってログインすることができます。

準備

まずは環境構築を行いましょう。ここでは、pipenvを使って必要なパッケージをインストールし、Djangoのプロジェクトとアプリケーションを作成します。

$ mkdir django-social-auth && cd django-social-auth
$ pipenv install django djangorestframework django-allauth django-rest-auth
$ pipenv shell
(django-social-auth) $ django-admin startproject auth .
(django-social-auth) $ python manage.py startapp src

パッケージとしては、以下の4つをインストールしました。

インストールできたら、開発サーバを起動してhttp://127.0.0.1:8000/を開きます。

(django-social-auth) $ python manage.py migrate
(django-social-auth) $ python manage.py runserver

以下の画面が確認できたらOKです。

f:id:Hironsan:20190423080236p:plain
DjangoのWelcomeページ

プロジェクトの設定

必要なパッケージをインストールできたので、プロジェクトの設定を行います。まずは、settings.pyINSTALLED_APPSにアプリケーションを追加します。以下のように設定しています。

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'rest_framework',  # new
    'rest_framework.authtoken',  # new
    'rest_auth',  # new
    'django.contrib.sites',  # new
    'allauth',  # new
    'allauth.account',  # new
    'allauth.socialaccount',  # new
    'allauth.socialaccount.providers.twitter',  # new
]

SITE_ID = 1

アプリケーションを追加したらマイグレーションをしておきます。

$ python manage.py migrate

ビューの作成

アプリケーションを作成したら、ビューを作成します。ソーシャル認証用のビューやTwitter用のSerializerはすでにallauthrest_authに用意されているのでそれらを使って以下のように書きます。

from allauth.socialaccount.providers.twitter.views import TwitterOAuthAdapter
from rest_auth.registration.views import SocialLoginView
from rest_auth.social_serializers import TwitterLoginSerializer

class TwitterLogin(SocialLoginView):
    serializer_class = TwitterLoginSerializer
    adapter_class = TwitterOAuthAdapter

ビューを書いたらURLに紐づけて登録しておきましょう。settings.pyに以下のように書いておきます。

from django.contrib import admin
from django.urls import path
from src.views import TwitterLogin

urlpatterns = [
    path('admin/', admin.site.urls),
    path('rest-auth/twitter/', TwitterLogin.as_view(), name='twitter_login')
]

この時点で、http://127.0.0.1:8000/rest-auth/twitter/にアクセスすると以下の画面が表示されます。

f:id:Hironsan:20190423083113p:plain
http://127.0.0.1:8000/rest-auth/twitter/

Twitter OAuthキーの取得

Twitter OAuth用のキーを取得するために以下のページへ移動します。

f:id:Hironsan:20190423083558p:plain
Developer Apps

移動したら、アプリケーションを作成します。そうすると、Consumer KeyとAccess Tokenが得られます。これらのトークンは外に漏れると悪用される可能性があるので、その扱いには十分に気をつけてください。

f:id:Hironsan:20190423093744p:plain

トークンの設定

取得したトークンはDjangoのadminページから設定する必要があります。そのために、以下のコマンドを使ってDjangoのsuperuserを作成します。

$ python manage.py createsuperuser

作成したら、作成したアカウントを使ってhttp://127.0.0.1:8000/adminにログインします。そうすると、以下の画面に遷移します。

f:id:Hironsan:20190423095820p:plain
Adminページ

Adminページにログインしたら、「Site」を選択してドメイン名を変更します。今回はテストなので127.0.0.1にしておきます。

f:id:Hironsan:20190423100022p:plain

次に、取得したトークンを設定するため、adminページから「Social Applications」を選択します。ここでOAuthの設定を行うことができます。今回はProviderとしてTwitterを選択し、Client IDとSecret keyに取得したConsumer keyとConsumer Secretを入力します。最後に、サイトを追加して保存します。

f:id:Hironsan:20190423100325p:plain
Social Applications

以上でadminページでの設定は完了です。

ためしにログインしてみましょう。http://127.0.0.1:8001/rest-auth/twitter/へ移動すると以下の画面が表示されます。ここに取得したTwitterAccess TokenとAccess Token Secretを入力してPOSTします。

f:id:Hironsan:20190423100702p:plain

以下のようにkeyが含まれたJSONを取得できれば認証に成功しています。

HTTP 200 OK
Allow: POST, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "key": "335a0kmcrypv56h3xgf7jabit8l2oewnqz4du91s"
}

参考資料