[스파크] Spark SQL 소개 및 튜토리얼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
tichers = sc.parallelize({
    (1, ('Google', 'GOOGL', 'USA')),
    (2, ('Netflix', 'NFLX', 'USA')),
    (3, ('Amazon', 'AMZN', 'USA')),
    (4, ('Tesla', 'TSLA', 'USA')),
    (5, ('Samsung', '005930', 'Korea')),
    (6, ('KaKao', '035720', 'Korea'))
})

prices = sc.parallelize([
    (1, (2984, 'USD')),
    (2, (645, 'USD')),
    (3, (3518, 'USD')),
    (4, (1222, 'USD')),
    (5, (70600, 'USD')),
    (6, (125000, 'USD')),
])
  • 만약 위 코드에서 미국의 2000불 이상의 주식만 가져오기 위한 방법은 3가지로 생각해볼 수 있다.
    • Inner Join
    • Filter by Coutry
    • Filter by Currency
1
2
3
4
5
6
7
8
9
10
11
# CASE 1: join 먼저, filter 나중에
tickerPrice = tickers.join(prices)
tickerPrice.filter(lambda x: x[1][0][2] == 'USA' and x[1][1][0] > 2000).collect()
'''
[(1, (('Google', 'GOOGL', 'USA'), (2984, 'USD'))), 3, (('Amazon', 'AMZN', 'USA'), (3518, 'USD')))]
'''

# CASE 2: filter 먼저, join 나중에
filteredTicker = tickers.filter(lambda x: x[1][2] == 'USA')
filteredTicker = prices.filter(lambda x: x[1][0] > 2000)
filteredTicker.join(filteredPrice).collect()
  • 두 가지의 경우 같은 결과를 낳지만 퍼포먼스 자체는 두 번째 케이스가 좋다.
  • 연산에 대하여 일일이 신경쓰기란 까다롭다.
  • 네트워크 연산 성능에 대하여 만약 데이터가 구조화되어 있다면 자동으로 최적화가 가능하다.
  • 구조화된 데이터란 정형, 비정형, 반정형데이터를 뜻한다.

정형(Structured), 비정형(Unstructured), 반정형(Semi structured)

  • 정형: 행과 열이 있고 데이터 타입이 스키마인 데이터이다.
    • 데이터 베이스
  • 비정형: 자유 형식으로 정리가 되지 않은 파일이다.
    • 로그 파일
    • 이미지
  • 반정형: 행과 열이 있는 데이터이다.
    • CSV
    • JSON
    • XML

정형 데이터와 RDD

  • RDD에서는 데이터의 구조를 모르기 때문에 데이터를 다루는 것을 개발자에게 의존할 수 밖에 없다.
    • map, flatMap, filter 등을 통해 유저가 만든 함수를 수행한다.
  • 하지만 정형 데이터에서는 데이터의 구조를 이미 알고 있으므로 어떤 테스크를 수행할 것인지 정의만 하면 된다.
    • 최적화도 자동으로 진행된다.

Spark SQL

  • Spark SQL은 구조화된 데이터를 다룰 수 있게 해준다.
  • 유저가 일일이 함수를 정의하는 일 없이 작업을 수행할 수 있고 자동으로 연산이 최적화된다.

스파크(Spark) SQL

목적

  • 스파크 프로그래밍 내부에서 관계형 처리를 할 수 있다.
  • 스키마 정보를 이용해 자동으로 최적화를 할 수 있다.
  • 외부 데이터셋을 쉽게 사용할 수 있다.

소개

  • 스파크 위에 구현된 하나의 패키지이다.
  • 3개의 주요 API가 존재한다.
    • SQL
    • DataFrame
    • Datasets
  • 2개의 백엔드 컴포넌트로 최적화를 진행한다.
    • Catalyst: 쿼리 최적화 엔진
    • Tungsten: 시리얼라이저(용량)

데이터 프레임(DataFrame)

  • 스파크 코어(Core)에 RDD가 있다면 스파크 SQL에는 데이터 프레임이 있다.
  • 데이터 프레임은 테이블 데이터셋이다.
  • 개념적으로는 RDD에 스키마가 적용된 것이라 볼 수 있다.

