[스파크] MLlib이란?

MLlib이란?

image

  • 스파크(Spark)의 컴포넌트(Component) 중 하나이며, Machine Learining Library의 약자이며 머신 러닝(Machine Learning)을 쉽고 확장성 있게 적용하고 파이프 라인(Pipeline) 개발을 쉽게 하기 위해 만들어졌다.
  • 스파크에는 여러가지 컴포넌트가 있는데, 데이터 프레임(Dataframe) API 위에서 동작한다.
  • RDD API는 지원이 끊기는 추세여서 데이터 프레임 API를 이용한다.
  • 데이터 프레임을 쓰는 MLlib API를 스파크 ML로도 부른다.

MLlib으로 가능한 것

  • 피쳐 엔지니어링(Feature Engineering)
  • 통계적 연산
  • 흔히 쓰이는 ML 알고리즘
    • Regression(Linear, Logistic)
    • Support Vector Machines
    • Naive Bayes
    • Decision Tree
    • K-Means clustering
  • 추천(Alternating Least Squares)

MLlib의 컴포넌트

image

  • 파이프 라인을 만들기 위한 여러가지 컴포넌트가 MLlib에 존재한다.
    • 알고리즘
      • Classification
      • Regression
      • Clustering
      • Recommendation
    • 파이프라인
      • Training
      • Evaluating(평가)
      • Tuning
      • Persistence(저장)
    • Feature Engineering
      • Extraction
      • Transformation
    • Utils
      • Linear algebra(행렬 계산)
      • Statistics

트랜스포머(Transformer)

  • 피쳐 변환과 학습된 모델을 추상화한다.
  • 모든 트랜스포머 인스턴스는 transform() 함수를 가졌다.
  • 데이터를 학습 가능한 포맷으로 변환한다.
  • 데이터 프레임을 받아 새로운 데이터 프레임을 만드는데, 보통 하나 이상의 컬럼(column)을 더하게 된다.
    • Data Normalization
    • Tokenization
    • One-Hot Encoding

에스티메이터(Estimatior)

  • 모델의 학습 과정을 추상화한다.
  • 모든 에스티메이터 인스턴스는 fit() 함수를 가졌다.
  • fit()은 데이터 프레임을 받아 모델을 반환한다.
  • 모델은 하나의 트랜스포머이다.
    • lr = LinearRegression()
    • model = lr.fit(data)

이밸류에이터(Evaluator)

  • 매트릭(metric)을 기반으로 모델의 성능을 평가한다.
    • RMSE(Root mean squared error)
  • 모델을 여러개 만들어서 성능을 평가한 후 가장 좋은 모델을 뽑는 방식으로 튜닝을 자동화할 수 있다.
    • BinaryClassificationEvaluator
    • CrossValidator

파이프라인(Pipeline)

image

  • 머신 러닝의 워크 플로우이며, 여러 스테이지를 담고 있고 저장(persist)될 수 있다.

예제

스파크 머신 러닝

  • 간단한 데이터 프레임을 생성하여 머신 러닝을 구현해본다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 세션과 인스턴스 생성
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("logistic-regression").getOrCreate()


# 필요한 패키지 불러오기
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression


# 간단한 데이터 프레임 생성
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])


training.show()
'''
+-----+--------------+
|label|      features|
+-----+--------------+
|  1.0| [0.0,1.1,0.1]|
|  0.0|[2.0,1.0,-1.0]|
|  0.0| [2.0,1.3,1.0]|
|  1.0|[0.0,1.2,-0.5]|
+-----+--------------+
'''


# 로지스틱 리그레션 인스턴스 생성
lr = LogisticRegression(maxIter=30, regParam=0.01)


# 모델 생성
model = lr.fit(training)


# 테스트 데이터 생성
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])


# 예측
prediction = model.transform(test)


# 예측 결과
prediction.show()
'''
+-----+--------------+--------------------+--------------------+----------+
|label|      features|       rawPrediction|         probability|prediction|
+-----+--------------+--------------------+--------------------+----------+
|  1.0|[-1.0,1.5,1.3]|[-6.2435550918400...|[0.00193916823498...|       1.0|
|  0.0|[3.0,2.0,-0.1]|[5.45228608726759...|[0.99573180142693...|       0.0|
|  1.0|[0.0,2.2,-1.5]|[-4.4104172202339...|[0.01200425500655...|       1.0|
+-----+--------------+--------------------+--------------------+----------+
'''

파이프 라인

  • 파이프 라인을 구성하는 데에 포커스를 맞춘다.
  • 파이프 라인은 데이터의 여러가지 처리 단계를 거칠 때 유용하게 쓰인다.
  • 대표적인 예가 텍스트 데이터로서, 토크나이징이나 글자의 수를 센다던 지 원핫 인코딩을 하는 등의 여러가지 처리 방법이 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("logistic-regression").getOrCreate()


from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer


# 예제 데이터 프레임 생성
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])


# 글자를 스플릿하기 위한 토크나이저와 해싱 TF 생성
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")


# 모델 생성
lr = LogisticRegression(maxIter=30, regParam=0.001)


# 파이프라인 생성
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])


# 학습
model = pipeline.fit(training)


# 테스트 데이터 프레임 생성
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])


# 예측
prediction = model.transform(test)


# 예측 결과
prediction.select(["id", "text", "probability", "prediction"]).show()
'''
+---+------------------+--------------------+----------+
| id|              text|         probability|prediction|
+---+------------------+--------------------+----------+
|  4|       spark i j k|[0.63102699631690...|       0.0|
|  5|             l m n|[0.98489377609773...|       0.0|
|  6|spark hadoop spark|[0.13563147748816...|       1.0|
|  7|     apache hadoop|[0.99563405823116...|       0.0|
+---+------------------+--------------------+----------+
'''
0%