[Spark] 저수준 API: RDD 이해하기
[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.
도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 SQL과 스칼라(scala)로 했습니다.
기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0
안의 하위 폴더 data
를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0
후, ./bin/spark-shell
명령어를 실행하시면 됩니다.
1. 저수준 API란
- 스파크에는 2종류의 저수준 API가 있다.
- 하나는 분산 데이터 처리를 위한 RDD이고, 다른 하나는 브로드캐스트(broadcast) 변수와 accumulator처럼 분산형 공유 변수(distributed shared variables)를 배포하고 다루기 위한 API이다.
1.2 저수준 API는 언제 사용할까
- 다음과 같은 상황에서 저수준 API를 사용한다.
- 고수준 API에서 제공하지 않는 기능이 필요한 경우
- ex) 클러스터의 물리적 데이터의 배치를 아주 세미할게 제어해야 하는 상황
- RDD를 사용해 개발된 기존 코드를 유지해야 하는 경우
- 사용자가 정의한 공유 변수를 다뤄야 하는 경우
- 스파크의 모든 워크로드는 저수준 기능을 사용하는 형태로 컴파일되므로 이를 이해하는 것은 많은 도움이 될 수 있다.
- DataFrame 트랜스포메이션을 호출하면 실제로 다수의 RDD 트랜스포메이션으로 변환된다.
1.2 저수준 API는 어떻게 사용할까
SparkContext
는 저수준 API 기능을 사용하기 위한 진입 지점이다.- 스파크 클러스터에서 연산을 수행하는 데 필요한 도구인 SparkSession을 이용해 SparkContext에 접근할 수 있다.
spark.sparkContext
2. RDD 개요
- 사용자가 실행한 모든 DataFrame이나 Dataset 코드는 RDD로 컴파일되기 때문에 적어도 RDD가 무엇인지, 어떻게 사용하는지 기본적으로 이해하고 있어야 한다.
- RDD는 불변성(immutable)을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드(record)의 모음이다.
- DataFrame의 각 레코드는 스키마를 알고 있는 필드로 구성된 구조화된 로우인 반면, RDD의 레코드는 그저 프로그래머가 선택하는 자바, 스칼라, 파이썬의 객체일 뿐이다.
- RDD의 모든 레코드는 자바나 파이썬의 객체이므로 완벽하게 제어할 수 있다. 하지만, 모든 값을 다루거나, 값 사이의 상호작용 과정을 반드시 수동으로 정의해야 한다.
- 정형 API는 자동으로 데이터를 최적화하고 압축된 바이너리 포맷으로 저장한다.
- 반면, 저수준 API에서 동일한 공간 효율성과 성능을 얻으려면 객체에 이런 포맷 타입을 구현해 모든 저수준 연산 과정에서 사용해야 한다.
2.1 RDD 유형
- RDD는 DataFrame API에서 최적화된 물리적 실행 계획을 만드는 데 대부분 사용된다.
- 사용자는 2가지 타입의 RDD를 만들 수 있다.
- 하나는 제네릭(generic) RDD 타입이고, 다른 하나는 키 기반의 집계가 가능한 키-값 RDD이다.
- 또한 내부적으로 각 RDD는 다음 5가지 주요 속성으로 구분된다.
- 파티션의 목록
- 각 조각을 연산하는 함수
- 다른 RDD와의 의존성 목록
- 부가적으로(optionally) 키-값 RDD를 위한 파티셔너(partitioner)
- ex) RDD는 해시 파티셔닝되어 있다고 말할 수 있다.
- 부가적으로 각 조각(split)을 연산하기 위한 기본 위치 목록
- ex) 하둡 분산 파일 시스템 파일의 블록 위치
- 위와 같은 속성은 사용자 프로그램을 스케줄링하고 실행하는 스파크의 모든 처리 방식을 결정한다.
- 각 RDD 유형은 각 속성에 대한 구현체를 가지고 있다.
- 사용자는 각 속성을 구현하여 새로운 데이터소스를 정의할 수 있다.
- 또한 RDD 역시 분산 환경에서 데이터를 다루는 데 필요한 지연(lazily) 처리 방식의 트랜스포메이션(transformation)과 즉시(eagarly) 실행 방식의 액션(action)을 제공한다.
2.2 RDD는 언제 사용할까
- RDD는 많은 강점이 있지만 정형 API가 제공하는 여러 최적화 기법을 사용할 수 없다.
- DataFrame은 RDD보다 더 효율적이고 안정적이며 표현력이 좋다.
- 물리적으로 분산된 데이터에 세부적인 제어가 필요할 때 RDD를 사용하는 것이 가장 적합하다.
3. RDD 생성하기
3.1 DataFrame, Dataset으로 RDD 생성하기
- RDD를 얻을 수 있는 가장 쉬운 방법은 기존에 사용하던 DataFrame이나 Dataset을 이용하는 것이다.
- rdd 메서드를 호출하면 쉽게 RDD로 변환할 수 있다.
- rdd 메서드 : Row 타입을 가진 RDD를 생성한다.
- Row 타입 : 스파크가 정형 API에서 데이터를 표현하는 데 사용하는 내부 카탈리스트 포맷이다.
// 스칼라 코드: Dataset[Long]을 RDD[Long]으로 변환
spark.range(10).rdd
- RDD를 사용해 DataFrame이나 Dataset을 생성할 때는 toDF 메서드를 호출하면 된다.
spark.range(10).rdd.toDF()
3.2 로컬 컬렉션으로 RDD 생성하기
- 컬렉션 객체를 RDD로 만들려면 (SparkSession 안에 있는) sparkContext의
parallelize
메서드를 호출해야 한다.- parallelize 메서드는 단일 노드에 있는 컬렉션을 병렬 컬렉션으로 전환한다.
- 또한 파티션 수를 명시적으로 지정할 수 있다.
- parallelize 메서드는 단일 노드에 있는 컬렉션을 병렬 컬렉션으로 전환한다.
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
val words = spark.sparkContext.parallelize(myCollection, 2)
-
실행결과
- RDD에 이름을 지정하면 스파크 UI에 지정한 이름으로 RDD가 표시된다.
words.setName("myWords")
words.name
-
실행결과
3.3 데이터소스로 RDD 생성하기
- RDD는 주로 RDD 간의 종속성(dependency) 구조와 파티션 목록을 정의한다.
- DataSource API는 데이터를 읽는 가장 좋은 방법이다.
- 또한 sparkContext를 사용해 데이터를 RDD로 읽을 수 있다.
- 다음은 라인(line) 단위로 텍스트 파일을 읽는 예제이다.
- 여러 텍스트 파일의 각 라인을 레코드로 가진 RDD를 생성한다.
spark.sparkContext.textFile("./some/path/withTextFiles")
- 다음은 텍스트 파일 하나를 레코드로 읽어야 할 경우 쓰는 예제이다.
- 생성된 RDD에서 파일명은 첫 번째 객체인 RDD의 키가 되며, 텍스트 파일의 두 번째 문자열 객체인 RDD의 값이 된다.
spark.sparkContext.wholeTextFiles("./some/path/withTextFiles")
4. 트랜스포메이션
- 대부분의 RDD 트랜스포메이션은 정형 API에서 사용할 수 있는 기능을 가지고 있다.
- DataFrame이나 Dataset과 동일하게 RDD에 트랜스포메이션을 지정해 새로운 RDD를 생성할 수 있다.
- 이때 RDD에 포함된 데이터를 다루는 함수에 따라 다른 RDD에 대한 종속성도 함께 정의한다.
4.1 distinct
- RDD의 distinct 메서드를 호출하면 RDD에서 중복된 데이터를 제거한다.
words.distinct().count()
-
실행결과
4.2 filter
- 필터링은 SQL의 where 조건절을 생성하는 것과 비슷하다.
- RDD의 레코드를 모두 확인하고 조건 함수를 만족하는 레코드만 반환한다.
- 모든 로우는 어떤 경우라도 입력값을 가지고 있어야 한다.
=>
는 함수 lambda를 의미한다.
def startsWithS(individual:String) = {
individual.startsWith("S")
}
words.filter(word => startsWithS(word)).collect()
-
실행결과
4.3 map
- map 메서드는 주어진 입력을 원하는 값으로 반환하는 함수를 명시하고 레코드별로 적용한다.
- 예제에서는 현재 단어를 ‘단어’, ‘단어의 시작 문자’, ‘첫 문자가 S인지 아닌지’ 순서로 매핑한다.
- _3 : 튜플의 3번째 반환값인 불리언값으로 필터링한다. 1
val words2 = words.map(word => (word, word(0), word.startsWith("S")))
words2.filter(record => record._3).take(5)
-
실행결과
flatMap
- flatMap 메서드는 단일 로우를 여러 로우로 변환해주는 map 함수의 확장 버전이다.
- ex)
flatMap
메서드를 사용해 단어(word)를 문자(character) 집합으로 변환할 수 있다.- 각 단어는 여러 문자로 구성되어 있으므로
flatMap
메서드를 사용해 다수의 로우로 변환할 수 있다.
- 각 단어는 여러 문자로 구성되어 있으므로
flatMap
은 확장 가능한map
함수의 출력을 반복 처리할수 있는 형태로 반환한다.
words.flatMap(word => word.toSeq).take(5)
-
실행결과
4.4 sortBy
- RDD를 정렬하려면 sortBy 메서드를 사용한다.
- 함수를 지정해 RDD의 데이터 객체에서 값을 추출한 다음 값을 기준으로 정렬한다.
- 다음 예제는 단어 길이가 가장 긴 것부터 짧은 순으로 정렬한다.
words.sortBy(word => word.length() * -1).take(2)
-
실행결과
4.5 randomSplit
- randomSplit 메서드는 RDD를 임의로 분할해 RDD 배열을 만들 때 사용하며, 가중치와 난수 시드(random seed)로 구성된 배열을 파라미터로 사용한다.
val fiftyFiftySplit = words.randomSplit(Array[Double](0.5, 0.5))
5. 액션
- 지정된 트랜스포메이션 연산을 시작하려면 액션을 사용한다.
- 액션(action)은 데이터를 드라이버로 모으거나 외부 데이터소스로 내보낼 수 있다.
5.1 reduce
- reduce 메서드를 사용해 RDD의 모든 값을 하나의 값으로 만든다.
- a to b는 a부터 b까지의 이터레이터를 반환한다. 2
spark.sparkContext.parallelize(1 to 20).reduce(_ + _)
-
실행결과
- 단어 집합에서 가장 긴 단어를 찾는 예제는
reduce
메서드를 사용해 처리할 수 있다.
def wordLengthReducer(leftWord:String, rightWord:String): String = {
if (leftWord.length > rightWord.length)
return leftWord
else
return rightWord
}
words.reduce(wordLengthReducer)
-
실행결과
5.2 count
- count 함수를 사용해 RDD의 전체 로우 수를 알 수 있다.
words.count()
-
실행결과
countApprox
- countApprox 함수는 count 함수의 근사치를 제한 시간 내에 계산한다.
- 제한 시간을 초과하면 불완전한 결과를 반환할 수 있다.
- 신뢰도(confidence)는 실제로 연산한 결과와의 오차율을 의미한다.
val confidence = 0.95
val timeoutMilliseconds = 400
words.countApprox(timeoutMilliseconds, confidence)
-
실행결과
5.3 first
- first 메서드는 데이터셋의 첫 번째 값을 반환한다.
words.first()
5.4 max와 min
- max와 min 메서드는 각각 최댓값과 최솟값을 반환한다.
spark.sparkContext.parallelize(1 to 20).max()
spark.sparkContext.parallelize(1 to 20).min()
-
실행결과
5.5 take
- take와 이것의 파생 메서드는 RDD에서 가져올 값의 개수를 파라미터로 사용한다.
- 이 메서드는 먼저 하나의 파티션을 스캔한다.
- 그 다음에 해당 파티션의 결과 수를 이용해 파라미터로 지정된 값을 만족하는 데 필요한 추가 파티션 수를 예측한다.
- top 함수는 암시적(implicit) 순서에 따라 최상위값을 선택한다.
- takeOrdered 함수는 top 함수와 반대되는 개념이다.
words.take(5)
words.takeOrdered(5)
words.top(5)
-
실행결과
6. 파일 저장하기
- 파일 저장은 데이터 처리 결과를 일반 텍스트 파일로 쓰는 것을 의미한다.
- RDD를 사용하면 일반적인 의미의 데이터소스에 저장할 수 없다.
- 각 파티션의 내용을 저장하려면 전체 파티션을 순회하면서 외부 데이터베이스에 저장해야 한다.
- 스파크는 각 파티션의 데이터를 파일로 저장한다.
6.1 saveAsTextFile
- 텍스트 파일로 저장하려면 경로를 지정해야 한다.
words.saveAsTextFile("file:/tmp/bookTitle")
- 압축 코덱을 설정하려면 하둡에서 사용 가능한 코덱을 import 해야 한다.
import org.apache.hadoop.io.compress.BZip2Codec
words.saveAsTextFile("file:/tmp/bookTitleCompressed", classOf[BZip2Codec])
6.2 시퀀스 파일
- 시퀀스 파일(sequenceFile)은 바이너리 키-값 쌍으로 구성된 플랫 파일이며, 맵리듀스의 입출력 포맷으로 널리 사용된다.
words.saveAsObjectFile("./tmp/my/sequenceFilePath")
7. 캐싱
- RDD를 캐시하거나(cache) 저장(persist)할 수 있다.
- 기본적으로 캐시와 저장은 메모리에 있는 데이터만을 대상으로 한다.
- setName 함수를 사용하면 캐시된 RDD에 이름을 저장할 수 있다.
words.cache()
8. 체크포인팅
- 체크포인팅(checkpointing)은 RDD를 디스크에 저장하는 방식이다.
- 나중에 저장된 RDD를 참조할 때는 원본 데이터소스를 다시 계산해 RDD를 생성하지 않고 디스크에 저장된 중간 결과 파티션을 참조한다.
- 이 기능은 반복적인 연산 수행 시 매우 유용하다.
spark.sparkContext.setCheckpointDir("./some/path/for/checkpointing")
댓글남기기