- 이번 시간에는 Google Colab상에서 Spark를 실행하는 방법을 알아보려고 한다.
교재구매
- 블로그 내용은 해당 교재의 내용을 기반으로 한다.
- 사용될 데이터셋은 해당 교재의 깃허브 저장소에서 다운 받는다. 이번 시간에는 chapter 02장을 살펴볼 것이다.
- https://github.com/databricks/LearningSparkV2
- https://www.yes24.com/Product/Goods/21667835
구글 드라이브 연동
- 먼저 구글 드라이브를 연동해준다.
from google.colab import drive
drive.mount('/content/drive')
JAVA 설치
- JVM의 실행을 위해서 JAVA를 설치해준다.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
Spark 설치
- https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz를 웹에서 입력하여 파일을 다운받은 후, 본인의 구글 드라이브에 업로드 해준다.
- 업로드 한 폴더 경로를 복사하여 해당 경로로 이동해준다.
%cd [복사한 경로]
- tar 압축파일을 복사하여 /content로 옮겨준다.
!cp -r spark-3.1.1-bin-hadoop2.7.tgz /content
- content 폴더 아래로 이동한 후 옮겨준 압축파일의 내용을 추출합니다.
%cd /content/
!tar xf spark-3.1.1-bin-hadoop2.7.tgz > /dev/null
환경변수 설정
- 일반적으로는 vi 편집기를 열어 환경변수를 설정하지만 구글코랩에서는 os라이브러리를 사용하여 환경변수를 설정해줍니다.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"
- Spark를 설치합니다.
!pip install -q pyspark==3.1.1
Ngrok 설정
- 일반적인 방식은 아니지만 Ngrok에서 Token을 받아 구글 코랩상에서 Spark UI를 보여줄 수 있다.
- 사이트로 이동해 회원가입 후 Your Authtoken에서 토큰값을 카피해온 뒤 [Yourtokenkey]에 집어넣고 코드를 실행시켜준다.
- https://ngrok.com/
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip &> /dev/null
!unzip ngrok-stable-linux-amd64.zip &> /dev/null
!pip install pyngrok
!./ngrok authtoken [Yourtokenkey]
Spark 실행
- Colab에서 Spark를 활용해보자.
SparkSession 객체 생성
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mulCamp28').config('spark.ui.port', '4050').getOrCreate()
text파일 가져오기
strings = spark.read.text('/content/spark-3.1.1-bin-hadoop2.7/README.md')
print(strings)
print(type(strings))
- .filter로 text 파일에 'Spark'라는 글자가 얼마나 들어있는지 확인 가능하다.
filtered = strings.filter(strings.value.contains('Spark'))
print(filtered)
print(type(filtered))
print(filtered.count())
csv파일 가져오기
- github에서 다운받은 mnm_dataset 파일을 구글드라이브에 업로드 한 후 파일 경로를 가져온다.
mnm_file = '[파일 경로]'
- .read.format으로 csv파일을 mnm_df에 할당한 후 .show()로 df을 확인한다.
mnm_df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(mnm_file)
mnm_df.show(n=5, truncate=False) # pandas의 head()와 유사
- .select로 df에서 원하는 컬럼만 선택하여 살펴본다.
count_mnm_df = mnm_df.select("State", "Color", "Count").groupBy("State", "Color").sum("Count").orderBy("sum(Count)", ascending=False)
count_mnm_df.show(n=30, truncate=False)
Spark UI 확인
- 아래 코드 실행 시 url이 출력되는데 이를 클릭하면 Spark UI를 확인 가능하다.
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels
- 아래와 같이 UI를 통해 작업한 내용들을 확인할 수 있다.
Spark 종료
- 아래 코드로 Spark Session을 종료한다.
spark.stop()