[Spark] 불리언, 수치, 문자, 정규 표현식 등의 데이터 타입 다루기
[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.
도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 스칼라(scala)로 했습니다.
앞으로 스파크와 관련된 내용은 딥러닝 부분을 제외하고 스칼라로 진행될 예정입니다.
기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0
안의 하위 폴더 data
를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0
후, ./bin/spark-shell
명령어를 실행하시면 됩니다.
0. 다양한 데이터 타입 다루기
- 이번 글에서는 스파크의 정형 연산에서 가장 중요한 내용인 표현식을 만드는 방법과 다양한 데이터 타입을 다루는 방법을 함께 알아본다.
1. API 찾기
- 핵심은 데이터 변환용 함수를 찾는 것이다.
- 이를 위해 핵심적으로 보아야 할 부분은 다음과 같다.
- DataFrame(Dataset) 메서드
- Column 메서드
- 모든 함수는 데이터 로우의 특정 포맷이나 구조를 다른 형태로 변환하기 위해 존재한다.
- 다음은 분석에 사용할 DataFrame을 생성하는 예제이다.
val df = spark.read.format("csv"
).option("header", "true"
).option("inferSchema", "true"
).load("./data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")
-
실행결과
2. 스파크 데이터 타입으로 변환하기
- 데이터 타입 변환은
lit
함수를 사용한다. - lit 함수는 다른 언어의 데이터 타입을 스파크 데이터 타입에 맞게 변환한다.
import org.apache.spark.sql.functions.lit
df.select(lit(5), lit("five"), lit(5.0))
-
실행결과
3. 불리언 데이터 타입 다루기
- 불리언(boolean)은 모든 필터링 작업의 기반이므로 데이터 분석에 필수적이다.
- 불리언 구문은
and
,or
,true
,false
로 구성된다. - 불리언 구문을 사용해 true 또는 false로 평가되는 논리 문법을 만든다.
- 스파크에서 동등 여부를 판별해 필터링하려면 일치를 나타내는
===
이나 불일치를 나타내는=!=
를 사용해야 한다.not
함수나equalTo
메서드를 사용한다.
import org.apache.spark.sql.functions.col
df.where(col("InvoiceNo").equalTo(536365)
).select("InvoiceNo", "Description"
).show(5, false)
-
실행결과
- 가장 명확한 방법은 문자열 표현식에 조건절을 명시하는 것이다.
- 다음과 같이 ‘일치하지 않음(
<>
)’을 표현할 수 있다.
df.where("InvoiceNo = 536365"
).show(3)
df.where("InvoiceNo <> 536365"
).show(3)
-
실행결과
- and 메서드나 or 메서드를 사용해서 불리언 표현식을 여러 부분에 지정할 수 있다.
- 불리언 표현식을 사용하는 경우 항상 모든 표현식을 and 메서드로 묶어 차례대로 필터를 적용해야 한다.
- 스파크는 내부적으로 and 구문을 필터 사이에 추가해 모든 필터를 하나의 문장으로 변환한다.
- 그런 다음 동시에 모든 필터를 처리한다.
- 반면 or 구문은 반드시 동일한 구문에 조건을 정의해야 한다.
- .contains() : 인수로 지정된 문자열이 DataFrame 컬럼에 포함되어 있으면 true를 반환하고 그렇지 않으면 false를 반환한다. 1
- .isin() : DataFrame의 컬럼 값이 문자열 값 목록에 존재하거나 포함되어 있는지 확인하기 위해 이 메서드를 사용한다. 2
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.where(col("StockCode").isin("DOT")
).where(priceFilter.or(descripFilter)
).show()
- 스파크 SQL을 사용해도 된다. 다음 두 문장은 동일하게 처리된다.
- .gt :
>
- .lt :
<
- .geq :
>=
- .leq :
<=
- .gt :
import org.apache.spark.sql.functions.{expr, not, col}
df.withColumn("isExpensive", not(col("UnitPrice").leq(250))
).filter("isExpensive"
).select("Description", "UnitPrice").show(5)
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250")
).filter("isExpensive"
).select("Description", "UnitPrice").show(5)
-
실행결과
4. 수치형 데이터 타입 다루기
- count : 전체 레코드 수
- 대부분은 수치형 데이터 타입을 사용해 연산 방식을 정의하기만 하면 된다.
- pow() : 표시된 지수만큼 컬럼의 값을 거듭제곱해준다.
- .alias() : 컬럼 이름 변경하기 3
import org.apache.spark.sql.functions.{expr, pow}
// (현재 수량 * 단위가격)^2 + 5
// 두 컬럼 모두 수치형이므로 곱셈 연산이 가능하다.
val fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)
// 아래는 SQL문
df.selectExpr(
"CustomerId",
"(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)
-
실행결과
- round 함수는 소수점 값이 중간값 이상이면 반올림한다.
- bround 함수는 내림할 때 사용한다.
import org.apache.spark.sql.functions.lit
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
-
실행결과
5. 문자열 데이터 타입 다루기
- 문자열을 다루는 작업은 거의 모든 데이터 처리 과정에서 발생한다.
- 로그 파일에 정규 표현식을 사용해 데이터 추출, 데이터 치환, 문자열 존재 여부, 대/소문자 변환 처리 등의 작업을 할 수 있다.
- initcap 함수는 주어진 문자열에서 공백으로 나뉘는 모든 단어의 첫 글자를 대문자로 변경한다.
import org.apache.spark.sql.functions.{initcap}
// show(2, false) : display 2 rows and full column contents
df.select(initcap(col("Description"))).show(2, false)
-
실행결과
- lower 함수를 사용해 문자열 전체를 소문자로 변경한다.
- upper 함수를 사용해 문자열 전체를 대문자로 변경한다.
import org.apache.spark.sql.functions.{lower, upper}
df.select(col("Description"),
lower(col("Description")),
upper(lower(col("Description")))).show(2)
-
실행결과
- 문자열 주변의 공백을 제거하거나 추가하는 작업이 가능하다.
- lpad(col, len, pad) : 해당 문자열을 len만큼 pad를 사용하여 왼쪽부터 채워넣는다.
- rpad(col, len, pad) : 해당 문자열을 len만큼 pad를 사용하여 오른쪽부터 채워넣는다.
- ltrim : 왼쪽 공백(blank) 제거한다.
- rtrim : 오른쪽 공백(blank) 제거한다.
- trim : 좌우 공백(blank) 제거한다.
import org.apache.spark.sql.functions.{lit, lpad, rpad, ltrim, rtrim, trim}
df.select(
ltrim(lit(" hello ")).as("ltrim"),
rtrim(lit(" hello ")).as("rtrim"),
trim(lit(" hello ")).as("trim"),
lpad(lit("HELLO"), 3, "#").as("lp"),
rpad(lit("HELLO"), 10, "#").as("rp")).show(2)
-
실행결과
5.1 정규 표현식
- 문자열의 존재 여부를 확인하거나 일치하는 모든 문자열을 치환할 때는 보통 정규 표현식(regular expression)을 사용한다.
- 정규 표현식을 사용해 문자열에서 값을 추출하거나 다른 값으로 치환하는 데 필요한 규칙 모음을 정의할 수 있다.
- 스파크는 정규 표현식을 위해 값을 추출하는 regexp_extract 함수와 값을 치환하는 regexp_replace 함수를 제공한다.
- 다음은
regexp_replace
함수를 사용해 description 컬럼의 값을 COLOR로 치환하는 예제이다.map(_.toUpperCase)
: 배열의 각 요소에 대문자로 치환을 적용한 값을 반환한다.mkString(sep)
: 구분자(separator)를 사용하여 문자 시퀀스의 요소를 모두 출력한다. 4|
는 정규 표현식에서 OR을 의미한다.
import org.apache.spark.sql.functions.regexp_replace
val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
df.select(
regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),
col("Description")
).show(2)
-
실행결과
- translate 함수는 교체 문자열에서 색인된 문자에 해당하는 모든 문자를 치환한다.
translate(col, matching, replace)
import org.apache.spark.sql.functions.translate
df.select(
translate(col("Description"), "LEET", "1337"),
col("Description")
).show(2)
-
실행결과
- 처음 나타난 색상 이름을 추출하는 것과 같은 작업을 수행할 수도 있다.
import org.apache.spark.sql.functions.regexp_extract
val regexString = simpleColors.map(_.toUpperCase).mkString("(", "|", ")")
df.select(
regexp_extract(col("Description"), regexString, 1).alias("color_clean"),
col("Description")
).show(2)
-
실행결과
- 동적으로 인수의 개수가 변하는 상황을 스파크는 어떻게 처리하는지 살펴본다.
- 값 목록을 인수로 변환해 함수에 전달할 때는 varargs(=variable arguments라 불리는 스칼라 고유 기능을 활용한다.
- varargs 기능을 사용해 임의 길이의 배열을 효율적으로 다룰 수 있다.
- _ : 임의의 문자를 대신한다. 6
- * : 해당 문자의 패턴이 0개 이상 일치할 경우
- $ : 문자열이 끝날 경우
- + : 해당 문자의 패턴이 1개 이상 일치할 경우
val simpleColors = Seq("black", "white", "red", "green", "blue")
val selectedColumns = simpleColors.map(color => {
col("Description").contains(color.toUpperCase).alias(s"is_$color")
}):+expr("*")
df.select(selectedColumns:_*).where(col("is_white").or(col("is_red"))
).select("Description").show(3, false)
-
실행결과
댓글남기기