[스파크] 추천 알고리즘

  • 추천 파이프 라인을 구축하기에 앞서 추천 알고리즘에 대해 알아볼 것이다.
  • ALS에 대해 알아볼 것이며, ALS는 Alternating Least Squares의 약자이다.
    • 직역하면 제곱된 숫자를 왔다갔다하며 최적화한다는 뜻이다.

추천 알고리즘 예제

image

  • A, B 유저는 영화에 점수를 주고 있다.
  • A 유저는 ‘Casablanca’라는 영화를 보지 않았지만, 그 외의 영화의 점수를 보면 둘의 영화 취향이 꽤나 비슷한 것을 볼 수 있다.
  • 그러므로 A 유저는 ‘Casablanca’를 재밌게 볼 확률이 높고, 이런 예측을 전달해주는 것이 추천 알고리즘이다. 이것을 협업 필터링이라는 것으로 풀게 된다.

image

  • 실제 비즈니스에선 한 유저가 볼 수 있는 영화의 수가 굉장히 많다.

image

  • 아직 안 본 영화의 평점들을 예측하여 값을 정렬하고, 제일 위에서부터 유저에게 전달하는 것이 바로 추천 알고리즘의 원리라고 할 수 있다.

ALS(Alternating Least Squares)란?

image

  • 빈칸이 많은 레이팅 매트릭스(Rating Matrix)는 유저 매트릭스(User Matrix)와 아이템 매트릭스(Item Matrix)로 이루어진다.
  • ALS는 두 행렬 중 하나를 보정시키고 다른 하나의 행렬을 순차적으로 반복하면서 최적화하는 방식이다.
  • 첫번째로 아이템 매트릭스 값과 유저 매트릭스 값이 랜덤하게 채워지고, 아이템 매트릭스 행렬을 고정시키고 유저 매트릭스의 행렬을 최적화한다.
  • 최적화 할 때 두 개의 값이 곱해졌을 때 레이팅 매트릭스의 값과 비슷하게 최적화를 진행한다.
  • 다음 유저 매트릭스의 값을 고정시키고 아이템 매트릭스를 최적화한다.
  • 위 과정을 계속 반복하고, 레이팅 매트릭스의 값과 가장 비슷해질 때 빈칸이 예측값으로 채워지게 되고 그것이 바로 추천 결과값이다.

영화 추천 파이프 라인 구축

1
2
3
4
5
6
7
8
from pyspark.sql import SparkSession


MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("movie-recommendation")\
    .config("spark.executor.memory", MAX_MEMORY)\
    .config("spark.driver.memory", MAX_MEMORY)\
    .getOrCreate()
  • 모델링 시 ‘Out of Memory’ 에러가 발생했기 때문에 MAX_MEMORY를 지정하여 세션을 띄운다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 데이터 프레임 생성
ratings_df = spark.read.csv("/Users/6mini/ml-25m/ratings.csv", inferSchema=True, header=True)


ratings_df.show()
'''
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
'''
  • 추천 알고리즘 모델링에 타임 스탬프 컬럼은 필요없기 때문에 삭제해준다.
1
2
3
4
5
6
7
8
9
10
11
# 타임 스탬프 컬럼 삭제
ratings_df = ratings_df.select(["userId", "movieId", "rating"])


ratings_df.printSchema()
'''
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
'''

간단한 EDA

  • 데이터셋 자체가 굉장히 심플하지만, 그래도 레이팅(rating)에 대해 통계를 내어 확인해본다.
1
2
3
4
5
6
7
8
9
10
11
12
ratings_df.select("rating").describe().show()
'''
+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|          25000095|
|   mean| 3.533854451353085|
| stddev|1.0607439611423535|
|    min|               0.5|
|    max|               5.0|
+-------+------------------+
'''

알고리즘 모델링

  • ALS를 활용한 추천 알고리즘 모델링을 진행한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 트레인 테스트셋 분리
train_df, test_df = ratings_df.randomSplit([0.8, 0.2])


from pyspark.ml.recommendation import ALS


# ALS 이용 모델링
als = ALS(
    maxIter=5, # 반복 횟수
    regParam=0.1,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop" # 학습되지 못한 데이터의 처리 방법
)


