Matrix FactorizationとDeep Matrix Factorization(Keras)でのレコメンド
レコメンドの手法であるMatrix Factorizationについて勉強したのでその記録です。
以下の検証に関するコードはgithubにあげてあります。
1. 本記事の概要
- レコメンドの手法であるMatrix Factorizationについての概要
- Matrix FactorizationのNeural Network形式の表現とKerasでの実装
- Deep Matrix Factorizationの表現とKerasでの実装
- MovieLensでの精度検証
2. Matrix Factorizationの概要
ざっくりと。
Matrix Factorizationはレコメンドの手法のひとつ。
- レコメンドの参考サイト
レコメンドつれづれ ~1-1. 協調フィルタリングのコンセプトを知る~ - Platinum Data Blog by BrainPad
レコメンドつれづれ ~1-2. 協調フィルタリングの実装 ~ - Platinum Data Blog by BrainPad
Matrix Factorizationでできること
ユーザーによるアイテムの評価データを基に、あるユーザーがあるアイテムをどう評価するかを予測できる。
例)食べログだと…
Aさんは飲食店Bの点数をつけたことがない。Matrix Factorizationを使うと、①他の人が飲食店Bにつけた評価、②他の人が他の飲食店につけた評価、③Aさんが他の飲食店につけた評価から、Aさんが飲食店Bにつけるであろう点数を予測できる。この予測した評価を利用して、Aさんに飲食店Bをお薦めする、などすることができる。
Matrix Factorizationの特徴
Matrix Factorizationは行列分解の考え方に基づいている。行列分解を使うレコメンド手法はSVD(特異値分解)やNon-negative Matrix Factorization(NMF)などがあるが、これらの手法とことなり、Matrix Factorizationではユーザーやアイテムのバイアスを考慮することができる。
- Matrix Factorizationの参考サイト
SVDでMovieLensのレコメンドを実装する - け日記
NMFでMovieLensのレコメンドを実装する - け日記
Python: レコメンドの行列分解を確率的勾配降下法で実装する - け日記
Python: レコメンドの行列分解を確率的勾配降下法で実装する - け日記
Matrix Factorizationを使った評価予測
Matrix Factorizationの実装(numpy)
ご参考まで。
class MatrixFactorization: def __init__(self, latent_num): self.latent_num = latent_num # r = mu + u_bias + i_bias + dot(u_latent,i_latent) self.mu = None self.u_bias = None self.i_bias = None self.u_latent = None self.i_latent = None # id_index_dict self.user_id_index_dict = None self.item_id_index_dict = None return def fit(self, user_ids, item_ids, rating, batch_size, epochs, lerning_rate=0.1, l2=0): print('run MatrixFactorization fit') # num user_num = len(np.unique(user_ids)) item_num = len(np.unique(item_ids)) sample_num = len(user_ids) # id_index_dict self.user_id_index_dict = self.id_index_dict(np.unique(user_ids)) self.item_id_index_dict = self.id_index_dict(np.unique(item_ids)) # make index user_idxs = self.convert_ids_to_index(user_ids, self.user_id_index_dict) item_idxs = self.convert_ids_to_index(item_ids, self.item_id_index_dict) # mu self.mu = np.average(rating) # initialization self.u_bias = self.__initialization_bias(user_num) self.i_bias = self.__initialization_bias(item_num) self.u_latent = self.__initialization_latent(user_num, self.latent_num) self.i_latent = self.__initialization_latent(item_num, self.latent_num) # calc errors_in_epoch = self.__minibatch_sgd(user_idxs, item_idxs, rating, batch_size, epochs, lerning_rate, l2) return def __initialization_bias(self, id_num): # itnialize -0.05 ~ 0.05 b = (np.random.rand(id_num) - 0.5) * 0.1 return b def __initialization_latent(self, id_num, latent_num): ''' return latent (shape=[id_num, latent_num]) ''' # itnialize -0.05 ~ 0.05 lt = (np.random.rand(id_num, latent_num) - 0.5) * 0.1 return lt def __minibatch_sgd(self, user_idxs, item_idxs, rating, batch_size, epochs, lerning_rate=0.1, l2=0, verbose=True): # sample_num = len(user_idxs) steps_per_epoch = int(np.ceil(len(user_idxs) / batch_size)) loss_in_epoch = [] error_in_epoch = [] ## for epoch for iep in range(epochs): rand_sample_idxs = np.random.permutation(np.arange(sample_num)) ## for steps_per_epoch for istp in range(steps_per_epoch): # indexs in this mini batch batch_idxs = rand_sample_idxs[batch_size*istp : batch_size*(istp+1)] # update delta_u_bias, delta_i_bias, delta_u_latent, delta_i_latent = self.__delta_param(user_idxs[batch_idxs], item_idxs[batch_idxs], rating[batch_idxs]) self.u_bias += lerning_rate * (delta_u_bias - l2 * self.u_bias) self.i_bias += lerning_rate * (delta_i_bias - l2 * self.i_bias) self.u_latent += lerning_rate * (delta_u_latent - l2 * self.u_latent) self.i_latent += lerning_rate * (delta_i_latent - l2 * self.i_latent) # recording error loss_in_epoch.append(self.__loss_function(user_idxs, item_idxs, rating, l2)) error_in_epoch.append(self.__error_function(user_idxs, item_idxs, rating)) # verbose print(' epoch {0}: error = {1:.4f}, loss = {2:.4f}'.format(iep+1, error_in_epoch[iep], loss_in_epoch[iep])) return error_in_epoch def __delta_param(self, user_idxs, item_idxs, rating): # delta_u_bias = np.zeros_like(self.u_bias) delta_i_bias = np.zeros_like(self.i_bias) delta_u_latent = np.zeros_like(self.u_latent) delta_i_latent = np.zeros_like(self.i_latent) # loss = rating - self.__calc_rating(user_idxs, item_idxs) # num_sample = len(user_idxs) # u_counter = np.zeros_like(self.u_bias) i_counter = np.zeros_like(self.i_bias) # calculate delta for ismp in range(num_sample): u_idx = user_idxs[ismp] i_idx = item_idxs[ismp] ls = loss[ismp] # delta_u_bias[u_idx] += ls delta_i_bias[i_idx] += ls delta_u_latent[u_idx] += ls * self.i_latent[i_idx] delta_i_latent[i_idx] += ls * self.u_latent[u_idx] # u_counter[u_idx] += 1 i_counter[i_idx] += 1 # average delta u_counter = np.maximum(u_counter, 1) i_counter = np.maximum(i_counter, 1) #u_counter = np.maximum(u_counter, num_sample) #i_counter = np.maximum(i_counter, num_sample) delta_u_bias /= u_counter delta_i_bias /= i_counter delta_u_latent /= u_counter[:,np.newaxis] delta_i_latent /= i_counter[:,np.newaxis] return delta_u_bias, delta_i_bias, delta_u_latent, delta_i_latent def __loss_function(self, user_idxs, item_idxs, rating, l2): e = np.sum(np.square(rating - self.__calc_rating(user_idxs, item_idxs))) l = l2 * (np.sum(np.square(self.u_bias[user_idxs])) + np.sum(np.square(self.i_bias[item_idxs])) + np.sum(np.square(self.u_latent[user_idxs]))+ + np.sum(np.square(self.i_latent[item_idxs]))) e_l = e + l return e_l def __error_function(self, user_idxs, item_idxs, rating): e = np.average(np.square(rating - self.__calc_rating(user_idxs, item_idxs))) return e def __calc_rating(self, user_idxs, item_idxs): mu = self.mu u_b = self.u_bias[user_idxs] i_b = self.i_bias[item_idxs] t1 = self.u_latent[user_idxs] t2 = self.i_latent[item_idxs] cross_term = np.sum(self.u_latent[user_idxs] * self.i_latent[item_idxs], axis=1) rt = mu + u_b + i_b + cross_term return rt def predict(self, user_ids, item_ids): # make index user_idxs = self.convert_ids_to_index(user_ids, self.user_id_index_dict) item_idxs = self.convert_ids_to_index(item_ids, self.item_id_index_dict) rt = self.__calc_rating(user_idxs, item_idxs) return rt
3. Matrix FactorizationのNeural Network形式の表現とKerasでの実装
Matrix Factorizationは上記のようにnumpyを使ってわりと簡単に実装できる。
しかし、Kerasで実装できれば、Adamなどの高度な最適化手法を使えることに加え、Matrix Factorizationの構造を複雑にするなどの発展にも簡単に対応できる。そこで、Matrix FactorizationのNeural Network形式の表現を導出し、Kerasで実装する。
Matrix FactorizationのNeural Network形式の表現
Matrix Factorizationでは、評価値が次式で表されると仮定する。
(1)式
:ユーザiによるアイテムjに対する評価値
:全アイテムの評価値の平均値。評価値のスケールを表す。
:ユーザiによる評価値のバイアス。このユーザがつける評価値が全体的に高いか低いかを表す。
:アイテムjに対する評価値のバイアス。このアイテムに対する評価値が全体的に高いか低いかを表す。
:ユーザiの特徴ベクトル。
:アイテムjの特徴ベクトル。
:ユーザiとアイテムjの特徴ベクトルの内積。バイアスのみでは表せられなかった相互作用を表す。
省略していますがL2正則化の項も含めたりする。普通は勾配降下法等を使ってb, vを求める。ちなみに上の実装は確率的勾配降下法を使っている。
ここで、ユーザとアイテムをそれぞれone-hotベクトル、で表す。
]:i番目が1でその他は0
]:j番目が1でその他は0
、を使うと、(1)式を次式のように行列形式で表現できる。
(2)式
:ユーザーのバイアス行列。次元は1×ユーザー数
:アイテムのバイアス行列。次元は1×アイテム数
:ユーザーの特徴行列。次元は特徴数(※)×ユーザー数
:アイテムの特徴行列。次元は特徴数(※)×アイテム数
(※)特徴数はユーザとアイテムで同じ。任意の数値を指定可能。
(2)式より、Matrix Factorizationは以下の画像のニューラルネットワーク構造で表現できる。
※dropoutは普通使いませんが、一般化しているので一応書いてます。
Kerasでの実装
上記の(2)式をKerasで実装しました。のちのDeepを意識した実装になっているので、かなりごちゃごちゃしてます。。
class DeepMatrixFactorization: def __init__(self, unique_user_num, unique_item_num, all_rating_mean=0, rating_scale=1): self.unique_user_num = unique_user_num self.unique_item_num = unique_item_num self.all_rating_mean = all_rating_mean self.rating_scale = rating_scale self.model = None self.history = None self.__count_call_sum_model = 0 self.__count_call_mean_model = 0 return #make model def make_model_mf(self, user_bias=True, item_bias=True, cross_term=True, latent_num=10, cross_term_l2=0): ''' make normal matrix factorization model with keras. rating = all_mean + user_bias + item_bias + cross_term ''' input_user_id = Input(shape=(1,), name='user_id') input_item_id = Input(shape=(1,), name='item_id') #user bias u_bias = None if user_bias: u_bias = self.__bias_term(input_id=input_user_id, unique_id_num=self.unique_user_num, l2=0) #item bias i_bias = None if item_bias: i_bias = self.__bias_term(input_id=input_item_id, unique_id_num=self.unique_item_num, l2=0) #cross term crs_trm = None if cross_term: crs_u = self.__single_term(input_id=input_user_id, unique_id_num=self.unique_user_num, output_dim=latent_num, l2=cross_term_l2) crs_i = self.__single_term(input_id=input_item_id, unique_id_num=self.unique_item_num, output_dim=latent_num, l2=cross_term_l2) crs_trm = self.__cross_term(crs_u, crs_i, merge='sum') #concatenate def append_isNotNone(lst, v): tls = copy.copy(lst) if v is not None: tls.append(v) return tls concats = [] concats = append_isNotNone(concats, u_bias) concats = append_isNotNone(concats, i_bias) concats = append_isNotNone(concats, crs_trm) if len(concats) > 1: y = Add(name='add_bias_crossTerm')(concats) else: y = concats[0] # add mean y = Lambda(lambda x: x*self.rating_scale + self.all_rating_mean, name='scaling')(y) self.model = Model(inputs=[input_user_id, input_item_id], outputs=y) return #model of bias term def __single_term(self, input_id, unique_id_num, output_dim=1, hidden_nodes=[], activation='lrelu', activation_last='linear', l2=0, hidden_l2s=[], dropout_rate=0, hidden_dropout_rates=[], latent_layer_name=None): ''' input -> embedding -> flatten -> dropout -> hidden_layer (-> dense -> activation -> dropout -> ... -> dense -> activation -> dropout -> dense -> activation_last -> dropout) -> output ''' # hidden_nodes_ = copy.copy(hidden_nodes) hidden_nodes_.append(output_dim) # hl = input_id # for ih, h_dim in enumerate(hidden_nodes_): # first layer if ih == 0: # embedding layer # input_shape = [batch_size, unique_id_num+1] # output_shape = [batch_size, input_length, output_dim] hl = Embedding(input_dim=unique_id_num, output_dim=hidden_nodes_[0], #input_length=1, embeddings_regularizer=regularizers.l2(l2), name=latent_layer_name )(hl) # flatten hl = Flatten()(hl) #dropout hl = Dropout(dropout_rate)(hl) # 2~ layer else: l2_h = 0 if len(hidden_l2s)==0 else hidden_l2s[ih-1] # hidden layer hl = Dense(h_dim, kernel_regularizer=regularizers.l2(l2_h))(hl) #activation act = activation if ih != len(hidden_nodes_)-1 else activation_last hl = KerasBase.activation(act)(hl) #dropout drp_rt = 0 if len(hidden_dropout_rates)==0 else hidden_dropout_rates[ih-1] hl = Dropout(drp_rt)(hl) return hl def __bias_term(self, input_id, unique_id_num, l2=0, latent_layer_name=None): ''' input -> embedding -> flatten -> output ''' bias = self.__single_term(input_id=input_id, unique_id_num=unique_id_num, output_dim=1, hidden_nodes=[], activation='lrelu', activation_last='linear', l2=l2, hidden_l2s=[], dropout_rate=0, hidden_dropout_rates=[], latent_layer_name=latent_layer_name) return bias #model of cross term def __cross_term(self, input1, input2, merge='sum', hidden_nodes=[], activation='lrelu', activation_last='lrelu', hidden_l2s=[], dropout_rate=0, hidden_dropout_rates=[]): ''' input1 and input2 must be already embedded. (input1, input2) -> Multiply -> dropout -> hidden_layer (-> dense -> activation -> dropout -> ... -> dense -> activation -> dropout -> dense -> activation_last -> dropout) -> merge(ex. sum, mean) -> output ''' multiplied = Multiply()([input1, input2]) #hidden layer hl = multiplied for ih, h_dim in enumerate(hidden_nodes): l2_h = 0 if len(hidden_l2s)==0 else hidden_l2s[ih] # dense hl = Dense(h_dim, kernel_regularizer=regularizers.l2(l2_h))(hl) # activation act = activation if ih != len(hidden_nodes)-1 else activation_last hl = KerasBase.activation(act)(hl) # dropout drp_rt = 0 if len(hidden_dropout_rates)==0 else hidden_dropout_rates[ih] hl = Dropout(drp_rt)(hl) #merge layer if merge=='sum': self.__count_call_sum_model += 1 crs_trm = KerasBase.sum_layer(name='sum' + str(self.__count_call_sum_model))(hl) elif merge=='mean': self.__count_call_mean_model += 1 crs_trm = KerasBase.mean_layer(name='mean' + str(self.__count_call_mean_model))(hl) return crs_trm def compile(self, optimizer='adam', loss='mean_squared_error'): self.model.compile(optimizer=optimizer, loss=loss) return def fit(self, user_ids, item_ids, rating, batch_size, epochs, user_ids_val=None, item_ids_val=None, rating_val=None): # validation data val_data = None if (user_ids_val is not None) and (item_ids_val is not None) and (rating_val is not None): val_data = ([user_ids_val, item_ids_val], rating_val) # fit self.history = self.model.fit(x=[user_ids, item_ids], y=rating, batch_size=batch_size, epochs=epochs, verbose=1, validation_data=val_data) return def predict(self, user_ids, item_ids): return self.model.predict([user_ids, item_ids])[:,0]
4. Deep Matrix Factorizationの表現とKerasでの実装
Deep Matrix Factorizationはすでにいろいろやられている。自分が思いつくものはだいたい既に誰かがやっているもんだよなぁ。。Matrix Factorizationを一般化したFactorization Machinesというものもあるみたい。
- Deep Matrix Factorization 参考サイト
PyTorchでより深いMatrix Factorization - nardtree - Medium
PyTorchでもMatrix Factorizationがしたい! | takuti.me
Matrix Factorization in PyTorch | Ethan Rosenthal
DEEP BEERS: Playing with Deep Recommendation Engines Using Keras
Recommender Systems in Keras
さて、ここではKerasでDeep Matrix Factorizationを実装した。上記のようにMatrix Factorizationをニューラルネットワーク形式で表現できれば、Deepにするのは簡単。ただ、どの部分をdeepにするかは工夫のしどころ。ここでは、①相互作用の入力部分、②userとitemの積をとった後の部分、③②をResNetのような構造にしたもの、④①と②両方、を例として実装した。おそらくもっといいものがあると思うが、ひとまずお試し。
Kerasを使った実装は以下のとおり。基本的に上記のMatrix Factorizationの実装を変えただけ。Kerasであれば構造の変更を簡単に実装できるからいいね。
class DeepMatrixFactorization: def __init__(self, unique_user_num, unique_item_num, all_rating_mean=0, rating_scale=1): self.unique_user_num = unique_user_num self.unique_item_num = unique_item_num self.all_rating_mean = all_rating_mean self.rating_scale = rating_scale self.model = None self.history = None self.__count_call_sum_model = 0 self.__count_call_mean_model = 0 return #make model def make_model_mf(self, user_bias=True, item_bias=True, cross_term=True, latent_num=10, cross_term_l2=0): ''' make normal matrix factorization model with keras. rating = all_mean + user_bias + item_bias + cross_term ''' input_user_id = Input(shape=(1,), name='user_id') input_item_id = Input(shape=(1,), name='item_id') #user bias u_bias = None if user_bias: u_bias = self.__bias_term(input_id=input_user_id, unique_id_num=self.unique_user_num, l2=0) #item bias i_bias = None if item_bias: i_bias = self.__bias_term(input_id=input_item_id, unique_id_num=self.unique_item_num, l2=0) #cross term crs_trm = None if cross_term: crs_u = self.__single_term(input_id=input_user_id, unique_id_num=self.unique_user_num, output_dim=latent_num, l2=cross_term_l2) crs_i = self.__single_term(input_id=input_item_id, unique_id_num=self.unique_item_num, output_dim=latent_num, l2=cross_term_l2) crs_trm = self.__cross_term(crs_u, crs_i, merge='sum') #concatenate def append_isNotNone(lst, v): tls = copy.copy(lst) if v is not None: tls.append(v) return tls concats = [] concats = append_isNotNone(concats, u_bias) concats = append_isNotNone(concats, i_bias) concats = append_isNotNone(concats, crs_trm) if len(concats) > 1: y = Add(name='add_bias_crossTerm')(concats) else: y = concats[0] # add mean y = Lambda(lambda x: x*self.rating_scale + self.all_rating_mean, name='scaling')(y) self.model = Model(inputs=[input_user_id, input_item_id], outputs=y) return def make_model_dmf_deepLatent(self, user_bias=True, item_bias=True, cross_term=True, latent_num=10, cross_term_l2=0, hidden_nodes_latent=[10], hidden_l2=[0], hidden_dropout_rates=[]): ''' make normal matrix factorization model with keras. rating = all_mean + user_bias + item_bias + cross_term ''' input_user_id = Input(shape=(1,), name='user_id') input_item_id = Input(shape=(1,), name='item_id') #user bias u_bias = None if user_bias: u_bias = self.__bias_term(input_id=input_user_id, unique_id_num=self.unique_user_num, l2=0) #item bias i_bias = None if item_bias: i_bias = self.__bias_term(input_id=input_item_id, unique_id_num=self.unique_item_num, l2=0) #cross term crs_trm = None if cross_term: crs_u = self.__single_term(input_id=input_user_id, unique_id_num=self.unique_user_num, output_dim=latent_num, l2=cross_term_l2, hidden_nodes=hidden_nodes_latent, hidden_l2s=hidden_l2, hidden_dropout_rates=hidden_dropout_rates) crs_i = self.__single_term(input_id=input_item_id, unique_id_num=self.unique_item_num, output_dim=latent_num, l2=cross_term_l2, hidden_nodes=hidden_nodes_latent, hidden_l2s=hidden_l2, hidden_dropout_rates=hidden_dropout_rates) crs_trm = self.__cross_term(crs_u, crs_i, merge='sum') #concatenate def append_isNotNone(lst, v): tls = copy.copy(lst) if v is not None: tls.append(v) return tls concats = [] concats = append_isNotNone(concats, u_bias) concats = append_isNotNone(concats, i_bias) concats = append_isNotNone(concats, crs_trm) if len(concats) > 1: y = Add(name='add_bias_crossTerm')(concats) else: y = concats[0] # add mean y = Lambda(lambda x: x*self.rating_scale + self.all_rating_mean, name='scaling')(y) self.model = Model(inputs=[input_user_id, input_item_id], outputs=y) return def make_model_dmf_deepCrossterm(self, user_bias=True, item_bias=True, cross_term=True, latent_num=10, cross_term_l2=0, hidden_nodes_crossterm=[10], hidden_l2=[0], hidden_dropout_rates=[]): ''' make normal matrix factorization model with keras. rating = all_mean + user_bias + item_bias + cross_term ''' input_user_id = Input(shape=(1,), name='user_id') input_item_id = Input(shape=(1,), name='item_id') #user bias u_bias = None if user_bias: u_bias = self.__bias_term(input_id=input_user_id, unique_id_num=self.unique_user_num, l2=0) #item bias i_bias = None if item_bias: i_bias = self.__bias_term(input_id=input_item_id, unique_id_num=self.unique_item_num, l2=0) #cross term crs_trm = None if cross_term: crs_u = self.__single_term(input_id=input_user_id, unique_id_num=self.unique_user_num, output_dim=latent_num, l2=cross_term_l2) crs_i = self.__single_term(input_id=input_item_id, unique_id_num=self.unique_item_num, output_dim=latent_num, l2=cross_term_l2) crs_trm = self.__cross_term(crs_u, crs_i, merge='sum', hidden_nodes=hidden_nodes_crossterm, hidden_l2s=hidden_l2, hidden_dropout_rates=hidden_dropout_rates) #concatenate def append_isNotNone(lst, v): tls = copy.copy(lst) if v is not None: tls.append(v) return tls concats = [] concats = append_isNotNone(concats, u_bias) concats = append_isNotNone(concats, i_bias) concats = append_isNotNone(concats, crs_trm) if len(concats) > 1: y = Add(name='add_bias_crossTerm')(concats) else: y = concats[0] # add mean y = Lambda(lambda x: x*self.rating_scale + self.all_rating_mean, name='scaling')(y) self.model = Model(inputs=[input_user_id, input_item_id], outputs=y) return def make_model_dmf_deepLatent_deepCrossterm(self, user_bias=True, item_bias=True, cross_term=True, latent_num=10, cross_term_l2=0, hidden_nodes_latent=[10], hidden_nodes_crossterm=[10], hidden_l2=[0], hidden_dropout_rates=[]): ''' make normal matrix factorization model with keras. rating = all_mean + user_bias + item_bias + cross_term ''' input_user_id = Input(shape=(1,), name='user_id') input_item_id = Input(shape=(1,), name='item_id') #user bias u_bias = None if user_bias: u_bias = self.__bias_term(input_id=input_user_id, unique_id_num=self.unique_user_num, l2=0) #item bias i_bias = None if item_bias: i_bias = self.__bias_term(input_id=input_item_id, unique_id_num=self.unique_item_num, l2=0) #cross term crs_trm = None if cross_term: crs_u = self.__single_term(input_id=input_user_id, unique_id_num=self.unique_user_num, output_dim=latent_num, l2=cross_term_l2, hidden_nodes=hidden_nodes_latent, hidden_dropout_rates=hidden_dropout_rates) crs_i = self.__single_term(input_id=input_item_id, unique_id_num=self.unique_item_num, output_dim=latent_num, l2=cross_term_l2, hidden_nodes=hidden_nodes_latent, hidden_dropout_rates=hidden_dropout_rates) crs_trm = self.__cross_term(crs_u, crs_i, merge='sum', hidden_nodes=hidden_nodes_crossterm, hidden_l2s=hidden_l2, hidden_dropout_rates=hidden_dropout_rates) #concatenate def append_isNotNone(lst, v): tls = copy.copy(lst) if v is not None: tls.append(v) return tls concats = [] concats = append_isNotNone(concats, u_bias) concats = append_isNotNone(concats, i_bias) concats = append_isNotNone(concats, crs_trm) if len(concats) > 1: y = Add(name='add_bias_crossTerm')(concats) else: y = concats[0] # add mean y = Lambda(lambda x: x*self.rating_scale + self.all_rating_mean, name='scaling')(y) self.model = Model(inputs=[input_user_id, input_item_id], outputs=y) return def make_model_dmf_residualDeepCrossterm(self, user_bias=True, item_bias=True, cross_term=True, latent_num=10, cross_term_l2=0, hidden_nodes_crossterm=[10], hidden_l2=[0], hidden_dropout_rates=[]): ''' make normal matrix factorization model with keras. rating = all_mean + user_bias + item_bias + cross_term ''' input_user_id = Input(shape=(1,), name='user_id') input_item_id = Input(shape=(1,), name='item_id') #user bias u_bias = None if user_bias: u_bias = self.__bias_term(input_id=input_user_id, unique_id_num=self.unique_user_num, l2=0) #item bias i_bias = None if item_bias: i_bias = self.__bias_term(input_id=input_item_id, unique_id_num=self.unique_item_num, l2=0) #cross term crs_trm = None res_crs_trm = None if cross_term: crs_u = self.__single_term(input_id=input_user_id, unique_id_num=self.unique_user_num, output_dim=latent_num, l2=cross_term_l2) crs_i = self.__single_term(input_id=input_item_id, unique_id_num=self.unique_item_num, output_dim=latent_num, l2=cross_term_l2) res_crs_trm = self.__res_cross_term(crs_u, crs_i, merge='sum', hidden_nodes=hidden_nodes_crossterm, hidden_l2s=hidden_l2, hidden_dropout_rates=hidden_dropout_rates) #concatenate def append_isNotNone(lst, v): tls = copy.copy(lst) if v is not None: tls.append(v) return tls concats = [] concats = append_isNotNone(concats, u_bias) concats = append_isNotNone(concats, i_bias) concats = append_isNotNone(concats, res_crs_trm) if len(concats) > 1: y = Add(name='add_bias_crossTerm')(concats) else: y = concats[0] # add mean y = Lambda(lambda x: x*self.rating_scale + self.all_rating_mean, name='scaling')(y) self.model = Model(inputs=[input_user_id, input_item_id], outputs=y) return #model of bias term def __single_term(self, input_id, unique_id_num, output_dim=1, hidden_nodes=[], activation='lrelu', activation_last='linear', l2=0, hidden_l2s=[], dropout_rate=0, hidden_dropout_rates=[], latent_layer_name=None): ''' input -> embedding -> flatten -> dropout -> hidden_layer (-> dense -> activation -> dropout -> ... -> dense -> activation -> dropout -> dense -> activation_last -> dropout) -> output ''' # hidden_nodes_ = copy.copy(hidden_nodes) hidden_nodes_.append(output_dim) # hl = input_id # for ih, h_dim in enumerate(hidden_nodes_): # first layer if ih == 0: # embedding layer # input_shape = [batch_size, unique_id_num+1] # output_shape = [batch_size, input_length, output_dim] hl = Embedding(input_dim=unique_id_num, output_dim=hidden_nodes_[0], #input_length=1, embeddings_regularizer=regularizers.l2(l2), name=latent_layer_name )(hl) # flatten hl = Flatten()(hl) #dropout hl = Dropout(dropout_rate)(hl) # 2~ layer else: l2_h = 0 if len(hidden_l2s)==0 else hidden_l2s[ih-1] # hidden layer hl = Dense(h_dim, kernel_regularizer=regularizers.l2(l2_h))(hl) #activation act = activation if ih != len(hidden_nodes_)-1 else activation_last hl = KerasBase.activation(act)(hl) #dropout drp_rt = 0 if len(hidden_dropout_rates)==0 else hidden_dropout_rates[ih-1] hl = Dropout(drp_rt)(hl) return hl def __bias_term(self, input_id, unique_id_num, l2=0, latent_layer_name=None): ''' input -> embedding -> flatten -> output ''' bias = self.__single_term(input_id=input_id, unique_id_num=unique_id_num, output_dim=1, hidden_nodes=[], activation='lrelu', activation_last='linear', l2=l2, hidden_l2s=[], dropout_rate=0, hidden_dropout_rates=[], latent_layer_name=latent_layer_name) return bias #model of cross term def __cross_term(self, input1, input2, merge='sum', hidden_nodes=[], activation='lrelu', activation_last='lrelu', hidden_l2s=[], dropout_rate=0, hidden_dropout_rates=[]): ''' input1 and input2 must be already embedded. (input1, input2) -> Multiply -> dropout -> hidden_layer (-> dense -> activation -> dropout -> ... -> dense -> activation -> dropout -> dense -> activation_last -> dropout) -> merge(ex. sum, mean) -> output ''' multiplied = Multiply()([input1, input2]) #hidden layer hl = multiplied for ih, h_dim in enumerate(hidden_nodes): l2_h = 0 if len(hidden_l2s)==0 else hidden_l2s[ih] # dense hl = Dense(h_dim, kernel_regularizer=regularizers.l2(l2_h))(hl) # activation act = activation if ih != len(hidden_nodes)-1 else activation_last hl = KerasBase.activation(act)(hl) # dropout drp_rt = 0 if len(hidden_dropout_rates)==0 else hidden_dropout_rates[ih] hl = Dropout(drp_rt)(hl) #merge layer if merge=='sum': self.__count_call_sum_model += 1 crs_trm = KerasBase.sum_layer(name='sum' + str(self.__count_call_sum_model))(hl) elif merge=='mean': self.__count_call_mean_model += 1 crs_trm = KerasBase.mean_layer(name='mean' + str(self.__count_call_mean_model))(hl) return crs_trm def __res_cross_term(self, input1, input2, merge='sum', hidden_nodes=[], activation='lrelu', activation_last='lrelu', hidden_l2s=[], dropout_rate=0, hidden_dropout_rates=[]): ''' input1 and input2 must be already embedded. (input1, input2) -> Multiply -> dropout -> hidden_layer (-> dense -> activation -> dropout -> ... -> dense -> activation -> dropout -> dense -> activation_last -> dropout) -> merge(ex. sum, mean) -> output ''' multiplied = Multiply()([input1, input2]) #hidden layer hl = multiplied for ih, h_dim in enumerate(hidden_nodes): l2_h = 0 if len(hidden_l2s)==0 else hidden_l2s[ih] # dense hl = Dense(h_dim, kernel_regularizer=regularizers.l2(l2_h))(hl) # activation act = activation if ih != len(hidden_nodes)-1 else activation_last hl = KerasBase.activation(act)(hl) # dropout drp_rt = 0 if len(hidden_dropout_rates)==0 else hidden_dropout_rates[ih] hl = Dropout(drp_rt)(hl) #add hl = Add()([multiplied, hl]) #merge layer if merge=='sum': self.__count_call_sum_model += 1 crs_trm = KerasBase.sum_layer(name='sum' + str(self.__count_call_sum_model))(hl) elif merge=='mean': self.__count_call_mean_model += 1 crs_trm = KerasBase.mean_layer(name='mean' + str(self.__count_call_mean_model))(hl) return crs_trm def compile(self, optimizer='adam', loss='mean_squared_error'): self.model.compile(optimizer=optimizer, loss=loss) return def fit(self, user_ids, item_ids, rating, batch_size, epochs, user_ids_val=None, item_ids_val=None, rating_val=None): # validation data val_data = None if (user_ids_val is not None) and (item_ids_val is not None) and (rating_val is not None): val_data = ([user_ids_val, item_ids_val], rating_val) # fit self.history = self.model.fit(x=[user_ids, item_ids], y=rating, batch_size=batch_size, epochs=epochs, verbose=1, validation_data=val_data) return def predict(self, user_ids, item_ids): return self.model.predict([user_ids, item_ids])[:,0]
5. MovieLensでの精度検証
MovieLensの映画評価データセットを使ってDeep Matrix Factorizationの精度検証を実施した。
MovieLens | GroupLens
Recommendation and Ratings Public Data Sets For Machine Learning · GitHub
使用したMovieLensの映画評価データセットについては以下のとおり。
- ユーザが映画に0.5~5の評価点をつけたデータセット
- 2019/3/3の最新データを使用。
- ユーザ数231978、映画数21851(ただし、評価した映画が10以下のユーザ、評価されたユーザ数が10以下の映画は除外)
精度検証の流れは以下のとおり。
- (1) ハイパーパラメータチューニング。training dataの一部を使用。
- (2) 学習、精度検証。training dataで学習後、test dataで検証。全データセットの80%をtraining data、20%をtest dataとして分割。
(1) ハイパーパラメータチューニング
ハイパーパラメータチューニングのため、training dataの一部を使用。ユーザ数8558、映画数6945。80%をtraining data、20%をtest dataとして使う。チューニングしたハイパーパラメータは、相互作用部分のノード数、Deep層のノード数、L2正則化の係数。
チューニングのデータセットと学習データセットでユーザ数及び映画数が違うことに伴って、これらで学習パラメータ数が変わってしまう。そのため、せっかくチューニングしたハイパーパラメータが妥当なのかかなり微妙。今思うと全体を60%をtraining data、20%をvalidation data、20%をtest dataとしてvalidation dataでチューニングした方がよかった。でもtraining data多いと学習時間かかってチューニングがしんどいんだよなー。。
(2) 学習、精度検証
training dataで学習後、test dataで精度検証する。全データセットの80%をtraining data、20%をtest dataとして使う。なお、予測した評価点と実際の評価点のRMSE (Root mean squared error)を精度とする。RMSEが小さいほど精度が良い。
精度の比較対象は以下のとおり。
- Random:評価点を0.5~5の乱数で予測(予測というのか??笑)。
- 全体平均:評価点をtraining dataの平均評価点で予測。
- 映画単位平均:評価点をtraining data中のその映画の平均評価点で予測。
- ユーザ単位平均:評価点をtraining data中のそのユーザの平均評価点で予測。
- Matrix Factorization:評価点をMatrix Factorizationで予測。
- Deep Matrix Factorization(①相互作用の入力部分をDeep):評価点をDeep Matrix Factorizationで予測。構造は上記の①
- Deep Matrix Factorization(②userとitemの積をとった後の部分をDeep):評価点をDeep Matrix Factorizationで予測。構造は上記の②
- Deep Matrix Factorization(③②をResNetのような構造にしたもの):評価点をDeep Matrix Factorizationで予測。構造は上記の③
- Deep Matrix Factorization(④①と②両方):評価点をDeep Matrix Factorizationで予測。構造は上記の④
結果は下表のとおり。
結果から以下のことがわかる。
- Matrix Factorizationを使うことで、ただの平均と比べて大幅に精度が向上する。
- Deep Matrix Factorization(①、②、③)はMatrix Factorizationと精度変わらず。パラメータチューニング不足??
- Deep Matrix Factorization(④①と②両方)は精度少し向上。パラメータチューニングをもっとがっつりやればもっとよくなるかも。
6. まとめ
Matrix Factorizationのニューラルネットワーク形式の表現を導出し、Kerasで実装した。
MovieLensのデータセットを使い、映画評価点の予測精度を検証した。その結果、ただの平均を使用する方法と比べてMatrix Factorizationはかなり精度が高くなった。また、Deep Matrix Factorizationは構造によってはMatrix Factorizationよりも精度が高くなった。
検証の感想は以下のとおり。
- Matrix Factorizationめっちゃ強い!!
- Deep Matrix Factorizationはポテンシャルありそうだけど、パラメータチューニングしんどい。
- 得られた教訓は、ニューラルネットワークの構造を考えるときにMatrix Factorizationのようにドメイン情報を反映することを意識すべき、ということ。機械学習エンジニアとしてレベルアップしていくには、やはり適用対象に対する分析力が必要。
せっかくMatrix Factorizationについて勉強して実装もしたので、実際のデータを使って分類とかしてみたいなー