3 분 소요

[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.

도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 스칼라(scala)로 했습니다.
앞으로 스파크와 관련된 내용은 딥러닝 부분을 제외하고 스칼라로 진행될 예정입니다.

기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0 안의 하위 폴더 data를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0 후, ./bin/spark-shell 명령어를 실행하시면 됩니다.



0. 다양한 데이터 타입 다루기

  • 이번 글에서는 스파크의 정형 연산에서 가장 중요한 내용인 표현식을 만드는 방법과 다양한 데이터 타입을 다루는 방법을 함께 알아본다.


1. API 찾기

  • 핵심은 데이터 변환용 함수를 찾는 것이다.
  • 이를 위해 핵심적으로 보아야 할 부분은 다음과 같다.
    • DataFrame(Dataset) 메서드
    • Column 메서드
  • 모든 함수는 데이터 로우의 특정 포맷이나 구조를 다른 형태로 변환하기 위해 존재한다.
  • 다음은 분석에 사용할 DataFrame을 생성하는 예제이다.
val df = spark.read.format("csv"
  ).option("header", "true"
  ).option("inferSchema", "true"
  ).load("./data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")
  • 실행결과

    image



2. 스파크 데이터 타입으로 변환하기

  • 데이터 타입 변환은 lit 함수를 사용한다.
  • lit 함수는 다른 언어의 데이터 타입을 스파크 데이터 타입에 맞게 변환한다.
import org.apache.spark.sql.functions.lit

df.select(lit(5), lit("five"), lit(5.0))
  • 실행결과

    image



3. 불리언 데이터 타입 다루기

  • 불리언(boolean)은 모든 필터링 작업의 기반이므로 데이터 분석에 필수적이다.
  • 불리언 구문은 and, or, true, false로 구성된다.
  • 불리언 구문을 사용해 true 또는 false로 평가되는 논리 문법을 만든다.
  • 스파크에서 동등 여부를 판별해 필터링하려면 일치를 나타내는 ===이나 불일치를 나타내는 =!=를 사용해야 한다.
    • not함수나 equalTo 메서드를 사용한다.
import org.apache.spark.sql.functions.col

df.where(col("InvoiceNo").equalTo(536365)
  ).select("InvoiceNo", "Description"
  ).show(5, false)
  • 실행결과

    image


  • 가장 명확한 방법은 문자열 표현식에 조건절을 명시하는 것이다.
  • 다음과 같이 ‘일치하지 않음(<>)’을 표현할 수 있다.
df.where("InvoiceNo = 536365"
  ).show(3)
df.where("InvoiceNo <> 536365"
  ).show(3)  
  • 실행결과

    image


  • and 메서드나 or 메서드를 사용해서 불리언 표현식을 여러 부분에 지정할 수 있다.
  • 불리언 표현식을 사용하는 경우 항상 모든 표현식을 and 메서드로 묶어 차례대로 필터를 적용해야 한다.
    • 스파크는 내부적으로 and 구문을 필터 사이에 추가해 모든 필터를 하나의 문장으로 변환한다.
    • 그런 다음 동시에 모든 필터를 처리한다.
    • 반면 or 구문은 반드시 동일한 구문에 조건을 정의해야 한다.
  • .contains() : 인수로 지정된 문자열이 DataFrame 컬럼에 포함되어 있으면 true를 반환하고 그렇지 않으면 false를 반환한다. 1
  • .isin() : DataFrame의 컬럼 값이 문자열 값 목록에 존재하거나 포함되어 있는지 확인하기 위해 이 메서드를 사용한다. 2
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")

df.where(col("StockCode").isin("DOT")
  ).where(priceFilter.or(descripFilter)
  ).show()


  • 스파크 SQL을 사용해도 된다. 다음 두 문장은 동일하게 처리된다.
    • .gt : >
    • .lt : <
    • .geq : >=
    • .leq : <=
import org.apache.spark.sql.functions.{expr, not, col}

df.withColumn("isExpensive", not(col("UnitPrice").leq(250))
  ).filter("isExpensive"
  ).select("Description", "UnitPrice").show(5)

df.withColumn("isExpensive", expr("NOT UnitPrice <= 250")
  ).filter("isExpensive"
  ).select("Description", "UnitPrice").show(5)
  • 실행결과

    image



4. 수치형 데이터 타입 다루기

  • count : 전체 레코드 수
    • 대부분은 수치형 데이터 타입을 사용해 연산 방식을 정의하기만 하면 된다.
  • pow() : 표시된 지수만큼 컬럼의 값을 거듭제곱해준다.
  • .alias() : 컬럼 이름 변경하기 3
import org.apache.spark.sql.functions.{expr, pow}

// (현재 수량 * 단위가격)^2 + 5
// 두 컬럼 모두 수치형이므로 곱셈 연산이 가능하다.
val fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)

