[에어플로우] NFT 파이프라인 미니 프로젝트

NFT 파이프라인

  • 에어플로우를 익히기 위한 미니 프로젝트를 진행할 것이다.

DAG

image

  • DAG는 위 이미지로 요약할 수있다.
  • 비순환되는 방향성과 의존성이 있는 테스크이다.
  • 앞으로 파이프라인을 DAG 형식으로 만들게 된다.

미니 프로젝트 소개

  • OpenSea 사이트의 NFT 데이터를 추출하여 데이터 테이블에 저장할 것이다.
    • NFT: 블록체인 상에서 소유권을 증명할 수 있게 해주는 프로덕트이다.
    • OpenSea: 소유권을 사고 팔 수 있는 이베이같은 사이트이다.

image

  • OpenSea에서 인기 있는 프로젝트인 Doodles의 작품에 대한 정보를 가공할 것이다.
  • 제공되는 API를 통해 테이블에 저장하는 파이프라인을 만들 것이다.

DAG Skeleton

1
2
$ airflow webserver
$ airflow scheduler
  • 에어플로우의 웹 서버를 열고 스케쥴러를 실행한다.

image

  • user 디렉토리에 airflow라는 폴더가 생겼을 텐데, 만든 DAG를 넣어 줄 폴더를 하나더 만들어준다.
  • 만든 폴더를 vsc를 통해 핸들링한다.

image

1
2
3
4
5
6
7
8
9
10
11
12
13
from datetime import datetime
from airflow import DAG

default_args = {
    'start_date': datetime(2021, 1, 1), # DAG를 시작하는 기준
}

with DAG(dag_id='nft-pipeline', # 이름을 지어준다.
         schedule_interval='@daily', # 매일 돌아가도록 지정
         default_args=default_args, 
         tags=['nft'],
         catchup=False) as dag:
    pass
  • 새로운 파이썬 파일을 만들어 스켈레톤을 만든다.
  • 저장 뒤 조금만 기다리면 UI 상에 표시된다.

image

오퍼레이터(Operators)

  • 데이터 파이프라인을 오퍼레이터(Operators)를 이용하여 코딩을 할 것이다.

에어플로우 내장 오퍼레이터

  • 배시 오퍼레이터(BashOperator)
  • 파이썬 오퍼레이터(PythonOperator)
  • 이메일 오퍼레이터(EmailOperator)

외부 오퍼레이터로

  • 액션 오퍼레이터(Action Operator)는 액션을 실행한다.
  • 트랜스퍼 오퍼레이터(Transfer Operator)는 데이터를 옮길 때 사용한다.
  • 센서(Sensors)는 조건이 맞을 때 까지 기다렸다가 충족됐을 때 실행한다.

외부 프로바이더(provider)

  • 외부에 존재하는 프로젝트와 에어플로우를 연결하는 브릿지(bridge)이다.

image

테이블 생성

  • SQLite를 이용하여 간단히 만들어본다.

image

  • 새로운 커넥션(connection)을 생성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from datetime import datetime
from airflow import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator

default_args = {
    'start_date': datetime(2021, 1, 1),
}

with DAG(dag_id='nft-pipeline',
         schedule_interval='@daily',
         default_args=default_args, 
         tags=['nft'],
         catchup=False) as dag:
    
    creating_table = SqliteOperator(
        task_id='creating_table',
        sqlite_conn_id='db_sqlite', # 위에서 만든 id
        sql='''
        CREATE TABLE IF NOT EXISTS nfts (
            token_id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            image_url TEXT NOT NULL
        )
        '''
    ) # 중복 방지
  • 위에서 만든 스켈레톤 위에 테스크를 추가한다.

image

  • 저장하면 UI 상에도 반영되어 있는 것을 볼 수 있다.
1
$  airflow tasks test nft-pipeline creating_table 2021-01-01
  • 위와 같은 명령어 폼으로 테스트를 진행할 수 있다.

image

  • sqlite3 상에서도 nfts 테이블이 생긴 것을 볼 수 있다.

Sensor

  • 센서 오퍼레이션을 사용하여 외부 API가 존재하는지 확인해볼 것이다.
  • 새로운 커넥션을 만들어준다.

image

  • 새로운 테스크를 추가한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from datetime import datetime
from airflow import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor

default_args = {
    'start_date': datetime(2021, 1, 1),
}

