[스파크] 키-밸류 페어(Key-Value Pair) RDD란?

키-밸류(Key-Value) RDD

  • 스트럭쳐(Structured) 데이터를 스파크와 연결하여 사용할 수 있게 해주는 키-밸류 RDD에 대해 알아볼 것이다.
  • 이름 그대로 키-밸류 쌍을 갖기 때문에 페어(Pairs) RDD로도 불린다.
  • 형태가 NoSQL처럼 생겼기 때문에 간단한 데이터 베이스처럼 다룰 수 있다.
  • 싱글 밸류(Sigle Value) RDD는 간단하게 갯수를 세거나 정형 데이터를 다루거나 1차원적인 연산이 가능 했다면, 키-밸류 RDD는 고차원적인 연산이 가능하다.(키를 기준으로 통계를 구하거나, 평균을 구한다는 등)
    • 싱글 밸류 RDD: 텍스트에 등장하는 단어 수 세기
    • 키-밸류 RDD: 넷플릭스 드라마가 받은 평균 별점

예제

  • 지역 ID 별 택시 운행 수
    • K: 지역 ID
    • V: 운행 수
1
2
3
4
[
    (지역 ID, 운행 )
    (지역 ID, 운행 )
]
  • 그 외, 드라마 별 별점 수 모아보기, 평균 구하기, 이커머스 사이트에서 상품당 별 평점 구하기 등이 있다.

코드

  • 코드상으로는 많이 다르지 않다.
1
pairs = rdd.map(lambda x: (x, 1))

image

image

  • 단순 값 뿐 아니라 리스트도 값이 될 수 있다.

기능

Reduction

  • 키 값을 기준으로 데이터를 묶어서 처리하거나 데이터를 줄이는데 쓰인다.
    • reduceByKey(): 키값을 기준으로 테스크를 처리한다.
    • groupByKey(): 키값을 기준으로 밸류를 묶는다.
    • sortByKey(): 키값을 기준으로 정렬한다.
    • keys(): 키 값을 추출한다.
    • values(): 밸류 값을 추출한다.
1
2
3
4
5
# 짜장면 짜장면 짬뽕 짬뽕 짜장면 우동
pairs = rdd.map(lambda x: (x, 1))
# (짜장면, 1) (짜장면, 1) (짬뽕, 1) (짬뽕, 1) (짜장면, 1) (우동, 1)
count = pairs.reduceByKey(lambda a, b,: a + b)
# (짜장면, 3) (짬뽕, 2) (김밥, 1) (우동, 1)
  • 절차는 주석과 같이 진행된다.

Join

  • 데이터 베이스와 많이 닮아있기 때문에 여러개의 RDD를 키 값을 기준으로 붙이는 Join도 가능하다.
    • join
    • rightOuterJoin
    • leftOuterJoin
    • subtractByKey

Mapping values

  • 키 밸류 데이터에서 키를 바꾸지 않는 경우 map()대신 밸류만 다루는 mapValues() 함수를 써주는 것이 좋다.
  • 그 이유는 스파크 내부에서 파티션을 유지할 수 있어 더욱 효율적이기 때문이다.
    • mapValues()
    • flatMapValues()
      • 위 함수들은 밸류만 다루는 연산들이지만 RDD에서 키는 유지되기 때문에 더 좋은 성능을 기대할 수 있다.

코드

  • 간단한 csv 파일로 키-밸류 RDD를 살펴보자.

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#rdd.ipynb

from pyspark import SparkConf, SparkContext


conf = SparkConf().setMaster("local").setAppName("category-review-average") # 로컬 환경과 앱 이름 지정
sc = SparkContext(conf=conf) # 컨텍스트 초기화
# 막간 에러: 자바 로컬 환경 변수를 변경해도 conda 가상환경 내 버전이 변경이 되지 않아 conda로 재인스톨하여 해결하였다.
# !conda install -c anaconda openjdk


lines = sc.textFile("/Users/6mini/spark/res.csv")
lines.collect() # 간단한 액션
'''
['id,item,category,reviews',
 '0,짜장면,중식,125',
 '1,짬뽕,중식,235',
 '2,김밥,분식,32',
 '3,떡볶이,분식,534',
 '4,라멘,일식,223',
 '5,돈가스,일식,52',
 '6,우동,일식,12',
 '7,쌀국수,아시안,312',
 '8,햄버거,패스트푸드,12',
 '9,치킨,패스트푸드,23']
'''


header = lines.first() # 헤더 추출
filtered_lines = lines.filter(lambda row: row != header)
filtered_lines.collect()
'''
['0,짜장면,중식,125',
 '1,짬뽕,중식,235',
 '2,김밥,분식,32',
 '3,떡볶이,분식,534',
 '4,라멘,일식,223',
 '5,돈가스,일식,52',
 '6,우동,일식,12',
 '7,쌀국수,아시안,312',
 '8,햄버거,패스트푸드,12',
 '9,치킨,패스트푸드,23']
'''


def parse(row): # 카테고리와 리뷰 수 만을 파싱하는 함수
    fields = row.split(",")
    category = fields[2]
    reviews = int(fields[3])
    return (category, reviews) # KV RDD를 위해 튜플 형태로 두가지 리턴

category_reviews = filtered_lines.map(parse) # KV RDD 생성
category_reviews.collect()
'''
[('중식', 125),
 ('중식', 235),
 ('분식', 32),
 ('분식', 534),
 ('일식', 223),
 ('일식', 52),
 ('일식', 12),
 ('아시안', 312),
 ('패스트푸드', 12),
 ('패스트푸드', 23)]
'''


category_reviews_count = category_reviews.mapValues(lambda x: (x, 1)) # 각 카테고리마다 값 하나를 추가
category_reviews_count.collect()
'''
[('중식', (125, 1)),
 ('중식', (235, 1)),
 ('분식', (32, 1)),
 ('분식', (534, 1)),
 ('일식', (223, 1)),
 ('일식', (52, 1)),
 ('일식', (12, 1)),
 ('아시안', (312, 1)),
 ('패스트푸드', (12, 1)),
 ('패스트푸드', (23, 1))]
'''


reduced = category_reviews_count.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) # 카테고리의 수와 리뷰 수의 총 합
reduced.collect()
'''
[('중식', (360, 2)),
 ('분식', (566, 2)),
 ('일식', (287, 3)),
 ('아시안', (312, 1)),
 ('패스트푸드', (35, 2))]
'''


averages = reduced.mapValues(lambda x: x[0] / x[1]) # 평균
averages.collect()
'''
[('중식', 180.0),
 ('분식', 283.0),
 ('일식', 95.66666666666667),
 ('아시안', 312.0),
 ('패스트푸드', 17.5)]
'''
0%