[Spark] 날짜, 타임스탬프, null, 정렬, JSON 등의 데이터 타입 다루기
[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.
도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 스칼라(scala)로 했습니다.
앞으로 스파크와 관련된 내용은 딥러닝 부분을 제외하고 스칼라로 진행될 예정입니다.
기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0
안의 하위 폴더 data
를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0
후, ./bin/spark-shell
명령어를 실행하시면 됩니다.
[Spark] 불리언, 수치, 문자, 정규 표현식 등의 데이터 타입 다루기에 이어서 진행됩니다.
6. 날짜와 타임스탬프 데이터 타입 다루기
- 스파크는 달력 형태의 날짜(date)와 날짜와 시간 정보를 모두 가지는 타임스탬프(timestamp) 두 가지 종류의 시간 관련 정보만 집중적으로 관리한다.
- 스파크는 inferSchema 옵션이 활성화된 경우 날짜와 타임스탬프를 포함해 컬럼의 데이터 타입을 최대한 정확하게 식별하려 노력한다.
- 스파크는 특정 날짜 포맷을 명시하지 않아도 자체적으로 식별해 데이터를 읽을 수 있다.
- 스파크는 날짜와 시간을 최대한 올바른 형태로 읽기 위해 노력한다.
- 만약 특이한 포맷의 날짜와 시간 데이터를 어쩔 수 없이 다뤄야 한다면 각 단계별로 어떤 데이터 타입과 포맷을 유지하는지 정확히 알고 트랜스포메이션을 적용해야 한다.
- 다음은 오늘 날짜와 현재 타임스탬프값을 구하는 예제이다.
import org.apache.spark.sql.functions.{current_date, current_timestamp}
val dateDF = spark.range(10
).withColumn("today", current_date()
).withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
- 위 코드를 통해 만들어진 DataFrame을 사용해 오늘을 기준으로 5일 전후의 날짜를 구해본다.
- date_sub(date, num) : 컬럼과 뺄 날짜 수를 인수로 전달한다.
- date_add(date, num) : 컬럼과 더할 날짜 수를 인수로 전달한다.
import org.apache.spark.sql.functions.{date_sub, date_add}
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1) // 지금은 9월 4일이다.
-
실행결과
- 두 날짜의 차이를 구하는 작업을 살펴본다.
- datediff(date1, date2) : 두 날짜 사이의 일 수를 반환해준다.
- months_between(date1, date2) : 두 날짜 사이의 개월 수를 반환해준다.
- to_date() : string 포맷을 DateType으로 변환해준다. 1
import org.apache.spark.sql.functions.{datediff, months_between, to_date}
dateDF.withColumn("week_ago", date_sub(col("today"), 7)
).select(datediff(col("week_ago"), col("today"))).show(1)
dateDF.select(
to_date(lit("2022-01-01")).alias("start"),
to_date(lit("2022-09-01")).alias("end")
).select(months_between(col("start"), col("end"))).show(1)
-
실행결과
- 스파크는 날짜를 파싱할 수 없다면 에러 대신 null 값을 반환한다.
dateDF.select(to_date(lit("2022-20-12")), to_date(lit("2022-09-01"))).show(1)
-
실행결과
- to_date 함수는 필요에 따라 날짜 포맷을 지정할 수 있지만 to_timestamp 함수는 반드시 날짜 포맷을 지정해야 한다.
import org.apache.spark.sql.functions.{to_date}
val dateFormat = "yyyy-dd-MM"
val cleanDateDF = spark.range(1).select(
to_date(lit("2022-12-11"), dateFormat).alias("date"),
to_date(lit("2022-20-12"), dateFormat).alias("date2")
)
cleanDateDF.createOrReplaceTempView("dateTable2")
- 항상 날짜 포맷을 지정해야 하는
to_timestamp
함수의 예제를 살펴본다.
import org.apache.spark.sql.functions.{to_timestamp}
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()
-
실행결과
7. null 값 다루기
- DataFrame에서 빠져 있거나 비어 있는 데이터를 표현할 때는 항상 null 값을 사용하는 것이 좋다.
- 스파크에서는 빈 문자열이나 대체 값 대신 null 값을 사용해야 최적화를 수행할 수 있다.
- DataFrame의 하위 패키지인 .na를 사용하는 것이 DataFrame에서 null 값을 다루는 기본 방식이다.
7.1 coalesce
- coalesce 함수는 인수로 지정한 여러 컬럼 중 null이 아닌 첫번 째 값을 반환한다.
- 모든 컬럼이 null이 아닌 값을 가지는 경우 첫 번째 컬럼의 값을 반환한다.
import org.apache.spark.sql.functions.coalesce
val df = spark.read.format("csv"
).option("header", "true"
).option("inferSchema", "true"
).load("./data/retail-data/by-day/2010-12-01.csv")
df.createOrReplaceTempView("dfTable")
df.select(coalesce(col("Description"), col("CustomerId"))).show(5)
-
실행결과
7.2 ifnull, nullif, nvl, nvl2
- ifnull 함수는 첫 번째 값이 null이면 두 번째 값을 반환한다.
- 첫 번째 값이 null이 아니면 첫 번째 값을 반환한다.
- nullif 함수는 두 값이 같으면 null을 반환한다.
- 두 값이 다르면 첫 번째 값을 반환한다.
- nvl 함수는 첫 번째 값이 null이면 두 번째 값을 반환한다.
- 첫 번째 값이 null이 아니면 첫 번째 값을 반환한다.
- nvl2 함수는 첫 번째 값이 null이 아니면 두 번째 값을 반환한다.
- 첫 번째 값이 null이면 세 번째 인수로 지정된 값을 반환한다.
spark.sql("""
SELECT
ifnull(null, 'return_value'),
nullif('value', 'value'),
nvl(null, 'return_value'),
nvl2('not_null, 'return_value',"else_value")
FROM dfTable LIMIT 1
""")
7.3 drop
- drop 메서드는 기본적으로 null 값을 가진 모든 로우를 제거한다.
- drop 메서드의 인수로 any를 지정한 경우 로우의 컬럼값 중 하나라도 null 값을 가지면 해당 로우를 제거한다.
- all을 지정한 경우 모든 컬럼의 값이 null이거나 NaN인 경우에만 해당 로우를 제거한다.
df.na.drop()
df.na.drop("any")
df.na.drop("all")
- drop 메서드에 배열 형태의 컬럼을 인수로 전달해 적용할 수도 있다.
df.na.drop("all", Seq("StockCode", "InvoiceNo"))
7.4 fill
- fill 함수를 사용해 하나 이상의 컬럼을 특정 값으로 채울 수 있다.
- 또한 스칼라 Map 타입을 사용해 다수의 컬럼에 fill 메서드를 적용할 수도 있다.
키(key)
: 컬럼명값(value)
: null 값을 채우는 데 사용할 값
df.na.fill("All Null values become this string")
df.na.fill(5, Seq("StockCode", "InvoiceNo"))
val fillColValues = Map("StockCode" -> 5, "Description" -> "No Value")
7.5 replace
- replace를 사용하면 조건에 따라 다른 값으로 대체할 수 있다.
df.na.replace("Description", Map("" -> "UNKNOWN"))
8. 정렬하기
- asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 함수를 사용해 DataFrame을 정렬할 때 null 값이 표시되는 기준을 지정할 수 있다.
9. 복합 데이터 타입 다루기
- 복합 데이터 타입에는 구조체(struct), 배열(array), 맵(map)이 있다.
9.1 구조체
- 구조체는 DataFrame 내부의 DataFrame이다.
- 쿼리문에서 다수의 컬럼을 괄호로 묶어 구조체를 만들 수 있다.
import org.apache.spark.sql.functions.struct
val complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")
- 위에서 복합 데이터 타입을 가진 DataFrame을 만들어 보았다.
- 이를 다른 DataFrame을 조회하는 것과 동일하게 사용하면 되지만, 유일한 차이점으로
.
을 사용하거나getField
메서드를 사용한다는 것이다.
complexDF.select("complex.Description")
complexDF.select(col("complex").getField("Description"))
*
문자를 사용해 모든 값을 조회할 수 있으며, 모든 컬럼을 DataFrame의 최상위 수준으로 끌어올릴 수 있다.
complexDF.select("complex.*")
9.2 배열
split
- 배열로 변환하려면 split 함수를 사용한다.
- split 함수에 구분자(delimiter)를 인수로 전달해 배열로 변환한다.
import org.apache.spark.sql.functions.split
df.select(split(col("Description"), " ")).show(2)
-
실행결과
- split 함수는 스파크에서 복합 데이터 타입을 마치 또 다른 컬럼처럼 다룰 수 있는 강력한 기능이다.
- 파이썬과 유사한 문법을 사용해 배열값을 조회할 수 있다.
df.select(split(col("Description"), " ").alias("array_col")
).selectExpr("array_col[0]").show(5)
-
실행결과
배열의 길이
- 배열의 크기(size)를 조회해 배열의 크기를 알 수 있다.
import org.apache.spark.sql.functions.size
df.select(size(split(col("Description"), " "))).show(5)
-
실행결과
array_contains
- array_contains 함수를 사용해 배열에 특정 값이 존재하는지 확인할 수 있다.
import org.apache.spark.sql.functions.array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(5)
-
실행결과
explode
- explode 함수는 배열 타입의 컬럼을 입력받은 후 입력된 컬럼의 배열값에 포함된 모든 값을 로우로 반환한다. 나머지 컬럼값은 중복되어 표시된다.
import org.apache.spark.sql.functions.{split, explode}
df.withColumn("splitted", split(col("Description"), " ")
).withColumn("exploded", explode(col("splitted"))
).select("Description", "InvoiceNo", "exploded").show(5)
-
실행결과
9.3 맵
- 맵(map)은 map 함수와 컬럼의 키-값 쌍을 이용해 생성한다. 그리고 배열과 동일한 방법으로 값을 선택할 수 있다.
import org.apache.spark.sql.functions.map
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(5)
-
실행결과
- 적합한 키를 사용해 데이터를 조회할 수 있으며, 해당 키가 없다면 null 값을 반환한다.
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")
).selectExpr("complex_map['WHITE METAL LANTERN']").show(5)
-
실행결과
10. JSON 다루기
- 스파크에서는 문자열 형태의 JSON을 직접 조작할 수 있으며, JSON을 파싱하거나 JSON 객체로 만들 수 있다.
- JSON : 속성-값 쌍, 배열 자료형 또는 기타 모든 시리얼화 가능한 값 또는 “키-값 쌍”으로 이루어진 데이터 오브젝트를 전달하기 위해 인간이 읽을 수 있는 텍스트를 사용하는 개방형 표준 포맷이다. 2
- 다음은 JSON 컬럼을 생성하는 예제이다.
val jsonDF = spark.range(1).selectExpr("""
'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString
""")
- get_json_object 함수로 JSON 객체를 인라인(inline) 쿼리로 조회할 수 있다.
- 중첩이 없는 단일 수준의 JSON 객체라면 json_tuple을 사용할 수도 있다.
import org.apache.spark.sql.functions.{get_json_object, json_tuple}
jsonDF.select(
get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column",
json_tuple(col("jsonString"), "myJSONKey")
).show(2, false)
-
실행결과
- to_json 함수를 사용해 StructType을 JSON 문자열로 변경할 수 있다.
to_json
함수에 JSON 데이터소스와 동일한 형태의 딕셔너리(맵)를 파라미터로 사용할 수 있다.
import org.apache.spark.sql.functions.to_json
df.selectExpr("(InvoiceNo, Description) as myStruct"
).select(to_json(col("myStruct")))
- from_json 함수를 사용해 JSON 문자열을 다시 객체로 변환할 수 있다.
- 파라미터로 반드시 스키마를 지정해야 한다.
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val parseSchema = new StructType(Array(
new StructField("InvoiceNo", StringType, true),
new StructField("Description", StringType, true)
))
df.selectExpr("(InvoiceNo, Description) as myStruct"
).select(to_json(col("myStruct")).alias("newJSON")
).select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2, false)
-
실행결과
11. 사용자 정의 함수
- 사용자 정의 함수(UDF, user defined function)는 파이썬이나 스칼라 그리고 외부 라이브러리를 사용해 사용자가 원하는 형태로 트랜스포메이션을 만들 수 있게 한다.
- UDF는 기본적으로 특정 SparkSession이나 Context에서 사용할 수 있도록 임시 함수 형태로 등록된다.
- 먼저 UDF를 생성해서 스파크에 등록한다.
- 그리고 생성된 UDf를 사용해 코드를 실행하는 과정에서 어떤 일이 발생하는지 알아본다.
- 첫 번째로 실제 함수가 필요하다.
- 즉, 함수를 정의해 입력값을 원하는 결과로 만들어 낸다.
val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)
-
실행결과
- 이제 함수를 만들고 테스트를 완료했으므로 모든 워커 노드에서 생성된 함수를 사용할 수 있도록 스파크에 등록할 차례이다.
- 스파크는 드라이버에서 함수를 직렬화하고 네트워크를 통해 모든 익스큐터 프로세스로 전달한다.
- UDF를 실행해본다. 먼저 DataFrame에서 사용할 수 있도록 함수를 등록한다.
- 이제 power3 함수를 DataFrame의 다른 함수와 동일한 방법으로 사용할 수 있다.
import org.apache.spark.sql.functions.udf
val power3udf = udf(power3(_:Double):Double)
- 스칼라를 사용해 사용자 정의 함수를 등록해본다.
spark.udf.register("power3", power3(_:Double):Double)
udfExampleDF.selectExpr("power3(num)").show(5)
-
실행결과
댓글남기기