3 분 소요

[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.

도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 스칼라(scala)로 했습니다.
앞으로 스파크와 관련된 내용은 딥러닝 부분을 제외하고 스칼라로 진행될 예정입니다.

기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0 안의 하위 폴더 data를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0 후, ./bin/spark-shell 명령어를 실행하시면 됩니다.

[Spark] 집계 연산, 함수, 그룹화, 롤업, 큐브 정리글에 이어서 진행됩니다.



초기 설정

val df = spark.read.format("csv"
    ).option("header", "true"
    ).option("inferSchema", "true"
    ).load("./data/retail-data/all/*.csv"
    ).coalesce(5)

df.cache()
df.createOrReplaceTempView("dfTable")

3. 그룹화

  • 데이터 그룹 기반의 집계는 단일 컬럼의 데이터를 그룹화하고 해당 그룹의 다른 여러 컬럼을 사용해서 계산하기 위해 카테고리형 데이터를 사용한다.
  • 데이터 그룹 기반의 집계를 설명하는 데 가장 좋은 방법은 그룹화를 해보는 것이다.
  • 그룹화 작업은 하나 이상의 컬럼을 그룹화하고 집계 연산을 수행하는 두 단계로 이뤄진다.
    • 첫 번째 단계에서는 RelationalGroupedDataset이 반환되고, 두 번째 단계에서는 DataFrame이 반환된다.
df.groupBy("InvoiceNo", "CustomerId").count().show(5)
  • 실행결과

    Screenshot from 2022-09-06 10-27-12


3.1 표현식을 이용한 그룹화

  • agg 메서드는 여러 집계 처리를 한 번에 지정할 수 있으며, 집계에 표현식을 사용할 수 있다.
  • 또한, 트랜스포메이션이 완료된 컬럼에 alias 메서드를 사용할 수 있다.
import org.apache.spark.sql.functions.count

df.groupBy("InvoiceNo").agg(
  count("Quantity").alias("quan"),
  expr("count(Quantity)")
).show(5)
  • 실행결과

    Screenshot from 2022-09-06 10-51-43


3.2 맵을 이용한 그룹화

  • 컬럼을 키로, 수행할 집계 함수의 문자열을 값으로 하는 맵(map) 타입을 사용해 트랜스포메이션을 정의할 수 있다.
df.groupBy("InvoiceNo").agg(
  "Quantity"->"avg",
  "Quantity"->"stddev_pop"
).show(5)
  • 실행결과

    Screenshot from 2022-09-06 10-56-03



4. 윈도우 함수

  • 윈도우(window) 함수는 데이터의 특정 윈도우를 대상으로 고유의 집계 연산을 수행한다.
    • 데이터의 윈도우는 현재 데이터에 대한 참조(reference)를 사용해 정의한다.
    • 윈도우 명세(window specification)는 함수에 전달된 로우를 결정한다.
    • 윈도우(window)는 로우 간 연산을 처리하기 위한 도구이다. 1
  • 윈도우 함수는 프레임(frame)에 입력되는 모든 로우에 대해 결괏값을 계산한다.
    • 프레임 : 로우 그룹 기반의 테이블을 의미한다.
  • 스파크는 다음 3가지 종류의 윈도우 함수를 지원한다.
    1. 랭크 함수(ranking function)
    2. 분석 함수(analytic function)
    3. 집계 함수(aggregate function)
  • 다음 그림은 로우가 어떻게 여러 프레임에 할당될 수 있는지 나타낸다.

Screenshot from 2022-09-06 11-01-58


  • 예제를 위해 주문 일자(Invoice Date) 컬럼을 변환해 date 컬럼을 만든다.
    • spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY") : spark 2.0 에서 3.0으로 오면서 다음 예제와 같은 포맷은 지원하지 않는 에러가 발생한다. 따라서 이 코드를 삽입해줘야 한다. 2
import org.apache.spark.sql.functions.{col, to_date}

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")


  • 윈도우 함수를 정의하기 위해 첫 번째 단계롤 윈도우 명세를 만든다.
    • partitionBy 메서드는 그룹을 어떻게 나눌지 결정하는 것과 유사한 개념이다.
    • orderBy 메서드는 파티션의 정렬 방식을 정의한다.
    • 프레임 명세(rowBetween 구문)는 입력된 로우의 참조를 기반으로 프레임에 로우가 포함될 수 있는지 결정한다.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col

val windowSpec = Window.partitionBy("CustomerId", "date"
  ).orderBy(col("Quantity").desc
  ).rowsBetween(Window.unboundedPreceding, Window.currentRow)


  • 이번에는 시간대별 최대 구매 개수를 구하는 예이다.
    • 위 예제에서 사용한 집계 함수에 컬럼명이나 표현식을 전달한다.
    • 이 함수를 적용할 데이터 프레임이 정의된 윈도우 명세도 함께 사용한다.
import org.apache.spark.sql.functions.max

val maxPurchaseQuantity = max(col("Quantity")
  ).over(windowSpec)
  • 실행결괴

    Screenshot from 2022-09-06 11-44-29


  • dense_rank 함수를 사용해 모든 고객에 대해 최대 구매 수량을 가진 날짜가 언제인지 알아본다.
    • 동일한 값이 나오거나 중복 로우가 발생해 순위가 비어 있을 수 있으므로 rank 함수 대신 dense_rank 함수를 사용한다.
import org.apache.spark.sql.functions.{dense_rank, rank}

val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)

