NFT 파이프라인
- 에어플로우를 익히기 위한 미니 프로젝트를 진행할 것이다.
DAG
- DAG는 위 이미지로 요약할 수있다.
- 비순환되는 방향성과 의존성이 있는 테스크이다.
- 앞으로 파이프라인을 DAG 형식으로 만들게 된다.
미니 프로젝트 소개
- OpenSea 사이트의 NFT 데이터를 추출하여 데이터 테이블에 저장할 것이다.
- NFT: 블록체인 상에서 소유권을 증명할 수 있게 해주는 프로덕트이다.
- OpenSea: 소유권을 사고 팔 수 있는 이베이같은 사이트이다.
DAG Skeleton
1 |
|
- 에어플로우의 웹 서버를 열고 스케쥴러를 실행한다.
- user 디렉토리에 airflow라는 폴더가 생겼을 텐데, 만든 DAG를 넣어 줄 폴더를 하나더 만들어준다.
- 만든 폴더를 vsc를 통해 핸들링한다.
1 |
|
- 새로운 파이썬 파일을 만들어 스켈레톤을 만든다.
- 저장 뒤 조금만 기다리면 UI 상에 표시된다.
오퍼레이터(Operators)
- 데이터 파이프라인을 오퍼레이터(Operators)를 이용하여 코딩을 할 것이다.
에어플로우 내장 오퍼레이터
- 배시 오퍼레이터(BashOperator)
- 파이썬 오퍼레이터(PythonOperator)
- 이메일 오퍼레이터(EmailOperator)
외부 오퍼레이터로
- 액션 오퍼레이터(Action Operator)는 액션을 실행한다.
- 트랜스퍼 오퍼레이터(Transfer Operator)는 데이터를 옮길 때 사용한다.
- 센서(Sensors)는 조건이 맞을 때 까지 기다렸다가 충족됐을 때 실행한다.
외부 프로바이더(provider)
- 외부에 존재하는 프로젝트와 에어플로우를 연결하는 브릿지(bridge)이다.
테이블 생성
- SQLite를 이용하여 간단히 만들어본다.
- 새로운 커넥션(connection)을 생성한다.
1 |
|
- 위에서 만든 스켈레톤 위에 테스크를 추가한다.
- 저장하면 UI 상에도 반영되어 있는 것을 볼 수 있다.
1 |
|
- 위와 같은 명령어 폼으로 테스트를 진행할 수 있다.
- sqlite3 상에서도 nfts 테이블이 생긴 것을 볼 수 있다.
Sensor
- 센서 오퍼레이션을 사용하여 외부 API가 존재하는지 확인해볼 것이다.
- 새로운 커넥션을 만들어준다.
- 새로운 테스크를 추가한다.
1 |
|
- 아래의 커맨드로 테스트해본다.
1 |
|
- 성공했음을 확인할 수 있다.
HttpOperator
- Http에서 데이터를 가져와 추출하는 구현을 할 것이다.
SimpleHttpOperator
라는 것을 사용할 것이다.- 새로운 테스크를 만들어 준다.
1 |
|
- 아래의 커맨드로 테스트해본다.
1 |
|
- 성공적으로 테스크가 실행이 되며, API를 이용하여 추출한 데이터가 전시된다.
- UI 상에도 만들었던 테스크들이 전시되는 것을 확인할 수 있다.
PythonOperator
- OpenSea API로 가져온 데이터를 가공할 것이다.
- 가공하기 위해
PythonOperator
를 사용할 것인데, 에어플로우의 내장 오퍼레이터 중 하나이다. - 새로운 테스크와 함수를 생성한다.
xcom_pull
을 통해 테스크를 가져올 수 있다.
1 |
|
- 아래 커맨드로 테스트해본다.
1 |
|
- 성공하였고, 생성한 CSV 파일도 확인해본다.
1 |
|
- CSV 파일도 잘 만들어진 것을 확인할 수 있다.
BashOperator
- 가공한 nft의 데이터를 저장해볼 것이다.
1 |
|
- 위와 같이 추가한 후 테스트를 해본다.
1 |
|
- sqlite3에서도 확인해본다.
1 |
|
- 정상적으로 전시된다.
의존성 생성
- 지금까지 파이프라인 내 여러가지의 테스크를 생성했다.
xcom_pull
로 테스크 간 커뮤니케이션도 만들었다.
- 하지만, 그래프 뷰를 보면 의존성이 없음을 볼 수 있다.
- 기본적인 일자형 의존성을 만들어 본다.
1 |
|
- 위와 같이 간단한 코드로 구현할 수 있다.
- UI 상에서도 반영된 것을 확인할 수 있다.
Backfill
- 데이터 파이프라인을 운영하며 맞닥드리게 되는 백필 문제에 대해 알아 볼 것이다.
- 백필은 어떤 파이프라인이 망가졌을 때 망가지기 전 시점으로 돌아가서 처음부터 돌리는 것을 의미한다.
- 매일 주기적으로 돌아가는 파이프라인을 멈췄다가 몇 일 뒤 실행시키는 상황을 가정해본다.
1 |
|
- 위에서 작성했던
catchup
이라는 필드로 컨트롤 할 수 있다.