Ahogrammer

Deep Dive Into NLP, ML and Cloud

無限関係モデルを用いたクラスタリングのPython実装

本記事では、無限関係モデル(Infinite Relational Model; IRM)の概要と周辺化ギブスサンプラーによる推論について簡単に解説した後、Pythonを用いた実装例を示します。さらに、Zachary’s Karate Clubデータセットを用いて、無限関係モデルをクラスタリングに適用し、その結果をARI(Adjusted Rand Index)で評価します。内容としては以下の記事の発展版となっています。

hironsan.hatenablog.com

無限関係モデルとは?

無限関係モデルは、関係データのクラスタリングに用いられる確率的生成モデルです。以前に紹介した確率的ブロックモデルと同様、無限関係モデルも関係データ行列に潜在的なブロック構造が存在すると仮定し、その推定を通じてクラスタリングを行います。ただし、確率的ブロックモデルとは異なり、無限関係モデルではクラスター数KLを自動的に決定できます。

無限関係モデルは、以下のように定式化されます。関係データ行列 \boldsymbol{X} = (x_{i,j}) \in \{0, 1\}^{N_1 \times N_2}が与えられたとき、行と列に対するクラスターの割り当てを表す変数 \boldsymbol{Z}_1 = (z_{1,i}) (i=1,\ldots,N_1) \boldsymbol{Z}_2 = (z_{2,j}) (j=1,\ldots,N_2)を導入します。このとき、関係データ行列\boldsymbol{X}の要素x_{i,j}は、行のクラスタz_{1,i}と列のクラスタz_{2,j}の関係の強さ\theta_{z_{1,i}, z_{2,j}}に依存して次のように生成されると仮定します。確率的ブロックモデルの生成モデルと比べると、中華料理店過程(Chinese Restaurant Process; CRP)という分布から直接\boldsymbol{Z}_1\boldsymbol{Z}_2を生成している点が異なります。

 \displaystyle
\begin{align}
\boldsymbol{Z}_1 | \alpha_1 &\sim \text{CRP}(\alpha_1) \\
\boldsymbol{Z}_2 | \alpha_2 &\sim \text{CRP}(\alpha_2) \\
\theta_{k,l} | a_0, b_0 &\sim \text{Beta}(a_0, b_0) \\
x_{i,j} | {\theta_{k,l}}, z_{1, i}, z_{2, j}  &\sim \text{Bernoulli}(\theta_{z_{1,i}, z_{2,j}})
\end{align}

推論に関しても確率的ブロックモデルと同様、周辺化ギブスサンプラーを用いて行います。ただし、事後分布や統計量の計算について、クラスター数を自動的に決定する都合上、少し複雑化しています。詳細については、『関係データ学習』を参照してください。

無限関係モデルの実装

ここでは、Zachary’s Karate Clubデータセットを使い、周辺化ギブスサンプラーにより無限関係モデルを推定する実装例を示します。あくまで学習・検証を目的としたサンプルコードです。

まずは必要なパッケージをインストールします。

pip install numpy networkx matplotlib scipy scikit-learn tqdm

パッケージをインストールしたら、インポートします。

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import numpy.typing as npt
import tqdm
from scipy.special import betaln, logsumexp
from sklearn.metrics import adjusted_rand_score

次に、Zachary’s Karate Clubデータセットを読み込みます。このデータセットは34の頂点からなる無向グラフで、各頂点は空手クラブのメンバーを表しています。エッジの有無はメンバー間の交友の有無を表しています。この空手クラブは、最終的に2つのグループに分裂してしまいます。そのため、クラスタリングによりこの分裂を推定できるか確認するために使うことができます。

def load_dataset() -> tuple[npt.NDArray, npt.NDArray]:
    # 空手クラブのグラフを読み込む
    graph = nx.karate_club_graph()

    # グラフを行列に変換
    X = (nx.to_numpy_array(graph) > 0).astype(np.int32)
    np.fill_diagonal(X, 1)

    # クラブ情報を数値に変換
    mapping = {"Mr. Hi": 0, "Officer": 1}
    Z = [mapping[node["club"]] for node in graph.nodes.values()]

    return X, np.array(Z)

X, Z = load_dataset()

