관리 메뉴

TEAM EDA

03. 협업필터링 기반 추천시스템 - ALS 본문

EDA Study/추천시스템

03. 협업필터링 기반 추천시스템 - ALS

김현우 2020. 12. 21. 01:10

해당 글은 T-아카데미에서 발표한 추천시스템 - 입문하기의 자료에 딥러닝을 이용한 추천시스템과 추천시스템 대회를 분석한 내용을 추가한 글입니다. 해당 자료보다 더욱더 좋은 자료들이 페이스북 그룹 Recommender System KR에 있으니 많은 관심 부탁합니다.

협업필터링 기반 추천시스템 - ALS

이전 2개의 포스팅에서는 KNN을 이용한 이웃 기반의 협업 필터링과 SGD를 이용한 잠재요인 기반의 협업 필터링을 살펴봤습니다. 이번 포스팅에서는 잠재요인 기반의 협업 필터링이지만, SGD와는 다른 방식의 학습을 가지는 Alternating Least Squares(ALS)에 대해서 살펴보도록 하겠습니다.

 

ALS이란?

기존의 SGD가 두 개의 행렬(User Latent, Item Latent)을 동시에 최적화하는 방법이라면, ALS는 두 행렬 중 하나를 고정시키고 다른 하나의 행렬을 순차적으로 반복하면서 최적화하는 방법입니다. 이렇게 하면, 기존의 최적화 문제가 convex 형태로 바뀌기에 수렴된 행렬을 찾을 수 있는 장점이 있습니다. 학습에 진행되는 알고리즘은 아래의 4개 과정이 순차적으로 진행되면서 User Latent와 Item Latent를 찾게 됩니다.

  1. 초기 아이템, 사용자 행렬을 초기화
  2. 아이템 행렬을 고정하고 사용자 행렬을 최적화
  3. 사용자 행렬을 고정하고 아이템 행렬을 최적화
  4. 위의 2, 3 과정을 반복

Convex 형태의 문제이기에 수렴된 행렬의 최적 해를 알고 있습니다. 해당 식은 아래와 같이 적을 수 있고 이론적으로 해당 값이 어떻게 나온지는 링크를 참조하시기 바랍니다.

ALS의 경우 입력값에 결측치가 있으면 안 됩니다. 그렇기에, 모든 결측치는 0으로 바꿔주고 학습을 진행하도록 하겠습니다.

 

먼저, 초기 아이템, 사용자 행렬을 초기화해주는 과정은 SGD와 동일합니다.

 

이후에, 아이템 행렬을 고정하고 사용자 행렬을 최적화합니다. SGD에서는 아이템 행렬을 고정하지 않고 동시에 아이템과 사용자 행렬을 최적화한 반면에, ALS는 하나의 행렬의 수렴을 끝내고 다른 하나의 행렬의 수렴을 진행한다는 점에서 가장 큰 차이를 보입니다.

 

마찬가지로, 이제 사용자 행렬을 고정하고 아이템 행렬을 최적화해줍니다.

 

보통 이런 식의 반복되는 최적화 과정을 거치면, 아이템 행렬의 값이 달라져서 사용자 행렬의 최적화된 값도 달라집니다. 그렇기에, 해당 과정을 이제 반복해가면서 두 행렬 모두 수렴의 근접한 값을 찾아주는 게 ALS 알고리즘의 핵심입니다.

한번, 코드를 통해서 ALS 과정을 세부적으로 살펴보겠습니다. 전체의 풀 코드는 아래와 같습니다.

import numpy as np
from tqdm import tqdm_notebook as tqdm

