3 분 소요

[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.

도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 스칼라(scala)로 했습니다.
앞으로 스파크와 관련된 내용은 딥러닝 부분을 제외하고 스칼라로 진행될 예정입니다.

기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0 안의 하위 폴더 data를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0 후, ./bin/spark-shell 명령어를 실행하시면 됩니다.



1. 조인 표현식

  • 스파크는 왼쪽과 오른쪽 데이터셋에 있는 하나 이상의 키값을 비교하고 왼쪽 데이터셋과 오른쪽 데이터셋의 결합 여부를 결정하는 조인 표현식(join expression)의 평가 결과에 따라 2개의 데이터셋을 조인한다.



2. 조인 타입

  • 조건 표현식은 두 로우의 조인 여부를 결정하는 반면, 조인 타입은 결과 데이터셋에 어떤 데이터가 있어야 하는지 결정한다.
  • 스파크에서 사용할 수 있는 조인 타입은 다음과 같다.
    • 내부 조인(inner join) : 왼쪽과 오른쪽 데이터셋에 키가 있는 로우를 유지
    • 외부 조인(outer join) : 왼쪽이나 오른쪽 데이터셋에 키가 있는 로우를 유지
    • 왼쪽 외부 조인(left outer join) : 왼쪽 데이터셋에 키가 있는 로우를 유지
    • 오른쪽 외부 조인(right outer join) : 오른쪽 데이터셋에 키가 있는 로우를 유지
    • 왼쪽 세미 조인(left semi join) : 왼쪽 데이터셋의 키가 일치하는 왼쪽 데이터셋만 유지
    • 왼쪽 안티 조인(left semi join) : 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 없는 경우에는 키가 일치하지 않는 왼쪽 데이터셋만 유지
    • 자연 조인(natural join) : 두 데이터셋에서 동일한 이름을 가진 컬럼을 암시적(implicit)으로 결합하는 조인을 수행
    • 교차 조인(cross join) : 왼쪽 데이터셋의 모든 로우와 오른쪽 데이터셋의 모든 로우르 조합


  • 우선 예제에서 사용할 몇 가지 간단한 데이터셋을 만든다.
val person = Seq(
    (0, "Bill Chambers", 0, Seq(100)),
    (1, "Matei Zaharia", 1, Seq(500, 250, 100)),
    (2, "Michael Armbrust", 1, Seq(250, 100))
).toDF("id", "name", "graduate_program", "spark_status")

val graduateProgram = Seq(
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "PH.D", "EECS", "UC Berkeley")
).toDF("id", "degree", "department", "school")

val sparkStatus = Seq(
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")
).toDF("id", "status")

person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")



2.1 내부 조인

  • 내부 조인(inner join)은 DataFrame이나 테이블에 존재하는 키를 평가한다.
  • 그리고 true로 평가되는 로우만 결합한다.
  • 내부 조인은 기본 조인 방식으로 JOIN 표현식에 왼쪽 DataFrame과 오른쪽 DataFrame을 지정하기만 하면 된다.
val joinExpression = person.col("graduate_program") === graduateProgram.col("id")

person.join(graduateProgram, joinExpression).show()


  • 실행결과

    Screenshot from 2022-09-06 15-41-15


  • join 메서드의 세 번째 파라미터(joinType)로 조인 타입을 명확하게 지정할 수도 있다.
var joinType = "inner"

person.join(graduateProgram, joinExpression, joinType).show()
  • 실행결과

    Screenshot from 2022-09-06 15-46-24



2.2 외부 조인

  • 외부 조인(outer join)은 DataFrame이나 테이블에 존재하는 키를 평가하여 true나 false으로 평가한 로우를 포함하여 조인한다.
  • 왼쪽이나 오른쪽 DataFrame에 일치하는 로우가 없다면 스파크는 해당 위치에 null을 삽입한다.
var joinType = "outer"

person.join(graduateProgram, joinExpression, joinType).show()
  • 실행결과

    Screenshot from 2022-09-06 15-50-52



2.3 왼쪽 외부 조인

  • 왼쪽 외부 조인(left outer join)은 DataFrame이나 테이블에 존재하는 키를 평가한다.
  • 그리고 왼쪽 DataFrame의 모든 로우와 왼쪽 DataFrame과 일치하는 오른쪽 DataFrame의 로우를 함께 포함한다.
var joinType = "left_outer"

graduateProgram.join(person, joinExpression, joinType).show()
  • 실행결과

    Screenshot from 2022-09-06 15-59-35



2.4 오른쪽 외부 조인

  • 오른쪽 외부 조인(right outer join)은 DataFrame이나 테이블에 존재하는 키를 평가한다.
  • 그리고 오른쪽 DataFrame의 모든 로우와 오른쪽 DataFrame과 일치하는 왼쪽 DataFrame의 로우를 함께 포함한다.
var joinType = "right_outer"

person.join(graduateProgram, joinExpression, joinType).show()
  • 실행결과

    Screenshot from 2022-09-06 16-11-27



