[DE 프로젝트: 실시간 빅데이터 처리 'SIXAT'] 2. 스파크(Spark) 환경 구축

기초 환경설정

필요한 환경 & 패키지

  • 파이썬
  • 주피터 노트북
  • 자바
  • 스파크
  • Pyspark

아나콘다

  • 위 패키지를 한번에 다운로드하기 위해 아나콘다를 설치한다.

image

  • 필자는 이미 아나콘다가 설치되어있기 때문에 터미널로 업데이트만 진행했다.
1
$ conda update anaconda
  • conda의 base에 진입하여 python의 위치를 확인한다.
1
$ which python # /Users/6mini/opt/anaconda3/bin/python
  • 위 처럼 anaconda3 내의 python경로가 나오면 된다.
  • 자바 설치를 확인한다.
1
2
3
4
5
6
1
2
3
4
5
6
$ java -version
'''
java version "16.0.2" 2021-07-20
Java(TM) SE Runtime Environment (build 16.0.2+7-67)
Java HotSpot(TM) 64-Bit Server VM (build 16.0.2+7-67, mixed mode, sharing)
'''
  • brew로 adoptopenjdk8, scala, apache-spark를 설치한다.
1
2
3
4
5
$ brew install --cask homebrew/cask-versions/adoptopenjdk8

$ brew install scala

$ brew install apache-spark
  • pip로 pyspark를 설치한다.
1
$ pip install pyspark

모빌리티 데이터 수집

뉴욕 TLC 트립 기록 데이터

image

  • 샘플 데이터로 20년 3월의 ‘High Volume For-Hire Vehicle Trip Records (CSV)’을 클릭하여 다운로드한다.
    • Yellow Taxi Trip Records: 뉴욕의 대표적인 택시
    • Green Taxi Trip Records: 브루클린에서 활동하는 택시
    • For-Hire Vehicle Trip Records: 정규 택시 서비스가 아닌 모빌리티로 운영하는 서비스
    • High Volume For-Hire Vehicle Trip Records: 우버나 리프트같이 큰 단위의 회사를 모아놓은 모빌리티 서비스

데이터셋 확인

1
2
3
4
import pandas as pd

df = pd.read_csv('fhvhv_tripdata_2020-03.csv')
df.head(5)

image

컬럼 정보

image

  • hvfhs_license_num: 회사 면허 번호
  • dispatching_base_num: 지역 라이센스 번호
  • pickup_datetime: 승차 시간
  • dropoff_datetime: 하차 시간
  • PULocationID: 승차 지역 아이디
  • DOLocationID: 하차 지역 아이디
  • SR_Flag: 합승 여부

우버 트립 수

  • 모든 환경 설정이 끝났으니 ‘20년 3월 데이터를 이용해 운행 수를 세보고 패턴과 인사이트를 스파크를 이용해 분석해볼 것이다.

파이썬 소스 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# count-trips.py
from pyspark import SparkConf, SparkContext
import pandas as pd

# Spark 설정
# SparkConf를 이용해 Master/Local 설정을 하고, 앱 이름을 정해준다.
conf = SparkConf().setMaster("local").setAppName("uber-date-trips")
# conf라는 객체를 SparkContext를 초기화 하는 데 사용한다.
sc = SparkContext(conf=conf)


# 데이터 파싱
lines = sc.textFile("fhvhv_tripdata_2020-03.csv")
header = lines.first()
filtered_lines = lines.filter(lambda row:row != header) 

# 필요한 부분만 골라내서 세는 부분
# countByValue로 같은 날짜가 등장하는 부분을 센다.
dates = filtered_lines.map(lambda x: x.split(",")[2].split(" ")[0])
result = dates.countByValue()

# Spark코드가 아닌 일반적인 파이썬 코드
# CSV로 결과값 저장 
pd.Series(result, name="trips").to_csv("trips-date.csv")
  • 스파크 코드를 실행할 때, 일반적인 파이썬 프로그램을 실행하는 것과는 다르다.
1
1
$ spark-submit count-trips.py

트러블 슈팅

  • 위 명령어를 터미널에 입력하면 되는데, 두 가지 에러와 조우했다.

Spark Service ‘sparkDriver’ failed after 16 retries (on a random free port)!

  • 다음과 같은 에러가 발생했다.
