4 분 소요

[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.

도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 스칼라(scala)로 했습니다.
앞으로 스파크와 관련된 내용은 딥러닝 부분을 제외하고 스칼라로 진행될 예정입니다.

기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0 안의 하위 폴더 data를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0 후, ./bin/spark-shell 명령어를 실행하시면 됩니다.



1. DataFrame의 트랜스포메이션

image

  • DataFrame을 다루는 방법은 다음과 같은 작업으로 나눌 수 있다.
    1. 로우나 컬럼 추가
    2. 로우나 컬럼 제거
    3. 로우를 컬럼으로 변환하거나 그 반대로 변환
    4. 컬럼값을 기준으로 로우 순서 변경
  • 가장 일반적인 트랜스포메이션은 모든 로우의 특정 컬럼값을 변경하고 그 결과를 반환하는 것이다.



2. DataFrame 생성하기

val df = spark.read.format("json"
    ).load("./data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable") // dfTable은 객체명을 의미


  • Row 객체를 가진 Seq 타입을 직접 변환해 DataFrame을 생성할 수도 있다.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}

val myManualSchema = new StructType(Array(
    new StructField("some", StringType, true),
    new StructField("col", StringType, true),
    new StructField("names", LongType, false)
))

val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows) // 로컬 python 컬렉션을 배포하여 RDD를 형성
val myDf = spark.createDataFrame(myRDD, myManualSchema) // RDD(data)와 스키마부터 DataFrame 생성 

myDf.show()
  • 실행결과

    image



3. select와 selectExpr

  • selectselectExpr 메서드를 사용하면 데이터 테이블에 SQL을 실행하는 것처럼 DataFrame에서도 SQL을 사용할 수 있다.
df.select("DEST_COUNTRY_NAME").show(2)
-- SQL
SELECT DEST_COUNTRY_NAME FROM dfTAble LIMIT 2
  • 실행결과

    image


  • 같은 형태의 쿼리로 여러 컬럼을 선택할 수 있다. 여러 컬럼을 선택하려면 select 메서드에 원하는 컬럼명을 추가한다.
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
  • 실행결과

    image


  • 모든 같은 의미로 컬럼을 참조하는 다양한 방법들이 있다.
import org.apache.spark.sql.functions.{expr, col, column}

df.select(
    df.col("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"),
    'DEST_COUNTRY_NAME,
    $"DEST_COUNTRY_NAME",
    expr("DEST_COUNTRY_NAME")
).show(2)


  • select 메서드에 expr 함수를 사용하는 패턴을 자주 사용한다.
  • 스파크는 이런 작업을 간단하고 효율적으로 할 수 있는 selectExpr 메서드를 제공한다.
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)
  • 실행결과

    image


  • select 표현식에는 DataFrame의 컬럼에 대한 집계 함수를 지정할 수 있다.
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)
  • 실행결과

    image



4. 스파크 데이터 타입으로 변환하기

  • 때로는 새로운 컬럼이 아닌 명시적인 값을 스파크에 전달해야 한다.
  • 이때 리터럴(literal)을 사용하는데, 리터럴은 프로그래밍 언어의 리터럴값을 스파크가 이해할 수 있는 값으로 변환한다.
    • 어떤 상수나 프로그래밍으로 생성된 변숫값이 특정 컬럼의 값보다 큰지 확인할 때 리터럴을 사용한다.
import org.apache.spark.sql.functions.lit

df.select(expr("*"), lit(1).as("One")).show(2)
-- SQL
-- SQL에서 리터럴은 상숫값을 의미한다.
SELECT *, 1 as One FROM dfTable LIMIT 2
  • 실행결과

    image



5. 컬럼 추가하기

  • DataFrame에 신규 컬럼을 추가하는 방법은 DataFrame의 withColumn 메서드를 사용하는 것이다.
    • 메서드 : DataFrame.withColumn(colName, col) 1
df.withColumn("numberOne", lit(1)).show(2)
  • 실행결과

    image


  • withColumn 메서드로 컬럼명을 변경할 수도 있다.
df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns // columns : DataFrame 컬럼에 접근
  • 실행결과

    image



6. 컬럼명 변경하기

  • withColumnRenamed 메서드로 컬럼명을 변경할 수 있다.
    • withColumnRenamed 메서드는 첫 번째 인수로 전달된 컬럼명을 두 번째 인수의 문자열로 변경한다.
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
  • 실행결과

    image



7. 예약 문자와 키워드

  • 공백(space)이나 하이픈(-, dash)같은 예약 문자(reserved characters)는 컬럼명에 사용할 수 없다.
  • 예약 문자를 컬럼명에 사용하려면 백틱(`) 문자를 이용해 이스케이핑해야 한다.
import org.apache.spark.sql.functions.expr

val dfWithLongColName = df.withColumn(
    "This Long Column-Name",
    expr("ORIGIN_COUNTRY_NAME")
)

dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`"
).show(2)
  • 실행결과

    image



8. 대소문자 구분

  • 기본적으로 스파크는 대소문자를 가리지 않는다.



9. 컬럼 제거하기

  • drop 메서드를 사용해 DataFrame의 컬럼을 제거할 수 있다.
df.drop("ORIGIN_COUNTRY_NAME").columns
  • 실행결과

    image



10. 컬럼의 데이터 타입 변경하기

  • cast 메서드로 데이터 타입을 변환할 수 있다.
  • 다음은 count 컬럼을 Integer 데이터 타입에서 String 데이터 타입으로 형변환하는 예제이다.
df.withColumn("count2", col("count").cast("string")).printSchema
  • 실행결과

    image



