[DE 프로젝트: 음악 추천 챗봇 'Sixpotify'] 6. AWS EC2 크론탭(Crontab)

데이터 파이프라인

데이터 워크플로우

  • 아래와 같은 서비스를 S3에 모아 아테나(Athena) 같은 서비스로 분석해준 뒤 그 결과를 저장해놓는 일련의 데이터 작업의 흐름이다.

image

DAG

  • 하나의 잡이 시작되거나 어떠한 이벤트에 트리거가 됐을 때, 또 다른 잡으로 연결이 되는 이런 정보들을 DAG(Directed Acyclic Graphs)라고 부른다.

image

ETL

  • 보통은 Extract⇨Transform⇨Load 순으로 작업을 해 왔지만, 최근에는 Extract⇨Load⇨Transform 순으로 작업을 하기도 한다.
  • 데이터 파이프라인의 연장선이다.
  • 하나의 예시를 들자면, 하루의 정해진 시간에 의한 스케쥴링인 Amazon CloudWatch Event와 Automation Script를 통해서 머신이 시작하면, AWS안에 AWS Step Functions은 각 과정에서 그 다음 과정으로의 연결에 대한 여러가지 경우의 수의 룰을 정해 놓는 서비스로, 쉽게 말하면 임의의 단계에서 fail 발생 시 어떤 이벤트를 발생시켜야 하고 success를 하면 어떤 이벤트를 발생시켜야 하는지 관리할 수 있도록 도와주는 서비스이다.
  • 이런 Step Function안의 ETL 플로우 머신이 시작하고, 이후에는 다양한 잡들이 작동하게 된다.
  • 이러한 ETL 잡들의 로그를 CloudWatch에 저장을 하고, 아래와 같은 플로우를 갖게된다.

image

image

AWS 글루(Glue)

  • AWS의 Step function에 관해 조금 더 말하자면, 아래 그림과 같이 사용할 수 있다.
  • 시작되면 잡이 제출 되고, 잡이 끝날 때 까지 기다려 줄 수 있게끔 Wait Seconds를 사용할 수도 있다.
  • 아테나는 어느 정도 빅데이터를 처리하는 시스템이기 때문에 MySQL이나 PostgreSQL보다는 느린 부분이 있다.
  • 이런 경우 위와 같이 time sleep을 통해 파이썬 스크립트를 잠깐 멈춰두고 그 다음에 해당 시간이 지났을때 그 쿼리에 대한 결과들을 가져올 수 있다.
  • 이후에는 다시 잡 상태를 받고 잡이 끝났는지 아닌지에 따라 작업을 진행하는 플로우를 볼 수 있다.
  • 이런 서비스들이 없었을 때는 하나하나 모니터링을 통해서 수동으로 관리를 해야 했다.

image

  • 좋은 부분은 SQL 같은 경우, 만들어놓은 스키마에 맞춰 데이터를 넣었는데, 이젠 데이터가 방대해지고 형식도 다른데, 이런것을 클루 한다는 의미의 서비스이다.
  • 많이 쓰여지는 부분 중 하나가 크롤러인데 크롤러를 사용하면 자동으로 해당 데이터를 크롤링해서 데이터가 어떤 형식인지에 대해 지속적으로 스키마 관리가 들어간다.
  • 그러므로 데이터와 컬럼이 많을 때 사용하면 좋다.
  • AWS 글루 페이지를 보면 아래 그림과 같이 테이블과 ETL, 트리거 등 다양한 작업을 할 수 있다.
  • 한가지 예시로 S3에 저장해놓은 파이썬 스크립트를 Jobs 탭에서 바로 수정 가능하고, 트리거도 등록해서 관리할 수 있다.

image

  • 해당 잡은 step function이나 글루를 통해 관리를 하거나, EC2에서 크론탭(Crontab)으로 스케쥴링의 변화를 통해서 관리를 하는 등 다양한 방법으로 관리를 하지만 아래와 같이 서비스들의 지속적인 모니터링을 통해 코스트를 효율적으로 사용할 선택과 집중을 해야 할 것이다.
  • 어떤 부분까지 모니터링할 것인지에 대해 선택하여 집중한다.

image

