[DE 프로젝트: 실시간 빅데이터 처리 'SIXAT'] 3. 스파크(Spark) SQL

FHVHV 데이터

  • 이전 포스팅에서 RDD를 이용하였는데, 스파크 SQL을 이용하여 데이터 분석을 진행해본다.

우버 트립 수

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 스파크 세션 import
from pyspark.sql import SparkSession


# 어플리케이션 생성
spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()


# 데이터 프레임 생성
data = spark.read.csv("/Users/6mini/fhvhv_tripdata_2020-03.csv", inferSchema = True, header = True) # 스키마 자동 예측과 헤더 자동 설정


# 데이터 확인
data.show(5)
'''
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   null|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   null|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
|           HV0003|              B02764|2020-03-01 00:18:42|2020-03-01 00:38:42|         209|          80|   null|
|           HV0003|              B02764|2020-03-01 00:44:24|2020-03-01 00:58:44|         256|         226|   null|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
only showing top 5 rows
'''


# SQL을 사용하기 위해 TempView에 담는다.
data.createOrReplaceTempView("mobility_data")


spark.sql("SELECT * FROM mobility_data LIMIT 5").show()
'''
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   null|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   null|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
|           HV0003|              B02764|2020-03-01 00:18:42|2020-03-01 00:38:42|         209|          80|   null|
|           HV0003|              B02764|2020-03-01 00:44:24|2020-03-01 00:58:44|         256|         226|   null|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
'''

# 일별 트립 수
# split을 이용하여 데이트 타임 분리

spark.sql("SELECT pickup_date, COUNT(*) AS trips FROM (SELECT split(pickup_datetime, ' ')[0] AS pickup_date FROM mobility_data) GROUP BY pickup_date ").show()
'''
+-----------+------+
|pickup_date| trips|
+-----------+------+
| 2020-03-02|648986|
| 2020-03-01|784246|
| 2020-03-03|697880|
| 2020-03-04|707879|
| 2020-03-05|731165|
| 2020-03-06|872012|
| 2020-03-07|886071|
| 2020-03-08|731222|
| 2020-03-10|626474|
| 2020-03-09|628940|
| 2020-03-11|628601|
| 2020-03-12|643257|
| 2020-03-13|660914|
| 2020-03-15|448125|
| 2020-03-14|569397|
| 2020-03-16|391518|
| 2020-03-17|312298|
| 2020-03-18|269232|
| 2020-03-20|261900|
| 2020-03-24|141686|
+-----------+------+
'''
  • 같은 기능이지만, 확실히 전에 봤던 RDD보다는 훨씬 간편하게 해결할 수 있다.

각 헹정구 별 데이터

  • TLC Trip Record Data에서 Taxi Zone Lookup Table CSV 파일을 다운로드 받는다.
    • 로케이션 아이디의 정보가 담겨있는 파일이다.
  • 조인(join)하여 러프하게 분석한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("trip_count_by_zone_sql").getOrCreate()


trip_data = spark.read.csv("/Users/6mini/fhvhv_tripdata_2020-03.csv", inferSchema = True, header = True)
zone_data = spark.read.csv("/Users/6mini/taxi+_zone_lookup.csv", inferSchema = True, header = True)


trip_data.createOrReplaceTempView("trip_data")
zone_data.createOrReplaceTempView("zone_data")


# 행정구 별 승차하는 승객을 구해본다.
spark.sql('SELECT borough, COUNT(*) FROM\
            (SELECT zone_data.Borough AS borough\
            FROM trip_data\
            JOIN zone_data ON trip_data.PULocationID = zone_data.LocationID)\
        GROUP BY borough').show()
'''
+-------------+--------+
|      borough|count(1)|
+-------------+--------+
|       Queens| 2437383|
|          EWR|     362|
|      Unknown|     845|
|     Brooklyn| 3735764|
|Staten Island|  178818|
|    Manhattan| 4953140|
|        Bronx| 2086592|
+-------------+--------+
'''


# 행정구 별 하차하는 승객을 구해본다.
spark.sql('SELECT borough, COUNT(*) FROM\
            (SELECT zone_data.Borough AS borough\
            FROM trip_data\
            JOIN zone_data ON trip_data.PULocationID = zone_data.LocationID)\
        GROUP BY borough').show()