11. 로우 필터링하기

  • 로우를 필터링하려면 참과 거짓을 판별하는 표현식을 만들어야 한다. 그러면 표현식의 결과가 false인 로우를 걸러낼 수 있다.
  • DataFrame의 where 메서드나 filter 메서드로 필터링할 수 있다.
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)
  • 실행결과

    image



12. 고유한 로우 얻기

  • DataFrame의 모든 로우에서 중복 데이터를 제거할 수 있는 distinct 메서드를 사용해 고윳값을 찾을 수 있다.
    • distinct 메서드는 중복되지 않은 로우를 가진 신류 DataFrame을 반환한다.
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct.count()
// 결과 : 256



13. 무작위 샘플 만들기

  • DataFrame에서 무작위 샘플 데이터를 얻으려면 DataFrame의 sample 메서드를 사용한다.
  • DataFame에서 표본 데이터 추출 비율을 지정할 수 있으며, 보권 추출이나 비복원 추출의 사용 여부를 지정할 수도 있다.
val withReplacement = false
val fraction = 0.5
val seed = 5
df.sample(withReplacement, fraction, seed).count()
// 결과 : 138



14. 임의 분할하기

  • 임의 분할(random split)은 원본 DataFrame을 임의 크기로 분할할 때 유용하게 사용된다.
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)



15. 로우 합치기와 추가하기

  • DataFrame은 불변성(immutable)을 가진다.
  • 그러므로 DataFrame에 레코드를 추가하는 작업은 DataFrame을 변경하는 작업이기 때문에 불가능하다.
  • DataFrame에 레코드를 추가하려면 원본 DataFrame을 새로운 DataFrame과 통합(union)해야 한다.
    • 통합(union)은 2개의 DataFrame을 단순히 결합하는 행위이다.
    • 통합하려는 2개의 DataFrame은 반드시 동일한 스키마와 컬럼 수를 가져야 한다.
    • =!= : 컬럼 표현식과 문자열을 비교할 때 사용 (notEqual 개념)
    • === : 컬럼 표현식과 문자열을 비교할 때 사용 (Equal 개념)
import org.apache.spark.sql.Row

val schema = df.schema

val newRows = Seq(
    Row("New Country", "Other Country", 5L),
    Row("New Country 2", "Other Country 3", 1L)
)

val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDF = spark.createDataFrame(parallelizedRows, schema)

df.union(newDF // df와 newDF를 합치는데
    ).where("count = 1" // 조건은 이러하고
    ).where($"ORIGIN_COUNTRY_NAME" =!= "United States" // =!= : notEqual
    ).show()
  • 실행결과

    image



16. 로우 정렬하기

  • sortorderBy 메서드를 사용해 DataFrame의 최댓값 혹은 최솟값이 상단에 위치하도록 정렬할 수 있다.
    • 두 메서드 모두 컬럼 표현식과 문자열을 사용할 수 있으며 다수의 컬럼을 지정할 수 있다.
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
  • 실행결과

    image


  • 정렬 기준을 명확히 지정하려면 ascdesc 함수를 사용한다.
  • 또한 asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 메서드를 사용하여 정렬된 DataFrame에서 null 값이 표시되는 기준을 지정할 수 있다.
import org.apache.spark.sql.functions.{desc, asc}

df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(5)
  • 실행결과

    image


  • 트랜스포메이션을 처리하기 전에 성능을 최적화하기 위해 파티션별 정렬을 수행하기도 한다.
  • sortWithinPartitions 메서드로 파티션별 정렬을 한다.
spark.read.format("json").load("./data/flight-data/json/*-summary.json"
    ).sortWithinPartitions("count")



17. 로우 수 제한하기

  • limit 메서드를 사용해 추출할 로우 수를 제한할 수 있다.
df.orderBy(expr("count desc")).limit(6).show()
  • 실행결과

    image



18. repartition과 coalesce

  • repartition 메서드를 호출하면 무조건 전체 데이터를 셔플한다.
    • 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 한다.
df.rdd.getNumPartitions
// 결과 : 1

df.repartition(5)


  • 특정 컬럼을 기준으로 자주 필터링한다면 자주 필터링되는 컬럼을 기준으로 파티션을 재분배하는 것이 좋다.
df.repartition(col("DEST_COUNTRY_NAME"))


  • 선택적으로 파티션 수를 지정할 수도 있다.
df.repartition(5, col("DEST_COUNTRY_NAME"))


  • coalesce 메서드는 전체 데이터를 셔플하지 않고 파티션을 병합하려는 경우에 사용한다.
  • 다음은 목적지를 기준으로 셔플을 수행해 5개의 파티션으로 나누고, 전체 데이터를 셔플 없이 병합하는 예제이다.
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)



19. 드라이버로 로우 데이터 수집하기

  • 스파크는 드라이버에서 클러스터 상태 정보를 유지한다.
  • 로컬 환경에서 데이터를 다루려면 드라이버로 데이터를 수집해야 한다.
  • collect 메서드는 전체 DataFrame의 모든 데이터를 수집하며, take 메서드는 상위 N개의 로우를 반환한다.
  • show 메서드는 여러 로우를 보기 좋게 출력한다.
val collectDF = df.limit(10)
collectDF.take(5)
collectDF.show()
collectDF.show(5, false)
collectDF.collect()
  • 실행결과

    image


  • toLocalIterator 메서드는 이터레이터(iterator)로 모든 파티션의 데이터를 드라이버에 전달한다.
collectDF.toLocalIterator()





References

댓글남기기