[스파크] UDF(User-Defined Function)란?

  • sql문 안에서 쓸 수 있는 하나의 함수를 정의하는 것이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from pyspark.sql import SparkSession


# 스파크 인스턴스 생성
spark = SparkSession.builder.appName("udf").getOrCreate()


# 실습을 위한 데이터
transactions = [
    ('찹쌀탕수육+짜장2', '2021-11-07 13:20:00', 22000, 'KRW'),
    ('등심탕수육+크립새우+짜장면', '2021-10-24 11:19:00', 21500, 'KRW'), 
    ('월남 쌈 2인 세트', '2021-07-25 11:12:40', 42000, 'KRW'), 
    ('콩국수+열무비빔국수', '2021-07-10 08:20:00', 21250, 'KRW'), 
    ('장어소금+고추장구이', '2021-07-01 05:36:00', 68700, 'KRW'), 
    ('족발', '2020-08-19 19:04:00', 32000, 'KRW'),  
]

schema = ["name", "datetime", "price", "currency"]


# 데이터 프레임 생성
df = spark.createDataFrame(data=transactions, schema=schema)


# TempView에 담기
df.createOrReplaceTempView("transactions")


spark.sql("SELECT * FROM transactions").show()
'''
+--------------------------+-------------------+-----+--------+
|                      name|           datetime|price|currency|
+--------------------------+-------------------+-----+--------+
|              찹쌀탕수육+짜장2|2021-11-07 13:20:00|22000|     KRW|
|        심탕수육+크립새우+짜장면|2021-10-24 11:19:00|21500|     KRW|
|             월남 쌈 2인 세트|2021-07-25 11:12:40|42000|     KRW|
|            콩국수+열무비빔국수|2021-07-10 08:20:00|21250|     KRW|
|            장어소금+고추장구이|2021-07-01 05:36:00|68700|     KRW|
|                       족발|2020-08-19 19:04:00|32000|     KRW|
+--------------------------+-------------------+-----+--------+
'''


# UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType # 리턴 타입이 문자열이 되지 않게하기 위한 라이브러리

# @udf('long') # 좀 더 파이써닉한 방법이며, 레지스팅과 타입 지정까지 할 수 있다. 왜인지 이 방법은 안된다.
def squared(n): # 제곱하는 함수 생성
    return n * n

## udf
# dataFrame = spark.table('transactions')
# display(dataFrame.select('price', squared_udf('price').alias('price_squared')))

spark.udf.register("squared", squared, LongType()) # 함수를 레지스터 하는 과정이 필요하다.


spark.sql("SELECT price, squared(price) FROM transactions").show()
'''
+-----+--------------+
|price|squared(price)|
+-----+--------------+
|22000|     484000000|
|21500|     462250000|
|42000|    1764000000|
|21250|     451562500|
|68700|    4719690000|
|32000|    1024000000|
+-----+--------------+
'''


# 숫자를 한글로 읽는 함수 생성
def read_number(n):
    units = ["", "십", "백", "천", "만"]
    nums = '일이삼사오육칠팔구'
    result = []
    i = 0
    while n > 0:
        n, r = divmod(n, 10) # 나눈 결과와 나머지를 반환하는 함수
        if r > 0:
            result.append(nums[r-1]+units[i])
        i += 1
    return "".join(reversed(result))

print(read_number(21250))
print(read_number(68700))
'''
이만일천이백오십
육만팔천칠백
'''


spark.udf.register("read_number", read_number)


spark.sql("SELECT price, read_number(price) FROM transactions").show()
'''
+-----+------------------+
|price|read_number(price)|
+-----+------------------+
|22000|            이만이천|
|21500|         이만일천오백|
|42000|            사만이천|
|21250|      이만일천이백오십|
|68700|         육만팔천칠백|
|32000|            삼만이천|
+-----+------------------+
'''


# 요일을 반환하는 함수 생성
def get_weekday(date):
    import calendar
    return calendar.day_name[date.weekday()]

spark.udf.register("get_weekday", get_weekday)


query = """
SELECT
    datetime,
    get_weekday(TO_DATE(datetime)) as day_of_week
FROM
    transactions
"""
spark.sql(query).show()
'''
+-------------------+-----------+
|           datetime|day_of_week|
+-------------------+-----------+
|2021-11-07 13:20:00|     Sunday|
|2021-10-24 11:19:00|     Sunday|
|2021-07-25 11:12:40|     Sunday|
|2021-07-10 08:20:00|   Saturday|
|2021-07-01 05:36:00|   Thursday|
|2020-08-19 19:04:00|  Wednesday|
+-------------------+-----------+
'''
0%