[Spark] 정형 API 기본 연산에 대해 알아보기
[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.
도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 스칼라(scala)로 했습니다.
앞으로 스파크와 관련된 내용은 딥러닝 부분을 제외하고 스칼라로 진행될 예정입니다.
기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0
안의 하위 폴더 data
를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0
후, ./bin/spark-shell
명령어를 실행하시면 됩니다.
- 스파크 실행은 다음과 같습니다.
$ cd spark-3.3.0/
$ ./bin/spark-shell
1. 정형 API 기본 연산
- 이번 글에서는 DataFrame과 DataFrame의 데이터를 다루는 기능을 다뤄본다.
- DataFrame은 ROw 타입의 레코드(record)와 각 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼(column)으로 구성된다.
- 스키마(schema)는 각 컬럼명과 데이터 타입을 정의한다.
- DataFrame의 파티셔닝(partitioning)은 DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의한다.
- 파티셔닝 스키마(partitioning schema)는 파티션을 배치하는 방법을 정의한다.
- 파티셔닝의 분할 기준은 특정 컬럼이나 비결정론적 값을 기반으로 설정할 수 있다.
- 비결정론적(nondeterministically) : 매번 변하는 정도
val df = spark.read.format("json"
).load("./data/flight-data/json/2015-summary.json")
2. 스키마
- 스키마는 DataFrame의 컬럼명과 데이터 타입을 정의한다. 데이터소스에서 스키마를 얻거나 직접 정의할 수 있다.
- 스키마는 여러 개의
StructField
타입 필드로 구성된StructType
객체이다.-
StructField는 이름, 데이터 타입, 컬럼의 값이 없거나 null일 수 있는지 지정하는 boolean 값이다. 1
public StructField(String name, DataType dataType, boolean nullable, Metadata metadata)
- 필요한 경우 컬럼과 관련된 메타데이터를 지정할 수도 있다.
- 메타데이터 : 해당 컬럼과 관련된 정보
-
spark.read.format("json").load("./data/flight-data/json/2015-summary.json").schema
-
실행결과
- 스키마는 복합 데이터 타입인
StructType
을 가질 수 있다.- 스파크는 런타임에 데이터 타입이 스키마의 데이터 타입과 일치하지 않으면 오류를 발생시킨다.
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata
val myManualSchema = StructType(Array(
StructField("DEST_COUNTRY_NAME", StringType, true),
StructField("ORIGIN_COUNTRY_NAME", StringType, true),
StructField("count", LongType, false,
Metadata.fromJson("{\"hello\":\"world\"}"))
))
val df = spark.read.format("json").schema(myManualSchema
).load("./data/flight-data/json/2015-summary.json")
-
실행결과
3. 컬럼과 표현식
- 사용자는 표현식으로 DataFrame의 컬럼을 선택, 조작, 제거할 수 있다.
- 표현식(expression) : 값을 반환하는 식 또는 코드 2
- 컬럼 내용을 수정하려면 반드시 DataFrame의 스파크 트랜스포메이션을 사용해야 한다.
3.1 컬럼
- 컬럼을 생성하고 참조할 때 col 함수나 column 함수를 사용하는 것이 가장 간단하다.
- 이들 함수는 컬럼명을 인수로 받는다.
import org.apache.spark.sql.functions.{col, column}
col("someColumnName")
column("someColumnName")
-
실행결과
$
을 사용하면 컬럼을 참조하는 특수한 문자열 표기를 만들 수 있다.'
은 심벌이라고도 불리는데, 이는 특정 식별자를 참조할 때 사용하는 스칼라 고유의 기능이다.- 위 2개는 모두 컬럼명으로 컬럼을 참조한다.
$"myColumn"
'myColumn
-
실행결과
명시적 컬럼 참조
- DataFrame의 컬럼은 col 메서드로 참조한다.
- col 메서드는 조인(join) 시 유용하다.
3.2 표현식
- 표현식(Expression)은 DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미한다.
- 여러 컬럼명을 입력으로 받아 식별하고, ‘단일 값’을 만들기 위해 다양한 표현식을 각 레코드에 적응하는 함수이다.
- 표현식은
expr
함수로 가장 간단히 사용할 수 있다. 이 함수를 사용해 DataFrame의 컬럼을 참조할 수 있다.
표현식으로 컬럼 표현
expr("someCol - 5")
col("someCol") - 5
expr("someCol") - 5
- 위 세 코드는 모두 같은 트랜스포메이션 과정을 거친다.
- 스파크가 연산 순서를 지정하는 논리적 트리로 컴파일하기 때문이다.
- 컬럼은 단지 표현식일 뿐이다.
- 컬럼과 컬럼의 트랜스포메이션은 파싱된 표현식과 동일한 논리적 실행 계획으로 컴파일된다.
- 위 DAG 그래프를 코드로 표현하면 다음과 같다.
import org.apache.spark.sql.functions.expr
expr("(((someCol + 5) * 200) - 6) < otherCol")
DataFrame 컬럼에 접근하기
printSchema
메서드로 DataFrame의 전체 컬럼 정보를 확인할 수 있다.- 프로그래밍 방식으로 컬럼에 접근할 때는 DataFrame의
columns
속성을 사용한다.
spark.read.format("json").load("./data/flight-data/json/2015-summary.json"
).columns
-
실행결과
4. 레코드와 로우
- 스파크는 레코드를 Row 객체로 표현한다.
- Row 객체는 내부에 바이트 배열을 가지는데, 이 바이트 배열 인터페이스는 오직 컬럼 표현식으로만 다룰 수 있으므로 사용자에게 절대 노출되지 않는다.
- 다음은 DataFrame의
first
메서드로 로우를 확인하는 예제이다.
df.first()
-
실행결과
4.1 로우 생성하기
- Row 객체는 스키마 정보를 가지고 있지 않는다.
- DataFrame만 유일하게 스키마를 갖는다.
- Row 객체를 직접 생성하려면 DataFrame의 스키마와 같은 순서로 값을 명시해야 한다.
import org.apache.spark.sql.Row
val myRow = Row("Hello", null, 1, false)
myRow(0) // Any 타입
myRow(0).asInstanceOf[String] // String 타입
myRow.getString(0) // String 타입
myRow.getInt(2)
-
실행결과
댓글남기기