1
Spark Service ‘sparkDriver’ failed after 16 retries (on a random free port)!
  • hosts 파일에 hostname을 터미널 상에서 실행했을 때 나오는 이름으로 추가하였다.
1
$ hostname
  • 아래 코드를 통해 hosts 파일을 열고,
1
$ sudo vi /etc/hosts
  • 위에서 출력된 이름을 다음과 같이 추가해준다.
  • 아이피와의 구분은 탭으로 한다.
1
2
127.0.0.1    HostName.local
127.0.0.1    localhost
참조

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext

  • 위 문제는 해결하였지만, 또 하나의 에러에 봉착했다.
1
Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext
  • doc에, 아래와 같은 문구가 있다.

Spark runs on Java 8/11, Scala 2.12, Python 3.6+ and R 3.5+. Python 3.6 support is deprecated as of Spark 3.2.0. Java 8 prior to version 8u201 support is deprecated as of Spark 3.2.0. For the Scala API, Spark 3.2.0 uses Scala 2.12. You will need to use a compatible Scala version (2.12.x).

  • JAVA(jdk)의 버전이 8 또는 11이어야 하는데 필자의 버전을 확인 해보니 16이었다.
  • 위에서 다운받은 jdk8로 변환하지 않아 생기는 간단한 문제였다.
1
2
3
4
5
6
1
2
3
4
5
6
$ java -version
'''
java version "16.0.2" 2021-07-20
Java(TM) SE Runtime Environment (build 16.0.2+7-67)
Java HotSpot(TM) 64-Bit Server VM (build 16.0.2+7-67, mixed mode, sharing)
'''
  • 설치되어있는 자바의 목록을 확인하고,
1
2
3
4
5
6
7
8
9
$ /usr/libexec/java_home -V
'''
16.0.2 (x86_64) "Oracle Corporation" - "Java SE 16.0.2" /Library/Java/JavaVirtualMachines/jdk-16.0.2.jdk/Contents/Home
12 (x86_64) "Oracle Corporation" - "Java SE 12" /Library/Java/JavaVirtualMachines/jdk-12.jdk/Contents/Home
12 (x86_64) "Oracle Corporation" - "OpenJDK 12" /Library/Java/JavaVirtualMachines/openJDK/Contents/Home
1.8.281.09 (x86_64) "Oracle Corporation" - "Java" /Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home
1.8.0_292 (x86_64) "AdoptOpenJDK" - "AdoptOpenJDK 8" /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home
1.8.0_281 (x86_64) "Oracle Corporation" - "Java SE 8" /Library/Java/JavaVirtualMachines/jdk1.8.0_281.jdk/Contents/Home
'''
  • 버전을 변경해준다.
1
$ export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
  • 다시 자바의 버전을 확인해보면 성공적으로 변경되어있다.
1
2
3
4
5
6
$ java -version
'''
java version "1.8.0_281"
Java(TM) SE Runtime Environment (build 1.8.0_281-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.281-b09, mixed mode)
'''
참조

  • 다시 돌아와서 명령어를 실행한다.
1
1
$ spark-submit count-trips.py
  • spark-submit은 클러스터 매니저에게 작업을 제출하는 코드이다.
  • 명령어를 입력하는 순간, 전체 파일이 드라이브 프로그램에서 클러스터 매니저에게 전달이 되고 연산을 진행한다.
  • 연산을 하는 도중 localhost:4040/jabs로 접속하게 되면, 아래 그림과 같이 현재 실행하는 스파크 잡이 전시되어 있는 것을 확인할 수 있다.

image

  • 연산이 마치면 아래와 같이 trips-data.csv가 생성된 것을 볼 수 있다.

image

  • 날짜와 트립 수를 결과값으로 볼 수 있다.

시각화

  • 위에서 만든 csv 파일을 이용하여 간단히 시각화 해본다.
1
2
3
4
5
6
import pandas as pd
import matplotlib.pyplot as plt

trips = pd.read_csv("trips-date.csv")
trips.plot()
plt.show()

image

  • 3월 5일 경 부터 급격히 트립 수가 하락하는 것을 볼 수 있는데, 이는 코로나로 인한 뉴욕의 락 다운 때문이라는 것을 확인할 수 있다.
  • 지금까지 간단한 사용을 다뤘고, 다음 포스팅 부터 제대로 스파크를 다룰 것이다.
0%