데이터 분석가
article thumbnail
Published 2023. 11. 8. 19:02
Spark - 정형화 API Spark
  • 이번 시간에는 Spark의 정형화 API에 대해 다루어보고 그 과정에서 여러가지 문법들도 알아보려고 한다.

교재구매

 

러닝 스파크 - 예스24

창시자와 커미터가 직접 저술한 스파크 입문서로 스파크를 만든 사람들이 쓴 이 책은 데이터 과학자들이나 엔지니어들이 곧바로 스파크를 쓸 수 있게 해 줄 것이다. 이 책을 통해 병렬 작업들을

www.yes24.com

사용환경

  • Google Colab에서 Spark를 사용할 것이고, 이에 대한 내용은 이전 블로그 글을 참고하기 바란다.
  • https://ls-alt.tistory.com/45

데이터프레임 API

  • 포맷과 몇몇 연산이 Pandas의 영향을 받음.
  • 아래 코드를 통해 칼럼과 스키마를 가진 분산 인메모리 테이블을 Spark상에서 만들 수 있으며, df.show()를 통해 보면 하나의 표처럼 보인다.
from pyspark.sql.functions import avg
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mulCamp28').config('spark.ui.port', '4050').getOrCreate()

# 데이터 프레임 생성
data_df = spark.createDataFrame([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)], ['name', 'age'])
print(data_df)
print(type(data_df))

data_df.show()

  • 아래 코드로 Spark dataframe api에서의 그룹화와 집계함수를 활용할 수 있다.
# 이름으로 그룹화 하여 평균 나이 계산
avg_df = data_df.groupby('name').agg(avg('age'))
avg_df.show()

스키마 정의

  • 스키마는 칼럼이름과 연관된 데이터 타입을 정의한 것을 말한다. 데이터가 방대한 파일을 읽어와야 한다면 미리 스키마를 지정해두고 파일을 읽어오는 것이 다음의 이유때문에 장점이 될 수 있다.
    • Spark가 스키마를 추정하기 위한 비용과 시간을 줄여준다.
    • 데이터가 스키마와 맞지 않는 경우 조기에 문제를 발견가능하다.
  • 아래에서 스키마를 정의하는 방법을 두가지로 나누어 알아보고자 한다.

프로그래밍 스타일

  • Spark 데이터 프레임 API를 활용하는 방식
from pyspark.sql import SparkSession
from pyspark.sql.types import *  # * : types 클래스 내부의 메서드를 모두 호출하겠다!
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('mulCamp28').config('spark.ui.port', '4050').getOrCreate()

schema = StructType([
   StructField("Id", IntegerType(), False), # SQL에서 테이블 생성
   StructField("First", StringType(), False),
   StructField("Last", StringType(), False),
   StructField("Url", StringType(), False),
   StructField("Published", StringType(), False),
   StructField("Hits", IntegerType(), False),
   StructField("Campaigns", ArrayType(StringType()), False)])

# create our data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
       [2, "Brooke","Wenig","https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3","6/7/2019",7659, ["web", "twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das","https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
      ]

blogs_df = spark.createDataFrame(data, schema)
blogs_df.show()

  • 아래 코드로 지정한 스키마를 확인해본다.
blogs_df.printSchema()

DDL

  • SQL에서 쓰이는 데이터 정의어(DDL)를 활용하는 방식
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaings` ARRAY<STRING>"

# create our data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
       [2, "Brooke","Wenig","https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3","6/7/2019",7659, ["web", "twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das","https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
      ]

blogs_df = spark.createDataFrame(data, schema)
blogs_df.show()

여러가지 문법표현

  • 세가지 표현이 모두 같은 결과를 도출한다. 이때 사용된 expr 함수는 아래와 같은 기능을 가진다.
    • Spark 내부에서 SQL문법 사용가능
    • 산술 연산 가능
    • 타입 변환 가능
    • 조건문 자리에 사용되면 SQL 비교연산자를 사용가능(아래의 withColumn코드에서 확인)
blogs_df.select(expr('Hits') * 2).show(2)
blogs_df.select(col('Hits') * 2).show(2)
blogs_df.select(expr('Hits * 2')).show(2)

  • withColumn으로 조건절을 사용 가능하다.
blogs_df.withColumn("Big Hitters", (expr("Hits > 10000"))).show(5)

  • 컬럼간의 연산을 수행한다.
blogs_df.select(col("Id"), col("Hits"), expr("Hits") + expr("Id")).show()

blogs_df\
  .withColumn("AuthorsId", (concat(expr("First"), expr("Last"), expr("Id"))))\
  .select(expr("AuthorsId"))\
  .show()

  • 정렬을 수행한다.
blogs_df.sort(col("Id").desc()).show()

  • Spark에서 하나의 행은 하나 이상의 칼럼을 갖는 로우객체로 표현되고, 각 컬럼에 대해 인덱스로 접근하게 된다.
from pyspark.sql import Row
blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015", ["twitter", "LinkedIn"])
blog_row[2]

profile

데이터 분석가

@이꾹꾹

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!