import copy
import logging
import numpy as np
import pandas as pd
from pyfm import pylibfm
from cornac.models.recommender import Recommender
from cornac.data.dataset import Dataset
from scipy import sparse
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def _prepare_data(train_set):
"""
Reformat cornac dataset with format (user, item, rating) tuples to
1. training_df of df (user, item);
2. y_train of array of ratings;
3. map cornac feature modality from (item_id, feature) to (item_idx, feature) using iid_map
Args: cornac dataset (rs.train_set, rs.test_set or rs.val_set)
user_id and item_id are treated as with dtype = string
"""
training_df = pd.DataFrame(
{
"user_id": list(map(str, train_set.uir_tuple[0])),
"item_id": list(map(str, train_set.uir_tuple[1])),
}
)
y_train = train_set.uir_tuple[2]
item_info_short = pd.DataFrame()
user_info_short = pd.DataFrame()
if train_set.item_feature:
item_info_short = pd.DataFrame(
{
"item_id": list(map(str, train_set.item_feature.features[:, 0])),
"feature": list(map(str, train_set.item_feature.features[:, 1])),
}
)
##rs.item_features keep already item_id
##update item_id to item_indices so training_df and item_feature can be merged into one dataframe
iid_map = pd.DataFrame(
{
"item_id": list(map(str, train_set.iid_map.keys())),
"item_indices": list(map(str, train_set.iid_map.values())),
}
)
item_info_short = item_info_short.merge(iid_map, on="item_id", how="left")
item_info_short = item_info_short.dropna().drop(columns=["item_id"])
item_info_short.columns = ["feature", "item_id"]
if len(training_df) > len(
training_df[training_df["item_id"].isin(item_info_short["item_id"])]
):
raise ValueError("training data contain items which features are unknown")
if train_set.user_feature:
user_info_short = pd.DataFrame(
{
"user_id": list(map(str, train_set.user_feature.features[:, 0])),
"feature": list(map(str, train_set.user_feature.features[:, 1])),
}
)
##rs.item_features keep already item_id
##update item_id to item_indices so training_df and item_feature can be merged into one dataframe
uid_map = pd.DataFrame(
{
"user_id": list(map(str, train_set.uid_map.keys())),
"user_indices": list(map(str, train_set.uid_map.values())),
}
)
user_info_short = user_info_short.merge(uid_map, on="user_id", how="left")
user_info_short = user_info_short.dropna().drop(columns=["user_id"])
user_info_short.columns = ["feature", "user_id"]
if len(training_df) > len(
training_df[training_df["user_id"].isin(user_info_short["user_id"])]
):
raise ValueError("training data contain users which features are unknown")
return training_df, y_train, item_info_short, user_info_short
[docs]
class FMRec(Recommender):
"""Factoriazation machine recommender algorithm
Parameters
----------
name: str
recommender name
trainable: bool
whether model can be trained
verbose: bool
Whether or not to print current iteration, training error
users_features: bool
whether features are used
num_factors: int
The dimensionality of the factorized 2-way interactions
num_iter: int,
Number of iterations
k0: bool, optional, default True
Use bias.
k1: bool, optional, default True
Use 1-way interactions (learn feature weights).
init_stdev: double, optional, default 0.1
Standard deviation for initialization of 2-way factors.
validation_size: double, optional, default 0.01
Proportion of the training set to use for validation.
power_t: double, optional, default 0.5
The exponent for inverse scaling learning rate [default 0.5].
t0: double, optional, default 0.001
Constant in the denominator for optimal learning rate schedule.
task: str, optional, default 'regression'
regression: Labels are real values.
classification: Labels are either positive or negative.
initial_learning_rate: double, optional, default 0.001
learning_rate_schedule: str, optional, default 'optimal'
The learning rate:
constant: eta = eta0
optimal: eta = 1.0/(t+t0) [default]
invscaling: eta = eta0 / pow(t, power_t)
"""
def __init__(
self,
name="FMRec",
trainable=True,
verbose=True,
uses_features=True,
num_factors=50,
num_iter=10,
k0=True,
k1=True,
init_stdev=0.1,
validation_size=0.01,
power_t=0.5,
t0=0.001,
task="regression",
initial_learning_rate=0.001,
learning_rate_schedule="optimal",
):
Recommender.__init__(self, name=name, trainable=trainable, verbose=verbose)
self.uses_features = uses_features
self.num_factors = num_factors
self.num_iter = num_iter
self.k0 = k0
self.k1 = k1
self.init_stdev = init_stdev
self.validation_size = validation_size
self.power_t = power_t
self.t0 = t0
self.task = task
self.initial_learning_rate = initial_learning_rate
self.learning_rate_schedule = learning_rate_schedule
self.one_hot_columns = None
# default rec
self.fm = pylibfm.FM(
num_factors=self.num_factors,
num_iter=self.num_iter,
k0=self.k0,
k1=self.k1,
init_stdev=self.init_stdev,
validation_size=self.validation_size,
power_t=self.power_t,
t0=self.t0,
task=self.task,
initial_learning_rate=self.initial_learning_rate,
learning_rate_schedule=self.learning_rate_schedule,
verbose=self.verbose,
)
def __deepcopy__(self, memo):
cls = self.__class__
new = cls.__new__(cls)
memo[id(self)] = new
for k, v in self.__dict__.items():
setattr(new, k, copy.deepcopy(v, memo))
return new
[docs]
def fit(self, train_set, val_set=None):
"""Fit the model to observations.
Parameters
----------
train_set: :obj:`cornac.data.Dataset`, required
User-Item preference data as well as additional modalities.
val_set: :obj:`cornac.data.Dataset`, optional, default: None
User-Item preference data for model selection purposes (e.g., early stopping).
Returns
-------
self : object
"""
# TODO: modify item_features to wide format. OR add a function in Dataset to perform this transformation??
self.train_set = self.LimeRSDataset(train_set)
self.num_users = self.train_set.num_users
self.num_items = self.train_set.num_items
self.uid_map = self.train_set.uid_map
self.iid_map = self.train_set.iid_map
self.min_rating = self.train_set.min_rating
self.max_rating = self.train_set.max_rating
self.global_mean = self.train_set.global_mean
df = self.train_set.merge_uir_with_features(
self.train_set.training_df, self.uses_features
)
training_data, training_columns = self.train_set.convert_to_pyfm_format(df)
self.one_hot_columns = training_columns
self.fm.fit(
training_data, self.train_set.y_train
) # training_data is a sparse matrix; y_train is an array
self.val_set = None if val_set is None else self.LimeRSDataset(val_set)
self.is_fitted = True
return self
[docs]
def score(self, user_id, item_id=None):
"""Predict the scores/ratings of a user for an item.
Parameters
----------
user_idx: int, required
The index of the user for whom to perform score prediction.
item_idx: int, optional, default: None
The index of the item for which to perform score prediction.
If None, scores for all known items will be returned.
Returns
-------
res : A scalar or a Numpy array
Relative scores that the user gives to the item or to all known items
"""
if item_id is not None:
df = pd.DataFrame(
{"user_id": str(user_id), "item_id": str(item_id)}, index=[0]
)
else:
all_items = [
str(item_idx) for _, item_idx in self.train_set.iid_map.items()
]
df = pd.DataFrame(
{"user_id": [str(user_id) for _ in all_items], "item_id": all_items}
)
df = self.train_set.merge_uir_with_features(df, self.uses_features)
all_predictions = list()
# divide in chunks to avoid memory errors
chunk_size = 10
chunks = np.array_split(df, chunk_size) if len(df) > 10 else [df]
for chunk in chunks:
# convert
test_data, _ = self.train_set.convert_to_pyfm_format(chunk)
# get predictions
preds = self.fm.predict(test_data)
all_predictions.extend(preds.round(3))
return all_predictions[0] if item_id != None else np.array(all_predictions)
[docs]
def score_neighbourhood(self, neighborhood_df):
"""make prediction on a list of items for each user. dataframe of [user_idx, item_idx] are
passed and predictions are made by chunks.
This is to void iterating each row and speed up the explain_instance function in limers
Returns: numpy array of predictions ordred by item_idx
"""
df = self.train_set.merge_uir_with_features(neighborhood_df, self.uses_features)
all_predictions = list()
# divide in chunks to avoid memory errors
chunk_size = 10
chunks = np.array_split(df, chunk_size)
for chunck in chunks:
# convert
test_data, _ = self.train_set.convert_to_pyfm_format(chunck)
# get predictions
preds = self.fm.predict(test_data)
all_predictions.extend(preds.round(3))
return np.array(all_predictions)
[docs]
class LimeRSDataset(Dataset):
"""Dataset object used for FM and Limers model training"""
def __init__(self, dataset):
"""
reformat rs dataset so to be able to merge uir with features
"""
uir_tuples = dataset.uir_tuple
Dataset.__init__(
self,
num_users=dataset.num_users,
num_items=dataset.num_items,
uid_map=dataset.uid_map,
iid_map=dataset.iid_map,
uir_tuple=uir_tuples,
)
training_df, y_train, train_item_features, train_user_features = (
_prepare_data(dataset)
)
self.training_df = training_df
self.y_train = y_train
self.user_frequency = self.set_train_frequency(item="False")
self.item_frequency = self.set_train_frequency(item="True")
self.item_features = train_item_features
self.user_features = train_user_features
[docs]
def convert_to_feature_long(self):
"""
Convert features from row to column representation
example: [(feature 1, item_idx), (feature 2, item_idx)] -> [item_idx, feature1, feature2]
"""
item_features_wide = pd.DataFrame()
user_features_wide = pd.DataFrame()
if not self.item_features.empty:
# if dataset contains item_feature
item_features_wide = self.item_features.copy()
item_features_wide["value"] = 1
item_features_wide = (
item_features_wide.pivot(
index="item_id", columns="feature", values="value"
)
.reset_index()
.fillna(0)
)
if not self.user_features.empty:
# if dataset contains user_feature
user_features_wide = self.user_features.copy()
user_features_wide["value"] = 1
user_features_wide = (
user_features_wide.pivot(
index="user_id", columns="feature", values="value"
)
.reset_index()
.fillna(0)
)
return item_features_wide, user_features_wide
def merge_uir_with_features(self, train_df, uses_features=True):
if uses_features:
# item_features are set as vectors, add a value column and convert to long format
if self.user_features.empty:
item_features, _ = self.convert_to_feature_long()
df = pd.merge(
train_df,
item_features,
on="item_id",
how="left",
)
elif self.item_features.empty:
_, user_features = self.convert_to_feature_long()
df = pd.merge(
train_df,
user_features,
on="user_id",
how="left",
)
else:
item_features, user_features = self.convert_to_feature_long()
df = pd.merge(
pd.merge(train_df, item_features, on="item_id", how="left"),
user_features,
on="user_id",
how="left",
suffixes=("_i_f", "_u_f"),
)
else:
df = train_df.copy()
return df
[docs]
def set_train_frequency(self, item="True"):
"""calculate user or item frequency appeared in the training set, frequency is mapped against id"""
if item == "True":
frequency = self.training_df["item_id"].value_counts().to_dict()
frequency = [
(key, frequency[str(value)])
for key, value in self.iid_map.items()
if str(value) in frequency
]
frequency = sorted(frequency, key=lambda x: x[1], reverse=True)
else:
frequency = self.training_df["user_id"].value_counts().to_dict()
frequency = [
(key, frequency[str(value)])
for key, value in self.uid_map.items()
if str(value) in frequency
]
frequency = sorted(frequency, key=lambda x: x[1], reverse=True)
return frequency
[docs]
def pick_top_users(self, count=1, train="True"):
"""pick the top n users based on train/test user frequency"""
if train:
return [x for x, _ in self.user_frequency[:count]]
else:
pass
[docs]
def pick_top_items(self, count=1, train="True"):
"""pick the top n items based on train/test item frequency"""
if train:
return [x for x, _ in self.item_frequency[:count]]
else:
pass
[docs]
def map_to_df(self, item="True"):
"""helper function to map item_idx to item_id or user_idx to user_id; returned df has xid.map.keys() as id and xid.map.items() as idx"""
if item == "True":
df = pd.DataFrame(
{
"item_id": list(self.iid_map.keys()),
"item_idx": list(self.iid_map.values()),
}
)
df["item_idx"] = df["item_idx"].astype(str)
else:
df = pd.DataFrame(
{
"user_id": list(self.uid_map.keys()),
"user_idx": list(self.uid_map.values()),
}
)
df["user_idx"] = df["user_idx"].astype(str)
return df