[스파크] UDF(User-Defined Function)란? Posted on 2021-12-21 | In Data Engineering 스파크 UDF의 간단한 실습 sql문 안에서 쓸 수 있는 하나의 함수를 정의하는 것이다. 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140from pyspark.sql import SparkSession # 스파크 인스턴스 생성 spark = SparkSession.builder.appName("udf").getOrCreate() # 실습을 위한 데이터 transactions = [ ('찹쌀탕수육+짜장2', '2021-11-07 13:20:00', 22000, 'KRW'), ('등심탕수육+크립새우+짜장면', '2021-10-24 11:19:00', 21500, 'KRW'), ('월남 쌈 2인 세트', '2021-07-25 11:12:40', 42000, 'KRW'), ('콩국수+열무비빔국수', '2021-07-10 08:20:00', 21250, 'KRW'), ('장어소금+고추장구이', '2021-07-01 05:36:00', 68700, 'KRW'), ('족발', '2020-08-19 19:04:00', 32000, 'KRW'), ] schema = ["name", "datetime", "price", "currency"] # 데이터 프레임 생성 df = spark.createDataFrame(data=transactions, schema=schema) # TempView에 담기 df.createOrReplaceTempView("transactions") spark.sql("SELECT * FROM transactions").show() ''' +--------------------------+-------------------+-----+--------+ | name| datetime|price|currency| +--------------------------+-------------------+-----+--------+ | 찹쌀탕수육+짜장2|2021-11-07 13:20:00|22000| KRW| | 심탕수육+크립새우+짜장면|2021-10-24 11:19:00|21500| KRW| | 월남 쌈 2인 세트|2021-07-25 11:12:40|42000| KRW| | 콩국수+열무비빔국수|2021-07-10 08:20:00|21250| KRW| | 장어소금+고추장구이|2021-07-01 05:36:00|68700| KRW| | 족발|2020-08-19 19:04:00|32000| KRW| +--------------------------+-------------------+-----+--------+ ''' # UDF from pyspark.sql.functions import udf from pyspark.sql.types import LongType # 리턴 타입이 문자열이 되지 않게하기 위한 라이브러리 # @udf('long') # 좀 더 파이써닉한 방법이며, 레지스팅과 타입 지정까지 할 수 있다. 왜인지 이 방법은 안된다. def squared(n): # 제곱하는 함수 생성 return n * n ## udf # dataFrame = spark.table('transactions') # display(dataFrame.select('price', squared_udf('price').alias('price_squared'))) spark.udf.register("squared", squared, LongType()) # 함수를 레지스터 하는 과정이 필요하다. spark.sql("SELECT price, squared(price) FROM transactions").show() ''' +-----+--------------+ |price|squared(price)| +-----+--------------+ |22000| 484000000| |21500| 462250000| |42000| 1764000000| |21250| 451562500| |68700| 4719690000| |32000| 1024000000| +-----+--------------+ ''' # 숫자를 한글로 읽는 함수 생성 def read_number(n): units = ["", "십", "백", "천", "만"] nums = '일이삼사오육칠팔구' result = [] i = 0 while n > 0: n, r = divmod(n, 10) # 나눈 결과와 나머지를 반환하는 함수 if r > 0: result.append(nums[r-1]+units[i]) i += 1 return "".join(reversed(result)) print(read_number(21250)) print(read_number(68700)) ''' 이만일천이백오십 육만팔천칠백 ''' spark.udf.register("read_number", read_number) spark.sql("SELECT price, read_number(price) FROM transactions").show() ''' +-----+------------------+ |price|read_number(price)| +-----+------------------+ |22000| 이만이천| |21500| 이만일천오백| |42000| 사만이천| |21250| 이만일천이백오십| |68700| 육만팔천칠백| |32000| 삼만이천| +-----+------------------+ ''' # 요일을 반환하는 함수 생성 def get_weekday(date): import calendar return calendar.day_name[date.weekday()] spark.udf.register("get_weekday", get_weekday) query = """ SELECT datetime, get_weekday(TO_DATE(datetime)) as day_of_week FROM transactions """ spark.sql(query).show() ''' +-------------------+-----------+ | datetime|day_of_week| +-------------------+-----------+ |2021-11-07 13:20:00| Sunday| |2021-10-24 11:19:00| Sunday| |2021-07-25 11:12:40| Sunday| |2021-07-10 08:20:00| Saturday| |2021-07-01 05:36:00| Thursday| |2020-08-19 19:04:00| Wednesday| +-------------------+-----------+ '''