4 분 소요

[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.

도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 SQL과 스칼라(scala)로 했습니다.

기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0 안의 하위 폴더 data를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0 후, ./bin/spark-shell 명령어를 실행하시면 됩니다.




1. 개요

  • 이번에는 스파크에서 코드를 실행할 때 어떤 일이 발생하는지 알아본다.
    • 스파크 애플리케이션의 아키텍처와 컴포넌트
    • 스파크 내/외부에서 실행되는 스파크 애플리케이션의 생애주기
    • 파이프라이닝과 같은 중요한 저수준 실행 속성
    • 스파크 애플리케이션을 실행하는 데 필요한 사항



2. 스파크 애플리케이션의 아키텍처

  • 스파크 애플리케이션과 관련된 고수준 컴포넌트는 다음과 같다.
  • 스파크 드라이버(driver)
    • 스파크 드라이버는 스파크 애플리케이션의 ‘운전자 역할’을 하는 프로세스이다.
    • 드라이버는 스파크 애플리케이션의 실행을 제어하고 스파크 클러스터의 모든 상태 정보를 유지한다.
    • 또한 물리적 컴퓨팅 자원 확보와 익스큐터 실행을 위해 클러스터 매니저와 통신할 수 있어야 한다.
  • 스파크 익스큐터(executors)
    • 스파크 익스큐터는 스파크 드라이버가 할당한 태스크를 수행하는 프로세스이다.
    • 익스큐터는 드라이버가 할당한 태스크를 받아 실행하고 태스크의 상태와 결과를 드라이버에 보고한다.
  • 클러스터 매니저
    • 클러스터 매니저는 스파크 애플리케이션을 실행할 클러스터 머신을 유지한다.
    • 클러스터 매니저는 드라이버(마스터)와 워커라는 개념을 가지고 있다.

image


  • 스파크 애플리케이션을 실제로 실행할 때가 되면 우리는 클러스터 매니저에 자원 할당을 요청한다.
  • 스파크 애플리케이션의 실행 과정에서 클러스터 매니저는 애플리케이션이 실행되는 머신을 관리한다.
  • 스파크가 지원하는 클러스터 매니저는 다음과 같다.
    • 스탠드얼론 클러스터 매니저(standalone cluster manager)
    • 아파치 메소스(Apache Mesos)
    • 하둡 얀(Hadoop YARN)


2.1 실행 모드

  • 실행 모드(mode)는 애플리케이션을 실행할 때 요청한 자원의 물리적인 위치를 결정한다.
  • 선택할 수 있는 실행 모드는 다음과 같다.
    • 클러스터 모드
    • 클라이언트 모드
    • 로컬 모드
  • 이어지는 그림들에서 실선으로 그려진 직사각형은 스파크 드라이버 프로세스를 나타내며, 점선으로 그려진 직사각형은 익스큐터 프로세스를 나타낸다.


클러스터 모드

image

  • 클러스터 모드를 사용하려면 컴파일된 JAR 파일이나 파이썬 스크립트를 클러스터 매니저에 전달해야 한다.
  • 클러스터 매니저는 파일을 받은 다음 워커 노드에 드라이버와 익스큐터 프로세스를 실행한다.
  • 클러스터 매니저는 모든 스파크 애플리케이션과 관련된 프로세스를 유지하는 역할을 한다.


클라이언트 모드

image

  • 클라이언트 모드는 애플리케이션을 제출한 클라이언트 머신에 스파크 드라이버가 위치한다.
  • 즉, 클라이언트 머신은 스파크 드라이버 프로세스를 유지하며 클러스터 매니저는 익스큐터 프로세스를 유지한다.


로컬 모드

  • 로컬 모드로 설정된 경우 모든 스파크 애플리케이션은 싱글 머신에서 실행된다.
  • 로컬 모드는 애플리케이션의 병렬 처리를 위해 싱글 머신의 스레드를 활용한다.
    • 이 모드는 스파크를 학습하거나 애플리케이션 테스트 그리고 개발 중인 애플리케이션을 반복적으로 실험하는 용도로 주로 사용된다.



3. 스파크 애플리케이션의 생애주기(스파크 외부)

  • spark-submit 명령을 사용해 애플리케이션을 실행하는 예제를 그림과 함께 설명해본다.
    • 하나의 드라이버 노드와 세 개의 워커 노드로 구성된 총 4대 규모의 클러스터가 이미 실행되고 있다고 가정한다.

그림에서 두꺼운 화살표 선은 스파크나 스파크 관련 프로세스가 수행하는 통신을 표현한다.
점선은 클러스터 매니저와의 통신 같은 일반적인 통신을 표현한다.


3.1 클라이언트 요청

  • 첫 단계는 스파크 애플리케이션을 제출하는 것이다.
    • 스파크 애플리케이션은 컴파일된 JAR나 라이브러리 파일을 의미한다.
  • 스파크 애플리케이션을 제출하는 시점에 로컬 머신에서 코드가 실행되어 클러스터 드라이버 노드에 요청한다.
    • 이 과정에서 스파크 드라이버 프로세스의 자원을 함께 요청한다.
  • 클러스터 매니저는 이 요청을 받아들이고 클러스터 노드 중 하나에 드라이버 프로세스를 실행한다.
  • 스파크 잡을 제출한 클라이언트 프로세스는 종료되고 애플리케이션은 클러스터에서 실행된다.

image


  • 스파크 애플리케이션을 제출하기 위해 터미널에서 다음과 같은 형태의 명령을 실행한다.
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode cluster \
--conf <key>=<value> \
... # 다른 옵션
<application-jar> \
[application-arguments]


3.2 시작

  • 드라이버 프로세스가 클러스터에 배치되었으므로 사용자 코드를 실행할 차례이다.
    • 사용자 코드에는 반드시 스파크 클러스터(ex. 드라이버와 익스큐터)를 초기화하는 SparkSession이 포함되어야 한다.
    • SparkSession은 클러스터 매니저와 통신해(검은색) 스파크 익스큐터 프로세스의 실행을 요청한다(빨간색, 노란색).

image

  • 클러스터 매니저는 익스큐터 프로세스를 시작하고 결과를 응답받아 익스큐터의 위치와 관련 정보를 드라이버 프로세스로 전송한다.
  • 모든 작업이 정상적으로 완료되면 ‘스파크 클러스터’가 완성된다.


3.3 실행

  • 드라이버와 워커는 코드를 실행하고 데이터를 이동하는 과정에서 서로 통신한다.

image

  • 드라이버는 각 워커에 태스크를 할당한다.
  • 태스크를 할당받은 워커는 태스크의 상태와 성공/실패 여부를 드라이버에 전송한다.


3.4 완료

  • 스파크 애플리케이션의 실행이 완료되면 드라이버 프로세스가 성공이나 실패 중 하나의 상태로 종료된다.

image

  • 그런 다음 클러스터 매니저는 드라이버가 속한 스파크 클러스터의 모든 익스큐터를 종료시킨다.
    • 이 시점에 스파크 애플리케이션의 성공/실패 여부를 클러스터 매니저에 요청해 확인할 수 있다.



4. 스파크 애플리케이션의 생애주기(스파크 내부)

  • 지금까지 클러스터 관점에서 스파크 애플리케이션의 생애주기를 알아보았다.
  • 그러나 애플리케이션을 실행하면 스파크 내부에서 어떤 일이 발생하는지 알아야 한다.
  • 여기서는 스파크 애플리케이션을 정의하는 실제 ‘사용자 코드’와 관련된 내용을 다룬다.
  • 스파크 애플리케이션은 하나 이상의 스파크 잡(job)으로 구성된다.


4.1 SparkSession

  • 모든 스파크 애플리케이션은 가장 먼저 SparkSession을 생성한다.
  • SparkSession의 빌더(builder) 메서드를 사용하면 스파크 애플리케이션에서 다수의 라이브러리가 세션을 생성하려는 상황에서 컨텍스트 충돌을 방지할 수 있다.
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Spark Example"
    ).config("spark.sql.warehouse.dir", "./spark-warehouse"
    ).getOrCreate()
  • SparkSession을 생성하면 스파크 코드를 실행할 수 있다.
  • SparkSession을 사용해 모든 저수준 API, 기존 컨텍스트 그리고 관련 설정 정보에 접근할 수 있다.