# 학습
model = als.fit(train_df)


# 예측
predictions = model.transform(test_df)


predictions.show()
'''
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   321|   3175|   3.0| 3.2872846|
|   321|   6620|   3.5| 3.6733642|
|   322|    463|   3.0|  3.269444|
|   368|   1580|   3.5| 3.6708436|
|   368|  54190|   4.0| 3.4968219|
|   375|   1580|   2.5|  3.421627|
|   458|   1580|   3.5| 3.1025493|
|   497|   2366|   4.0| 3.8671935|
|   588|   1580|   2.5| 2.6639755|
|   596|   1580|   3.0| 3.5359857|
|   597|   1088|   3.0| 3.2686214|
|   597|   1580|   4.0| 3.6997573|
|   597|   3997|   1.0| 1.9885403|
|   613|   1645|   4.0| 3.7589316|
|   756|  44022|   3.5|  3.107243|
|   847|   4818|   0.5| 1.4983841|
|   847|   7833|   3.0| 3.6151803|
|   847|  96488|   4.0| 3.8599513|
|   847| 180981|   3.5| 3.4091368|
|   879|  68135|   5.0| 4.4257684|
+------+-------+------+----------+
'''
  • 레이팅의 실제값과 예측값을 확인할 수 있다.
  • 어느정도 유사하게 잘 예측한 것 같다.
  • 실제값과 예측값의 통계를 확인해본다.
1
2
3
4
5
6
7
8
9
10
11
12
predictions.select('rating', 'prediction').describe().show()
'''
+-------+-----------------+------------------+
|summary|           rating|        prediction|
+-------+-----------------+------------------+
|  count|          4997993|           4997993|
|   mean|3.533390903108508| 3.423480612159125|
| stddev|1.060723007511501|0.6444551403360655|
|    min|              0.5|        -1.6993694|
|    max|              5.0|         6.4476376|
+-------+-----------------+------------------+
'''

평가

  • RMSE를 이용하여 평가를 진행한다.
1
2
3
4
5
6
7
8
9
10
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol='rating', predictionCol='prediction')


rmse = evaluator.evaluate(predictions)
rmse
'''
0.8087562307490825
'''
  • 성능이 그렇게 좋진 않지만, 파이프 라인 실습에 집중했으므로 넘어간다.
  • 레코멘데이션(recommendation)을 직접 뽑아본다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 유저 별 탑 3개의 아이템 추천
model.recommendForAllUsers(3).show()
'''
+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    28|[{194434, 7.50267...|
|    31|[{203882, 3.97501...|
|    34|[{194434, 5.53526...|
|    53|[{192089, 6.53624...|
|    65|[{194434, 6.70630...|
|    78|[{203882, 6.70107...|
|    81|[{192689, 5.35233...|
|    85|[{203086, 5.54420...|
|   101|[{203882, 5.29644...|
|   108|[{194434, 5.46318...|
|   115|[{203882, 6.33117...|
|   126|[{203086, 6.45976...|
|   133|[{203086, 5.32927...|
|   137|[{203086, 5.65758...|
|   148|[{194434, 5.89812...|
|   155|[{194434, 5.75075...|
|   183|[{194434, 5.71291...|
|   193|[{183947, 5.34912...|
|   210|[{126941, 7.52532...|
|   211|[{203086, 6.51664...|
+------+--------------------+
'''


# 아이템 별 탑 3명의 유저 추천
model.recommendForAllItems(3).show()
'''
+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     28|[{105801, 5.53363...|
|     31|[{87426, 5.40749}...|
|     34|[{128562, 5.47395...|
|     53|[{7552, 5.1105866...|
|     65|[{87426, 5.288509...|
|     78|[{142811, 4.78454...|
|     81|[{105946, 4.77834...|
|     85|[{160416, 4.85580...|
|    101|[{57450, 4.929262...|
|    108|[{34485, 6.031539...|
|    115|[{142811, 5.85976...|
|    126|[{87426, 4.779694...|
|    133|[{142811, 5.69091...|
|    137|[{142811, 5.29639...|
|    148|[{160416, 4.21056...|
|    155|[{10417, 5.080611...|
|    183|[{10417, 5.191336...|
|    193|[{87426, 5.088001...|
|    210|[{87426, 4.919769...|
|    211|[{105801, 5.12127...|
+-------+--------------------+
'''
  • 위 데이터 프레임은 유저 별 영화를 추천하고, 아래 데이터 프레임은 영화 별 유저를 추천한다.

