[Spark] 그룹화, 롤업, 큐브, 피벗 정리
[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.
도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 스칼라(scala)로 했습니다.
앞으로 스파크와 관련된 내용은 딥러닝 부분을 제외하고 스칼라로 진행될 예정입니다.
기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0
안의 하위 폴더 data
를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0
후, ./bin/spark-shell
명령어를 실행하시면 됩니다.
[Spark] 집계 연산, 함수, 그룹화, 롤업, 큐브 정리글에 이어서 진행됩니다.
초기 설정
val df = spark.read.format("csv"
).option("header", "true"
).option("inferSchema", "true"
).load("./data/retail-data/all/*.csv"
).coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")
3. 그룹화
- 데이터 그룹 기반의 집계는 단일 컬럼의 데이터를 그룹화하고 해당 그룹의 다른 여러 컬럼을 사용해서 계산하기 위해 카테고리형 데이터를 사용한다.
- 데이터 그룹 기반의 집계를 설명하는 데 가장 좋은 방법은 그룹화를 해보는 것이다.
- 그룹화 작업은 하나 이상의 컬럼을 그룹화하고 집계 연산을 수행하는 두 단계로 이뤄진다.
- 첫 번째 단계에서는
RelationalGroupedDataset
이 반환되고, 두 번째 단계에서는DataFrame
이 반환된다.
- 첫 번째 단계에서는
df.groupBy("InvoiceNo", "CustomerId").count().show(5)
-
실행결과
3.1 표현식을 이용한 그룹화
- agg 메서드는 여러 집계 처리를 한 번에 지정할 수 있으며, 집계에 표현식을 사용할 수 있다.
- 또한, 트랜스포메이션이 완료된 컬럼에 alias 메서드를 사용할 수 있다.
import org.apache.spark.sql.functions.count
df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
expr("count(Quantity)")
).show(5)
-
실행결과
3.2 맵을 이용한 그룹화
- 컬럼을 키로, 수행할 집계 함수의 문자열을 값으로 하는 맵(map) 타입을 사용해 트랜스포메이션을 정의할 수 있다.
df.groupBy("InvoiceNo").agg(
"Quantity"->"avg",
"Quantity"->"stddev_pop"
).show(5)
-
실행결과
4. 윈도우 함수
- 윈도우(window) 함수는 데이터의 특정 윈도우를 대상으로 고유의 집계 연산을 수행한다.
- 데이터의 윈도우는 현재 데이터에 대한 참조(reference)를 사용해 정의한다.
- 윈도우 명세(window specification)는 함수에 전달된 로우를 결정한다.
- 윈도우(window)는 로우 간 연산을 처리하기 위한 도구이다. 1
- 윈도우 함수는 프레임(frame)에 입력되는 모든 로우에 대해 결괏값을 계산한다.
- 프레임 : 로우 그룹 기반의 테이블을 의미한다.
- 스파크는 다음 3가지 종류의 윈도우 함수를 지원한다.
- 랭크 함수(ranking function)
- 분석 함수(analytic function)
- 집계 함수(aggregate function)
- 다음 그림은 로우가 어떻게 여러 프레임에 할당될 수 있는지 나타낸다.
- 예제를 위해 주문 일자(Invoice Date) 컬럼을 변환해 date 컬럼을 만든다.
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
: spark 2.0 에서 3.0으로 오면서 다음 예제와 같은 포맷은 지원하지 않는 에러가 발생한다. 따라서 이 코드를 삽입해줘야 한다. 2
import org.apache.spark.sql.functions.{col, to_date}
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
- 윈도우 함수를 정의하기 위해 첫 번째 단계롤 윈도우 명세를 만든다.
- partitionBy 메서드는 그룹을 어떻게 나눌지 결정하는 것과 유사한 개념이다.
- orderBy 메서드는 파티션의 정렬 방식을 정의한다.
- 프레임 명세(rowBetween 구문)는 입력된 로우의 참조를 기반으로 프레임에 로우가 포함될 수 있는지 결정한다.
Window.unboundedPreceding
: 파티션의 첫 번째 행을 나타내는 값 3Window.currentRow
: 현재 행을 나타내는 값 3- Window 함수에 대해 더 알고 싶다면 org.apache.spark.sql.expressions - Class Window 를 클릭한다.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec = Window.partitionBy("CustomerId", "date"
).orderBy(col("Quantity").desc
).rowsBetween(Window.unboundedPreceding, Window.currentRow)
- 이번에는 시간대별 최대 구매 개수를 구하는 예이다.
- 위 예제에서 사용한 집계 함수에 컬럼명이나 표현식을 전달한다.
- 이 함수를 적용할 데이터 프레임이 정의된 윈도우 명세도 함께 사용한다.
import org.apache.spark.sql.functions.max
val maxPurchaseQuantity = max(col("Quantity")
).over(windowSpec)
-
실행결괴
- dense_rank 함수를 사용해 모든 고객에 대해 최대 구매 수량을 가진 날짜가 언제인지 알아본다.
- 동일한 값이 나오거나 중복 로우가 발생해 순위가 비어 있을 수 있으므로 rank 함수 대신 dense_rank 함수를 사용한다.
import org.apache.spark.sql.functions.{dense_rank, rank}
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)
실행결과
- 위 예제를 통해 select 구문에서 사용할 수 있는 컬럼을 반환했다.
- 이제 select 메서드를 사용해 계산된 윈도우값을 확인해본다.
import org.apache.spark.sql.functions.col
dfWithDate.where("CustomerId IS NOT NULL"
).orderBy("CustomerId"
).select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")
).show()
-
실행결과
5. 그룹화 셋
- 그룹화 셋(grouping set)은 여러 그룹에 걸쳐 여러 집계를 결합하는 저수준 기능이다.
- 그룹화 셋을 이용하면 group-by 구문에서 원하는 형태로 집계를 생성할 수 있다.
- 그룹화 셋은 SQL에서만 사용할 수 있다.
val dfNoNull = dfWithDate.na.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
spark.sql("""
SELECT CustomerID, stockCode, sum(Quantity) FROM dfNoNULL
GROUP BY customerID, stockCode
ORDER BY CustomerId DESC, stockCode DESC
""").show(5)
-
실행결과
5.1 롤업
- 다양한 컬럼을 그룹화 키로 설정하면 그룹화 키로 설정된 조합뿐만 아니라 데이터셋에서 볼 수 있는 실제 조합을 모두 살펴볼 수 있다.
- 롤업(rollup)은 group-by 스타일의 다양한 연산을 수행할 수 있는 다차원 집계 기능이다.
- 그룹별 결과 뿐만 아니라 총 집계도 해준다.
- 다음 예제에서는 시간과 공간을 축으로 하는 롤업을 생성한다.
val rolledUpDF = dfNoNull.rollup("Date", "Country"
).agg(sum(col("Quantity"))
).selectExpr("date", "Country", "'sum(Quantity)' as total_quantity"
).orderBy("Date")
rolledUpDF.show(5)
5.2 큐브
- 큐브(cube)는 롤업을 고차원적으로 사용할 수 있게 해준다.
- 큐브는 요소들을 계층적으로 다루는 대신 모든 차원에 대해 동일한 작업을 수행한다.
- 큐브를 사용해 테이블에 있는 모든 정보를 빠르고 쉽게 조회할 수 있는 요약 정보 테이블을 만들 수 있다.
dfNoNull.cube("Date", "Country"
).agg(sum(col("Quantity"))
).selectExpr("date", "Country", "'sum(Quantity)' as total_quantity"
).orderBy("Date"
).show()
5.3 그룹화 메타데이터
- 큐브와 롤업을 사용하다 보면 집계 수준에 따라 쉽게 필터링하기 위해 집계 수준을 조회하는 경우가 발생하는 데 이때 grouping_id를 사용한다.
- grouping_id는 결과 데이터셋의 집계 수준을 명시하는 컬럼을 제공한다.
import org.apache.spark.sql.functions.{grouping_id, sum, expr}
dfNoNull.cube("customerId", "stockCode"
).agg(grouping_id(), sum("Quantity")
).orderBy(col("grouping_id()").desc
).show()
-
실행결과
5.4 피벗
- 피벗(pivot)을 사용해 로우를 컬럼으로 변환할 수 있다.
- USAA와 관련된 컬럼을 살펴보면
USA_sum(Quantity)
,USA_sum(UnitPrice)
,USA_sum(CustomerId)
가 있다.pivoted.printSchema()
코드를 실행해보면 된다.
- USAA와 관련된 컬럼을 살펴보면
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
pivoted.where("date > '2011-12-05'"
).select("date", "USA_sum(Quantity)"
).show()
-
실행결과
댓글남기기