DataPipeline/Spark

스파크완벽가이드 - 6장 다양한 데이터 타입 다루기

wave35 2024. 7. 27. 21:48

 

다양한 데이터 타입

- boolean
- 수치
- 문자열
- data와 timestamp
- null 값
- 복합 데이터
- 사용자 정의 함수

 

 

6.1 API는 어디서 찾을까

- DataFrame은 Row 타입을 가진 Dataset이므로 결국에는 Dataset 메서드를 참조함
- org.apache.spark.sql.functions 데이터 타입과 다양한 함수를 제공

- 분석에 사용할 DataFrame

df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load('/Users/myName/Test/Spark_ref/sparkGuide/data/2010-12-01.csv')
df.printSchema()

 

 

6.2 스파크 데이터 타입으로 변환하기

from pyspark.sql.functions import lit
df.select(lit(5), lit("five"), lit(5.0))

 

 

 

6.3 불리언 데이터 타입 다루기

- 중첩 조건
from pyspark.sql.functions import col, instr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(priceFilter|descripFilter).show()
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      NULL|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      NULL|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+

- 불리언을 컬럼에 사용
df.withColumn("isExpensive", (col("StockCode") == "DOT") & (priceFilter|descripFilter))\
    .where("isExpensive")\
    .select("unitPrice", "isExpensive").show(5)
+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+

 

 

 

6.4 수치형 데이터 타입 다루기

- 데이터 연산
from pyspark.sql.functions import expr, pow
fabQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5   # pow:제곱
df.select("Quantity", "UnitPrice", fabQuantity.alias("realQuantity")).show(2)
+--------+---------+------------------+
|Quantity|UnitPrice|      realQuantity|
+--------+---------+------------------+
|       6|     2.55|239.08999999999997|
|       6|     3.39|          418.7156|
+--------+---------+------------------+

- 수치형 요약 통계
df.describe().show()
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                NULL| 8.627413127413128| 4.151946589446603|15661.388719512195|          NULL|
| stddev|72.89447869788873|17407.897548583845|                NULL|26.371821677029203|15.638659854603892|1854.4496996893627|          NULL|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C536548|              POST|ZINC WILLIE WINKI...|               600|            607.49|           18229.0|United Kingdom|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+

- StatFunctions 패키지를 통해 다양한 통계함수 제공
- 블룸 필터링이나 스케칭 알고리즘과 같은 고급 기법과 관련된 함수 제공
df.stat.crosstab("StockCode", "Quantity").show()

 

 

 

6.5 문자열 데이터 타입 다루기

- lower 함수를 사용해 소문자로, upper 함수를 사용해 대문자로 변경
from pyspark.sql.functions import lower, upper
df.select(col("Description")
    ,lower(col("Description"))
    ,upper(col("Description"))).show(2)
+--------------------+--------------------+--------------------+
|         Description|  lower(Description)|  upper(Description)|
+--------------------+--------------------+--------------------+
|WHITE HANGING HEA...|white hanging hea...|WHITE HANGING HEA...|
| WHITE METAL LANTERN| white metal lantern| WHITE METAL LANTERN|
+--------------------+--------------------+--------------------+


- 문자열 주변의 공백을 제거하거나 추가
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
    ltrim(lit(" Hellow  ")).alias("ltrim"),
    rtrim(lit(" Hellow  ")).alias("rtrim"),
    trim(lit(" Hellow  ")).alias("trim"),
    lpad(lit("Hellow"),3, " ").alias("lpad"),
    rpad(lit("Hellow"),10, " ").alias("rpad")
).show(2)
|   ltrim|  rtrim|  trim|lpad|      rpad|
+--------+-------+------+----+----------+
|Hellow  | Hellow|Hellow| Hel|Hellow    |
|Hellow  | Hellow|Hellow| Hel|Hellow    |
+--------+-------+------+----+----------+


- 정규 표현식을 regex_replace 함수를 이용해 치환
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(
    col("Description"),
    regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean")
).show(2, False)
+----------------------------------+----------------------------------+
|Description                       |color_clean                       |
+----------------------------------+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|COLOR HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |COLOR METAL LANTERN               |
+----------------------------------+----------------------------------+


- 문자열 치환을 translate 함수를 이용
from pyspark.sql.functions import translate
df.select(col("Description"), translate(col("Description"), "LEFT", 1337).show(2)
df.select(
    col("Description"), 
    translate(col("Description"), "LEFT", "1337")
).where(instr(col("Description"), "WHITE")>=1)
.show(2,False)
+----------------------------------+----------------------------------+
|Description                       |translate(Description, LEFT, 1337)|
+----------------------------------+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|WHI73 HANGING H3AR7 7-1IGH7 HO1D3R|
|WHITE METAL LANTERN               |WHI73 M37A1 1AN73RN               |
+----------------------------------+----------------------------------+


- 동적으로 인수의 개수가 변하는 상황 ( locate )
from pyspark.sql.functions import expr, locate
simpleColors = ["black", "white", "red", "green", "blue"]
def color_locator(column, color_string):
    return locate(color_string.upper(), column).cast("boolean").alias("is_"+color_string)

selectedColumns = [color_locator(col("Description"), c) for c in simpleColors]
selectedColumns.append(expr("*"))
df.select(*selectedColumns).where(expr("is_white Or is_red")).show(3,False)
+--------+--------+------+--------+-------+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
|is_black|is_white|is_red|is_green|is_blue|InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+--------+--------+------+--------+-------+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
|false   |true    |false |false   |false  |536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER|6       |2010-12-01 08:26:00|2.55     |17850.0   |United Kingdom|
|false   |true    |false |false   |false  |536365   |71053    |WHITE METAL LANTERN               |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|false   |true    |true  |false   |false  |536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.    |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
+--------+--------+------+--------+-------+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+

 

 

 

6.6 날짜와 타임스탬프 데이터 타입 다루기

- 스파크는 달력 형태의 날짜(Date), 날짜와 시간 모두를 가지는 타임스탬프(timestamp)로 날짜데이터 다룸
- 날짜나 시간을 문자열에 저장하고 런타임에 날짜타입으로 변환하는 경우가 많음 ( 파일로부터 데이터를 다룰 때 )

- 오늘 날짜와 현재 타임스탬프 값을 구하는 예제
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
    .withColumn("today",current_date())\
    .withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()
root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)
 

