[에어플로우] Amazon EC2 ubuntu 환경에서 에어플로우 ETL 환경 구축

개요

  • 현재 데이터를 크롤링하여 적재하고, 적재된 데이터를 이용하여 서비스하고 있는 프로젝트가 세 가지가 되었다.
  • 이 모든 과정을 단순히 Amazon EC2의 크론잡(Cronjob)을 이용하여 진행하고 있다.
  • 모든 데이터 적재가 셀레니움(Sellenium)을 통한 동적 크롤링(Crawling)으로 진행되고 있는데, 생각보다 많은 에러를 직면하게 된다.
  • 앞으로도 많은 크롤러 프로그램을 제작하고 스케쥴링하게 될텐데, 점점 복잡도가 올라감에 따라 에어플로우(Airflow)를 도입하려 한다.

EC2 위에 에어플로우 구축

  • 24시간 돌아가야하는 작업이 많기 때문에 클라우드상에 구축을 진행한다.
  • AMI를 우분투(Ubuntu)로 사용하고, 인스턴스 사이즈는 기존 크롤링 서버와 동일하게 t3a.xlarge로 선택하여 여유롭게 진행한다.

파이썬 설치

1
2
$ sudo apt-get update
$ sudo apt-get install -y python3-pip
  • 위 명령어를 통해 최신 파이썬(Python) 버전을 설치한다.

에어플로우 및 기타 모듈 설치

1
2
3
4
5
6
7
8
9
10
11
12
13
# PostgreSQL
$ sudo apt-get install -y postgresql-server-dev-all
$ sudo apt-get install -y postgresql-common

# 에어플로우
$ sudo pip3 install apache-airflow
$ sudo pip3 install apache-airflow-providers-postgres[amazon]==2.0.0

# psycopg
$ sudo pip3 install cryptography psycopg2-binary boto3 botocore

# sqlalchemy
$ sudo pip3 install SQLAlchemy
  • sqlalchemy의 경우 로우 버전을 다운 받았더니 오류가 났다. 최신 버전을 다운로드 한다.

에어플로우 계정 생성

  • 우분투(Ubuntu) root 계정이 아닌 airflow user를 생성하여 작업을 진행한다.
1
2
3
$ sudo groupadd airflow

$ sudo useradd -s /bin/bash airflow -g airflow -d /var/lib/airflow -m
  • 루트 디렉토리는 /var/lib/airflow/이다.

PostgreSQL 설치

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ sudo apt-get install -y postgresql postgresql-contrib

# Postgre user로 로그인하여 USER와 DATABASE 생성
# Postgre user 로그인
ubuntu $ sudo su postgres

# user, database 생성
postgres /home/ubuntu $ psql

= CREATE USER airflow PASSWORD 'airflow';
= CREATE DATABASE airflow;
= \q

postgres /home/ubuntu $ exit

# PostgreSQL 재실행

ubuntu $ sudo service postgresql restart

에어플로우 초기화

1
2
3
4
5
6
7
8
9
10
# airflow user 사용

ubuntu $ sudo su airflow
airflow /home/ubuntu $ cd /var/lib/airflow

# dags 폴더생성
airflow $ mkdir dags

# airflow 초기화
airflow $ AIRFLOW_HOME=/var/lib/airflow airflow db init

에어플로우 cfg 파일 수정

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
airflow $ sudo vi airflow.cfg
'''
.
.
executor = LocalExecutor
.
.
load_examples = False
.
.
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
.
.
'''

# 에어플로우 재설정
airflow $ AIRFLOW_HOME=/var/lib/airflow airflow db init

에어플로우 웹 서버, 스케쥴러 서비스 등록

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 우분투 계정으로 이동
airflow $ exit

# 에어플로우 웹 서버를 백그라운드 서비스로 등록
ubuntu $ sudo vi /etc/systemd/system/airflow-webserver.service
'''
[Unit]
Description=Airflow webserver
After=network.target

[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow webserver -p 8080
Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target
'''
 
# 에어플로우 스케쥴러를 백그라운드 서비스로 등록
ubuntu $ sudo vi /etc/systemd/system/airflow-scheduler.service

'''
[Unit]
Description=Airflow scheduler
After=network.target

[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow scheduler
Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target
'''

# 서비스 활성화
ubuntu $ sudo systemctl daemon-reload
ubuntu $ sudo systemctl enable airflow-webserver
ubuntu $ sudo systemctl enable airflow-scheduler

# 서비스 시작
ubuntu $ sudo systemctl start airflow-webserver
ubuntu $ sudo systemctl start airflow-scheduler

# 서비스 상태확인
ubuntu $ sudo systemctl status airflow-webserver
ubuntu $ sudo systemctl status airflow-scheduler

에어플로우 웹 서버 로그인 어카운트 생성

1
2
3
4
5
# 에어플로우 접속
ubuntu $ sudo su airflow

# 생성
airflow $ AIRFLOW_HOME=/var/lib/airflow airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin

최종

  • [hostname]:8080으로 접속하여 확인한다.
  • EC2 보안 그룹에 8080 포트를 추가한다.

image

  • 에어플로우 실행까지 완료했다.

에어플로우 DAG 예제

  • 예제 코드를 작성한다.
  • 단순 ‘Hello World!’ 작업 흐름을 만들 것이다.
  • DAG 정의 파일이 AIRFLOW_HOME/dags에 저장되게 dags_folder를 만들고 이 디렉터리에 hello_world.py 파일을 만든다.
1
2
3
4
5
6
airflow_home
├── airflow.cfg
├── airflow.db
├── dags                <- Your DAGs directory
   └── hello_world.py  <- Your DAG definition file
└── unittests.cfg
  • dag는 간단하게 BashOperater를 이용하여 두가지로 구성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# hello_world.py
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
  
default_args = {
  'start_date': datetime(2021, 1, 1),
}

with DAG(dag_id='test',
         schedule_interval='@daily',
         default_args=default_args,
         tags=['test'],
         catchup=False) as dag:
  
  dag1 = BashOperator(
                task_id="dag1",
                bash_command='python3 /home/ubuntu/dag1.py'
  )
  
  dag2 = BashOperator(
                task_id="dag2",
                bash_command='python3 /home/ubuntu/dag2.py'
  )

  dag1 >> dag2
  • ubuntu 메인 디렉토리에 실행할 파일을 작성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# main_func.py

from slack_sdk import WebClient
import json

def say_hi():
    return '안녕!'

def say_hi_to(name):
    return f'{name}님 안녕하세요!'

def slack_warning(i):
    # 슬랙 클라이언트 설정
    client = WebClient(token='')
    
    message = [{
                    "type": "section",
                    "text": {
                                "type": "plain_text",
                                "text": i
                            }
                }]
    
    client.chat_postMessage(channel='U032BP8CV4Y', blocks=json.dumps(message))
  • 슬랙 API를 통해 메세지를 전송하는 프로세스를 작성한다.
1
2
3
4
5
6
7
8
9
10
# dag1.py
from main_func import *

slack_warning(say_hi())


# dag2.py
from main_func import *

slack_warning(say_hi_to('6mini'))
  • 모든 작성이 끝나고 조금의 시간이 흐르면 에어플로우 GUI에서 작업을 확인할 수 있다.

image

  • 버튼을 통해 작업을 살행하면 정상적으로 동작하는 것을 확인할 수 있다.

image

참조

0%