with DAG(dag_id='nft-pipeline',
         schedule_interval='@daily',
         default_args=default_args, 
         tags=['nft'],
         catchup=False) as dag:
    
    creating_table = SqliteOperator(
        task_id='creating_table',
        sqlite_conn_id='db_sqlite',
        sql='''
        CREATE TABLE IF NOT EXISTS nfts (
            token_id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            image_url TEXT NOT NULL
        )
        '''
    )

    is_api_available = HttpSensor(
        task_id='is_api_available',
        http_conn_id='opensea_api',
        endpoint='api/v1/assets?collection=doodles-official&limit=1'
    )
  • 아래의 커맨드로 테스트해본다.
1
2
3
$ airflow tasks test nft-pipeline is_api_available 2021-01-01

[2021-12-19 16:15:27,972] {taskinstance.py:1270} INFO - Marking task as SUCCESS. dag_id=nft-pipeline, task_id=is_api_available, execution_date=20210101T000000, start_date=20211219T071527, end_date=20211219T071527
  • 성공했음을 확인할 수 있다.

HttpOperator

  • Http에서 데이터를 가져와 추출하는 구현을 할 것이다.
  • SimpleHttpOperator라는 것을 사용할 것이다.
  • 새로운 테스크를 만들어 준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from datetime import datetime
import json

from airflow import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator

default_args = {
    'start_date': datetime(2021, 1, 1),
}

with DAG(dag_id='nft-pipeline',
         schedule_interval='@daily',
         default_args=default_args, 
         tags=['nft'],
         catchup=False) as dag:
    
    creating_table = SqliteOperator(
        task_id='creating_table',
        sqlite_conn_id='db_sqlite',
        sql='''
        CREATE TABLE IF NOT EXISTS nfts (
            token_id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            image_url TEXT NOT NULL
        )
        '''
    )

    is_api_available = HttpSensor(
        task_id='is_api_available',
        http_conn_id='opensea_api',
        endpoint='api/v1/assets?collection=doodles-official&limit=1'
    )

    extract_nft = SimpleHttpOperator(
        task_id='extract_nft',
        http_conn_id='opensea_api',
        endpoint='api/v1/assets?collection=doodles-official&limit=1',
        method='GET',
        response_filter=lambda res: json.loads(res.text),
        log_response=True
    )
  • 아래의 커맨드로 테스트해본다.
1
2
3
4
5
6
7
$ airflow tasks test nft-pipeline extract_nft 2021-01-01

[2021-12-19 16:31:05,774] {http.py:115} INFO - {"assets":[{"id":74417323,"token_id":"9999","num_sales":1,"background_color":null,"image_url":"https://lh3.googleusercontent.com/CrSXeD3t60EdSZqBPSdzU82aA9zd5n5W5ap0Feg1efE7dB4NHjFU2sHTLAhem22Hezt9PSIPWFQUGoG_TJBzccwPGpzwyXoGbOHJtQ","image_preview_url":"https://lh3.googleusercontent.com/
.
.
."transaction_hash":"0x8730bed59d2310b4aef2587000e804cecb6aaa4e8c8ad3d139fe815352c89f7e","transaction_index":"303"},"created_date":"2021-10-20T00:00:45.597187","quantity":"1"},"top_bid":null,"listing_date":null,"is_presale":false,"transfer_fee_payment_token":null,"transfer_fee":null}]}
[2021-12-19 16:31:05,782] {taskinstance.py:1270} INFO - Marking task as SUCCESS. dag_id=nft-pipeline, task_id=extract_nft, execution_date=20210101T000000, start_date=20211219T073104, end_date=20211219T073105
  • 성공적으로 테스크가 실행이 되며, API를 이용하여 추출한 데이터가 전시된다.

image

  • UI 상에도 만들었던 테스크들이 전시되는 것을 확인할 수 있다.

PythonOperator

  • OpenSea API로 가져온 데이터를 가공할 것이다.
  • 가공하기 위해 PythonOperator를 사용할 것인데, 에어플로우의 내장 오퍼레이터 중 하나이다.
  • 새로운 테스크와 함수를 생성한다.
  • xcom_pull을 통해 테스크를 가져올 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from datetime import datetime
import json

from airflow import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from pandas import json_normalize

default_args = {
    'start_date': datetime(2021, 1, 1),
}

def _processing_nft(ti): # 테스크 인스턴스를 넘겨준다.
    assets = ti.xcom_pull(task_ids=['extract_nft'])  # 테스크를 가져온다.
    if not len(assets):
        raise ValueError("assets is empty")
    nft = assets[0]['assets'][0]

    processed_nft = json_normalize({ # 제이슨을 판다스로 변환
        'token_id': nft['token_id'],
        'name': nft['name'],
        'image_url': nft['image_url'],
    })
    processed_nft.to_csv('/tmp/processed_nft.csv', index=None, header=False) 

