관리 메뉴

TEAM EDA

03. 협업필터링 기반 추천시스템 - SGD 본문

EDA Study/추천시스템

03. 협업필터링 기반 추천시스템 - SGD

김현우 2020. 12. 21. 00:54

해당 글은 T-아카데미에서 발표한 추천시스템 - 입문하기의 자료에 딥러닝을 이용한 추천시스템과 추천시스템 대회를 분석한 내용을 추가한 글입니다. 해당 자료보다 더욱더 좋은 자료들이 페이스북 그룹 Recommender System KR에 있으니 많은 관심 부탁합니다.

협업필터링 기반 추천시스템 - SGD

이전에 봤던 협업 필터링은 Neighborhood based method이라면 이번에 살펴볼 SGD와 다음의 ALS는 Latent Factor Collaborative Filtering입니다. 둘의 차이를 한번 살펴보면 아래의 그림과 같습니다.

 

잠재 요인 협업 필터링이란?

잠재 요인 협업 필터링은 Rating Matrix에서 빈 공간을 채우기 위해서 사용자와 상품을 잘 표현하는 차원(Latent Factor)을 찾는 방법입니다. 잘 알려진 행렬 분해는 추천 시스템에서 사용되는 협업 필터링 알고리즘의 한 종류입니다. 행렬 분해 알고리즘은 사용자-아이템 상호 작용 행렬을 두 개의 저 차원 직사각형 행렬의 곱으로 분해하여 작동합니다. 이 방법은 Netflix 챌린지에서 널리 알려지게 되었습니다.

 

잠재 요인 협업 필터링의 원리?

사용자의 잠재요인과 아이템의 잠재요인을 내적 해서 평점 매트릭스를 계산하는 방식을 사용합니다. 즉, 사용자의 잠재요인과 아이템의 잠재요인의 내적이 기존의 평점 매트릭스와 유사해지도록 만드는 잠재요인 값을 찾아야 합니다. 하지만, 이런 잠재 요인 협업 필터링의 방법은 여러 가지가 있습니다. 대표적으로 넷플릭스 대회에서 사용한 SVD를 시작으로 최근 많이 사용하는 SGD, ALS가 있습니다.

 

SVD 방식은 고윳값 분해(eigen value Decomposition)와 같은 행렬을 대각화 하는 방법입니다.

 

하지만, 이런 SVD는 아래와 같은 한계가 있습니다.

  1. 데이터에 결측치가 없어야 함
  2. 대부분의 현업 데이터는 Sparse 한 데이터

SGD를 이용한 협업 필터링은 결측치와 상관없이 진행하는 장점이 있습니다.

 

기본적인 식은 위와 같습니다. 일단, 평점이 있는 부분에 대해서 Matrix Factorization을 시행하고 해당 부분의 오차를 최소화하는 것입니다. 이때, Gradient Descent처럼 오차를 최소화하는 방향을 찾기 위해 편미분이 수행됩니다.

 

하지만, 딥러닝을 수행하다 보면 보통 특정 Weight가 커지는 문제점이 생기고 이를 방지하기 위해서 Regularization을 수행합니다. SGD에서도 마찬가지로 Regularization term을 두어서 Weight를 업데이트합니다.

 

한번 Explict Feedback 된 평점 행렬의 예시에 대해 물음표를 채워보도록 하겠습니다.

 

? 3 2
5 1 2
4 2 1
2 ? 4

 

먼저, User Latent와 Item Latent를 임의로 초기화해주는 과정으로 시작합니다. 이렇게 해서 구한 User Latent와 Item Latent의 곱을 통해 평점 행렬을 계산합니다. 이제 이 행렬을 가지고 본격적으로 SGD의 학습을 진행합니다.

 

먼저, 값이 존재하는 모든 평점에 대해서 순차적으로 Gradient Descent 과정을 진행해줍니다. 아래는 평점 3인 부분에 대해서 오차를 계산하여 편미분을 계산한 것입니다.

 

이제, 이렇게 계산한 Descent 값을 통해서 User Latent와 Item Latent의 업데이트를 진행합니다. 업데이트의 식은 아래와 같습니다.

  • New Latent = Latent - learning rate * Descent

참고로 업데이트되는 부분은 0번째 행, 1번째 열의 계산을 담당하는 User Latent의 0번째 행과 Item Latent의 1번째 열입니다.

 

이제 위의 과정을 모든 평점에 대해서 반복해줍니다. 0번째 사용자의 2번째 아이템에 대해서 학습을 진행하므로 User Latent의 0번째와 Item Latent의 2번째가 반응하는 것을 볼 수 있습니다.

 