# Base code : https://github.com/mickeykedia/Matrix-Factorization-ALS/blob/master/ALS%20Python%20Implementation.py
class AlternatingLeastSquares():
    def __init__(self, R, k, reg_param, epochs, verbose=False):
        """
        :param R: rating matrix
        :param k: latent parameter
        :param learning_rate: alpha on weight update
        :param reg_param: beta on weight update
        :param epochs: training epochs
        :param verbose: print status
        """
        self._R = R
        self._num_users, self._num_items = R.shape
        self._k = k
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose


    def fit(self):
        # init latent features
        self._users = np.random.normal(size=(self._num_users, self._k))
        self._items = np.random.normal(size=(self._num_items, self._k))

        # train while epochs
        self._training_process = []
        self._user_error = 0; self._item_error = 0; 
        for epoch in range(self._epochs):
            for i, Ri in enumerate(self._R):
                self._users[i] = self.user_latent(i, Ri)
                self._user_error = self.cost()

            for j, Rj in enumerate(self._R.T):
                self._items[j] = self.item_latent(j, Rj)
                self._item_error = self.cost()

            cost = self.cost()
            self._training_process.append((epoch, cost))

            # print status
            if self._verbose == True and ((epoch + 1) % 10 == 0):
                print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))


    def cost(self):
        """
        compute root mean square error
        :return: rmse cost
        """
        xi, yi = self._R.nonzero()
        cost = 0
        for x, y in zip(xi, yi):
            cost += pow(self._R[x, y] - self.get_prediction(x, y), 2)
        return np.sqrt(cost/len(xi))


    def user_latent(self, i, Ri):
        """
        :param error: rating - prediction error
        :param i: user index
        :param Ri: Rating of user index i
        :return: convergence value of user latent of i index
        """

        du = np.linalg.solve(np.dot(self._items.T, np.dot(np.diag(Ri), self._items)) + self._reg_param * np.eye(self._k),
                                   np.dot(self._items.T, np.dot(np.diag(Ri), self._R[i].T))).T
        return du

    def item_latent(self, j, Rj):
        """
        :param error: rating - prediction error
        :param j: item index
        :param Rj: Rating of item index j
        :return: convergence value of itemr latent of j index
        """

        di = np.linalg.solve(np.dot(self._users.T, np.dot(np.diag(Rj), self._users)) + self._reg_param * np.eye(self._k),
                                 np.dot(self._users.T, np.dot(np.diag(Rj), self._R[:, j])))
        return di


    def get_prediction(self, i, j):
        """
        get predicted rating: user_i, item_j
        :return: prediction of r_ij
        """
        return self._users[i, :].dot(self._items[j, :].T)


    def get_complete_matrix(self):
        """
        :return: complete matrix R^
        """
        return self._users.dot(self._items.T)



# run example
if __name__ == "__main__":
    # rating matrix - User X Item : (7 X 5)
    R = np.array([
        [1, 0, 0, 1, 3],
        [2, 0, 3, 1, 1],
        [1, 2, 0, 5, 0],
        [1, 0, 0, 4, 4],
        [2, 1, 5, 4, 0],
        [5, 1, 5, 4, 0],
        [0, 0, 0, 1, 0],
    ])
als = AlternatingLeastSquares(R = R, reg_param = 0.01, epochs=100, verbose=True, k=3)
als.fit()
Iteration: 10 ; cost = 0.0275
Iteration: 20 ; cost = 0.0181
Iteration: 30 ; cost = 0.0135
Iteration: 40 ; cost = 0.0108
Iteration: 50 ; cost = 0.0090
Iteration: 60 ; cost = 0.0078
Iteration: 70 ; cost = 0.0069
Iteration: 80 ; cost = 0.0062
Iteration: 90 ; cost = 0.0057
Iteration: 100 ; cost = 0.0053
als.get_complete_matrix()
array([[ 0.99651642,  0.90191888, -0.39191548,  1.01607611,  2.99890128],
       [ 1.99847564, -0.41920731,  2.99788432,  1.01629562,  1.000374  ],
       [ 1.00104212,  1.99718815,  4.3484917 ,  4.99954775,  4.71578424],
       [ 1.00217607,  1.59913874,  3.47460431,  3.99959159,  3.99805924],
       [ 1.99686201,  1.00464112,  4.99759015,  3.99918833,  3.69888277],
       [ 5.00036511,  0.99911566,  5.00075304,  3.99759404,  7.00029428],
       [ 0.27193143,  0.33822924,  1.02261363,  0.99777396,  0.87894603]])

기본적인 구조는 위와 같습니다. 한번 __init__ 부터 차근차근 코드의 설명을 시작하겠습니다. 대부분의 내용이 SGD의 설명과 유사한 부분이 있으니 이전 포스팅을 한번 보고 오면 도움이 되실 것입니다.

class MatrixFactorization():
    def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False):
        """
        :param R: rating matrix
        :param k: latent parameter
        :param learning_rate: alpha on weight update
        :param reg_param: beta on weight update
        :param epochs: training epochs
        :param verbose: print status
        """
        self._R = R
        self._num_users, self._num_items = R.shape
        self._k = k
        self._learning_rate = learning_rate
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose
# Base code : https://github.com/mickeykedia/Matrix-Factorization-ALS/blob/master/ALS%20Python%20Implementation.py
class AlternatingLeastSquares():
    def __init__(self, R, k, reg_param, epochs, verbose=False):
        """
        :param R: rating matrix
        :param k: latent parameter
        :param learning_rate: alpha on weight update
        :param reg_param: beta on weight update
        :param epochs: training epochs
        :param verbose: print status
        """
        self._R = R
        self._num_users, self._num_items = R.shape
        self._k = k
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose

