[DE 프로젝트: 음악 추천 챗봇 'Sixpotify'] 4. AWS S3 데이터 레이크

데이터 레이크(Lake)

시대의 변화에 따른 저장방식의 진화

구분 Data Lake Data Warehouse
Data Structure Raw Processed
Purpose of Data Not Yet Determined In Use
Users Data Scientists Business Professionals
Accessibility High / Quick to update Complicated / Costly

스크린샷 2021-10-04 16 25 49

아키텍쳐

스크린샷 2021-10-04 16 28 07

데이터 파이프라인

스크린샷 2021-10-04 16 29 53

S3

버킷 생성

스크린샷 2021-10-04 16 37 25

S3 적재

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import sys
import os
import logging
import boto3
import requests
import base64
import json
import psycopg2
from datetime import datetime
import pandas as pd
import jsonpath  # pip3 install jsonpath --user

client_id = ""
client_secret = ""

host = ""
port = 5432
username = ""
database = ""
password = ""


def main():

    try:
        conn = psycopg2.connect(
            host=host,
            database=database,
            user=username,
            password=password)
        cursor = conn.cursor()
    except:
        logging.error("could not connect to rds")
        sys.exit(1)

    headers = get_headers(client_id, client_secret)

    # RDS - 아티스트 ID를 가져오고
    cursor.execute("SELECT id FROM artists LIMIT 10")
    top_track_keys = {
        "id": "id",
        "name": "name",
        "popularity": "popularity",
        "external_url": "external_urls.spotify"
    }
    # Top Tracks Spotify 가져오고
    # Parquet화 : 스파크가 좋아하는 형태
    top_tracks = []
    for (id, ) in cursor.fetchall():
        URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(id)
        params = {
            'country': 'US'
        }

        r = requests.get(URL, params=params, headers=headers)
        raw = json.loads(r.text)

        for i in raw['tracks']:
            top_track = {}
            for k, v in top_track_keys.items():
                value = jsonpath.jsonpath(i, v)
                if type(value) == bool:
                    continue
                top_track.update({k: value[0]})
                top_track.update({'artist_id': id})
                top_tracks.append(top_track)
    

    # track_ids

    track_ids = [i['id'] for i in top_tracks]

    top_tracks = pd.DataFrame(top_tracks)
    top_tracks.to_parquet('top-tracks.parquet', engine='pyarrow', compression='snappy')

    dt = datetime.utcnow().strftime("%Y-%m-%d")

    s3 = boto3.resource('s3')
    object = s3.Object('6mini-spotify', 'top-tracks/dt={}/top-tracks.parquet'.format(dt)) # partition
    data = open('top-tracks.parquet', 'rb')
    object.put(Body=data)
    # S3 import

    tracks_batch = [track_ids[i: i+100] for i in range(0, len(track_ids), 100)]
    audio_features = []
    for i in tracks_batch:
        ids = ','.join(i)
        URL = "https://api.spotify.com/v1/audio-features/?ids={}".format(ids)

        r = requests.get(URL, headers=headers)
        raw = json.loads(r.text)

        audio_features.extend(raw['audio_features'])

    audio_features = pd.DataFrame(audio_features)
    audio_features.to_parquet('audio-features.parquet', engine='pyarrow', compression='snappy')

    s3 = boto3.resource('s3')
    object = s3.Object('6mini-spotify', 'audio-features/dt={}/top-tracks.parquet'.format(dt)) # dt : 파티션 -> 날짜로 만든다
    data = open('audio-features.parquet', 'rb')
    object.put(Body=data)


def get_headers(client_id, client_secret):

    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')

    headers = {
        "Authorization": "Basic {}".format(encoded)
    }
    payload = {
        "grant_type": "client_credentials"
    }

    r = requests.post(endpoint, data=payload, headers=headers)

    access_token = json.loads(r.text)['access_token']

    headers = {
        "Authorization": "Bearer {}".format(access_token)
    }

    return headers


if __name__=='__main__':
    main()

AWS S3 확인

스크린샷 2021-10-05 15 04 23

프레스토(Presto)

  • 스파크(Spark)의 단점이라 하면, 물론 스파크 SQL도 있지만 어느 정도 스크립팅이 필요한 부분이 있다.
  • 다양한 멀티플(multiple) 데이터 소스를 싱글 쿼리를 통해서 진행 할 수 있다.
  • 하둡(Hadoop)의 경우는 퍼포먼스나 여러가지 데이터 분석을 할 때 여러가지 이슈들이 있으며 이전 방식이기 때문에 최근에는 스파크와 프레스토로 넘어오는 추세이다.
  • AWS는 프레스토 기반인 아테나(Athena)를 통해서 S3의 데이터를 작업할 수 있다.

서버리스(Serverless)

  • 어떠한 요청이 들어올 때 서버를 띄우는데, 지속적으로 요청이 들어온다면 계속적으로 병렬적인 서버를 띄운다.
  • 서버 안에서 용량을 정하는 것을 알아서 자동적으로 해결해 주므로 비용적인 문제를 보완한다.
  • AWS에서 EC2 같은 경우는 서버 하나를 띄우는 것이고, 람다(Lambda)가 서버리스의 개념을 갖는 서비스이다.
  • 아테나도 서버리스의 개념을 갖는 서비스이다.

AWS 아테나(Athena)

  • AWS 아테나에서도 데이터 레이크의 시스템 형태로 데이터를 작업하더라도 쿼리를 통해 작업을 하려면 데이터 웨어하우스(warehouse)처럼 테이블의 형식을 만들어야한다.
  • AWS 글루(Glue)를 쓰면 크롤러를 이용해서 저장되어있는 데이터의 로케이션에 들어가서 생성한다.
  • 데이터를 S3로 파이썬을 통해 옮기고 아테나를 통해 테이블을 만들고 쿼리를 통해 데이터를 뽑아오는 부분을 진행할 것이다.

쿼리 위치

스크린샷 2021-10-05 15 49 30

쿼리 실행

  • 테이블과 파티션을 생성하고 리페어를 진행한다.
  • 키와 벨류가 모두 리스트가 아닌 형태여야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
CREATE EXTERNAL TABLE IF NOT EXISTS top_tracks(
    id string,
    artsist_id string,
    name string,
    popularity int,
    external_url string
) PARTITIONED BY (dt string)
STORED AS PARQUET LOCATION 's3://6mini-spotify/top-tracks/' tblproperties("parquet.compress"="SNAPPY")

MSCK REPAIR TABLE top_tracks

SELECT * FROM top_tracks LIMIT 10


CREATE EXTERNAL TABLE IF NOT EXISTS audio_features(
    id string,
    danceability double,
    energy double,
    key int,
    loudness double,
    mode int,
    speechiness double,
    acousticness double,
    instrumentalness double
) PARTITIONED BY (dt string)
STORED AS PARQUET LOCATION 's3://6mini-spotify/audio-features/' tblproperties("parquet.compress"="SNAPPY")

MSCK REPAIR TABLE audio_features

SELECT * FORM audio_features

스크린샷 2021-10-05 21 38 26

1
2
3
4
5
SELECT
AVG(danceability),
AVG(loudness)
FROM audio_features
WHERE CAST(dt AS date) = CURRENT_DATE 
  • Partition dt를 잘 선택해야한다.
_col0 _col1
0.6186399999999985 -7.51500000000001
0%