3 분 소요

[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.

도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 SQL과 스칼라(scala)로 했습니다.

기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0 안의 하위 폴더 data를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0 후, ./bin/spark-shell 명령어를 실행하시면 됩니다.



1. Dataset

  • Dataset은 정형 API의 기본 데이터 타입이다.
    • DataFrame은 Row 타입의 Dataset이다.
    • Dataset은 스칼라와 자바에서만 사용할 수 있다.
  • Dataset을 사용해 데이터셋의 각 로우를 구성하는 객체를 정의한다.
  • 스칼라에서는 스키마가 정의된 케이스 클래스 객체를 사용해 Dataset을 정의한다.


  • 스파크는 StringType, BigIntType, StructType과 같은 다양한 데이터 타입을 제공한다.
  • 또한 스파크가 지원하는 다양한 언어의 String, Integer, 그리고 Double과 같은 데이터 타입을 스파크의 특정 데이터 타입으로 매핑할 수 있다.
  • 스칼라나 자바를 사용할 때 모든 DataFrame은 Row 타입의 Dataset을 의미하는데, 도메인별 특정 객체를 효과적으로 지원하기 위해 인코더(encoder)라 부르는 특수 개념이 필요하다.
    • 인코더(encoder)는 도베인별 특정 객체 T를 스파크의 내부 데이터 타입으로 매핑하는 시스템을 의미한다.


  • Dataset API를 사용한다면 스파크는 데이터셋에 접근할 때마다 Row 포맷이 아닌 사용자 정의 데이터 타입으로 변환한다.
    • 하지만 사용자 정의 데이터 타입을 사용하면 성능이 나빠지게 된다.



2. Dataset을 사용할 시기

  • 하지만 그럼에도 불구하고 Dataset을 사용해야 하는 이유가 있다.

    1. DataFrame 기능만으로는 수행할 연산을 표현할 수 없는 경우
    2. 성능 저하를 감수하더라도 타입 안정성(type-safe)을 가진 데이터 타입을 사용하고 싶은 경우
  • 단일 노드의 워크로드와 스파크 워크로드에서 전체 로우에 대한 다양한 트랜스포메이션을 재사용하려면 Dataset을 사용하는 것이 적합하다.
  • 케이스 클래스로 구현된 데이터 타입을 사용해 모든 데이터와 트랜스포메이션을 정의하면 재사용할 수 있다.



3. Dataset 생성

  • Dataset을 생성하는 것은 수동 작업이므로 정의할 스키마를 미리 알고 있어야 한다.
  • 스칼라에서 Dataset을 생성하려면 스칼라 case class 구문을 사용해 데이터 타입을 정의해야 한다.
  • 케이스 클래스(case class)는 다음과 같은 특징을 가진 정규 클래스(regular class)이다.
    • 불변성(Immutable)
      • 객체들이 언제 어디서 변경되었는지 추적할 필요가 없다.
    • 패턴 매칭으로 분해 가능
      • 패턴 매칭은 로직 분기를 단순화해 버그를 줄이고 가독성을 좋게 만든다.
    • 참조값 대신 클래스 구조를 기반으로 비교
    • 사용하기 쉽고 다루기 편함
case class Flight(DEST_COUNTRY_NAME: String,
                  ORIGIN_COUNTRY_NAME: String,
                  count: BigInt)

val flightsDF = spark.read.parquet("./data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]



4. 액션

  • Dataset과 DataFrame에 collect, take, count 같은 액션을 적용할 수 있다.
flights.show(2)
  • 실행결과

    image


5. 트랜스포메이션

  • DataFrame의 모든 트랜스포메이션은 Dataset에서 사용할 수 있다.

5.1 필터링

  • Flight 클래스를 파라미터로 사용해 불리언값을 반환하는 함수를 만들어본다.
    • 스칼라에서 함수는 다음과 같이 정의한다. 1

      def 함수명:타입 = 표현식
      
      def 함수명(매개변수:타입) = 표현식
      
def originIsDestination(flight_row: Flight): Boolean = {
  return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}


  • 위에서 정의한 함수를 filter 메서드에 적용해 각 행이 true를 반환하는지 평가하고 데이터셋을 필터링할 수 있다.
    • =>을 기준으로 왼쪽에는 매개변수 목록이고 오른쪽은 매개변수를 포함한 표현식이다. 2
flights.filter(flight_row => originIsDestination(flight_row)).first()
  • 실행결과

    image


5.2 매핑

  • 필터링은 단순한 트랜스포메이션이지만 때로는 특정 값을 다른 값으로 매핑(mapping)해야 한다.
  • 다음은 목적지 컬럼을 추출하는 예제이다.
val destinations = flights.map(f => f.DEST_COUNTRY_NAME)


  • 드라이버는 결괏값을 모아 문자열 타입의 배열로 반환한다.
val localDestinations = destinations.take(5)
  • 실행결과

    image



6. 조인

  • 조인은 DataFrame에서와 마찬가지로 Dataset에도 동일하게 적용된다.
  • 하지만 Dataset은 joinWith처럼 정교한 메서드를 제공한다.
  • joinWith 메서드는 co-group과 거의 유사하며 Dataset 안쪽에 다른 2개의 중첩된 Dataset으로 구성된다.
  • 각 컬럼은 단일 Dataset이므로 Dataset 객체를 컬럼처럼 다룰 수 있다.
  • 그러므로 조인 수행 시 더 많은 정보를 유지할 수 있으며 고급 맵이나 필터처럼 정교하게 데이터를 다룰 수 있다.
case class FlightMetadata(count: BigInt, randomData: BigInt)

// 스칼라에서 _ 의미는 모든 것을 포함하는 의미이다. 
val flightsMeta = spark.range(500
  ).map(x => (x, scala.util.Random.nextLong)
  ).withColumnRenamed("_1", "count"
  ).withColumnRenamed("_2", "randomData"
  ).as[FlightMetadata]

val flights2 = flights.joinWith(
  flightsMeta, flights.col("count") === flightsMeta.col("count")
)
  • 실행결과

    image


  • 최종적으로 로우는 Fight와 FlightMetadata로 이루어진 키값 형태의 Dataset을 반환한다.
  • Dataset이나 복합 데이터 타입의 DataFrame으로 데이터를 조회할 수 있다.
flights2.selectExpr("_1.DEST_COUNTRY_NAME")


  • 드라이버로 데이터를 모은 다음 결과를 반환한다.
flights2.take(2)
  • 실행결과

    image


  • 일반 조인 역시 잘 동작한다.
val flights2 = flights.join(flightsMeta, Seq("count"))



7. 그룹화와 집계

  • 그룹화와 집계는 동일한 기본 표준을 따른다.
  • 하지만 Dataset 대신 DataFrame을 반환하기 때문에 데이터 타입 정보를 잃게 된다.
flights.groupBy("DEST_COUNTRY_NAME").count()


  • 데이터 타입 정보를 유지할 수 있는 그룹화와 집계 방법이 있다.
  • 한 가지 예로 groupByKey 메서드는 Dataset의 특정 키를 기준으로 그룹화하고 형식화된 Dataset을 반환한다.
    • 하지만 이 함수는 컬럼명 대신 함수를 파라미터로 사용해야 한다.
flights.groupByKey(x => x.DEST_COUNTRY_NAME).count()
  • 실행결과(이와 비교)

    image


  • Dataset의 키를 이용해 그룹화를 수행한 다음 결과를 키-값 형태로 함수에 전달해 원시 객체 형태로 그룹화된 데이터를 다룰 수 있다.
def grpSum(countryName:String, values: Iterator[Flight]) = {
  values.dropWhile(_.count < 5).map(x => (countryName, x))
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).flatMapGroups(grpSum).show(5)
  • 실행결과

    image


def grpSum2(f:Flight):Integer = {
  1
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).mapValues(grpSum2).count().take(5)
  • 실행결과

    image


  • 다음 예제처럼 새로운 처리 방법을 생성해 그룹을 축소(reduce)하는 방법을 정의할 수 있다.
def sum2(left:Flight, right:Flight) = {
  Flight(left.DEST_COUNTRY_NAME, null, left.count + right.count)
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).reduceGroups((l, r) => sum2(l, r)).take(5)
  • 실행결과

    image





References

댓글남기기