5 분 소요

[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.

도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 스칼라(scala)로 했습니다.
앞으로 스파크와 관련된 내용은 딥러닝 부분을 제외하고 스칼라로 진행될 예정입니다.

기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0 안의 하위 폴더 data를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0 후, ./bin/spark-shell 명령어를 실행하시면 됩니다.

[Spark] 불리언, 수치, 문자, 정규 표현식 등의 데이터 타입 다루기에 이어서 진행됩니다.



6. 날짜와 타임스탬프 데이터 타입 다루기

  • 스파크는 달력 형태의 날짜(date)와 날짜와 시간 정보를 모두 가지는 타임스탬프(timestamp) 두 가지 종류의 시간 관련 정보만 집중적으로 관리한다.
  • 스파크는 inferSchema 옵션이 활성화된 경우 날짜와 타임스탬프를 포함해 컬럼의 데이터 타입을 최대한 정확하게 식별하려 노력한다.
  • 스파크는 특정 날짜 포맷을 명시하지 않아도 자체적으로 식별해 데이터를 읽을 수 있다.
  • 스파크는 날짜와 시간을 최대한 올바른 형태로 읽기 위해 노력한다.
    • 만약 특이한 포맷의 날짜와 시간 데이터를 어쩔 수 없이 다뤄야 한다면 각 단계별로 어떤 데이터 타입과 포맷을 유지하는지 정확히 알고 트랜스포메이션을 적용해야 한다.
  • 다음은 오늘 날짜와 현재 타임스탬프값을 구하는 예제이다.
import org.apache.spark.sql.functions.{current_date, current_timestamp}

val dateDF = spark.range(10
    ).withColumn("today", current_date()
    ).withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")


  • 위 코드를 통해 만들어진 DataFrame을 사용해 오늘을 기준으로 5일 전후의 날짜를 구해본다.
    • date_sub(date, num) : 컬럼과 뺄 날짜 수를 인수로 전달한다.
    • date_add(date, num) : 컬럼과 더할 날짜 수를 인수로 전달한다.
import org.apache.spark.sql.functions.{date_sub, date_add}

dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1) // 지금은 9월 4일이다.
  • 실행결과

    image


  • 두 날짜의 차이를 구하는 작업을 살펴본다.
  • datediff(date1, date2) : 두 날짜 사이의 일 수를 반환해준다.
  • months_between(date1, date2) : 두 날짜 사이의 개월 수를 반환해준다.
  • to_date() : string 포맷을 DateType으로 변환해준다. 1
import org.apache.spark.sql.functions.{datediff, months_between, to_date}

dateDF.withColumn("week_ago", date_sub(col("today"), 7)
    ).select(datediff(col("week_ago"), col("today"))).show(1)
dateDF.select(
    to_date(lit("2022-01-01")).alias("start"),
    to_date(lit("2022-09-01")).alias("end")
    ).select(months_between(col("start"), col("end"))).show(1)
  • 실행결과

    image


  • 스파크는 날짜를 파싱할 수 없다면 에러 대신 null 값을 반환한다.
dateDF.select(to_date(lit("2022-20-12")), to_date(lit("2022-09-01"))).show(1)
  • 실행결과

    image


  • to_date 함수는 필요에 따라 날짜 포맷을 지정할 수 있지만 to_timestamp 함수는 반드시 날짜 포맷을 지정해야 한다.
import org.apache.spark.sql.functions.{to_date}

val dateFormat = "yyyy-dd-MM"
val cleanDateDF = spark.range(1).select(
    to_date(lit("2022-12-11"), dateFormat).alias("date"),
    to_date(lit("2022-20-12"), dateFormat).alias("date2")
)
cleanDateDF.createOrReplaceTempView("dateTable2")


  • 항상 날짜 포맷을 지정해야 하는 to_timestamp 함수의 예제를 살펴본다.
import org.apache.spark.sql.functions.{to_timestamp}

cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()
  • 실행결과

    image



7. null 값 다루기

  • DataFrame에서 빠져 있거나 비어 있는 데이터를 표현할 때는 항상 null 값을 사용하는 것이 좋다.
  • 스파크에서는 빈 문자열이나 대체 값 대신 null 값을 사용해야 최적화를 수행할 수 있다.
  • DataFrame의 하위 패키지인 .na를 사용하는 것이 DataFrame에서 null 값을 다루는 기본 방식이다.


7.1 coalesce

  • coalesce 함수는 인수로 지정한 여러 컬럼 중 null이 아닌 첫번 째 값을 반환한다.
    • 모든 컬럼이 null이 아닌 값을 가지는 경우 첫 번째 컬럼의 값을 반환한다.
import org.apache.spark.sql.functions.coalesce

val df = spark.read.format("csv"
  ).option("header", "true"
  ).option("inferSchema", "true"
  ).load("./data/retail-data/by-day/2010-12-01.csv")