// 아래는 SQL문
df.selectExpr(
  "CustomerId",
  "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)
  • 실행결과

    image


  • round 함수는 소수점 값이 중간값 이상이면 반올림한다.
  • bround 함수는 내림할 때 사용한다.
import org.apache.spark.sql.functions.lit

df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
  • 실행결과

    image



5. 문자열 데이터 타입 다루기

  • 문자열을 다루는 작업은 거의 모든 데이터 처리 과정에서 발생한다.
  • 로그 파일에 정규 표현식을 사용해 데이터 추출, 데이터 치환, 문자열 존재 여부, 대/소문자 변환 처리 등의 작업을 할 수 있다.
  • initcap 함수는 주어진 문자열에서 공백으로 나뉘는 모든 단어의 첫 글자를 대문자로 변경한다.
import org.apache.spark.sql.functions.{initcap}

// show(2, false) : display 2 rows and full column contents
df.select(initcap(col("Description"))).show(2, false)
  • 실행결과

    image


  • lower 함수를 사용해 문자열 전체를 소문자로 변경한다.
  • upper 함수를 사용해 문자열 전체를 대문자로 변경한다.
import org.apache.spark.sql.functions.{lower, upper}

df.select(col("Description"),
  lower(col("Description")),
  upper(lower(col("Description")))).show(2)
  • 실행결과

    image


  • 문자열 주변의 공백을 제거하거나 추가하는 작업이 가능하다.
  • lpad(col, len, pad) : 해당 문자열을 len만큼 pad를 사용하여 왼쪽부터 채워넣는다.
  • rpad(col, len, pad) : 해당 문자열을 len만큼 pad를 사용하여 오른쪽부터 채워넣는다.
  • ltrim : 왼쪽 공백(blank) 제거한다.
  • rtrim : 오른쪽 공백(blank) 제거한다.
  • trim : 좌우 공백(blank) 제거한다.
import org.apache.spark.sql.functions.{lit, lpad, rpad, ltrim, rtrim, trim}

df.select(
  ltrim(lit("   hello   ")).as("ltrim"),
  rtrim(lit("   hello   ")).as("rtrim"),
  trim(lit("   hello   ")).as("trim"),
  lpad(lit("HELLO"), 3, "#").as("lp"),
  rpad(lit("HELLO"), 10, "#").as("rp")).show(2)
  • 실행결과

    image


5.1 정규 표현식

  • 문자열의 존재 여부를 확인하거나 일치하는 모든 문자열을 치환할 때는 보통 정규 표현식(regular expression)을 사용한다.
  • 정규 표현식을 사용해 문자열에서 값을 추출하거나 다른 값으로 치환하는 데 필요한 규칙 모음을 정의할 수 있다.
  • 스파크는 정규 표현식을 위해 값을 추출하는 regexp_extract 함수와 값을 치환하는 regexp_replace 함수를 제공한다.
  • 다음은 regexp_replace 함수를 사용해 description 컬럼의 값을 COLOR로 치환하는 예제이다.
    • map(_.toUpperCase) : 배열의 각 요소에 대문자로 치환을 적용한 값을 반환한다.
    • mkString(sep) : 구분자(separator)를 사용하여 문자 시퀀스의 요소를 모두 출력한다. 4
    • |는 정규 표현식에서 OR을 의미한다.
import org.apache.spark.sql.functions.regexp_replace

val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
df.select(
  regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),
  col("Description")
).show(2)
  • 실행결과

    image


  • translate 함수는 교체 문자열에서 색인된 문자에 해당하는 모든 문자를 치환한다.
    • translate(col, matching, replace)
import org.apache.spark.sql.functions.translate

df.select(
  translate(col("Description"), "LEET", "1337"),
  col("Description")
).show(2)
  • 실행결과

    image


  • 처음 나타난 색상 이름을 추출하는 것과 같은 작업을 수행할 수도 있다.
    • regexp_extract(str, regexp [, idx] ) : str에서 regexp 식과 일치하고 regex 그룹 인덱스에 해당하는 첫 번째 문자열을 추출한다. 5
    • mkString(String start, String sep, String end) 4
import org.apache.spark.sql.functions.regexp_extract

val regexString = simpleColors.map(_.toUpperCase).mkString("(", "|", ")")
df.select(
  regexp_extract(col("Description"), regexString, 1).alias("color_clean"),
  col("Description")
).show(2)
  • 실행결과

    image


  • 동적으로 인수의 개수가 변하는 상황을 스파크는 어떻게 처리하는지 살펴본다.
  • 값 목록을 인수로 변환해 함수에 전달할 때는 varargs(=variable arguments라 불리는 스칼라 고유 기능을 활용한다.
  • varargs 기능을 사용해 임의 길이의 배열을 효율적으로 다룰 수 있다.
    • _ : 임의의 문자를 대신한다. 6
    • * : 해당 문자의 패턴이 0개 이상 일치할 경우
    • $ : 문자열이 끝날 경우
    • + : 해당 문자의 패턴이 1개 이상 일치할 경우
val simpleColors = Seq("black", "white", "red", "green", "blue")

val selectedColumns = simpleColors.map(color => {
  col("Description").contains(color.toUpperCase).alias(s"is_$color")
}):+expr("*")

df.select(selectedColumns:_*).where(col("is_white").or(col("is_red"))
  ).select("Description").show(3, false)
  • 실행결과

    image





References

댓글남기기