ベースライン的なレコメンドモデルを作るのに RecTools が便利そう
レコメンド用のツールについて
レコメンド界隈はみんなが使っている定番のライブラリというものがないように思う。例えば、自然言語処理では(色々と文句を言われることもあるが)Hugging Face が標準的に使われるようになっている。それに比べると、レコメンドはとりあえずこれ使っておけ、と言えるものが思い浮かばない。
ロジック/モデル部分中心のものであれば implicit や RecBole などポピュラーなライブラリはいくつかあるが、それに与えるデータの前処理とか結果のオフライン評価に関しては、それぞれの現場で固有のツールやライブラリが作られがちな気がしている。
大規模サービスを運営していて KPI を0.1%でも向上させることに意味があるようなテック企業を除けば、一般的な協調フィルタリングや評価指標を実装すればまずは十分だろうし、そうなると中核となるデータの処理方法も概ね共通してくる。
その割にはその辺まで含めたレコメンドシステムの構築を一通りカバーしているライブラリってないなと思っていたのだけど、最近知った RecTools はこの領域で有用なライブラリなのではないかと思ったので触ってみた。
RecTools はロシアでクラウドサービスを提供している MWS という会社が開発している OSS である。ドキュメントには
RecTools is an easy-to-use Python library which makes the process of building recommendation systems easier, faster and more structured than ever before. The aim is to collect ready-to-use solutions and best practices in one place to make processes of creating your first MVP and deploying model to production as fast and easy as possible.
と書かれており、レコメンドシステムを構築するプロセス全体をカバーするようなライブラリとして作成されているのがわかる。
RecTools での学習・推論・評価パイプラインは以下のようになる。
import pandas as pd
from rectools import Columns
from rectools.dataset import Dataset
from rectools.models import SASRecModel
from rectools.metrics import NDCG, Recall, Serendipity, calc_metrics
from rectools.models import load_model
dataset = Dataset.construct(interaction)
model = SASRecModel(...)
model.fit(dataset)
model.save("model.pkl")
model = load_model("model.pkl")
recommendations = model.recommend(
users=interaction[Columns.User].unique(),
dataset=dataset,
k=10,
filter_viewed=True,
)
metrics = {
"NDCG@10": NDCG(k=10, divide_by_achievable=True),
"Recall@10": Recall(k=10),
"Serendipity@10": Serendipity(k=10),
}
metric_values = calc_metrics(
metrics,
reco=recommendations,
interactions=interactions_test, catalog=interaction[Columns.Item].unique(),
)
学習には (user, item) の interaction データセットを pandas データフレーム として用意する。一部のモデルではさらに追加で特徴量を与えることもできる。
推薦モデルを作るときは生のユーザー・アイテムID(外部ID)を、0から始まる整数のID(内部ID)に変換する処理をよく実行するが、そのようなよくある前処理は全て Dataset クラスが受け持ってくれるため自分で実装する必要がない。
また使えるモデルも ItemkNN など基本のものから、implicit や LightFM などの定番ライブラリのラッパー、SASRec や BERT4Rec などのニューラル系列推薦モデルまで基本的なものは一通り揃っており、またそれらが共通のインターフェースで使える。BPRなど単純な (user, item) interaction を学習するモデルにも、系列推薦をするモデルにも同じ形式のデータセットを与えれば学習・推論・評価ができるのは便利である。
評価指標についても基本の Recall@K や NDCG@K から、serendipity などの beyond accuracy 指標、さらにバイアス除去系のオプションまで揃っているので困ることはなさそうである。
概要は上に紹介したので、ここからは実際に実務でモデルを作るのに必要な機能があるか調べていこうと思う。
データ分割
実務でのレコメンドでは、系列推薦モデルを使っていなくても、train/test のデータ分割を時系列で行うことは多い。サービスで使う以上モデルに持たせたい能力は未来予測だからだ。RecTools では interaction データの最後の N 個を評価用に分割するための LastNSplitter、決まった時間幅のデータを評価用に分割するための TimeRangeSplitter が用意されており、時系列に沿った分割が簡単にできる。
以下はユーザーごとに行動を時系列で並べ、最後の一個を評価用に、それ以前を学習用に分割する例である
from rectools.dataset import Dataset
from rectools.model_selection import LastNSplitter
ds = Dataset.construct(df)
splitter = LastNSplitter(
n=1, n_splits=1, filter_cold_items=False,
filter_cold_users=False,
filter_already_seen=True,
)
iterator = splitter.split(ds.interactions)
train_idx, test_idx, _ = next(iterator)
ds_train = ds.filter_interactions(train_idx)
ds_test = ds.filter_interactions(test_idx)
n_splits を増やすことで cross validation 用に複数 fold を生成できる。
カスタム Trainer の利用
ニューラルネットワーク系のモデルは内部的には pytorch と pytorch lightning で実装されている。何も指定しなくても内部でデフォルトの Trainer インスタンスを生成してくれるが、学習の制御のためカスタマイズしたいことは多い。そのような場合は、モデルのコンストラクタに Trainer を返す関数を与える。
SASRec モデルを例とすると以下のように書ける
from rectools.models import SASRecModel
from pytorch_lightning import Trainer
def get_trainer(checkpoint_dir: str) -> Trainer:
...
model = SASRecModel(
get_trainer_func=get_trainer,
get_trainer_func_kwargs={"checkpoint_dir": checkpoint_dir},
)
get_trainer の引数は別途 get_trainer_func_kwargs で渡す必要がある。
Early stopping やチェックポイント
これもニューラルネットワーク系のモデルの場合だが、学習時に validation セットにおけるメトリクスを監視して early stopping をしたり、途中経過をチェックポイントとして残したいことは多い。
その場合、ドキュメントの Transformer Models Advanced Training Guide にあるように、まずは毎エポック validation メトリクスを計算する callback を実装する必要がある。
Validation メトリクスを計算する callback
from pytorch_lightning.callbacks import Callback
class ValidationMetrics(Callback):
def __init__(self, top_k: int, val_metrics: dict, verbose: int = 0) -> None:
self.top_k = top_k
self.val_metrics = val_metrics
self.verbose = verbose
self.epoch_n_users: int = 0
self.batch_metrics: list[dict[str, float]] = []
def on_validation_batch_end(
self,
trainer: Trainer,
pl_module: LightningModule,
outputs: dict[str, torch.Tensor],
batch: dict[str, torch.Tensor],
batch_idx: int,
dataloader_idx: int = 0,
) -> None:
logits = outputs["logits"]
if logits is None:
logits = pl_module.torch_model.encode_sessions(batch, pl_module.item_embs)[
:, -1, :
]
_, sorted_batch_recos = logits.topk(k=self.top_k)
batch_recos = sorted_batch_recos.tolist()
targets = batch["y"].tolist()
batch_val_users = list(
itertools.chain.from_iterable(
itertools.repeat(idx, len(recos))
for idx, recos in enumerate(batch_recos)
)
)
batch_target_users = list(
itertools.chain.from_iterable(
itertools.repeat(idx, len(targets))
for idx, targets in enumerate(targets)
)
)
batch_recos_df = pd.DataFrame(
{
Columns.User: batch_val_users,
Columns.Item: list(itertools.chain.from_iterable(batch_recos)),
}
)
batch_recos_df[Columns.Rank] = (
batch_recos_df.groupby(Columns.User, sort=False).cumcount() + 1
)
interactions = pd.DataFrame(
{
Columns.User: batch_target_users,
Columns.Item: list(itertools.chain.from_iterable(targets)),
}
)
prev_interactions = pl_module.data_preparator.train_dataset.interactions.df
catalog = prev_interactions[Columns.Item].unique()
batch_metrics = calc_metrics(
self.val_metrics, batch_recos_df, interactions, prev_interactions, catalog
)
batch_n_users = batch["x"].shape[0]
self.batch_metrics.append(
{metric: value * batch_n_users for metric, value in batch_metrics.items()}
)
self.epoch_n_users += batch_n_users
def on_validation_epoch_end(
self, trainer: Trainer, pl_module: LightningModule
) -> None:
epoch_metrics = dict(sum(map(Counter, self.batch_metrics), Counter()))
epoch_metrics = {
metric: value / self.epoch_n_users
for metric, value in epoch_metrics.items()
}
self.log_dict(
epoch_metrics, on_step=False, on_epoch=True, prog_bar=self.verbose > 0
)
self.batch_metrics.clear()
self.epoch_n_users = 0
そしてこの callback を上の方法で Trainer に設定する
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
from rectools.metrics import NDCG
from pytorch_lightning import Trainer
def get_trainer(checkpoint_dir: str) -> Trainer:
metrics = {"val_NDCG@10": NDCG(k=10, divide_by_achievable=True)}
val_metrics_callback = ValidationMetrics(
top_k=10, val_metrics=metrics, verbose=1
)
best_ndcg_ckpt = ModelCheckpoint(
dirpath=checkpoint_dir,
monitor="val_NDCG@10",
mode="max",
filename="{epoch}-{val_NDCG@10:.4f}",
)
early_stopping_callback = EarlyStopping(
monitor="val_NDCG@10",
mode="max",
)
return Trainer(
enable_checkpointing=True,
callbacks=[val_metrics_callback, early_stopping_callback, best_ndcg_ckpt],
...
)
また validation 用のデータを作るため学習データをさらに分割する必要があるが、これは LastNSplitter などを使わず、学習データのうちどこが validation かを表すマスクを渡すことが想定されているようである。以下は学習データのなかで各ユーザーごとに最後のアイテムを validation として使う例。
from rectools import Columns
def get_val_mask_func(interactions: pd.DataFrame) -> np.ndarray:
rank = (
interactions.sort_values(Columns.Datetime, ascending=False, kind="stable")
.groupby(Columns.User, sort=False)
.cumcount()
)
val_mask = rank == 0
return val_mask.values
model = SASRecModel(
get_val_mask_func=get_val_mask_func,
...
)
以上の準備をして model.fit(ds_train) をすれば、指定したメトリクスに基づいた early stopping が実行され、その過程のチェックポイントを残すことができる。
以上見てきたようにベースライン的なモデルであればそれなりの柔軟性を持ちつつ簡単に学習から評価まで回せそうである。一つ難点を挙げるとすれば、基本的にデータセットを pandas のデータフレームとして扱うため、一度にメモリに乗り切らないような大規模データを扱うことは向いていない。あくまで小〜中規模のデータセットに対してサクッとベースライン性能を持つレコメンドシステムを作るためのツールだと思うのが良さそうである。