- 날짜 더하고 빼기
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col("today"),5), date_add(col("today"),5) ).show(1)
## 오늘 2024-05-04 
+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2024-04-29|        2024-05-09|
+------------------+------------------+

- 두 날짜의 차이를 구하는 작업 ( datediff )
from pyspark.sql.functions import datediff, months_between, to_date
dateDF.withColumn("week_ago", date_sub(col("today"), 7)).select(datediff(col("week_ago"),col("today"))).show(1)
+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+


- 두 날짜 사이의 개월 수를 반환하는 ( month_between )
from pyspark.sql.functions import datediff, months_between, to_date
dateDF.select(to_date(lit("2024-01-01")).alias("start"), to_date(lit("2024-05-31")).alias("end"))\
    .select(months_between(col("start"), col("end")))\
    .show(1)
+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                     -4.96774194|
+--------------------------------+
- to_date() 는 문자열을 날짜로 변환할 수 있으며, 포맷 지정도 가능


- 파싱할 수 없는 날짜는 null 값으로 반환 Ex) to_date(2020-20-12) >> null
- 좀 더 견고한 날짜 처리를 위해 date format을 같이 설정
    - select(to_date(lit("2024-05-01"), "yyyy-MM-dd"))
    - select(to_timestamp(col("data"), "yyyy-MM-dd"))

 

 

 

6.7 null 값 다루기

- DataFrame의 하위 패키지인 .na를 사용하여 null 값을 다룸
- null 값을 명시적으로 사용하는 것이 항상 좋음
- null 값을 다루는 두 가지 방법으로 명시적으로 null을 제거하거나, 전역 또는 컬럼 단위로 null 값을 특정 값으로 채워넣는 방법

- ifnull, nullif
SELECT
    ifnull(null, 'return_value'), # 첫번째 값이 null이면 두번째 값을 반환
    nullif('value', 'value') # 두 값이 같으면 null을 반환, 다르면 첫번째 값을 반환
FROM dfTable LIMIT 1

- drop 
- null 값을 가진 로우를 제거하는 함수
- df.na.drop("any") # 컬럼값 중 하나라도 null이면 해당 row 제거
- df.na.drop("all") # 컬럼값 모두 null, NaN 인 경우 해당 row 제거

- replace
- df.na.replace([""], ["Unknown"], "Description")

 

 

6.9 복합데이터 다루기

- 구조체는 쿼리문에서 다수의 컬럼을 괄호로 묶어 만들 수 있음
from pyspark.sql.functions import struct

complexDF = df.select(struct("Description","InvoiceNo").alias("complex") )
complexDF.createOrReplaceTempView("complexDF")
spark.sql("SELECT * FROM complexDF").show(2,False)
+--------------------------------------------+
|complex                                     |
+--------------------------------------------+
|{WHITE HANGING HEART T-LIGHT HOLDER, 536365}|
|{WHITE METAL LANTERN, 536365}               |
+--------------------------------------------+

- 구조체화된 컬럼을 확인
spark.sql("SELECT complex.Description FROM complexDF").show(2,False)
complexDF.select(col("complex").getField("Description")).show(2,False)

- 배열
from pyspark.sql.functions import split
df.select(split(col("Description"), ' ')).show(2,False)
+----------------------------------------+
|split(Description,  , -1)               |
+----------------------------------------+
|[WHITE, HANGING, HEART, T-LIGHT, HOLDER]|
|[WHITE, METAL, LANTERN]                 |
+----------------------------------------+


- split 함수는 복합 데이터를 또 다른 컬럼처럼 다를 수 있게 함
df.select(split(col("Description"), ' ').alias("arr_col"))\
    .selectExpr("arr_col[0]").show(2,False)
+----------+
|arr_col[0]|
+----------+
|WHITE     |
|WHITE     |
+----------+


- array_contains
- 배열에 특정 값이 존재하는지 확인
from pyspark.sql.functions import array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)
>>> 결과 True / False

 

 

 

6.10 JSON 다루기

- Spark는 문자열 형태의 Json을 직접 조작할 수 있으며, Json을 파싱하거나 Json 객체로 만들 수 있음
jsonDF = spark.range(1).selectExpr("""'{"myKey":{"myValue":[1,2,3]}}' as jsonString""")
jsonDF.show(2, False)
+-----------------------------+
|jsonString                   |
+-----------------------------+
|{"myKey":{"myValue":[1,2,3]}}|
+-----------------------------+


- get_json_object 함수로 Json객체를 인라인 쿼리로 조회 가능
jsonDF.select(get_json_object(col("jsonString"),"$.myKey.myValue[1]").alias("column"), json_tuple(col("jsonString"), "myKey") ).show(2)
+------+-------------------+
|column|                 c0|
+------+-------------------+
|     2|{"myValue":[1,2,3]}|
+------+-------------------+


- to_json 함수를 사용해 StructType을 Json 문자열로 변경
from pysaprk.sql.functions import to_json
df.selectExpr("(InvoiceNo, Description) as myStrunct").select(to_json(col("myStruct")))