[Spark] DataFrame, 스키마, sql, 계보에 대한 정의 및 예제
[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.
도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 파이썬으로 했습니다.
스칼라는 추후에 다루겠습니다.
기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0
안의 하위 폴더 data
를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0
후, ./bin/pyspark
명령어를 실행하시면 됩니다.
1. 개요
- 이전 글에서는 스파크의 전반적인 내용을 다뤘다.
- 해당 링크 : [Spark] 빅데이터와 아파치 스파크란
- 이번 글에서는 스파크 내부에서 어떤 일이 일어나는지 단계별로 살펴본다.
1.1 종합 예제
- 미국 교통통계국의 항공운항 데이터 중 일부를 스파크로 분석한 예제를 본다.
head data/flight-data/csv/2015-summary.csv
명령어를 통해 파일을 확인한다.-
실행결과
- 스파크는 다양한 데이터소스를 지원한다.
-
데이터는 SparkSession의 DataFrameReader 클래스를 사용해서 읽는다.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName('DataFrameReader') \ .master("local") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() # spark 초기화
SparkSession을 생성하면 스파크 코드를 실행할 수 있다. 그리고 모든 저수준 API, 기존 컨텍스트 그리고 관련 설정 정보에 접근할 수 있다.
SparkContext를 이용하면 RDD같은 스파크의 저수준 API를 사용할 수 있다.
모든 스파크 코드는 RDD 명령으로 컴파일된다.
- 이때 특정 파일 포맷과 몇 가지 옵션을 함께 설정한다.
.option("inferSchema", "true")
: 스파크 DataFrame의 스키마 정보를 알아내는 스키마 추론(schema inference) 기능을 사용한다.- 스키마(schema) : 컬럼과 컬럼의 타입을 정의한 목록 1
.option("header", "true")
: 파일의 첫 로우를 헤더로 지정하는 옵션이다.-
csv 파일 경로를 불러올 때 경로를 찾지 못하겠다면 다음 코드를 실행해본다.
import os print(os.getcwd())
flightData2015 = spark \
.read \
.option("inferSchema", "true") \
.option("header", "true") \
.csv("./spark-3.3.0/data/flight-data/csv/2015-summary.csv")
- 스칼라와 파이썬에서 사용하는 DataFrame은 불특정 다수의 로우와 컬럼을 가진다.
- 로우의 수를 알 수 없는 이유는 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메이션이기 때문이다.
- 스파크는 각 컬럼의 데이터 타입을 추론하기 위해 적은 양의 데이터를 읽는다.
- Figure 2.7은 DataFrame에서 CSV 파일을 읽어 로컬 배열이나 리스트 형태로 변환하는 과정이다.
- DataFrame의 take 액션을 호출하면 이전의 head 명령과 같은 결과를 확인할 수 있다.
-
실행결과
- 이제 트랜스포메이션을 추가로 지정한다. 정수 데이터 타입인 count 컬럼을 기준으로 데이터를 정렬한다.
- sort 메서드는 트랜스포메이션이기 때문에 데이터에 아무런 변화도 일어나지 않는다.
explain()
: DataFrame의 계보(lineage)나 스파크의 쿼리 실행 계획을 확인할 수 있다.- 스파크는 실행 계획을 만들고 검토하여 클러스터에서 처리할 방법을 알아낸다.
- 실행 계획(plan)은 위에서 아래 방향으로 읽으며 최종 결과는 가장 위에, 데이터소스는 가장 아래에 있다.
flightData2015.sort("count").explain()
-
실행결과
- 이제 트랜스포메이션 실행 계획을 시작하기 위해 액션을 호출한다.
- 스파크는 셔플 수행 시 기본적으로 200개의 셔플 파티션을 생성한다.
- 이 값을 5로 설정해 셔플의 출력 파티션 수를 줄인다.
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)
- 트랜스포메이션의 논리적 실행 계획은 DataFrame의 계보를 정의한다.
- 스파크는 계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있다.
계보는 스파크의 프로그래밍 모델인 함수형 프로그래밍의 핵심이다.
함수형 프로그래밍은 데이터의 변환 규칙이 일정한 경우 같은 입력에 대해 항상 같은 출력을 생성한다는 의미이다.
- 사용자는 물리적 데이터를 직접 다루지 않는다. 대신 앞서 설정한 셔플 파티션 파라미터와 같은 속성으로 물리적 실행 특성을 제어한다.
1.2 DataFrame과 SQL
- 스파크는 언어에 상관없이 같은 방식으로 트랜스포메이션을 실행할 수 있다.
- 사용자가 SQL이나 DataFrame으로 비즈니스 로직을 표현하면 스파크에서 실제 코드를 실행하기 전에 그 로직을 기본 실행 계획(plan)으로 컴파일한다.
- 스파크 SQL을 사용하면 모든 DataFrame을 테이블이나 뷰로 등록한 후 SQL 쿼리를 사용할 수 있다.
createOrReplaceTempView
메서드를 호출하면 모든 DataFrame을 테이블이나 뷰로 만들 수 있다.
flightData2015.createOrReplaceTempView("flight_data_2015")
- 이제 SQL로 데이터를 조회할 수 있다.
spark.sql
: 새로운 DataFrame을 반환하는 메서드로 SQL 쿼리를 실행한다.spark
는 SparkSession의 변수이다.
- sqlWay와 dataFrameWay 모두 동일한 기본 실행 계획으로 컴파일된다.
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
dataFrameWay = flightData2015 \
.groupBy("DEST_COUNTRY_NAME") \
.count()
sqlWay.explain()
dataFrameWay.explain()
-
실행결과
- 다음은 멀티 트랜스포메이션 쿼리에 대한 예제이다.
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()
-
실행결과
from pyspark.sql.functions import desc
flightData2015 \
.groupBy("DEST_COUNTRY_NAME") \
.sum("count") \
.withColumnRenamed("sum(count)", "destination_total") \
.sort(desc("destination_total")) \
.limit(5) \
.show()
-
실행결과
- DataFrame의 expalin 메서드로 확인해보면 총 7가지 단계가 있다.
- 실행 계획(plan)은 트랜스포메이션의 지향성 비순환 그래프(directed acyclic graph, DAG)이며 액션이 호출되면 결과를 만들어낸다.
- DAG의 각 단계는 불변성을 가진 신규 DataFrame을 생성한다.
flightData2015 \
.groupBy("DEST_COUNTRY_NAME") \
.sum("count") \
.withColumnRenamed("sum(count)", "destination_total") \
.sort(desc("destination_total")) \
.limit(5) \
.explain()
-
실행결과
댓글남기기