4.2 논리적 명령

  • 논리적 명령을 물리적 실행 계획으로 변환해본다.
  • 간단한 DataFrame을 이용해 파티션을 재분배하는 잡, 값을 트랜스포메이션하는 잡, 집계 및 최종 결과를 얻어내는 잡 이렇게 세 단계의 잡을 수행해본다.
    • collect 같은 액션을 호출하면 개별 스테이지와 태스크로 이루어진 스파크 잡이 실행된다.
// range 명령을 사용해 DataFrame을 생성하면 기본적으로 8개의 파티션을 생성한다.
val df1 = spark.range(2, 10000, 2)
val df2 = spark.range(2, 10000, 4)
// 이 단계에서는 데이터 셔플링으로 파티션 수를 변경한다.
val step1 = df1.repartition(5)
val step12 = df1.repartition(6) 
val step2 = step1.selectExpr("id * 5 as id")
// spark.sql.shuffle.partitions 속성의 기본값은 200이다.
// 따라서 스파크 잡이 실행되는 도중에 셔플을 수행하면 기본적으로 200개의 셔플 파티션을 생성한다.
val step3 = step2.join(step12, "id")
val step4 = step3.selectExpr("sum(id)")

step4.collect()

step4.explain()
  • 실행결과

    image


4.3 스테이지

  • 스파크의 스테이지(stage)는 다수의 머신에서 동일한 연산을 수행하는 태스크의 그룹을 나타낸다.
  • 스파크는 가능한 한 많은 태스크(잡의 트랜스포메이션)을 동일한 스테이지로 묶으려 노력한다.
  • 셔플 작업이 일어난 다음에는 반드시 새로운 스테이지를 시작한다.
    • 셔플(shuffle)은 데이터의 물리적 재분배(repartition) 과정이다.
    • 예를 들어, DataFrame 정렬이나 키별로 적재된 파일 데이터를 그룹화하는 작업과 같다.
  • 파티션을 재분배하는 과정은 데이터를 이동시키는 작업이므로 익스큐터 간의 조정이 필요하다.
  • 스파크는 셔플이 끝난 다음 새로운 스테이지를 시작하며 최종 결과를 계산하기 위해 스테이지 실행 순서를 계속 추적한다.