データセットを読み込んだら、可視化してみます。

# 関係データを可視化。X=1は黒、X=0は白
plt.imshow(X, cmap="gray_r")
plt.show()

関係データ行列を可視化すると、以下のようになります(下図左)。黒は交友があること、白は交友がないことを表しています。参考までにグラフ表現として可視化した例も示しておきます(下図右)。

Zachary’s Karate Clubデータセット

次に、周辺化ギブスサンプラーによる無限関係モデルの推定をするためのクラスを定義します。このクラスは、中華料理店過程のハイパーパラメーターalpha1alpha2、Beta分布のハイパーパラメーターa0b0、反復回数num_iter等を引数に取ります。fitメソッド内では、周辺化ギブスサンプラーによりクラスター割り当てを推定しています。コード全体については、Gistに載せています。

class InfiniteRelationalModel:
    def __init__(
        self,
        alpha1: float,
        alpha2: float,
        a0: float,
        b0: float,
        num_iter: int,
        burn_in: int,
        interval: int,
        seed: Optional[int] = None,
    ) -> None:
        self.alpha1 = alpha1
        self.alpha2 = alpha2
        self.a0 = a0
        self.b0 = b0
        self.num_iter = num_iter
        self.burn_in = burn_in
        self.interval = interval
        self.seed = seed
        self.zs1 = []
        self.zs2 = []

    def fit(self, X: npt.NDArray) -> None:
        N1, N2 = X.shape
        objects = Objects(N1, N2)
        rng = np.random.default_rng(self.seed)

        # 中華料理店過程を用いてZ1とZ2の初期値を計算
        z1 = chinese_restaurant_process(N1, self.alpha1, self.seed)
        z2 = chinese_restaurant_process(N2, self.alpha2, self.seed)

        # Z1とZ2を用いて統計量を計算
        stats = SufficientStatistics(z1, z2, X)
        row_cluster = ClusterManager(z1)
        col_cluster = ClusterManager(z2)

        for _ in tqdm.tqdm(range(self.num_iter)):
            for o in objects:
                # 第1ドメインの場合
                if o.is_first_domain():
                    # 1つ抜きの統計量を計算
                    stats.update_row(X, z1, z2, o.index, increment=False)
                    row_cluster.remove_index(z1[o.index], o.index)

                    # クラスターが空の場合は削除
                    if row_cluster.is_empty(z1[o.index]):
                        row_cluster.remove_cluster(z1[o.index])
                        stats.remove_row_cluster(z1[o.index], col_cluster.cluster_ids)

                    # 事後確率の計算とサンプリング
                    probs = self.calculate_first_domain_posterior_prob(
                        X, stats, row_cluster, col_cluster, o.index,
                    )
                    k = rng.choice(row_cluster.cluster_ids_with_new, p=probs)

                    # 統計量の更新
                    z1[o.index] = k
                    row_cluster.add_index(k, o.index)
                    stats.update_row(X, z1, z2, o.index, increment=True)
                else:
                    # 第2ドメインの場合も同様に計算
                    stats.update_col(X, z1, z2, o.index, increment=False)
                    col_cluster.remove_index(z2[o.index], o.index)
                    if col_cluster.is_empty(z2[o.index]):
                        col_cluster.remove_cluster(z2[o.index])
                        stats.remove_col_cluster(z2[o.index], row_cluster.cluster_ids)
                    probs = self.calculate_second_domain_posterior_prob(
                        X, stats, row_cluster, col_cluster, o.index,
                    )
                    l = rng.choice(col_cluster.cluster_ids_with_new, p=probs)
                    z2[o.index] = l
                    col_cluster.add_index(l, o.index)
                    stats.update_col(X, z1, z2, o.index, increment=True)
            self.zs1.append(z1.copy())
            self.zs2.append(z2.copy())

    def fit_predict(self, X: npt.NDArray) -> tuple[npt.NDArray, npt.NDArray]:
        self.fit(X)
        z1 = mode(self.zs1[self.burn_in::self.interval], keepdims=False).mode
        z2 = mode(self.zs2[self.burn_in::self.interval], keepdims=False).mode
        _, z1 = np.unique(z1, return_inverse=True)
        _, z2 = np.unique(z2, return_inverse=True)
        return z1, z2

    def calculate_first_domain_posterior_prob(
        self,
        X: npt.NDArray,
        stats: SufficientStatistics,
        row_cluster: ClusterManager,
        col_cluster: ClusterManager,
        i: int,
    ) -> npt.NDArray:
        log_probs = np.zeros(len(row_cluster.cluster_ids_with_new))
        for idx, k in enumerate(row_cluster.cluster_ids_with_new):
            log_probs[idx] = np.log(self.alpha1) if k == row_cluster.new_cluster_id else np.log(row_cluster.m(k))
            for l, indices in col_cluster.clusters.items():
                a_hat = self.a0 + stats.n_pos[(k, l)]
                b_hat = self.b0 + stats.n_neg[(k, l)]
                pos = X[i][indices].sum()
                neg = len(indices) - pos
                log_probs[idx] += betaln(
                    a_hat + pos,
                    b_hat + neg,
                ) - betaln(a_hat, b_hat)
        log_probs -= logsumexp(log_probs)
        return np.exp(log_probs)

    def calculate_second_domain_posterior_prob(
        self,
        X: npt.NDArray,
        stats: SufficientStatistics,
        row_cluster: ClusterManager,
        col_cluster: ClusterManager,
        j: int,
    ) -> npt.NDArray:
        log_probs = np.zeros(len(col_cluster.cluster_ids_with_new))
        for idx, l in enumerate(col_cluster.cluster_ids_with_new):
            log_probs[idx] = np.log(self.alpha2) if l == col_cluster.new_cluster_id else np.log(col_cluster.m(l))
            for k, indices in row_cluster.clusters.items():
                a_hat = self.a0 + stats.n_pos[(k, l)]
                b_hat = self.b0 + stats.n_neg[(k, l)]
                pos = X[indices, j].sum()
                neg = len(indices) - pos
                log_probs[idx] += betaln(
                    a_hat + pos,
                    b_hat + neg,
                ) - betaln(a_hat, b_hat)
        log_probs -= logsumexp(log_probs)
        return np.exp(log_probs)