일단, __init__ 함수는 AlternatingLeastSquares이라는 클래스가 호출될 때 자동으로 실행되는 부분입니다. 파라미터로는 6개의 인자를 받는데 각각 아래의 의미를 가집니다.

  • R: 평점 행렬
  • k: User Latent와 Item Latent의 차원의 수
  • reg_param: Weight의 Regularization 값
  • epochs: 전체 학습 횟수 (Total Epoch)
  • verbose: 학습 과정을 출력할지 여부 (True : 10번마다 cost 출력, False : cost를 출력하지 않음)
%%time

R = np.array([
    [1, 0, 0, 1, 3],
    [2, 0, 3, 1, 1],
    [1, 2, 0, 5, 0],
    [1, 0, 0, 4, 4],
    [2, 1, 5, 4, 0],
    [5, 1, 5, 4, 0],
    [0, 0, 0, 1, 0],
])

als = AlternatingLeastSquares(R = R, reg_param = 0.01, epochs=100, verbose=True, k=3)
als.fit()

위의 실행은 7개의 User, 5개의 Item에 대한 평점 행렬(R)에 대해서 k=3, reg_param=0.01, epochs=100, verbose=True 와 같은 파라미터를 가지는 AlternatingLeastSquares객체를 생성하라는 의미입니다. 생성된 객체에서 als.fit()을 통해서 fit() 함수를 실행하면, 아래의 코드가 실행되게 됩니다.

def fit(self):
    # init latent features
    self._users = np.random.normal(size=(self._num_users, self._k))
    self._items = np.random.normal(size=(self._num_items, self._k))

    # train while epochs
    self._training_process = []
    self._user_error = 0; self._item_error = 0; 
    for epoch in range(self._epochs):
        # rating이 존재하는 index를 기준으로 training
        for i, Ri in enumerate(self._R):
            self._users[i] = self.user_latent(i, Ri)
            self._user_error = self.cost()

        for j, Rj in enumerate(self._R.T):
            self._items[j] = self.item_latent(j, Rj)
            self._item_error = self.cost()

        cost = self.cost()
        self._training_process.append((epoch, cost))

        # print status
        if self._verbose == True and ((epoch + 1) % 10 == 0):
            print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))

fit 함수에서 가장 먼저 실행되는 부분은 아래의 Latent Matrix를 초기화해주는 부분입니다.

self._users = np.random.normal(size=(self._num_users, self._k))
self._items = np.random.normal(size=(self._num_items, self._k))

이때, np.random.normal 함수를 이용해서 행렬을 초기화해줍니다. 함수의 의미는 아래와 같습니다.

  • np.random.normal : (self._num_users, self._k)의 크기로 행렬을 정규분포 형태로 초기화합니다. 위의 예시에서는 User Latent Matrix는 (7, 3)의 크기를 Item Latent Matrix는 (5, 3)의 크기를 가짐

이후, 학습 과정을 진행해줍니다.

# train while epochs
self._training_process = []
self._user_error = 0; self._item_error = 0; 
for epoch in range(self._epochs):
    for i, Ri in enumerate(self._R):
        self._users[i] = self.user_latent(i, Ri)
        self._user_error = self.cost()

    for j, Rj in enumerate(self._R.T):
        self._items[j] = self.item_latent(j, Rj)
        self._item_error = self.cost()

    cost = self.cost()
    self._training_process.append((epoch, cost))

self._training_process = [] 는 for문 안의 self._training_process.append((epoch, cost)) 에 사용되는 부분으로 학습 시에 Epoch와 Cost를 저장하는 부분입니다. for문의 경우 처음 파라미터로 받은 self._epochs 만큼 반복학습을 진행하게 됩니다.

먼저 시행되는 for문 안의 내용을 보면 User Latent Matrix를 계산하는 부분(self.user_latent(i, Ri) )과 Item Latent Matrix를 계산하는 부분 (self.item_latent(j, Rj))으로 나뉩니다. 각각의 코드는 아래와 같습니다. 해당 부분은 아래의 수식을 구현한 것과 같습니다.

 

def user_latent(self, i, Ri):
    du = np.linalg.solve(np.dot(self._items.T, np.dot(np.diag(Ri), self._items)) + self._reg_param * np.eye(self._k), np.dot(self._items.T, np.dot(np.diag(Ri), self._R[i].T))).T
    return du

def item_latent(self, j, Rj):
    di = np.linalg.solve(np.dot(self._users.T, np.dot(np.diag(Rj), self._users)) + self._reg_param * np.eye(self._k), np.dot(self._users.T, np.dot(np.diag(Rj), self._R[:, j])))
    return di

해당 과정을 이제 반복하게 되면, 아래와 같이 원 평점 행렬을 잘 복원한 Matrix가 생성됩니다.