마찬가지로, 1번째 사용자의 0번째 아이템에 대해서 학습을 진행하므로 User Latent의 1번째와 Item Latent의 0번째가 반응하는 것을 볼 수 있습니다.

 

마지막으로, 3번째 사용자의 2번째 아이템에 대해서 학습을 진행하므로 User Latent의 3번째와 Item Latent의 2번째가 반응하고 1 epoch에 대해서 학습이 완료되게 됩니다.

 

이 과정을 여러 번 반복하게 되면 loss 값은 계속 떨어지게 되고 원 평점 행렬이 잘 생성된 것을 볼 수 있습니다.

이제 이렇게 생성된 평점 행렬을 가지고 ?에 대해서 추천을 아래와 같이 추천을 할 수 있습니다.

  • 평점이 높은 1번 상품은 1번 유저에게 추천을 해준다.
  • 평점이 낮은 2번 상품은 4번 유저에게 추천을 하지 않는다.

SGD 같은 경우는 GPU를 사용하고 결측치에 관계없이 사용할 수 있기에 현업에서도 많이 사용하는 방법입니다.

 

[장점]

  • 매우 유연한 모델로 다른 Loss function을 사용할 수 있음
  • parallelized가 가능함 (여러 GPU를 통해서 계산이 가능)

[단점]

  • 수렴까지 속도가 매우 느림 (여러 번 반복해서 계산해야 함)

한번 위의 과정을 코드를 통해서 살펴보며 이해해보도록 하겠습니다. 해당 코드는 Y.LAB 블로그의 [Recommender System] - Python으로 Matrix Factorization 구현하기에 있는 코드를 기반으로 작성했습니다. 먼저 풀코드는 아래와 같고 한번 차근차근 코드가 어떻게 작동하는지 살펴보겠습니다.

import numpy as np
from tqdm import tqdm_notebook as tqdm

import numpy as np

# Base code : https://yamalab.tistory.com/92
class MatrixFactorization():
    def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False):
        """
        :param R: rating matrix
        :param k: latent parameter
        :param learning_rate: alpha on weight update
        :param reg_param: beta on weight update
        :param epochs: training epochs
        :param verbose: print status
        """

        self._R = R
        self._num_users, self._num_items = R.shape
        self._k = k
        self._learning_rate = learning_rate
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose


    def fit(self):
        """
        training Matrix Factorization : Update matrix latent weight and bias

        참고: self._b에 대한 설명
        - global bias: input R에서 평가가 매겨진 rating의 평균값을 global bias로 사용
        - 정규화 기능. 최종 rating에 음수가 들어가는 것 대신 latent feature에 음수가 포함되도록 해줌.

        :return: training_process
        """

        # init latent features
        self._P = np.random.normal(size=(self._num_users, self._k))
        self._Q = np.random.normal(size=(self._num_items, self._k))

        # init biases
        self._b_P = np.zeros(self._num_users)
        self._b_Q = np.zeros(self._num_items)
        self._b = np.mean(self._R[np.where(self._R != 0)])

        # train while epochs
        self._training_process = []
        for epoch in range(self._epochs):
            # rating이 존재하는 index를 기준으로 training
            xi, yi = self._R.nonzero()
            for i, j in zip(xi, yi):
                self.gradient_descent(i, j, self._R[i, j])
            cost = self.cost()
            self._training_process.append((epoch, cost))

            # print status
            if self._verbose == True and ((epoch + 1) % 10 == 0):
                print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))


    def cost(self):
        """
        compute root mean square error
        :return: rmse cost
        """

        # xi, yi: R[xi, yi]는 nonzero인 value를 의미한다.
        # 참고: http://codepractice.tistory.com/90
        xi, yi = self._R.nonzero()
        # predicted = self.get_complete_matrix()
        cost = 0
        for x, y in zip(xi, yi):
            cost += pow(self._R[x, y] - self.get_prediction(x, y), 2)
        return np.sqrt(cost/len(xi))


    def gradient(self, error, i, j):
        """
        gradient of latent feature for GD

        :param error: rating - prediction error
        :param i: user index
        :param j: item index
        :return: gradient of latent feature tuple
        """

        dp = (error * self._Q[j, :]) - (self._reg_param * self._P[i, :])
        dq = (error * self._P[i, :]) - (self._reg_param * self._Q[j, :])
        return dp, dq


    def gradient_descent(self, i, j, rating):
        """
        graident descent function

        :param i: user index of matrix
        :param j: item index of matrix
        :param rating: rating of (i,j)
        """

        # get error
        prediction = self.get_prediction(i, j)
        error = rating - prediction

        # update biases
        self._b_P[i] += self._learning_rate * (error - self._reg_param * self._b_P[i])
        self._b_Q[j] += self._learning_rate * (error - self._reg_param * self._b_Q[j])

        # update latent feature
        dp, dq = self.gradient(error, i, j)
        self._P[i, :] += self._learning_rate * dp
        self._Q[j, :] += self._learning_rate * dq


    def get_prediction(self, i, j):
        """
        get predicted rating: user_i, item_j
        :return: prediction of r_ij
        """
        return self._b + self._b_P[i] + self._b_Q[j] + self._P[i, :].dot(self._Q[j, :].T)


    def get_complete_matrix(self):
        """
        computer complete matrix PXQ + P.bias + Q.bias + global bias

        - PXQ 행렬에 b_P[:, np.newaxis]를 더하는 것은 각 열마다 bias를 더해주는 것
        - b_Q[np.newaxis:, ]를 더하는 것은 각 행마다 bias를 더해주는 것
        - b를 더하는 것은 각 element마다 bias를 더해주는 것

        - newaxis: 차원을 추가해줌. 1차원인 Latent들로 2차원의 R에 행/열 단위 연산을 해주기위해 차원을 추가하는 것.

        :return: complete matrix R^
        """
        return self._b + self._b_P[:, np.newaxis] + self._b_Q[np.newaxis:, ] + self._P.dot(self._Q.T)