'''
+-------------+--------+
|      borough|count(1)|
+-------------+--------+
|       Queens| 2468408|
|          EWR|   65066|
|      Unknown|  387759|
|     Brooklyn| 3696682|
|Staten Island|  177727|
|    Manhattan| 4553776|
|        Bronx| 2043486|
+-------------+--------+
'''
  • 승차든 하차든 맨해튼의 수가 굉장히 많은 걸 볼 수있다.
  • 공항(EMR)의 하차 수가 많은 걸 볼 수 있다.

택시 데이터

  • 본격적으로 택시 요금 예측 머신 러닝에 쓰일 데이터를 분석한다.
  • 뉴욕의 택시 데이터 분석을 진행할 것이다.
  • 데이터셋은 TLC Trip Record Data에서 2021년 1월부터 7월까지의 ‘Yellow Taxi Trip Records’ CSV파일을 다운받아 진행한다.

image

분석 준비

  • 라이브러리와 데이터를 불러온다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("taxi-analysis").getOrCreate()


trips_df = spark.read.csv("/Users/6mini/trips/*", inferSchema=True, header=True) # 여러개 파일 동시에 가져올 수 있다.
zone_df = spark.read.csv("/Users/6mini/taxi+_zone_lookup.csv", inferSchema=True, header=True)


trips_df.printSchema()
zone_df.printSchema()
'''
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)
'''

데이터 병합

  • 불러온 두 데이터 프레임을 조인하고, 필요한 컬럼만 선택한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
query = """
SELECT 
    t.VendorID as vendor_id,
    TO_DATE(t.tpep_pickup_datetime) as pickup_date,
    TO_DATE(t.tpep_dropoff_datetime) as dropoff_date,
    HOUR(t.tpep_pickup_datetime) as pickup_time,
    HOUR(t.tpep_dropoff_datetime) as dropoff_time,
    t.passenger_count,
    t.trip_distance,
    t.fare_amount,
    t.tip_amount,
    t.tolls_amount,
    t.total_amount,
    t.payment_type,
    pz.Zone as pickup_zone,
    dz.Zone as dropoff_zone
FROM 
    trips t
    LEFT JOIN 
        zone pz
    ON
        t.PULocationID = pz.LocationID
    LEFT JOIN
        zone dz
    ON 
        t.DOLocationID = dz.LocationID
"""
comb_df = spark.sql(query)

# 새로운 TempView에 담는다.
comb_df.createOrReplaceTempView("comb")


comb_df.printSchema()
'''
root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- dropoff_date: date (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- dropoff_time: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- pickup_zone: string (nullable = true)
 |-- dropoff_zone: string (nullable = true)
'''

이상치 탐색

  • 이상한 데이터가 있는 지 확인한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
spark.sql("SELECT pickup_date, pickup_time FROM comb WHERE pickup_date < '2020-12-31'").show()
'''
+-----------+-----------+
|pickup_date|pickup_time|
+-----------+-----------+
| 2009-01-01|          0|
| 2008-12-31|         23|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2008-12-31|         23|
| 2008-12-31|         23|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2009-01-01|          1|
| 2009-01-01|         17|
| 2009-01-01|         17|
| 2009-01-01|         18|
| 2009-01-01|          1|
| 2009-01-01|          1|
| 2009-01-01|          2|
| 2009-01-01|          2|
| 2009-01-01|          0|
+-----------+-----------+
'''
  • 2021년 1월부터 7월까지의 데이터인데 옛날 데이터가 끼여있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
# 요금의 요약 확인
comb_df.select("total_amount").describe().show()
'''
+-------+------------------+
|summary|      total_amount|
+-------+------------------+
|  count|          15000700|
|   mean| 18.75545205706905|
| stddev|145.74424528059743|
|    min|            -647.8|
|    max|          398469.2|
+-------+------------------+
'''
  • 금액인데 음수도 있고 최대값은 원화로 4억이 넘어간다.
  • 택시 요금으로 4억….?