als.get_complete_matrix()
array([[ 0.99651642,  0.90191888, -0.39191548,  1.01607611,  2.99890128],
       [ 1.99847564, -0.41920731,  2.99788432,  1.01629562,  1.000374  ],
       [ 1.00104212,  1.99718815,  4.3484917 ,  4.99954775,  4.71578424],
       [ 1.00217607,  1.59913874,  3.47460431,  3.99959159,  3.99805924],
       [ 1.99686201,  1.00464112,  4.99759015,  3.99918833,  3.69888277],
       [ 5.00036511,  0.99911566,  5.00075304,  3.99759404,  7.00029428],
       [ 0.27193143,  0.33822924,  1.02261363,  0.99777396,  0.87894603]])

특히, 행렬의 사용자 5의 아이템 4는 평점이 7.00으로 기존에 0으로 구매를 안 했지만 볼 경우 7로 평점을 높게 매길 거라고 볼 수 있습니다. 이러한 상품들만 추천하는 게 ALS 및 SGD와 같은 Latent 기반의 협업 필터링입니다. 하지만, 위와 같이 코드를 짜면 조금 귀찮습니다. 그래서 implicit 패키지에서는 ALS 함수를 제공합니다.

import numpy as np 
import scipy 

R = np.array([
    [1, 0, 0, 1, 3],
    [2, 0, 3, 1, 1],
    [1, 2, 0, 5, 0],
    [1, 0, 0, 4, 4],
    [2, 1, 5, 4, 0],
    [5, 1, 5, 4, 0],
    [0, 0, 0, 1, 0],
])

rating_matrix = scipy.sparse.csr_matrix(R)

기본적으로 implcit의 ALS 패키지는 입력을 Sparse Matrix형태로 받기에 scipy.sparse.csr_matrix(R)을 통해서 형태를 바꿔줍니다. 이후, 아래의 코드를 통해서 학습을 진행해줍니다.

from implicit.evaluation import  *
from implicit.als import AlternatingLeastSquares as ALS
from implicit.bpr import BayesianPersonalizedRanking as BPR

als_model = ALS(factors=3, regularization=0.01, iterations = 100)
als_model.fit(rating_matrix.T)
als_model.user_factors
array([[-0.43390596,  0.7792046 ,  0.72156245],
       [ 0.25987417,  1.585035  , -0.07761745],
       [ 0.5883245 ,  0.67921823,  1.4624232 ],
       [-0.44013542,  0.76680833,  0.68539524],
       [ 1.2584542 ,  1.5149189 ,  0.70578897],
       [ 1.2584835 ,  1.5153128 ,  0.7062366 ],
       [-0.01211063,  0.24986167,  0.54429907]], dtype=float32)
als_model.item_factors
array([[-0.2741828 ,  0.70229995,  0.39969537],
       [ 0.6381009 , -0.07443979,  0.44144744],
       [ 0.3407201 ,  0.5544682 , -0.38120595],
       [-0.32385796,  0.68887347,  0.51305306],
       [-0.9246371 ,  0.78346837, -0.02537217]], dtype=float32)
np.dot(als_model.user_factors, als_model.item_factors.T) * 5
array([[ 4.7730503 , -0.08173858,  0.04569907,  5.237486  ,  4.966901  ],
       [ 5.054468  ,  0.06786128,  4.9849205 ,  4.8395214 ,  5.017524  ],
       [ 4.5011516 ,  4.8521624 ,  0.09787245,  5.1383123 , -0.24472743],
       [ 4.6657815 , -0.17682949,  0.06965568,  5.1120973 ,  4.9517283 ],
       [ 5.0049076 ,  5.009096  ,  4.99852   ,  4.990671  ,  0.02685123],
       [ 5.0071454 ,  5.0100317 ,  4.998809  ,  4.993129  ,  0.0282017 ],
       [ 1.9817609 ,  1.0697598 , -0.3653801 ,  2.2764974 ,  0.96573305]],
      dtype=float32)

그럼, 이제 위와 같이 완성된 Rating Matrix를 만들 수 있습니다. 참고로 Implicit 패키지의 단어 의미처럼 0과 1을 가지는 행렬에 최적화되어있습니다. (내부적인 알고리즘에 해당 장치가 있는데 관련 내용은 추후에 추가하도록 하겠습니다.) 그래서 Rating Matrix의 0~5의 범위를 맞춰주려면 5를 곱해주면 됩니다. 이번 포스팅을 끝으로 협업 필터링 기반의 추천 시스템에 대한 내용을 모두 살펴봤습니다. 다음 포스팅에서는 딥러닝 기반의 추천시스템에 대해서 알아보도록 하겠습니다.

 

 

donaricano-btn

커피 선물하기

 

참고