# run example
if __name__ == "__main__":
    # rating matrix - User X Item : (7 X 5)
    R = np.array([
        [1, 0, 0, 1, 3],
        [2, 0, 3, 1, 1],
        [1, 2, 0, 5, 0],
        [1, 0, 0, 4, 4],
        [2, 1, 5, 4, 0],
        [5, 1, 5, 4, 0],
        [0, 0, 0, 1, 0],
    ])

    # P, Q is (7 X k), (k X 5) matrix
%%time
factorizer = MatrixFactorization(R, k=3, learning_rate=0.01, reg_param=0.01, epochs=100, verbose=True)
factorizer.fit()
Iteration: 10 ; cost = 1.1768
Iteration: 20 ; cost = 0.8537
Iteration: 30 ; cost = 0.7165
Iteration: 40 ; cost = 0.6334
Iteration: 50 ; cost = 0.5643
Iteration: 60 ; cost = 0.4984
Iteration: 70 ; cost = 0.4340
Iteration: 80 ; cost = 0.3726
Iteration: 90 ; cost = 0.3162
Iteration: 100 ; cost = 0.2661
Wall time: 56.2 ms
factorizer.get_complete_matrix()
array([[ 0.43969656,  1.62911189,  2.66808392,  1.6868341 ,  3.04869067],
       [ 2.2000929 ,  2.27133439,  2.73745321,  1.24294904,  1.07460791],
       [ 1.04894765,  1.99596573,  2.57735796,  4.85898202,  0.77222116],
       [ 1.15453799,  0.76194646,  4.68674128,  3.77973742,  3.97056686],
       [ 2.41183566,  0.90755162,  4.74436383,  3.89203037,  2.01902094],
       [ 4.61386076,  1.09148215,  5.23299137,  3.97290047, -0.63551453],
       [ 1.07942605,  5.53956943,  2.83705583,  0.81685834,  7.19239648]])

기본적인 구조는 위와 같습니다. 한번 __init__ 부터 차근차근 코드의 설명을 시작하겠습니다.

class MatrixFactorization():
    def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False):
        """
        :param R: rating matrix
        :param k: latent parameter
        :param learning_rate: alpha on weight update
        :param reg_param: beta on weight update
        :param epochs: training epochs
        :param verbose: print status
        """
        self._R = R
        self._num_users, self._num_items = R.shape
        self._k = k
        self._learning_rate = learning_rate
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose

일단, __init__ 함수는 Matrix Factorization이라는 클래스가 호출될 때 자동으로 실행되는 부분입니다. 파라미터로는 6개의 인자를 받는데 각각 아래의 의미를 가집니다.

  • R: 평점 행렬
  • k: User Latent와 Item Latent의 차원의 수
  • learning_rate: 학습률
  • reg_param: Weight의 Regularization 값
  • epochs: 전체 학습 횟수 (Total Epoch)
  • verbose: 학습 과정을 출력할지 여부 (True : 10번마다 cost 출력, False : cost를 출력하지 않음)