4.4 태스크

  • 스파크의 스테이지는 태스크(task)로 구성된다.
  • 각 태스크는 싱글 익스큐터에서 실행할 데이터의 블록과 다수의 트랜스포메이션 조합으로 볼 수 있다.
    • 태스크(task)는 데이터 단위(파티션)에 적용되는 연산 단위를 의미한다.



5. 세부 실행 과정

  • 스파크의 스테이지와 태스크는 중요한 특성을 가지고 있다.
  • 첫째, 스파크는 map 연산 후 다른 map 연산이 이어진다면 함께 실행할 수 있도록 스테이지와 태스크를 자동으로 연결한다.
  • 둘째, 스파크는 모든 셔플을 작업할 때 데이터를 안정적인 저장소에 저장하므로 여러 잡에서 재사용할 수 있다.


5.1 파이프라이닝

  • 스파크는 메모리나 디스크에 데이터를 쓰기 전에 최대한 많은 단계를 수행한다는 점은 스파크를 ‘인메모리 컴퓨팅 도구’로 만들어주는 핵심 요소 중 하나이다.
  • 스파크가 수행하는 주요 최적화 기법 중 하나는 RDD나 RDD보다 더 아래에서 발생하는 파이프라이닝 기법이다.
    • 파이프라이닝(pipelining) 기법은 노드 간의 데이터 이동 없이 각 노드가 데이터를 직접 공급할 수 있는 연산만 모아 태스크의 단일 스테이지로 만든다.
  • 파이프라인으로 구성된 연산 작업은 단계별로 메모리나 디스크에 중간 결과를 기록하는 방식보다 훨씬 더 처리 속도가 빠르다.


5.2 셔플 결과 저장

  • 두 번째 특성은 셔플 결과 저장(shuffle persistence)이다.
  • 스파크가 reduce-by-key 연산 같이 노드 간 복제를 유발하는 연산을 실행하면 엔진에서 파이프라이닝을 수행하지 못하므로 네트워크 셔플이 발생한다.
  • 노드 간 복제를 유발하는 연산은 각 키에 대한 입력 데이터를 먼저 여러 노드로부터 복사한다.
  • 그리고 소스 태스크의 스테이지가 실행되는 동안 셔플 파일을 로컬 디스크에 기록한다.
  • 그런 다음 그룹화나 리듀스를 수행하는 스테이지가 시작된다.
  • 이 스테이지에서는 셔플 파일에서 레코드를 읽어 들인 다음 연산을 수행한다.
  • 만약 잡이 실패한 경우 셔플 파일을 디스크에 저장했기 때문에 ‘소스’ 스테이지가 아닌 해당 스테이지부터 처리할 수 있다.





References

댓글남기기