2 분 소요

[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.

도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 파이썬으로 했습니다.
스칼라는 추후에 다루겠습니다.

기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0 안의 하위 폴더 data를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0 후, ./bin/pyspark 명령어를 실행하시면 됩니다.



1. 개요

  • 이전 글에서는 스파크의 전반적인 내용을 다뤘다.
  • 이번 글에서는 스파크 내부에서 어떤 일이 일어나는지 단계별로 살펴본다.


1.1 종합 예제

  • 미국 교통통계국의 항공운항 데이터 중 일부를 스파크로 분석한 예제를 본다.
  • head data/flight-data/csv/2015-summary.csv 명령어를 통해 파일을 확인한다.
  • 실행결과

    image


  • 스파크는 다양한 데이터소스를 지원한다.
  • 데이터는 SparkSession의 DataFrameReader 클래스를 사용해서 읽는다.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
      .appName('DataFrameReader') \
      .master("local") \
      .config("spark.some.config.option", "some-value") \
      .getOrCreate() # spark 초기화
    

SparkSession을 생성하면 스파크 코드를 실행할 수 있다. 그리고 모든 저수준 API, 기존 컨텍스트 그리고 관련 설정 정보에 접근할 수 있다.
SparkContext를 이용하면 RDD같은 스파크의 저수준 API를 사용할 수 있다.
모든 스파크 코드는 RDD 명령으로 컴파일된다.

  • 이때 특정 파일 포맷과 몇 가지 옵션을 함께 설정한다.
    • .option("inferSchema", "true") : 스파크 DataFrame의 스키마 정보를 알아내는 스키마 추론(schema inference) 기능을 사용한다.
      • 스키마(schema) : 컬럼과 컬럼의 타입을 정의한 목록 1
    • .option("header", "true") : 파일의 첫 로우를 헤더로 지정하는 옵션이다.
    • csv 파일 경로를 불러올 때 경로를 찾지 못하겠다면 다음 코드를 실행해본다.

        import os
      
        print(os.getcwd())
      
flightData2015 = spark \
    .read \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .csv("./spark-3.3.0/data/flight-data/csv/2015-summary.csv")


  • 스칼라와 파이썬에서 사용하는 DataFrame은 불특정 다수의 로우와 컬럼을 가진다.
  • 로우의 수를 알 수 없는 이유는 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메이션이기 때문이다.
  • 스파크는 각 컬럼의 데이터 타입을 추론하기 위해 적은 양의 데이터를 읽는다.
  • Figure 2.7은 DataFrame에서 CSV 파일을 읽어 로컬 배열이나 리스트 형태로 변환하는 과정이다.

image

  • DataFrame의 take 액션을 호출하면 이전의 head 명령과 같은 결과를 확인할 수 있다.
  • 실행결과

    image


  • 이제 트랜스포메이션을 추가로 지정한다. 정수 데이터 타입인 count 컬럼을 기준으로 데이터를 정렬한다.
    • sort 메서드는 트랜스포메이션이기 때문에 데이터에 아무런 변화도 일어나지 않는다.
    • explain() : DataFrame의 계보(lineage)나 스파크의 쿼리 실행 계획을 확인할 수 있다.
      • 스파크는 실행 계획을 만들고 검토하여 클러스터에서 처리할 방법을 알아낸다.
    • 실행 계획(plan)은 위에서 아래 방향으로 읽으며 최종 결과는 가장 위에, 데이터소스는 가장 아래에 있다.

image

flightData2015.sort("count").explain()
  • 실행결과

    image


  • 이제 트랜스포메이션 실행 계획을 시작하기 위해 액션을 호출한다.
    • 스파크는 셔플 수행 시 기본적으로 200개의 셔플 파티션을 생성한다.
    • 이 값을 5로 설정해 셔플의 출력 파티션 수를 줄인다.

image

spark.conf.set("spark.sql.shuffle.partitions", "5")

flightData2015.sort("count").take(2)


  • 트랜스포메이션의 논리적 실행 계획은 DataFrame의 계보를 정의한다.
  • 스파크는 계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있다.

계보는 스파크의 프로그래밍 모델인 함수형 프로그래밍의 핵심이다.
함수형 프로그래밍은 데이터의 변환 규칙이 일정한 경우 같은 입력에 대해 항상 같은 출력을 생성한다는 의미이다.

  • 사용자는 물리적 데이터를 직접 다루지 않는다. 대신 앞서 설정한 셔플 파티션 파라미터와 같은 속성으로 물리적 실행 특성을 제어한다.



1.2 DataFrame과 SQL

  • 스파크는 언어에 상관없이 같은 방식으로 트랜스포메이션을 실행할 수 있다.
  • 사용자가 SQL이나 DataFrame으로 비즈니스 로직을 표현하면 스파크에서 실제 코드를 실행하기 전에 그 로직을 기본 실행 계획(plan)으로 컴파일한다.
  • 스파크 SQL을 사용하면 모든 DataFrame을 테이블이나 뷰로 등록한 후 SQL 쿼리를 사용할 수 있다.
    • createOrReplaceTempView 메서드를 호출하면 모든 DataFrame을 테이블이나 뷰로 만들 수 있다.
flightData2015.createOrReplaceTempView("flight_data_2015")
  • 이제 SQL로 데이터를 조회할 수 있다.
    • spark.sql : 새로운 DataFrame을 반환하는 메서드로 SQL 쿼리를 실행한다.
      • spark는 SparkSession의 변수이다.
    • sqlWay와 dataFrameWay 모두 동일한 기본 실행 계획으로 컴파일된다.
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015 \
    .groupBy("DEST_COUNTRY_NAME") \
    .count()

sqlWay.explain()
dataFrameWay.explain()
  • 실행결과

    image


  • 다음은 멀티 트랜스포메이션 쿼리에 대한 예제이다.
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()
  • 실행결과

    image


from pyspark.sql.functions import desc

flightData2015 \
    .groupBy("DEST_COUNTRY_NAME") \
    .sum("count") \
    .withColumnRenamed("sum(count)", "destination_total") \
    .sort(desc("destination_total")) \
    .limit(5) \
    .show()
  • 실행결과

    image


  • DataFrame의 expalin 메서드로 확인해보면 총 7가지 단계가 있다.
  • 실행 계획(plan)은 트랜스포메이션의 지향성 비순환 그래프(directed acyclic graph, DAG)이며 액션이 호출되면 결과를 만들어낸다.
    • DAG의 각 단계는 불변성을 가진 신규 DataFrame을 생성한다.

image

flightData2015 \
    .groupBy("DEST_COUNTRY_NAME") \
    .sum("count") \
    .withColumnRenamed("sum(count)", "destination_total") \
    .sort(desc("destination_total")) \
    .limit(5) \
    .explain()
  • 실행결과

    image





References

댓글남기기