df.createOrReplaceTempView("dfTable")

df.select(coalesce(col("Description"), col("CustomerId"))).show(5)
  • 실행결과

    image


7.2 ifnull, nullif, nvl, nvl2

  • ifnull 함수는 첫 번째 값이 null이면 두 번째 값을 반환한다.
    • 첫 번째 값이 null이 아니면 첫 번째 값을 반환한다.
  • nullif 함수는 두 값이 같으면 null을 반환한다.
    • 두 값이 다르면 첫 번째 값을 반환한다.
  • nvl 함수는 첫 번째 값이 null이면 두 번째 값을 반환한다.
    • 첫 번째 값이 null이 아니면 첫 번째 값을 반환한다.
  • nvl2 함수는 첫 번째 값이 null이 아니면 두 번째 값을 반환한다.
    • 첫 번째 값이 null이면 세 번째 인수로 지정된 값을 반환한다.
spark.sql("""
    SELECT
        ifnull(null, 'return_value'),
        nullif('value', 'value'),
        nvl(null, 'return_value'),
        nvl2('not_null, 'return_value',"else_value")
    FROM dfTable LIMIT 1
""")


7.3 drop

  • drop 메서드는 기본적으로 null 값을 가진 모든 로우를 제거한다.
    • drop 메서드의 인수로 any를 지정한 경우 로우의 컬럼값 중 하나라도 null 값을 가지면 해당 로우를 제거한다.
    • all을 지정한 경우 모든 컬럼의 값이 null이거나 NaN인 경우에만 해당 로우를 제거한다.
df.na.drop()

df.na.drop("any")

df.na.drop("all")


  • drop 메서드에 배열 형태의 컬럼을 인수로 전달해 적용할 수도 있다.
df.na.drop("all", Seq("StockCode", "InvoiceNo"))


7.4 fill

  • fill 함수를 사용해 하나 이상의 컬럼을 특정 값으로 채울 수 있다.
  • 또한 스칼라 Map 타입을 사용해 다수의 컬럼에 fill 메서드를 적용할 수도 있다.
    • 키(key) : 컬럼명
    • 값(value) : null 값을 채우는 데 사용할 값
df.na.fill("All Null values become this string")

df.na.fill(5, Seq("StockCode", "InvoiceNo"))

val fillColValues = Map("StockCode" -> 5, "Description" -> "No Value")


7.5 replace

  • replace를 사용하면 조건에 따라 다른 값으로 대체할 수 있다.
df.na.replace("Description", Map("" -> "UNKNOWN"))



8. 정렬하기

  • asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 함수를 사용해 DataFrame을 정렬할 때 null 값이 표시되는 기준을 지정할 수 있다.


9. 복합 데이터 타입 다루기

  • 복합 데이터 타입에는 구조체(struct), 배열(array), 맵(map)이 있다.

9.1 구조체

  • 구조체는 DataFrame 내부의 DataFrame이다.
  • 쿼리문에서 다수의 컬럼을 괄호로 묶어 구조체를 만들 수 있다.
import org.apache.spark.sql.functions.struct

val complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")


  • 위에서 복합 데이터 타입을 가진 DataFrame을 만들어 보았다.
  • 이를 다른 DataFrame을 조회하는 것과 동일하게 사용하면 되지만, 유일한 차이점으로 .을 사용하거나 getField 메서드를 사용한다는 것이다.
complexDF.select("complex.Description")
complexDF.select(col("complex").getField("Description"))


  • * 문자를 사용해 모든 값을 조회할 수 있으며, 모든 컬럼을 DataFrame의 최상위 수준으로 끌어올릴 수 있다.
complexDF.select("complex.*")


9.2 배열

split

  • 배열로 변환하려면 split 함수를 사용한다.
  • split 함수에 구분자(delimiter)를 인수로 전달해 배열로 변환한다.
import org.apache.spark.sql.functions.split

df.select(split(col("Description"), " ")).show(2)
  • 실행결과

    image


  • split 함수는 스파크에서 복합 데이터 타입을 마치 또 다른 컬럼처럼 다룰 수 있는 강력한 기능이다.
  • 파이썬과 유사한 문법을 사용해 배열값을 조회할 수 있다.
df.select(split(col("Description"), " ").alias("array_col")
    ).selectExpr("array_col[0]").show(5)
  • 실행결과

    image


배열의 길이

  • 배열의 크기(size)를 조회해 배열의 크기를 알 수 있다.
import org.apache.spark.sql.functions.size

df.select(size(split(col("Description"), " "))).show(5)
  • 실행결과

    image


array_contains

  • array_contains 함수를 사용해 배열에 특정 값이 존재하는지 확인할 수 있다.
import org.apache.spark.sql.functions.array_contains

df.select(array_contains(split(col("Description"), " "), "WHITE")).show(5)
  • 실행결과

    image


