스파크완벽가이드 - 6장 다양한 데이터 타입 다루기
다양한 데이터 타입
- boolean
- 수치
- 문자열
- data와 timestamp
- null 값
- 복합 데이터
- 사용자 정의 함수
6.1 API는 어디서 찾을까
- DataFrame은 Row 타입을 가진 Dataset이므로 결국에는 Dataset 메서드를 참조함
- org.apache.spark.sql.functions 데이터 타입과 다양한 함수를 제공
- 분석에 사용할 DataFrame
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load('/Users/myName/Test/Spark_ref/sparkGuide/data/2010-12-01.csv')
df.printSchema()
6.2 스파크 데이터 타입으로 변환하기
from pyspark.sql.functions import lit
df.select(lit(5), lit("five"), lit(5.0))
6.3 불리언 데이터 타입 다루기
- 중첩 조건
from pyspark.sql.functions import col, instr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(priceFilter|descripFilter).show()
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
| 536544| DOT|DOTCOM POSTAGE| 1|2010-12-01 14:32:00| 569.77| NULL|United Kingdom|
| 536592| DOT|DOTCOM POSTAGE| 1|2010-12-01 17:06:00| 607.49| NULL|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
- 불리언을 컬럼에 사용
df.withColumn("isExpensive", (col("StockCode") == "DOT") & (priceFilter|descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5)
+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
| 569.77| true|
| 607.49| true|
+---------+-----------+
6.4 수치형 데이터 타입 다루기
- 데이터 연산
from pyspark.sql.functions import expr, pow
fabQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5 # pow:제곱
df.select("Quantity", "UnitPrice", fabQuantity.alias("realQuantity")).show(2)
+--------+---------+------------------+
|Quantity|UnitPrice| realQuantity|
+--------+---------+------------------+
| 6| 2.55|239.08999999999997|
| 6| 3.39| 418.7156|
+--------+---------+------------------+
- 수치형 요약 통계
df.describe().show()
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary| InvoiceNo| StockCode| Description| Quantity| UnitPrice| CustomerID| Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
| count| 3108| 3108| 3098| 3108| 3108| 1968| 3108|
| mean| 536516.684944841|27834.304044117645| NULL| 8.627413127413128| 4.151946589446603|15661.388719512195| NULL|
| stddev|72.89447869788873|17407.897548583845| NULL|26.371821677029203|15.638659854603892|1854.4496996893627| NULL|
| min| 536365| 10002| 4 PURPLE FLOCK D...| -24| 0.0| 12431.0| Australia|
| max| C536548| POST|ZINC WILLIE WINKI...| 600| 607.49| 18229.0|United Kingdom|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
- StatFunctions 패키지를 통해 다양한 통계함수 제공
- 블룸 필터링이나 스케칭 알고리즘과 같은 고급 기법과 관련된 함수 제공
df.stat.crosstab("StockCode", "Quantity").show()
6.5 문자열 데이터 타입 다루기
- lower 함수를 사용해 소문자로, upper 함수를 사용해 대문자로 변경
from pyspark.sql.functions import lower, upper
df.select(col("Description")
,lower(col("Description"))
,upper(col("Description"))).show(2)
+--------------------+--------------------+--------------------+
| Description| lower(Description)| upper(Description)|
+--------------------+--------------------+--------------------+
|WHITE HANGING HEA...|white hanging hea...|WHITE HANGING HEA...|
| WHITE METAL LANTERN| white metal lantern| WHITE METAL LANTERN|
+--------------------+--------------------+--------------------+
- 문자열 주변의 공백을 제거하거나 추가
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
ltrim(lit(" Hellow ")).alias("ltrim"),
rtrim(lit(" Hellow ")).alias("rtrim"),
trim(lit(" Hellow ")).alias("trim"),
lpad(lit("Hellow"),3, " ").alias("lpad"),
rpad(lit("Hellow"),10, " ").alias("rpad")
).show(2)
| ltrim| rtrim| trim|lpad| rpad|
+--------+-------+------+----+----------+
|Hellow | Hellow|Hellow| Hel|Hellow |
|Hellow | Hellow|Hellow| Hel|Hellow |
+--------+-------+------+----+----------+
- 정규 표현식을 regex_replace 함수를 이용해 치환
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(
col("Description"),
regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean")
).show(2, False)
+----------------------------------+----------------------------------+
|Description |color_clean |
+----------------------------------+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|COLOR HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN |COLOR METAL LANTERN |
+----------------------------------+----------------------------------+
- 문자열 치환을 translate 함수를 이용
from pyspark.sql.functions import translate
df.select(col("Description"), translate(col("Description"), "LEFT", 1337).show(2)
df.select(
col("Description"),
translate(col("Description"), "LEFT", "1337")
).where(instr(col("Description"), "WHITE")>=1)
.show(2,False)
+----------------------------------+----------------------------------+
|Description |translate(Description, LEFT, 1337)|
+----------------------------------+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|WHI73 HANGING H3AR7 7-1IGH7 HO1D3R|
|WHITE METAL LANTERN |WHI73 M37A1 1AN73RN |
+----------------------------------+----------------------------------+
- 동적으로 인수의 개수가 변하는 상황 ( locate )
from pyspark.sql.functions import expr, locate
simpleColors = ["black", "white", "red", "green", "blue"]
def color_locator(column, color_string):
return locate(color_string.upper(), column).cast("boolean").alias("is_"+color_string)
selectedColumns = [color_locator(col("Description"), c) for c in simpleColors]
selectedColumns.append(expr("*"))
df.select(*selectedColumns).where(expr("is_white Or is_red")).show(3,False)
+--------+--------+------+--------+-------+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
|is_black|is_white|is_red|is_green|is_blue|InvoiceNo|StockCode|Description |Quantity|InvoiceDate |UnitPrice|CustomerID|Country |
+--------+--------+------+--------+-------+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
|false |true |false |false |false |536365 |85123A |WHITE HANGING HEART T-LIGHT HOLDER|6 |2010-12-01 08:26:00|2.55 |17850.0 |United Kingdom|
|false |true |false |false |false |536365 |71053 |WHITE METAL LANTERN |6 |2010-12-01 08:26:00|3.39 |17850.0 |United Kingdom|
|false |true |true |false |false |536365 |84029E |RED WOOLLY HOTTIE WHITE HEART. |6 |2010-12-01 08:26:00|3.39 |17850.0 |United Kingdom|
+--------+--------+------+--------+-------+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
6.6 날짜와 타임스탬프 데이터 타입 다루기
- 스파크는 달력 형태의 날짜(Date), 날짜와 시간 모두를 가지는 타임스탬프(timestamp)로 날짜데이터 다룸
- 날짜나 시간을 문자열에 저장하고 런타임에 날짜타입으로 변환하는 경우가 많음 ( 파일로부터 데이터를 다룰 때 )
- 오늘 날짜와 현재 타임스탬프 값을 구하는 예제
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
.withColumn("today",current_date())\
.withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()
root
|-- id: long (nullable = false)
|-- today: date (nullable = false)
|-- now: timestamp (nullable = false)
- 날짜 더하고 빼기
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col("today"),5), date_add(col("today"),5) ).show(1)
## 오늘 2024-05-04
+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
| 2024-04-29| 2024-05-09|
+------------------+------------------+
- 두 날짜의 차이를 구하는 작업 ( datediff )
from pyspark.sql.functions import datediff, months_between, to_date
dateDF.withColumn("week_ago", date_sub(col("today"), 7)).select(datediff(col("week_ago"),col("today"))).show(1)
+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
| -7|
+-------------------------+
- 두 날짜 사이의 개월 수를 반환하는 ( month_between )
from pyspark.sql.functions import datediff, months_between, to_date
dateDF.select(to_date(lit("2024-01-01")).alias("start"), to_date(lit("2024-05-31")).alias("end"))\
.select(months_between(col("start"), col("end")))\
.show(1)
+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
| -4.96774194|
+--------------------------------+
- to_date() 는 문자열을 날짜로 변환할 수 있으며, 포맷 지정도 가능
- 파싱할 수 없는 날짜는 null 값으로 반환 Ex) to_date(2020-20-12) >> null
- 좀 더 견고한 날짜 처리를 위해 date format을 같이 설정
- select(to_date(lit("2024-05-01"), "yyyy-MM-dd"))
- select(to_timestamp(col("data"), "yyyy-MM-dd"))
6.7 null 값 다루기
- DataFrame의 하위 패키지인 .na를 사용하여 null 값을 다룸
- null 값을 명시적으로 사용하는 것이 항상 좋음
- null 값을 다루는 두 가지 방법으로 명시적으로 null을 제거하거나, 전역 또는 컬럼 단위로 null 값을 특정 값으로 채워넣는 방법
- ifnull, nullif
SELECT
ifnull(null, 'return_value'), # 첫번째 값이 null이면 두번째 값을 반환
nullif('value', 'value') # 두 값이 같으면 null을 반환, 다르면 첫번째 값을 반환
FROM dfTable LIMIT 1
- drop
- null 값을 가진 로우를 제거하는 함수
- df.na.drop("any") # 컬럼값 중 하나라도 null이면 해당 row 제거
- df.na.drop("all") # 컬럼값 모두 null, NaN 인 경우 해당 row 제거
- replace
- df.na.replace([""], ["Unknown"], "Description")
6.9 복합데이터 다루기
- 구조체는 쿼리문에서 다수의 컬럼을 괄호로 묶어 만들 수 있음
from pyspark.sql.functions import struct
complexDF = df.select(struct("Description","InvoiceNo").alias("complex") )
complexDF.createOrReplaceTempView("complexDF")
spark.sql("SELECT * FROM complexDF").show(2,False)
+--------------------------------------------+
|complex |
+--------------------------------------------+
|{WHITE HANGING HEART T-LIGHT HOLDER, 536365}|
|{WHITE METAL LANTERN, 536365} |
+--------------------------------------------+
- 구조체화된 컬럼을 확인
spark.sql("SELECT complex.Description FROM complexDF").show(2,False)
complexDF.select(col("complex").getField("Description")).show(2,False)
- 배열
from pyspark.sql.functions import split
df.select(split(col("Description"), ' ')).show(2,False)
+----------------------------------------+
|split(Description, , -1) |
+----------------------------------------+
|[WHITE, HANGING, HEART, T-LIGHT, HOLDER]|
|[WHITE, METAL, LANTERN] |
+----------------------------------------+
- split 함수는 복합 데이터를 또 다른 컬럼처럼 다를 수 있게 함
df.select(split(col("Description"), ' ').alias("arr_col"))\
.selectExpr("arr_col[0]").show(2,False)
+----------+
|arr_col[0]|
+----------+
|WHITE |
|WHITE |
+----------+
- array_contains
- 배열에 특정 값이 존재하는지 확인
from pyspark.sql.functions import array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)
>>> 결과 True / False
6.10 JSON 다루기
- Spark는 문자열 형태의 Json을 직접 조작할 수 있으며, Json을 파싱하거나 Json 객체로 만들 수 있음
jsonDF = spark.range(1).selectExpr("""'{"myKey":{"myValue":[1,2,3]}}' as jsonString""")
jsonDF.show(2, False)
+-----------------------------+
|jsonString |
+-----------------------------+
|{"myKey":{"myValue":[1,2,3]}}|
+-----------------------------+
- get_json_object 함수로 Json객체를 인라인 쿼리로 조회 가능
jsonDF.select(get_json_object(col("jsonString"),"$.myKey.myValue[1]").alias("column"), json_tuple(col("jsonString"), "myKey") ).show(2)
+------+-------------------+
|column| c0|
+------+-------------------+
| 2|{"myValue":[1,2,3]}|
+------+-------------------+
- to_json 함수를 사용해 StructType을 Json 문자열로 변경
from pysaprk.sql.functions import to_json
df.selectExpr("(InvoiceNo, Description) as myStrunct").select(to_json(col("myStruct")))