%%time

R = np.array([
    [1, 0, 0, 1, 3],
    [2, 0, 3, 1, 1],
    [1, 2, 0, 5, 0],
    [1, 0, 0, 4, 4],
    [2, 1, 5, 4, 0],
    [5, 1, 5, 4, 0],
    [0, 0, 0, 1, 0],
])

factorizer = MatrixFactorization(R, k=3, learning_rate=0.01, reg_param=0.01, epochs=100, verbose=True)
factorizer.fit()

위의 실행은 7개의 User, 5개의 Item에 대한 평점 행렬(R)에 대해서 k=3, learning_rate=0.01, reg_param=0.01, epochs=100, verbose=True 와 같은 파라미터를 가지는 MatrixFactorization 객체를 생성하라는 의미입니다. 생성된 객체에서 factorizer.fit()을 통해서 fit() 함수를 실행하면, 아래의 코드가 실행되게 됩니다.

def fit(self):
    """
    training Matrix Factorization : Update matrix latent weight and bias

    참고: self._b에 대한 설명
    - global bias: input R에서 평가가 매겨진 rating의 평균값을 global bias로 사용
    - 정규화 기능. 최종 rating에 음수가 들어가는 것 대신 latent feature에 음수가 포함되도록 해줌.

    :return: training_process
    """

    # init latent features
    self._P = np.random.normal(size=(self._num_users, self._k))
    self._Q = np.random.normal(size=(self._num_items, self._k))

    # init biases
    self._b_P = np.zeros(self._num_users)
    self._b_Q = np.zeros(self._num_items)
    self._b = np.mean(self._R[np.where(self._R != 0)])

    # train while epochs
    self._training_process = []
    for epoch in range(self._epochs):
        # rating이 존재하는 index를 기준으로 training
        xi, yi = self._R.nonzero()
        for i, j in zip(xi, yi):
            self.gradient_descent(i, j, self._R[i, j])
        cost = self.cost()
        self._training_process.append((epoch, cost))

        # print status
        if self._verbose == True and ((epoch + 1) % 10 == 0):
            print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))

fit 함수에서 가장 먼저 실행되는 부분은 아래의 Latent Matrix를 초기화해주는 부분입니다.

# init latent features
self._P = np.random.normal(size=(self._num_users, self._k))
self._Q = np.random.normal(size=(self._num_items, self._k))

# init biases
self._b_P = np.zeros(self._num_users)
self._b_Q = np.zeros(self._num_items)
self._b = np.mean(self._R[np.where(self._R != 0)])

이때, np.random.normal 함수를 이용해서 행렬을 초기화해주고 np.zeros 함수를 이용해서 bias 부분을 초기화해줍니다. 함수의 의미는 아래와 같습니다.

  • np.random.normal : (self._num_users, self._k)의 크기로 행렬을 정규분포 형태로 초기화합니다. 위의 예시에서는 User Latent Matrix는 (7, 3)의 크기를 Item Latent Matrix는 (5, 3)의 크기를 가짐
  • np.zeros : self._num_users 혹은 self._num_items의 크기만큼의 0 값을 가지는 벡터를 생성합니다.
  • np.mean(self._R[np.where(self._R != 0)]) : 전체 평점의 평균을 계산

이후, 전체 학습 과정을 진행합니다.

# train while epochs
self._training_process = []
for epoch in range(self._epochs):
    # rating이 존재하는 index를 기준으로 training
    xi, yi = self._R.nonzero()
    for i, j in zip(xi, yi):
        self.gradient_descent(i, j, self._R[i, j])
    cost = self.cost()
    self._training_process.append((epoch, cost))

    # print status
    if self._verbose == True and ((epoch + 1) % 10 == 0):
        print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))

self._training_process = [] 는 for문 안의 self._training_process.append((epoch, cost)) 에 사용되는 부분으로 학습 시에 Epoch와 Cost를 저장하는 부분입니다. for문의 경우 처음 파라미터로 받은 self._epochs 만큼 반복학습을 진행하게 됩니다.

 

먼저 시행되는 for문의 첫 번째로 시행되는 xi, yi = self._R.nonzero()는 평점 행렬에서 0이 아닌 부분 즉, 사용자가 평점을 매긴 부분에 대해서만 값을 추출하라는 의미입니다. 그 이유는 위의 이론에서 말했듯이 결측치가 아닌 부분을 통해서만 학습을 진행하려는 의도입니다. 이후에 해당 부분을 통해서 모든 평점 부분에 대해서 gradient_descent를 실행해줍니다.