with DAG(dag_id='nft-pipeline',
         schedule_interval='@daily',
         default_args=default_args, 
         tags=['nft'],
         catchup=False) as dag:
    
    creating_table = SqliteOperator(
        task_id='creating_table',
        sqlite_conn_id='db_sqlite',
        sql='''
        CREATE TABLE IF NOT EXISTS nfts (
            token_id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            image_url TEXT NOT NULL
        )
        '''
    )

    is_api_available = HttpSensor(
        task_id='is_api_available',
        http_conn_id='opensea_api',
        endpoint='api/v1/assets?collection=doodles-official&limit=1'
    )

    extract_nft = SimpleHttpOperator(
        task_id='extract_nft',
        http_conn_id='opensea_api',
        endpoint='api/v1/assets?collection=doodles-official&limit=1',
        method='GET',
        response_filter=lambda res: json.loads(res.text),
        log_response=True
    )

    process_nft = PythonOperator( # 파이썬 함수를 사용할 수 있다.
        task_id='process_nft',
        python_callable=_processing_nft
    )
  • 아래 커맨드로 테스트해본다.
1
2
3
$ airflow tasks test nft-pipeline process_nft 2021-01-01

[2021-12-19 17:07:05,646] {taskinstance.py:1270} INFO - Marking task as SUCCESS. dag_id=nft-pipeline, task_id=process_nft, execution_date=20210101T000000, start_date=20211219T080705, end_date=20211219T080705
  • 성공하였고, 생성한 CSV 파일도 확인해본다.
1
2
3
$ cat /tmp/processed_nft.csvexemple.py

9999,Doodle #9999,https://lh3.googleusercontent.com/CrSXeD3t60EdSZqBPSdzU82aA9zd5n5W5ap0Feg1efE7dB4NHjFU2sHTLAhem22Hezt9PSIPWFQUGoG_TJBzccwPGpzwyXoGbOHJtQ
  • CSV 파일도 잘 만들어진 것을 확인할 수 있다.

BashOperator

  • 가공한 nft의 데이터를 저장해볼 것이다.
1
2
3
4
5
6
from airflow.operators.bash import BashOperator

    store_nft = BashOperator(
        task_id='store_nft',
        bash_command='echo -e ".separator ","\n.import /tmp/processed_nft.csv nfts" | sqlite3 /Users/6mini/airflow/airflow.db'
    )
  • 위와 같이 추가한 후 테스트를 해본다.
1
2
3
$ airflow tasks test nft-pipeline store_nft 2021-01-01

[2021-12-19 17:20:25,769] {taskinstance.py:1270} INFO - Marking task as SUCCESS. dag_id=nft-pipeline, task_id=store_nft, execution_date=20210101T000000, start_date=20211219T082025, end_date=20211219T082025
  • sqlite3에서도 확인해본다.
1
2
3
$ select * from nfts;

9999|Doodle #9999|https://lh3.googleusercontent.com/CrSXeD3t60EdSZqBPSdzU82aA9zd5n5W5ap0Feg1efE7dB4NHjFU2sHTLAhem22Hezt9PSIPWFQUGoG_TJBzccwPGpzwyXoGbOHJtQ
  • 정상적으로 전시된다.

의존성 생성

  • 지금까지 파이프라인 내 여러가지의 테스크를 생성했다.
  • xcom_pull로 테스크 간 커뮤니케이션도 만들었다.

image

  • 하지만, 그래프 뷰를 보면 의존성이 없음을 볼 수 있다.
  • 기본적인 일자형 의존성을 만들어 본다.
1
creating_table >> is_api_available >> extract_nft >> process_nft >> store_nft
  • 위와 같이 간단한 코드로 구현할 수 있다.

image

  • UI 상에서도 반영된 것을 확인할 수 있다.

Backfill

  • 데이터 파이프라인을 운영하며 맞닥드리게 되는 백필 문제에 대해 알아 볼 것이다.
  • 백필은 어떤 파이프라인이 망가졌을 때 망가지기 전 시점으로 돌아가서 처음부터 돌리는 것을 의미한다.

image

  • 매일 주기적으로 돌아가는 파이프라인을 멈췄다가 몇 일 뒤 실행시키는 상황을 가정해본다.
1
2
3
4
5
with DAG(dag_id='nft-pipeline',
         schedule_interval='@daily',
         default_args=default_args, 
         tags=['nft'],
         catchup=True) as dag:
  • 위에서 작성했던 catchup이라는 필드로 컨트롤 할 수 있다.
0%