explode

  • explode 함수는 배열 타입의 컬럼을 입력받은 후 입력된 컬럼의 배열값에 포함된 모든 값을 로우로 반환한다. 나머지 컬럼값은 중복되어 표시된다.

image

import org.apache.spark.sql.functions.{split, explode}

df.withColumn("splitted", split(col("Description"), " ")
    ).withColumn("exploded", explode(col("splitted"))
    ).select("Description", "InvoiceNo", "exploded").show(5)
  • 실행결과

    image


9.3 맵

  • 맵(map)은 map 함수와 컬럼의 키-값 쌍을 이용해 생성한다. 그리고 배열과 동일한 방법으로 값을 선택할 수 있다.
import org.apache.spark.sql.functions.map

df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(5)
  • 실행결과

    image


  • 적합한 키를 사용해 데이터를 조회할 수 있으며, 해당 키가 없다면 null 값을 반환한다.
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")
    ).selectExpr("complex_map['WHITE METAL LANTERN']").show(5)
  • 실행결과

    image



10. JSON 다루기

  • 스파크에서는 문자열 형태의 JSON을 직접 조작할 수 있으며, JSON을 파싱하거나 JSON 객체로 만들 수 있다.
    • JSON : 속성-값 쌍, 배열 자료형 또는 기타 모든 시리얼화 가능한 값 또는 “키-값 쌍”으로 이루어진 데이터 오브젝트를 전달하기 위해 인간이 읽을 수 있는 텍스트를 사용하는 개방형 표준 포맷이다. 2
  • 다음은 JSON 컬럼을 생성하는 예제이다.
val jsonDF = spark.range(1).selectExpr("""
    '{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString
""")


  • get_json_object 함수로 JSON 객체를 인라인(inline) 쿼리로 조회할 수 있다.
  • 중첩이 없는 단일 수준의 JSON 객체라면 json_tuple을 사용할 수도 있다.
import org.apache.spark.sql.functions.{get_json_object, json_tuple}

jsonDF.select(
    get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column",
    json_tuple(col("jsonString"), "myJSONKey")
).show(2, false)
  • 실행결과

    image


  • to_json 함수를 사용해 StructType을 JSON 문자열로 변경할 수 있다.
    • to_json 함수에 JSON 데이터소스와 동일한 형태의 딕셔너리(맵)를 파라미터로 사용할 수 있다.
import org.apache.spark.sql.functions.to_json

df.selectExpr("(InvoiceNo, Description) as myStruct"
    ).select(to_json(col("myStruct")))


  • from_json 함수를 사용해 JSON 문자열을 다시 객체로 변환할 수 있다.
    • 파라미터로 반드시 스키마를 지정해야 한다.
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val parseSchema = new StructType(Array(
    new StructField("InvoiceNo", StringType, true),
    new StructField("Description", StringType, true)
))
df.selectExpr("(InvoiceNo, Description) as myStruct"
    ).select(to_json(col("myStruct")).alias("newJSON")
    ).select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2, false)
  • 실행결과

    image



11. 사용자 정의 함수

  • 사용자 정의 함수(UDF, user defined function)는 파이썬이나 스칼라 그리고 외부 라이브러리를 사용해 사용자가 원하는 형태로 트랜스포메이션을 만들 수 있게 한다.
  • UDF는 기본적으로 특정 SparkSession이나 Context에서 사용할 수 있도록 임시 함수 형태로 등록된다.


  • 먼저 UDF를 생성해서 스파크에 등록한다.
  • 그리고 생성된 UDf를 사용해 코드를 실행하는 과정에서 어떤 일이 발생하는지 알아본다.
  • 첫 번째로 실제 함수가 필요하다.
  • 즉, 함수를 정의해 입력값을 원하는 결과로 만들어 낸다.
val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)
  • 실행결과

    Screenshot from 2022-09-05 01-44-14


  • 이제 함수를 만들고 테스트를 완료했으므로 모든 워커 노드에서 생성된 함수를 사용할 수 있도록 스파크에 등록할 차례이다.
  • 스파크는 드라이버에서 함수를 직렬화하고 네트워크를 통해 모든 익스큐터 프로세스로 전달한다.


  • UDF를 실행해본다. 먼저 DataFrame에서 사용할 수 있도록 함수를 등록한다.
  • 이제 power3 함수를 DataFrame의 다른 함수와 동일한 방법으로 사용할 수 있다.
import org.apache.spark.sql.functions.udf

val power3udf = udf(power3(_:Double):Double)


  • 스칼라를 사용해 사용자 정의 함수를 등록해본다.
spark.udf.register("power3", power3(_:Double):Double)
udfExampleDF.selectExpr("power3(num)").show(5)
  • 실행결과

    Screenshot from 2022-09-05 01-51-59





References

댓글남기기