2.5 왼쪽 세미 조인

  • 왼쪽 세미 조인(left semi join)은 기존 조인 기능과는 달리 DataFrame의 필터 정도로 볼 수 있다.
    • 두 번째 DataFrame은 값이 존재하는지 확인하기 위해 값만 비교하는 용도로 사용한다.
var joinType = "left_semi"

graduateProgram.join(person, joinExpression, joinType).show()
  • 실행결과

    Screenshot from 2022-09-06 16-15-51



2.6 왼쪽 안티 조인

  • 안티 조인(anti join)은 SQL의 NOT IN과 같은 스타일의 필터로 볼 수 있다.
    • 두 번째 DataFrame의 어떤 값도 포함하지 않고, 단지 값이 존재하는지 확인하기 위해 값만 비교하는 용도로 사용한다.
    • 하지만, 두 번째 DataFrame에 존재하는 값을 유지하는 대신 두 번째 DataFrame에서 관련된 키를 찾을 수 없는 로우만 결과에 포함한다.
var joinType = "left_anti"

graduateProgram.join(person, joinExpression, joinType).show()
  • 실행결과

    Screenshot from 2022-09-06 16-21-45



2.7 자연 조인

  • 자연 조인(natural join)은 조인하려는 컬럼을 암시적으로(implicitly)으로 추정한다.
  • 즉, 일치하는 컬럼을 찾고 그 결과를 반환한다.
  • 왼쪽과 오른쪽, 그리고 외부 자연 조인을 사용할 수 있다.



2.8 교차 조인

  • 교차 조인(cross join)은 조건절을 기술하지 않은 내부 조인을 의미한다.
  • 교차 조인은 왼쪽 DataFrame의 모든 로우를 오른쪽 DataFrame의 모든 로우와 결합한다.
person.crossJoin(graduateProgram).show()
  • 실행결과

    Screenshot from 2022-09-06 16-28-42



3. 스파크의 조인 수행 방식

  • 스파크가 조인을 수행하는 방식을 이해하기 위해서는 실행에 필요한 2가지 핵심 전략을 이해해야 한다.
    • 노드간 네트워크 통신 전략
    • 노드별 연산 전략


3.1 네트워크 통신 전략

  • 스파크는 조인 시 2가지 클러스터 통신 방식을 활용한다.
    • 전체 노드 간 통신을 유발하는 셔플 조인(shuffle join)과 그렇지 않은 브로드캐스트 조인(broadcast join)이다.
  • 이제부터는 사용자가 스파크에서 사용하는 테이블의 크기가 아주 크거나 아주 작다고 가정한다.

큰 테이블과 큰 테이블 조인

  • 하나의 큰 테이블을 다른 큰 테이블과 조인하면 Figure 8.1과 같이 셔플 조인이 발생한다.

Screenshot from 2022-09-06 16-41-39

  • 셔플 조인은 전체 노드 간 통신이 발생한다. 그리고 조인에 사용한 특정 키나 키 집합을 어떤 노드가 가졌는지에 따라 해당 노드와 데이터를 공유한다.
    • 이런 통신 방식 때문에 네트워크는 복잡해지고 많은 자원을 사용한다.
  • 즉, 전체 조인 프로세스가 진행되는 동안 데이터 파티셔닝 없이 모든 워커 노드에서 통신이 발생함을 의미한다.


큰 테이블과 작은 테이블 조인

  • 테이블이 단일 워커 노드의 메모리 크기에 적합할 정도로 충분히 작은 경우 조인 연산을 최적화할 수 있다.
  • 브로드캐스트 조인 방법은 작은 DataFrame을 클러스터의 전체 워커 노드에 복제하는 것을 의미한다.
    • 이렇게 하면 조인 프로세스 내내 전체 노드가 통신하는 현상을 방지할 수 있다.
  • 다음 Figure 8.2같이 시작 시 단 한 번만 복제가 수행되며 그 이후로는 개별 워커가 다른 워커 노드를 기다리거나 통신할 필요 없이 작업을 수행할 수 있다.

Screenshot from 2022-09-06 16-45-43


  • 브로드캐스트 조인은 이전 조인 방식과 마찬가지로 대규모 노드 간 통신이 발생한다.
  • 하지만 그 이후로는 노드 사이에 추가적인 통신이 발생하지 않는다.
  • 따라서 모든 단일 노드에서 개별적으로 조인이 수행되므로 CPU가 가장 큰 병목 구간이 된다.
  • 다음 코드에서 볼 수 있듯이 스파크가 자동으로 데이터셋을 브로드캐스트 조인으로 설정한 것을 알 수 있다.
val joinExpr = person.col("graduate_program") === graduateProgram.col("id")

person.join(graduateProgram, joinExpr).explain()
  • 실행결과

    Screenshot from 2022-09-06 16-49-10


아주 작은 테이블 사이의 조인

  • 아주 작은 테이블 사이의 조인을 할 때는 스파크가 조인을 결정하도록 내버려두는 것이 제일 좋다.





References

댓글남기기