for i, j in zip(xi, yi):
    self.gradient_descent(i, j, self._R[i, j])
def gradient_descent(self, i, j, rating):
    """
    graident descent function

    :param i: user index of matrix
    :param j: item index of matrix
    :param rating: rating of (i,j)
    """

    # get error
    prediction = self.get_prediction(i, j)
    error = rating - prediction

    # update biases
    self._b_P[i] += self._learning_rate * (error - self._reg_param * self._b_P[i])
    self._b_Q[j] += self._learning_rate * (error - self._reg_param * self._b_Q[j])

    # update latent feature
    dp, dq = self.gradient(error, i, j)
    self._P[i, :] += self._learning_rate * dp
    self._Q[j, :] += self._learning_rate * dq

gradient_descent 함수의 경우 행렬의 원소 위치(i, j)와 평점 값(self._R[i, j])을 받습니다. 바로 시작하는 prediction = self.get_prediction(i, j) 은 User Latent Matrix와 Item Latent Matrix의 곱을 통해서 평점 행렬의 값들을 생성하는 부분입니다.

 

self._P[i, :].dot(self._Q[j, :].T) 에서 User Latent P와 Item Latent Q가 곱해져서 평점을 계산하고 Bias를 없애기 위해서 전체 평균(self._b), User의 평균 평점(self._b_P[i]), Item의 평균 평점(self._b_Q[j])을 더해줌으로써 값을 생성합니다.

def get_prediction(self, i, j):
    """
    get predicted rating: user_i, item_j
    :return: prediction of r_ij
    """
    return self._b + self._b_P[i] + self._b_Q[j] + self._P[i, :].dot(self._Q[j, :].T)

이후에, error = rating - prediction 을 통해서 얼마만큼의 차이가 있는지 계산을 합니다. 그리고 아래의 수식에서 사용한 공식을 이용해서 self.gradient(error, i, j) 에서 Gradient 값을 계산하고 Weight 및 Bais를 업데이트합니다.

# update biases
self._b_P[i] += self._learning_rate * (error - self._reg_param * self._b_P[i])
self._b_Q[j] += self._learning_rate * (error - self._reg_param * self._b_Q[j])

# update latent feature
dp, dq = self.gradient(error, i, j)
self._P[i, :] += self._learning_rate * dp
self._Q[j, :] += self._learning_rate * dq

이후에, cost = self.cost() 에서 전체 Matrix에 대해서 오차를 계산하고 출력해줍니다. cost += pow(self._R[x, y] - self.get_prediction(x, y), 2) 에서 각 평점 별로 오차를 계산하게 됩니다.

def cost(self):
    """
    compute root mean square error
    :return: rmse cost
    """

    # xi, yi: R[xi, yi]는 nonzero인 value를 의미한다.
    # 참고: http://codepractice.tistory.com/90
    xi, yi = self._R.nonzero()
    # predicted = self.get_complete_matrix()
    cost = 0
    for x, y in zip(xi, yi):
        cost += pow(self._R[x, y] - self.get_prediction(x, y), 2)
    return np.sqrt(cost/len(xi))

이후, 모든 Epoch에 대해서 학습이 완료되면 factorizer.get_complete_matrix() 를 통해서 완성된 평점 행렬을 추출하게 되면 SGD를 이용한 협업 필터링이 완료되게 됩니다.

array([[ 0.43969656,  1.62911189,  2.66808392,  1.6868341 ,  3.04869067],
       [ 2.2000929 ,  2.27133439,  2.73745321,  1.24294904,  1.07460791],
       [ 1.04894765,  1.99596573,  2.57735796,  4.85898202,  0.77222116],
       [ 1.15453799,  0.76194646,  4.68674128,  3.77973742,  3.97056686],
       [ 2.41183566,  0.90755162,  4.74436383,  3.89203037,  2.01902094],
       [ 4.61386076,  1.09148215,  5.23299137,  3.97290047, -0.63551453],
       [ 1.07942605,  5.53956943,  2.83705583,  0.81685834,  7.19239648]])

참고로 이를 조금 수정해서 PyTorch를 이용한 코드는 다음의 링크에서 확인할 수 있습니다. 다음 포스팅에서는 ALS가 무엇인지에서부터 어떻게 사용하는지에 대해 알아보도록 하겠습니다.

 

donaricano-btn

커피 선물하기

 

참고자료