1
2
3
4
5
6
7
8
9
10
11
12
comb_df.select("trip_distance").describe().show()
'''
+-------+-----------------+
|summary|    trip_distance|
+-------+-----------------+
|  count|         15000700|
|   mean|6.628629402627825|
| stddev|671.7293482115845|
|    min|              0.0|
|    max|        332541.19|
+-------+-----------------+
'''
  • 이동 거리에도 이상치가 있다.
  • 최소값은 타자마자 내렸다 쳐도 최대값은 535,172km이다.
1
2
3
4
5
6
7
8
9
10
11
12
comb_df.select("passenger_count").describe().show()
'''
+-------+------------------+
|summary|   passenger_count|
+-------+------------------+
|  count|          14166672|
|   mean|1.4253783104458126|
| stddev|1.0443270490596768|
|    min|                 0|
|    max|                 9|
+-------+------------------+
'''
  • 승객 수는 그럴싸 하지만, 택시에 9명이 타는 건 좀 무리라고 본다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 월 단위로 데이터를 그룹핑하여 확인한다.
query = """
SELECT
     DATE_TRUNC('MM', c.pickup_date) AS month,
     COUNT(*) AS trips
FROM
    comb c
GROUP BY
    month
ORDER BY
    month desc
"""
spark.sql(query).show()
'''
+-------------------+-------+
|              month|  trips|
+-------------------+-------+
|2029-05-01 00:00:00|      1|
|2021-12-01 00:00:00|      5|
|2021-11-01 00:00:00|      5|
|2021-10-01 00:00:00|      3|
|2021-09-01 00:00:00|      3|
|2021-08-01 00:00:00|     36|
|2021-07-01 00:00:00|2821430|
|2021-06-01 00:00:00|2834204|
|2021-05-01 00:00:00|2507075|
|2021-04-01 00:00:00|2171215|
|2021-03-01 00:00:00|1925130|
|2021-02-01 00:00:00|1371688|
|2021-01-01 00:00:00|1369749|
|2020-12-01 00:00:00|     16|
|2009-01-01 00:00:00|    111|
|2008-12-01 00:00:00|     26|
|2004-04-01 00:00:00|      1|
|2003-01-01 00:00:00|      1|
|2002-12-01 00:00:00|      1|
'''
  • 월 단위로 운행 수를 확인해보면 데이터셋 외 날짜의 데이터가 존재한다.
  • 1월부터 7월 외의 데이터를 삭제해도 무방할 것으로 생각된다.

데이터 정제

  • 위에서 확인한 이상치를 참고하여 정제한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
query = """
SELECT
    *
FROM 
    comb c
WHERE
    c.total_amount < 5000
    AND c.total_amount > 0
    AND c.trip_distance < 100
    AND c.passenger_count < 5
    AND c.pickup_date >= '2021-01-01'
    AND c.pickup_date < '2021-08-01'
"""
cleaned_df = spark.sql(query)
cleaned_df.createOrReplaceTempView("cleaned")


cleaned_df.describe().show()
'''
+-------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+--------------------+--------------------+
|summary|          vendor_id|       pickup_time|      dropoff_time|   passenger_count|     trip_distance|       fare_amount|        tip_amount|       tolls_amount|      total_amount|      payment_type|         pickup_zone|        dropoff_zone|
+-------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+--------------------+--------------------+
|  count|           13297107|          13297107|          13297107|          13297107|          13297107|          13297107|          13297107|           13297107|          13297107|          13297107|            13297107|            13297107|
|   mean| 1.6708898409255486|14.196737004522863|14.282849871028338|1.2091016489526631|2.8381349567240175|12.161357451661463|2.1885364222464383|0.27013995299859384| 18.07238341634938|1.2470360658149175|                null|                null|
| stddev|0.46989007105205594| 5.120955674004399| 5.192133029412091|0.5416739633355407|3.6244092466998454| 10.98625196913336| 2.577909795896231| 1.5421161367097538|13.263132484566189|0.4532515283863241|                null|                null|
|    min|                  1|                 0|                 0|                 0|               0.0|              -0.8|               0.0|                0.0|              0.01|                 1|Allerton/Pelham G...|Allerton/Pelham G...|
|    max|                  2|                23|                23|                 3|             99.96|            4969.0|             700.0|             956.55|            4973.3|                 5|      Yorkville West|      Yorkville West|
+-------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+--------------------+--------------------+
'''

