[DE 프로젝트: 실시간 빅데이터 처리 'SIXAT'] 5. 에어플로우(Airflow)

에어 플로우(Airflow) 설치

1
$ pip install apache-airflow
  • 위 명령어로 간단하게 설치할 수 있다.
1
$ pip install apache-airflow-providers-apache-spark
  • 위 명령어로 에어 플로우에 스파크 프로바이더(providers)를 설치하여 핸들링한다.

테스크(Task) 작성

  • DAG를 구성하기 앞서, 플로우로 구성할 테스크들을 작성한다.
  • 앞의 머신러닝 절차를 파이썬 파일로 간단하게 생성할 것이다.

전처리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# preprocess.py
from pyspark.sql import SparkSession

MAX_MEMORY="5g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()

trips_df = spark.read.csv("/Users/6mini/trip/", inferSchema=True, header=True) # 데이터 프레임 생성

trips_df.createOrReplaceTempView("trips")
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 5
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query) # 데이터 전처리

train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1) # 데이터 스플릿
data_dir = "/Users/6mini/spark/taxi"
train_df.write.format("parquet").mode('overwrite').save(f"{data_dir}/train/")
test_df.write.format("parquet").mode('overwrite').save(f"{data_dir}/test/") # 파이프라인이 여러번 돌 것이기 때문에 overwrite 모드로 지정한다.

하이퍼 파라미터 튜닝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# tune_hyperparameter.py
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np
import pandas as pd

MAX_MEMORY="5g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()

data_dir = "/Users/6mini/spark/taxi"

train_df = spark.read.parquet(f"{data_dir}/train/")

toy_df = train_df.sample(False, 0.1, seed=1)

cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

stages = []

for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol= c + "_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

num_feats = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vecotr")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    stages += [num_assembler, num_scaler]

assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

lr = LinearRegression(
    maxIter=30,
    solver="normal",
    labelCol='total_amount',
    featuresCol='feature_vector'
)

cv_stages = stages + [lr]

cv_pipeline = Pipeline(stages=cv_stages)
param_grid = ParamGridBuilder()\
                .addGrid(lr.elasticNetParam, [0.3, 0.5])\
                .addGrid(lr.regParam, [0.03, 0.05])\
                .build()

cross_val = CrossValidator(estimator=cv_pipeline,
                           estimatorParamMaps=param_grid,
                           evaluator=RegressionEvaluator(labelCol="total_amount"),
                           numFolds=5)

cv_model = cross_val.fit(toy_df)
alpha = cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam()
reg_param = cv_model.bestModel.stages[-1]._java_obj.getRegParam()

hyperparam = {
    'alpha': [alpha],
    'reg_param': [reg_param]
}
hyper_df = pd.DataFrame(hyperparam).to_csv(f"{data_dir}hyperparameter.csv")
print(hyper_df)

모델 학습

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# train_model.py
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

import pandas as pd



MAX_MEMORY="5g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()

data_dir = "/Users/6mini/spark/taxi"
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")


hyper_df = pd.read_csv(f"{data_dir}hyperparameter.csv")
alpha = float(hyper_df.iloc[0]['alpha'])
reg_param = float(hyper_df.iloc[0]['reg_param'])


cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

stages = []

for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol= c + "_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

num_feats = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vecotr")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    stages += [num_assembler, num_scaler]

assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]


# Training
transform_stages = stages
pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)

vtrain_df = fitted_transformer.transform(train_df)
lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector",
    elasticNetParam=alpha,
    regParam=reg_param,
)

model = lr.fit(vtrain_df)
vtest_df = fitted_transformer.transform(test_df)
predictions = model.transform(vtest_df)
predictions.cache()
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

model_dir = "/Users/6mini/spark/taxi/airflow"
model.write().overwrite().save(model_dir)

DAG 작성

  • 테스크를 작성했으니, DAG 파일을 생성하여 파이프 라인을 만든다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from datetime import datetime

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

default_args = {
  'start_date': datetime(2021, 1, 1),
}

with DAG(dag_id='taxi-price-pipeline',
         schedule_interval='@daily',
         default_args=default_args,
         tags=['spark'],
         catchup=False) as dag:
  
  preprocess = SparkSubmitOperator(
      application="/Users/keon/fastcampus/data-engineering/02-airflow/preprocess.py", task_id="preprocess", conn_id="spark_local"
  )

  tune_hyperparameter = SparkSubmitOperator(
      application="/Users/keon/fastcampus/data-engineering/02-airflow/tune_hyperparameter.py", task_id="tune_hyperparameter", conn_id="spark_local"
  )

  train_model = SparkSubmitOperator(
      application="/Users/keon/fastcampus/data-engineering/02-airflow/train_model.py", task_id="train_model", conn_id="spark_local"
  )

  preprocess >> tune_hyperparameter >> train_model
  • 의존성까지 구성하였으면 UI를 통해 확인한다.

테스트

1
2
$ airflow webserver
$ airflow scheduler
  • 에어 플로우의 웹 서버를 열고 스케쥴러를 실행한다.

트러블 슈팅

Cannot execute: spark-submit –master yarn –name arrow-spark

1
Cannot execute: spark-submit --master yarn --name arrow-spark
  • 위와 같은 에러가 전시되어 connection을 정비했다.

image

  • 위 이미지와 같이 수정하니 정상 작동했다.

Task received SIGTERM signal

1
Task received SIGTERM signal
  • 위와 같은 에러가 전시되어 찾아보니, 한 테스크에서 너무 오랜 시간을 보내게 되면 나오는 에러였다.
  • 아무래도 하이퍼 파라미터를 튜닝할 때 시간이 오래 걸렸는데, 워크 플로우 구성이 목적이므로 파라미터의 갯수를 줄여서 해결했다.

image

  • 모든 트러블 슈팅 후 위 이미지와 같이 모든 테스크가 정상적으로 진행됨을 확인할 수 있다.
0%