서비스

  • 실 서비스에선 API로 유저를 위한 추천을 해야하기 때문에 디테일 작업을 해본다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from pyspark.sql.types import IntegerType

user_list = [65, 78, 81] # 임의 유저 리스트
# 데이터 프레임 생성
users_df = spark.createDataFrame(user_list, IntegerType()).toDF('userId')

users_df.show()
'''
+------+
|userId|
+------+
|    65|
|    78|
|    81|
+------+
'''
  • 위 데이터를 추천 모델에 넣어주기만 하면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 유저마다 5개의 추천
user_recs = model.recommendForUserSubset(users_df, 5)


# user_recs에서 첫번째 값만 꺼내어 이름으로 바꾸기 위한 리스트 생성
movies_list = user_recs.collect()[0].recommendations


# 리스트 데이터를 스파크 데이터 프레임으로 변경
recs_df = spark.createDataFrame(movies_list)
recs_df.show()
'''
+-------+------------------+
|movieId|            rating|
+-------+------------------+
| 194434|  6.70630407333374|
| 126941| 6.530391216278076|
| 203882|   6.5257887840271|
| 203086| 6.447256088256836|
| 205453|6.0838799476623535|
+-------+------------------+
'''
  • 유저 별 추천 영화 Id와 점수가 나왔으니 다른 데이터셋을 합쳐서 영화명이 전시되게 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
movies_df = spark.read.csv("/Users/6mini/ml-25m/movies.csv", inferSchema=True, header=True)


movies_df.show()
'''
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sense and Sensibi...|       Drama|Romance|
|     18|   Four Rooms (1995)|              Comedy|
|     19|Ace Ventura: When...|              Comedy|
|     20|  Money Train (1995)|Action|Comedy|Cri...|
+-------+--------------------+--------------------+
'''


# SQL로 합치기 위해 TempView에 담는다.
recs_df.createOrReplaceTempView("recommendations")
movies_df.createOrReplaceTempView("movies")


query = """
SELECT *
FROM
    movies JOIN recommendations
    ON movies.movieId = recommendations.movieId
ORDER BY
    rating desc
"""
recommended_movies = spark.sql(query)
recommended_movies.show()
'''
+-------+--------------------+------------------+-------+------------------+
|movieId|               title|            genres|movieId|            rating|
+-------+--------------------+------------------+-------+------------------+
| 194434|   Adrenaline (1990)|(no genres listed)| 194434|  6.70630407333374|
| 126941|Joni's Promise (2...|            Comedy| 126941| 6.530391216278076|
| 203882|Dead in the Water...|            Horror| 203882|   6.5257887840271|
| 203086|Truth and Justice...|             Drama| 203086| 6.447256088256836|
| 205453|The Good Fight: T...|       Documentary| 205453|6.0838799476623535|
+-------+--------------------+------------------+-------+------------------+
'''
  • 유저에게 영화를 추천하는 순위가 나오게 된다.
  • 추가적으로 실제 서비스할 때는 하나의 함수를 사용하면 좋으니, 서빙하는 함수까지 작성해본다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def get_recommendations(user_id, num_recs): # 유저 아이디와 추천 수
    users_df = spark.createDataFrame([user_id], IntegerType()).toDF('userId')
    user_recs_df = model.recommendForUserSubset(users_df, num_recs)
    
    recs_list = user_recs_df.collect()[0].recommendations
    recs_df = spark.createDataFrame(recs_list)
    recommended_movies = spark.sql(query)
    return recommended_movies


recs = get_recommendations(456, 10)


recs.toPandas()

image

  • 성공적이다.
  • 사용이 끝났으면 .stop()을 진행한다.
0%