분석 및 시각화

1
2
3
4
5
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

일 별 운행 수

1
2
3
4
5
6
7
8
9
10
11
12
13
query = """
SELECT 
    c.pickup_date,
    COUNT(*) AS trips
FROM
    cleaned c
GROUP BY
    c.pickup_date
"""
pd_df = spark.sql(query).toPandas() # 그래프를 그려야하기 때문에 판다스로 보낸다.

fig, ax = plt.subplots(figsize=(16, 6))
sns.lineplot(x="pickup_date", y="trips", data=pd_df)

image

  • 코로나가 완화되며 택시 이용 수가 늘어나고 있는 것을 확인할 수 있다.
  • 중간 중간 하락하는 것을 보며 위클리 패턴(weekly pattern)이 있음을 볼 수 있다.

요일 별 운행 수

1
2
3
4
5
6
7
8
9
10
11
12
query = """
SELECT 
    c.pickup_date,
    DATE_FORMAT(c.pickup_date, 'EEEE') AS day_of_week,
    COUNT(*) AS trips
FROM
    cleaned c
GROUP BY
    c.pickup_date,
    day_of_week
"""
pd_df2 = spark.sql(query).toPandas()
  • 요일 별 정렬을 위해 컬럼을 하나 만든다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
data = pd_df2.groupby("day_of_week").trips.median().to_frame().reset_index()
data["sort_dow"] = data["day_of_week"].replace({
    "Sunday": 0,
    "Monday": 1,
    "Tuesday": 2,
    "Wednesday": 3,
    "Thursday": 4,
    "Friday": 5,
    "Saturday": 6,
})


data.sort_values(by="sort_dow", inplace=True)
data

image

1
2
3
4
5
6
fig, ax = plt.subplots(figsize=(12, 5))
sns.barplot(
    x="day_of_week",
    y="trips",
    data=data
)

image

  • 확실히 주말보다 평일이 택시 운행량이 많은 것을 확인할 수 있다.
  • 일요일이 가장 적고, 금요일이 가장 많다.

요일 생성 함수

  • 위에서 만든 쿼리 외에도 데이터 프레임에 함수를 이용하여 생성하는 방법도 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 요일 생성 함수 생성
def get_weekday(data):
    import calendar
    return calendar.day_name[date.weekday()]

spark.udf.register("get_weekday", get_weekday)


query = """
SELECT 
    c.pickup_date,
    get_weekday(c.pickup_date) AS day_of_week,
    COUNT(*) AS trips
FROM
    cleaned c
GROUP BY
    c.pickup_date,
    day_of_week
"""

결제 타입 패턴

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
payment_type_to_string = {
    1: "Credit Card",
    2: "Cash",
    3: "No Charge",   # 계산되지 않음
    4: "Dispute",     # 분쟁이 있었음
    5: "Unknown",
    6: "Voided Trip", # 취소된 운행
}
def parse_payment_type(payment_type):
    return payment_type_to_string[payment_type]

spark.udf.register("parse_payment_type", parse_payment_type)


query = """
SELECT 
    parse_payment_type(payment_type),
    count(*) AS trips,
    MEAN(fare_amount) AS mean_fare_amount,
    STD(fare_amount) AS stdev_fare_amount
FROM
    cleaned
GROUP BY
    payment_type
"""

spark.sql(query).show()
'''
+--------------------------------+--------+------------------+------------------+
|parse_payment_type(payment_type)|   trips|  mean_fare_amount| stdev_fare_amount|
+--------------------------------+--------+------------------+------------------+
|                     Credit Card|10117869|12.216744280836133|10.795080136078607|
|                       No Charge|   58552| 11.93100560185818|15.037849089253717|
|                         Dispute|   23536|12.653402447314743| 14.16347772653488|
|                            Cash| 3097149|11.981031761791233|11.474101590229493|
|                         Unknown|       1|              17.8|              null|
+--------------------------------+--------+------------------+------------------+
'''
  • 신용 카드가 가장 많고 그 다음이 현금이다.
  • 요금이 가장 많은 것은 Dispute이다. 아무래도 금액이 높으니 분쟁이 많았던 것 같다.
0%