크론탭(Crontab)

마이크로 서비스

수집 프로세스

  • Unknown Artist가 챗봇 메세지로 들어왔을 경우 AWS 람다(Lambda) 서비스 통해 Spotify API에 접근한다.
  • 해당 데이터가 Top Tracks, Artist Table에 가야되는지 S3에 가야되는 지를 관리한다.
  • Ad Hoc 데이터 잡을 통해 하루에 한 번이라던지, 직접 로컬에서 커맨드 라인을 통해 데이터를 가져올 수도 있게 된다.
  • 람다가 필요한 이유는 우리가 Unknown Artist가 챗봇 메세지로 들어왔을때 내용을 업데이트하기 위함이다.
  • 사람들이 기대하는 챗봇은 업데이트를 바로 해줘서 원하는 정보를 얻게끔 해줘야 하기 때문에 람다라는 서비스를 통해 해결한다.
  • 람다는 마이크로 서비스의 개념이다.

  • 챗봇을 람다로 구현하는 이유는 서버리스가 하나의 함수이기 때문에 스테이트리스(Stateless)라고도 하는데 지금 상태가 어떤지 모르겠다는 의미이다.
  • 어떤 메세지를 보내면 람다 함수에는 이전에 어떤 메세지를 갖고 있었는지 담을 수 없다.
  • 그러므로 스테이트를 관리할 데이터베이스가 필요한데 DynamoDB는 메시지에 특화되어 있다.
  • 람다의 경우 해당 서비스의 유저가 기하급수적으로 늘어났을 때 병렬로 늘어나기 때문에 제한점이 서버로 구현하는 것보다 덜하다.
  • 지속적으로 띄워져 있는 것이 아니라 필요할 때 띄워서 사용한 만큼만 비용을 지불한다.

image

람다(Lambda) 스크립팅

함수 생성

  • 람다 함수는 이전에 DynamoDB에 top track 정보를 저장했었는데, Artist가 추가된다면 DynamoDB에도 저장되어야 하므로 이 작업을 작성한다.

image

S3 생성

image

top_tracks 구조

1
2
3
4
5
top_tracks
├── deploy.sh
├── lambda_function.py
├── requirements.txt
└── setup.cfg

deploy.sh

1
2
3
4
5
6
7
8
9
10
11
#!/bin/bash

rm -rf ./libs
pip3 install -r requirements.txt -t ./libs

rm *.zip
zip top_tracks.zip -r *

aws s3 rm s3://6mini-top-tracks/top_tracks.zip
aws s3 cp ./top_tracks.zip s3://6mini-top-tracks/top_tracks.zip
aws lambda update-function-code --function-name top-tracks --s3-bucket 6mini-top-tracks --s3-key top_tracks.zip

lambda_function.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import sys
sys.path.append('./libs')
import os
import boto3
import requests
import base64
import json
import logging


client_id = ""
client_secret = ""

try:
    dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2', endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
    logging.error('could not connect to dynamodb')
    sys.exit(1)


def lambda_handler(event, context):

    headers = get_headers(client_id, client_secret)

    table = dynamodb.Table('top_tracks')

    artist_id = event['artist_id']

    URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(artist_id)
    params = {
        'country': 'US'
    }
    r = requests.get(URL, params=params, headers=headers)

    raw = json.loads(r.text)

    for track in raw['tracks']:

        data = {
            'artist_id': artist_id
        }

        data.update(track)

        table.put_item(
            Item=data
        )

    return "SUCCESS"



def get_headers(client_id, client_secret):

    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')

    headers = {
        "Authorization": "Basic {}".format(encoded)
    }

    payload = {
        "grant_type": "client_credentials"
    }

    r = requests.post(endpoint, data=payload, headers=headers)

    access_token = json.loads(r.text)['access_token']

    headers = {
        "Authorization": "Bearer {}".format(access_token)
    }

    return headers


if __name__=='__main__':
    main()

requirements.txt

1
requests

setup.cfg

1
2
[install]
prefix=

실행

1
2
3
4
5
$ brew install awscli

$ chmod +x deploy.sh

$ ./deploy.sh

IAM 역할 설정

image

image

테스트

image

0%