推定のためのクラスを定義したので、読み込んだデータセットを用いて推定をします。パラメーターの設定については、『関係データ学習』の79ページに記載されていた無限関係モデルに対する推奨設定を参考に決定しています。

# モデルの作成
model = InfiniteRelationalModel(
    alpha1=1.0, # 中華料理店過程のハイパーパラメーター(行側)
    alpha2=1.0, # 中華料理店過程のハイパーパラメーター(列側)
    a0=0.5,     # Beta分布のハイパーパラメーター
    b0=0.5,     # Beta分布のハイパーパラメーター
    num_iter=2000,
    burn_in=1500,
    interval=5,
    seed=41,
)
# 周辺化ギブスサンプラーの実行
z1, z2 = model.fit_predict(X)

推定を終えたら、ARI(Adjusted Rand Index)を用いてクラスタリングの性能を評価します。それぞれ、行と列のクラスタリング結果に対応しています。

print(adjusted_rand_score(Z, z1))
print(adjusted_rand_score(Z, z2))
0.5932
0.5932

クラスタリングの性能は、ARIで行と列ともに0.5932となりました。クラスター数はそれぞれ5という結果になっています。関係データ学習の本に記載されていた無限関係モデルの例では、行と列のクラスタリング結果に対するARIはそれぞれ0.6077と0.6404だったので、それには及ばないものの近い性能が得られていることを確認できました。以下に、推定されたクラスターを可視化した結果を載せます。

クラスタリング結果

まとめ

本記事では、無限関係モデル(IRM)の概要と周辺化ギブスサンプラーによる推論を説明し、Pythonによる実装例を示しました。今回は対称関係データに対してクラスタリングを行いましたが、無限関係モデルは非対称関係データにも適用可能です。確率的ブロックモデルの場合と比べて実装が少々めんどうでしたが、実装を通じて理解が深まったため、機会があれば利用してみたいと思います。

参考文献

  1. Learning Systems of Concepts with an Infinite Relational Model
  2. An Information Flow Model for Conflict and Fission in Small Groups
  3. 関係データ学習