실행결과

Screenshot from 2022-09-06 11-52-43


  • 위 예제를 통해 select 구문에서 사용할 수 있는 컬럼을 반환했다.
  • 이제 select 메서드를 사용해 계산된 윈도우값을 확인해본다.
import org.apache.spark.sql.functions.col

dfWithDate.where("CustomerId IS NOT NULL"
  ).orderBy("CustomerId"
  ).select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")
  ).show()
  • 실행결과

    Screenshot from 2022-09-06 12-09-13



5. 그룹화 셋

  • 그룹화 셋(grouping set)은 여러 그룹에 걸쳐 여러 집계를 결합하는 저수준 기능이다.
  • 그룹화 셋을 이용하면 group-by 구문에서 원하는 형태로 집계를 생성할 수 있다.
  • 그룹화 셋은 SQL에서만 사용할 수 있다.
val dfNoNull = dfWithDate.na.drop()

dfNoNull.createOrReplaceTempView("dfNoNull")


spark.sql("""
  SELECT CustomerID, stockCode, sum(Quantity) FROM dfNoNULL
  GROUP BY customerID, stockCode
  ORDER BY CustomerId DESC, stockCode DESC
""").show(5)
  • 실행결과

    Screenshot from 2022-09-06 12-27-06


5.1 롤업

  • 다양한 컬럼을 그룹화 키로 설정하면 그룹화 키로 설정된 조합뿐만 아니라 데이터셋에서 볼 수 있는 실제 조합을 모두 살펴볼 수 있다.
  • 롤업(rollup)은 group-by 스타일의 다양한 연산을 수행할 수 있는 다차원 집계 기능이다.
    • 그룹별 결과 뿐만 아니라 총 집계도 해준다.
  • 다음 예제에서는 시간과 공간을 축으로 하는 롤업을 생성한다.
val rolledUpDF = dfNoNull.rollup("Date", "Country"
  ).agg(sum(col("Quantity"))
  ).selectExpr("date", "Country", "'sum(Quantity)' as total_quantity"
  ).orderBy("Date")
rolledUpDF.show(5)


5.2 큐브

  • 큐브(cube)는 롤업을 고차원적으로 사용할 수 있게 해준다.
  • 큐브는 요소들을 계층적으로 다루는 대신 모든 차원에 대해 동일한 작업을 수행한다.
  • 큐브를 사용해 테이블에 있는 모든 정보를 빠르고 쉽게 조회할 수 있는 요약 정보 테이블을 만들 수 있다.
dfNoNull.cube("Date", "Country"
  ).agg(sum(col("Quantity"))
  ).selectExpr("date", "Country", "'sum(Quantity)' as total_quantity"
  ).orderBy("Date"
  ).show()


5.3 그룹화 메타데이터

  • 큐브와 롤업을 사용하다 보면 집계 수준에 따라 쉽게 필터링하기 위해 집계 수준을 조회하는 경우가 발생하는 데 이때 grouping_id를 사용한다.
    • grouping_id는 결과 데이터셋의 집계 수준을 명시하는 컬럼을 제공한다.
import org.apache.spark.sql.functions.{grouping_id, sum, expr}

dfNoNull.cube("customerId", "stockCode"
    ).agg(grouping_id(), sum("Quantity")
    ).orderBy(col("grouping_id()").desc
    ).show()
  • 실행결과

    Screenshot from 2022-09-06 12-42-55


5.4 피벗

  • 피벗(pivot)을 사용해 로우를 컬럼으로 변환할 수 있다.
    • USAA와 관련된 컬럼을 살펴보면 USA_sum(Quantity), USA_sum(UnitPrice), USA_sum(CustomerId)가 있다.
      • pivoted.printSchema() 코드를 실행해보면 된다.
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

pivoted.where("date > '2011-12-05'"
    ).select("date", "USA_sum(Quantity)"
    ).show()
  • 실행결과

    Screenshot from 2022-09-06 12-48-58





References

댓글남기기