데이터 프레임 생성

  • RDD에서 스키마를 정의한 다음 변형 하는 방법과 CSV, JSON 등의 데이터를 받아오는 방법이 있다.
RDD로 데이터 프레임 생성
  • 스키마를 자동으로 유추하여 데이터 프레임을 만들거나, 스키마를 사용자가 정의하는 방법이 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# RDD
lines = sc.textFile('example.csv')
data = lines.map(lambda x: x.split(','))
preprocessed = data.map(lambda x: Rou(name=x[0], price=int(x[1])))

# Infer
df = spark.createDataFrame(preprocessed)

# Specify
schema = StructType(
    StructField('name', StringType(), True),
    StructField('price', StringType(), True)
)
spark.createDataFrame(preprocessed, schema).show()
  • 두 번째 블록은 자동으로 유추하는 것이고, 세번째는 사용자가 정의하여 데이터 프레임을 만드는 방법이다.
파일로부터 데이터 프레임 생성
1
2
3
4
5
6
7
8
9
10
11
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test-app'). getOrCreate()

# json
df = spark.read.json('data.json)
# text
df_txt = spark.read.text('data.txt)
# csv
df_csv = spark.read.csv('data.csv)
# parquet
df_parquet = spark.read.load('data.parquet)

createOrReplaceTempView

  • 데이터 프레임을 하나의 데이터 베이스 테이블 처럼 사용하려면 createOrReplaceTempView()함수로 temporary view를 만들어줘야한다.
1
2
data.creatOrReplaceTempView('mobility_data') # 닉네임 지정
spark.sql('SELECT pickup_datetime FROM mobility_data LIMIT 5').show()
  • 닉네임을 지정하면 SQL문처럼 사용할 수 있다.

스파크 세션(SparkSession)

  • 스파크 코어에 스파크 컨텍스트가 있었다면 스파크 SQL엔 스파크 세션이 있다.
  • 파이썬에서 스파크 SQL을 사용하기 위한 방법이며 스파크 세션으로 불러오는 데이터는 데이터 프레임이다.
1
spark = SparkSession.builder.appName("test-app").getOrCreate()
  • 위와 같은 코드로 스파크 세션을 만들어 줄 것이다.
  • SQL문 뿐만 아니라 함수를 사용해서도 가능하다.
  • 데이터 프레임을 RDD로 변환하여 사용할 수도 있지만(rdd = df.rdd.map(tuple)), RDD를 덜 사용하는 쪽이 좋다.

스파크에서 사용할 수 있는 SQL문

  • 하이브 쿼리 언어(Hive Query Language)와 거의 동일하다.
    • Select
    • From
    • Where
    • Count
    • Having
    • Group By
    • Order By
    • Sort By
    • Distinct
    • Join

데이터 프레임의 이점

  • 위에서 RDD를 덜 사용하는 편이 좋다고 했는데, 그 이유는 MLlib이나 스파크 스트리밍(Spark Streaming)과 같은 다른 스파크 모듈과 사용하기엔 데이터 프레임이 좋기 때문이다.
  • 개발하기에도 편하고 최적화도 알아서 된다.

데이터셋(Datasets)

  • 타입이 있는 데이터프레임이며 파이스파크에선 크게 신경쓰지 않아도 된다.

파이썬 튜토리얼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
from pyspark.sql import SparkSession


# 스파크 세션 생성
spark = SparkSession.builder.master("local").appName("learn-sql").getOrCreate()


# 주식 데이터 생성
stocks = [
    ('Google', 'GOOGL', 'USA', 2984, 'USD'), 
    ('Netflix', 'NFLX', 'USA', 645, 'USD'),
    ('Amazon', 'AMZN', 'USA', 3518, 'USD'),
    ('Tesla', 'TSLA', 'USA', 1222, 'USD'),
    ('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
    ('Toyota', '7203', 'Japan', 2006, 'JPY'),
    ('Samsung', '005930', 'Korea', 70600, 'KRW'),
    ('Kakao', '035720', 'Korea', 125000, 'KRW'),
]


# 스키마 생성
# 컬럼의 이름만 입력하고 데이터 타입은 정하지 않는다.
stockSchema = ["name", "ticker", "country", "price", "currency"]


# 데이터 프레임 생성
df = spark.createDataFrame(data=stocks, schema=stockSchema)


# 데이터 타입 확인
df.dtypes
'''
[('name', 'string'),
 ('ticker', 'string'),
 ('country', 'string'),
 ('price', 'bigint'),
 ('currency', 'string')]
'''


# 데이터 프레임 확인
df.show()
'''
+-------+------+---------+------+--------+
|   name|ticker|  country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL|      USA|  2984|     USD|
|Netflix|  NFLX|      USA|   645|     USD|
| Amazon|  AMZN|      USA|  3518|     USD|
|  Tesla|  TSLA|      USA|  1222|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY|
|Samsung|005930|    Korea| 70600|     KRW|
|  Kakao|035720|    Korea|125000|     KRW|
+-------+------+---------+------+--------+
'''


# 스파크 SQL
# TempView에 등록을 하여야 사용할 수 있다.
df.createOrReplaceTempView("stocks")


spark.sql("SELECT nameFROM stocks").show()
'''
+-------+
|   name|
+-------+
| Google|
|Netflix|
| Amazon|
|  Tesla|
|Tencent|
| Toyota|
|Samsung|
|  Kakao|
+-------+
'''


spark.sql("SELECT name, price FROM stocks").show()
'''
+-------+------+
|   name| price|
+-------+------+
| Google|  2984|
|Netflix|   645|
| Amazon|  3518|
|  Tesla|  1222|
|Tencent|   483|
| Toyota|  2006|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+
'''


spark.sql("SELECT name, price FROM stocks WHERE country = 'Korea'").show()
'''
+-------+------+
|   name| price|
+-------+------+
|Samsung| 70600|
|  Kakao|125000|
+-------+------+
'''


spark.sql("SELECT name, price FROM stocks WHERE price > 2000").show()
'''
+-------+------+
|   name| price|
+-------+------+
| Google|  2984|
| Amazon|  3518|
| Toyota|  2006|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+
'''


spark.sql("SELECT name, price FROM stocks WHERE price > 2000 and country = 'USA'").show()
'''
+------+-----+
|  name|price|
+------+-----+
|Google| 2984|
|Amazon| 3518|
+------+-----+
'''


spark.sql("SELECT name, price FROM stocks WHERE country LIKE 'U%'").show() # U로 시작
'''
+-------+-----+
|   name|price|
+-------+-----+
| Google| 2984|
|Netflix|  645|
| Amazon| 3518|
|  Tesla| 1222|
+-------+-----+
'''


spark.sql("SELECT name, price FROM stocks WHERE country LIKE 'U%' AND name NOT LIKE '%e%'").show() # U로 시작하는 컨트리, e를 가진 회사명을 제외
'''
+------+-----+
|  name|price|
+------+-----+
|Amazon| 3518|
+------+-----+
'''


spark.sql("SELECT name, price FROM stocks WHERE price BETWEEN 1000 AND 10000").show() # 1000에서 10000 사이
'''
+------+-----+
|  name|price|
+------+-----+
|Google| 2984|
|Amazon| 3518|
| Tesla| 1222|
|Toyota| 2006|
+------+-----+
'''


spark.sql("SELECT name, price FROM stocks WHERE country='USA'").show()
'''
+-------+-----+
|   name|price|
+-------+-----+
| Google| 2984|
|Netflix|  645|
| Amazon| 3518|
|  Tesla| 1222|
+-------+-----+
'''


spark.sql("SELECT name, price, currency FROM stocks \
WHERE currency = 'USD' AND \
price > (SELECT price FROM stocks WHERE NAME = 'Tesla')").show()
'''
+------+-----+--------+
|  name|price|currency|
+------+-----+--------+
|Google| 2984|     USD|
|Amazon| 3518|     USD|
+------+-----+--------+
'''


spark.sql("SELECT name, price FROM stocks ORDER BY price ASC").show()
'''
+-------+------+
|   name| price|
+-------+------+
|Tencent|   483|
|Netflix|   645|
|  Tesla|  1222|
| Toyota|  2006|
| Google|  2984|
| Amazon|  3518|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+
'''


spark.sql("SELECT name, price FROM stocks ORDER BY price DESC").show()
'''
+-------+------+
|   name| price|
+-------+------+
|  Kakao|125000|
|Samsung| 70600|
| Amazon|  3518|
| Google|  2984|
| Toyota|  2006|
|  Tesla|  1222|
|Netflix|   645|
|Tencent|   483|
+-------+------+
'''


spark.sql("SELECT name, price FROM stocks ORDER BY LENGTH(name)").show()
'''
+-------+------+
|   name| price|
+-------+------+
|  Tesla|  1222|
|  Kakao|125000|
| Amazon|  3518|
| Toyota|  2006|
| Google|  2984|
|Netflix|   645|
|Samsung| 70600|
|Tencent|   483|
+-------+------+
'''


spark.sql("SELECT sum(price) FROM stocks WHERE country = 'Korea'").show()
'''
+----------+
|sum(price)|
+----------+
|    195600|
+----------+
'''


spark.sql("SELECT mean(price) FROM stocks WHERE country = 'Korea'").show()
'''
+-----------+
|mean(price)|
+-----------+
|    97800.0|
+-----------+
'''


spark.sql("SELECT count(price) FROM stocks WHERE country = 'Korea'").show()
'''
+------------+
|count(price)|
+------------+
|           2|
+------------+
'''


spark.sql("SELECT count(price) FROM stocks WHERE country IN ('Korea', 'USA')").show()
'''
+------------+
|count(price)|
+------------+
|           6|
+------------+
'''


# 여러개의 데이터를 다루는 SQL 예제
earnings = [
    ('Google', 27.99, 'USD'), 
    ('Netflix', 2.56, 'USD'),
    ('Amazon', 6.12, 'USD'),
    ('Tesla', 1.86, 'USD'),
    ('Tencent', 11.01, 'HKD'),
    ('Toyota', 224.82, 'JPY'),
    ('Samsung', 1780., 'KRW'),
    ('Kakao', 705., 'KRW')
]


from pyspark.sql.types import StringType, FloatType, StructType, StructField


# 직접 스키마 타입 설정
earningsSchema = StructType([
    StructField("name", StringType(), True),
    StructField("eps", FloatType(), True),
    StructField("currency", StringType(), True),
])


# 데이터 프레임 생성
earningsDF = spark.createDataFrame(data=earnings, schema=earningsSchema)


earningsDF.dtypes


earningsDF.createOrReplaceTempView("earnings")


earningsDF.select("*").show()
'''
+-------+------+--------+
|   name|   eps|currency|
+-------+------+--------+
| Google| 27.99|     USD|
|Netflix|  2.56|     USD|
| Amazon|  6.12|     USD|
|  Tesla|  1.86|     USD|
|Tencent| 11.01|     HKD|
| Toyota|224.82|     JPY|
|Samsung|1780.0|     KRW|
|  Kakao| 705.0|     KRW|
+-------+------+--------+
'''


spark.sql("SELECT * FROM stocks JOIN earnings ON stocks.name = earnings.name").show()
'''
+-------+------+---------+------+--------+-------+------+--------+
|   name|ticker|  country| price|currency|   name|   eps|currency|
+-------+------+---------+------+--------+-------+------+--------+
| Amazon|  AMZN|      USA|  3518|     USD| Amazon|  6.12|     USD|
| Google| GOOGL|      USA|  2984|     USD| Google| 27.99|     USD|
|  Kakao|035720|    Korea|125000|     KRW|  Kakao| 705.0|     KRW|
|Netflix|  NFLX|      USA|   645|     USD|Netflix|  2.56|     USD|
|Samsung|005930|    Korea| 70600|     KRW|Samsung|1780.0|     KRW|
|Tencent|  0700|Hong Kong|   483|     HKD|Tencent| 11.01|     HKD|
|  Tesla|  TSLA|      USA|  1222|     USD|  Tesla|  1.86|     USD|
| Toyota|  7203|    Japan|  2006|     JPY| Toyota|224.82|     JPY|
+-------+------+---------+------+--------+-------+------+--------+
'''


# PER: Price / EPS 
spark.sql("SELECT stocks.name, (stocks.price / earnings.eps) FROM stocks JOIN earnings ON stocks.name = earnings.name").show()
'''
+-------+------------------+
|   name|     (price / eps)|
+-------+------------------+
| Amazon| 574.8366120563447|
| Google| 106.6095042658442|
|  Kakao| 177.3049645390071|
|Netflix| 251.9531306315913|
|Samsung|39.662921348314605|
|Tencent| 43.86920889728746|
|  Tesla|  656.989242258975|
| Toyota| 8.922693419839167|
+-------+------------------+
'''

데이터 프레임(DataFrame)

  • 데이터 프레임은 관계형 데이터셋(RDD + Relation)이다.
  • RDD가 함수형 API를 가졌다면 데이터 프레임은 선언형 API이다.
  • 스키마를 가졌기 때문에 자동으로 최적화가 가능하다.
  • 타입이 없다.(데이터 프레임 내부적으로 타입을 관제하지 않는다.)

데이터 프레임의 특징

  • 데이터 프레임은 RDD의 확장판이다.
  • RDD와 같이 지연 실행(Lazy Execution)된다.
  • 분산 저장된다.
  • 불변(immutabel) 데이터이다.
  • 열(row) 객체가 있다.
  • SQL 쿼리를 직접 바로 실행할 수 있다.
  • 스키마를 가질 수 있고, 이를 통해 성능을 더욱 최적화 할 수 있다.
  • CSV, JSON, Hive 등으로 읽어오거나 변환이 가능하다.

데이터 프레임의 스키마 확인

dtypes

  • 내부 스키마를 볼 수 있다.
1
2
3
4
5
6
7
8
df.dtypes
'''
[('name', 'string'),
 ('ticker', 'string'),
 ('country', 'string'),
 ('price', 'bigint'),
 ('currency', 'string')]
'''

show()

  • 테이블 형태로 데이터를 출력하며 첫 20개의 열만 전시한다.
  • 디버깅할 때 유용하게 쓰인다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
df.show()
'''
+-------+------+---------+------+--------+
|   name|ticker|  country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL|      USA|  2984|     USD|
|Netflix|  NFLX|      USA|   645|     USD|
| Amazon|  AMZN|      USA|  3518|     USD|
|  Tesla|  TSLA|      USA|  1222|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY|
|Samsung|005930|    Korea| 70600|     KRW|
|  Kakao|035720|    Korea|125000|     KRW|
+-------+------+---------+------+--------+
'''

printSchema()

  • 스키마를 트리 형태로 볼 수 있다.
  • 중첩된 스키마라면 이 방법이 편하다.
1
2
3
4
5
6
7
8
9
df.printSchema()
'''
root
 |-- name: string (nullable = true)
 |-- ticker: string (nullable = true)
 |-- country: string (nullable = true)
 |-- price: long (nullable = true)
 |-- currency: string (nullable = true)
'''

복잡한 데이터 타입

  • ArrayType: 변수 타입
  • MapType: 파이썬의 딕셔너리와 같은 형태
  • StructType: 오브젝트

데이터 프레임의 작업

  • SQL과 비슷한 작업이 가능하다.
    • Select
    • Where
    • Limit
    • OrderBy
    • GroupBy
    • Join

Select

  • 사용자가 원하는 컬럼이나 데이터를 추출하는데 사용한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
df.select('*').collect() # 모든 컬럼
'''
[Row(name='Google', ticker='GOOGL', country='USA', price=2984, currency='USD'),
 Row(name='Netflix', ticker='NFLX', country='USA', price=645, currency='USD')]
'''


df.select('name', 'price').collect() # 원하는 컬럼
'''
[Row(name='Google', price=2984),
 Row(name='Netflix', price=645)]
'''


# 셀렉트문에서 연산까지 가능하다.
# alias는 별명을 지정해주는 함수이다.
df.select(df.name, (df.price + 10000).alias('price')).collect()
'''
[Row(name='Google', price=12984),
 Row(name='Netflix', price=10645)]
'''

Agg

  • Aggregate의 약자로, 그룹핑 후 데이터를 하나로 합치는 작업이다.
1
2
3
4
5
6
7
8
9
10
11
12
df.agg({'price': 'max'}).collect() # 딕셔너리 형태로 컬럼의 max를 가져온다.
'''
[Row(max(price)=125000)]
'''


# 파이스파크의 함수를 이용하여 정의할 수도 있다.
from pyspark.sql import functions as F
df.agg(F.min(df.price)).collect()
'''
[Row(min(price)=483)]
'''

GropBy

  • 사용자가 지정한 컬럼을 기준으로 데이터를 그룹핑하는 작업이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
df.groupBy('currency').avg().collect()
'''
[Row(currency='KRW', avg(price)=97800.0),
 Row(currency='JPY', avg(price)=2006.0),
 Row(currency='HKD', avg(price)=483.0),
 Row(currency='USD', avg(price)=2092.25)]
'''


df.groupBy('currency').agg({'price': 'mean'}).collect()
'''
[Row(currency='KRW', avg(price)=97800.0),
 Row(currency='JPY', avg(price)=2006.0),
 Row(currency='HKD', avg(price)=483.0),
 Row(currency='USD', avg(price)=2092.25)]
'''


df.groupBy(df.currency).avg().collect()
'''
[Row(currency='KRW', avg(price)=97800.0),
 Row(currency='JPY', avg(price)=2006.0),
 Row(currency='HKD', avg(price)=483.0),
 Row(currency='USD', avg(price)=2092.25)]
'''


df.groupBy([df.currency, df.price]).count().collect()
'''
[Row(currency='USD', price=1222, count=1),
 Row(currency='USD', price=3518, count=1),
 Row(currency='HKD', price=483, count=1),
 Row(currency='USD', price=645, count=1),
 Row(currency='KRW', price=70600, count=1),
 Row(currency='JPY', price=2006, count=1),
 Row(currency='USD', price=2984, count=1),
 Row(currency='KRW', price=125000, count=1)]
'''

Join

  • 다른 데이터 프레임과 사용자가 지정한 컬럼을 기준으로 합치는 작업이다.
1
2
3
4
5
6
7
8
9
10
11
df.join(earningsDF, 'name').select(df.name, earningsDF.eps).collect()
'''
[Row(name='Amazon', eps=6.119999885559082),
 Row(name='Google', eps=27.989999771118164),
 Row(name='Kakao', eps=705.0),
 Row(name='Netflix', eps=2.559999942779541),
 Row(name='Samsung', eps=1780.0),
 Row(name='Tencent', eps=11.010000228881836),
 Row(name='Tesla', eps=1.8600000143051147),
 Row(name='Toyota', eps=224.82000732421875)]
'''

데이터 프레임 조작 예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
df.show()
'''
+-------+------+---------+------+--------+
|   name|ticker|  country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL|      USA|  2984|     USD|
|Netflix|  NFLX|      USA|   645|     USD|
| Amazon|  AMZN|      USA|  3518|     USD|
|  Tesla|  TSLA|      USA|  1222|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY|
|Samsung|005930|    Korea| 70600|     KRW|
|  Kakao|035720|    Korea|125000|     KRW|
+-------+------+---------+------+--------+
'''


usa_df = df.select('name', 'country', 'price').where('country == "USA"').orderBy('price')
usa_df.show()
'''
+-------+-------+-----+
|   name|country|price|
+-------+-------+-----+
|Netflix|    USA|  645|
|  Tesla|    USA| 1222|
| Google|    USA| 2984|
| Amazon|    USA| 3518|
+-------+-------+-----+
'''


df.groupBy('currency').max('price').show()
'''
+--------+----------+
|currency|max(price)|
+--------+----------+
|     KRW|    125000|
|     JPY|      2006|
|     HKD|       483|
|     USD|      3518|
+--------+----------+
'''


from pyspark.sql.functions import avg, count

df.groupBy('currency').agg(avg('price')).show()
'''
+--------+----------+
|currency|avg(price)|
+--------+----------+
|     KRW|   97800.0|
|     JPY|    2006.0|
|     HKD|     483.0|
|     USD|   2092.25|
+--------+----------+
'''


df.groupBy('currency').agg(count('price')).show()
'''
+--------+------------+
|currency|count(price)|
+--------+------------+
|     KRW|           2|
|     JPY|           1|
|     HKD|           1|
|     USD|           4|
+--------+------------+
'''
0%