#lecture15_na 처리
from pyspark.sql import (
functions as f,
SparkSession,
types as t
)
spark = SparkSession.builder.appName("df_missing_data").getOrCreate()
df = spark.read.csv(
"file:///home/jovyan/work/sample/null_data.csv", header=True, inferSchema=True)
# df.show()
# DataFrame.na: Returns a DataFrameNaFunctions for handling missing values.
# DataFrame.dropna(how='any', thresh=None, subset=None)[source]: Returns a new DataFrame omitting rows with null values. DataFrame.dropna() and DataFrameNaFunctions.drop() are aliases of each other.
# how: 'any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
# thresh: default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
# subset: optional list of column names to consider.
# df.na.drop(how="any").show()
# df.na.drop(thresh=2).show()
# df.na.drop(subset=["salary"]).show()
df.printSchema()
# # fill string
# df.na.fill("engineer").show()
# # fill integer
# df.na.fill(0).show()
# # fill the subset
# df.na.fill("NA", subset=["occupation"]).show()
# # fill the mean value
# mean_value = df.select(f.mean(df['salary'])).collect()
# # print(mean_value[0][0])
# df.na.fill(mean_value[0][0], subset=["salary"]).show()
# Date parsing
spark = SparkSession.builder.appName("df_manage_date").getOrCreate()
df = spark.read.csv(
"file:///home/jovyan/work/sample/date_parsing.csv", header=True, inferSchema=True)
# # show year
# df.select(f.year('date')).show()
# # show month
# df.select(f.month('date')).show()
# # show day
# df.select(f.dayofmonth('date').alias('day')).show()
# df.select(f.dayofyear('date').alias('day')).show()
df = df.withColumn("year", f.year('date')).groupBy("year").mean("number").withColumnRenamed("avg(number)", "avg")
# df.show()
df.select("year", f.format_number("avg", 2).alias("avg")).show()
- df.na.drop() = df.dropna
- drop(how="any") : any일 경우 row에서 어떤 값이라도 비어있으면 drop, all인 경우 row에서 모든 값이 비어있으면 drop
- drop(thresh=2) : 기존 컬럼에서 빈칸이 두개일경우에만 drop, 이건 열 기준임!
- drop(subset=["salary"]) : 해당 컬럼에 빈칸이 있을 경우 drop
- df.na.fill()
- fill("fill") : 빈칸 타입이 문자열인 경우, "fill"로 채움
- fill(0) : 빈칸 타입이 정수인 경우, 0으로 채움
- fill("NA", subset=["occupation"]) : "occupation" 컬럼에 빈칸이 있는 경우 "NA" 문자열로 채움
- df.select(f.mean(df['salary'])).collect() : 'salary' 컬럼의 평균값
- collect() : 클러스터의 워커 노드에 분산되어 있는 데이터를 드라이버 프로그램의 메모리로 수집해 파이썬의 리스트 형태로 변환
- mean_value의 결과는 collect()로 인해 DF의 각 행이 PySpark의 Row 객체로 변환되어 파이썬 리스트에 담김
- print(mean_value[0][0])
- 첫번째 인덱스의 의미 : 리스트에서 행 추출
- mean_value는 Row 객체들을 담고 있는 리스트
- 해당 리스트의 첫번째 요소에 접근하기 위해 [0]을 사용
- 두번째 인덱스의 의미 : Row 객체에서 값 추출
- Row 객체는 파이썬의 튜플처럼 인덱스를 통해 내부 값에 접근 가능
- Row 객체 안의 첫번째 열값에 접근하기 위해 다시 [0]을 사용
- df.select
- select(f.year()): 연도 추출
- select(f.month()): 월 추출
- select(f.dayofmonth()) : 날짜 추출
- select(f.dayofyear()) : 현재 날짜가 해당 연도에서 몇 일째인지
#lecture16_join
from pyspark.sql import (
functions as f,
SparkSession,
types as t
)
spark = SparkSession.builder.appName("df_join").getOrCreate()
# user data
user_data = [
["1000", "Neville Hardy", "Apple"],
["2000", "Dacia Cohen", "Alphabet"],
["3000", "Elois Cox", "Neflix"],
["4000", "Junita Meyer", "Meta"],
["5000", "Cleora Banks", "Amazon"]]
user_col = ['id', 'name', 'company']
df_user = spark.createDataFrame(data=user_data, schema=user_col)
df_user.show()
# salary data
salary_data = [
["1000", "150000", "engineer"],
["2000", "240000", "manager"],
["3000", "120000", "human resource"],
["6000", "100000", "sales"]]
salary_col = ['id', 'salary', 'department']
df_salary = spark.createDataFrame(data=salary_data, schema=salary_col)
df_salary.show()
# # inner join: join the two dataframes on common key columns.
# # dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”inner”)
# print("== inner join ==")
# df_user.join(df_salary,
# df_user.id == df_salary.id,
# "inner").show()
# # inner join, then filter
# df_user.join(df_salary,
# df_user.id == df_salary.id,
# "inner").filter(df_user.id == 1000).show()
# # inner join, then where
# df_user.join(df_salary,
# df_user.id == df_salary.id,
# "inner").where(df_user.id == 1000).show()
# # multiple join with &
# df_user.join(df_salary,
# (df_user.id == df_salary.id) & (df_user.id == 1000)
# ).show()
# # full outer join: join the two dataframes with all matching and non-matching rows
# print("== full outer join ==")
# df_user.join(df_salary,
# df_user.id == df_salary.id,
# "fullouter").show()
# # left join: joins by returning all rows from the first dataframe and only matched rows from the second one
# print("== left join ==")
# df_user.join(df_salary,
# df_user.id == df_salary.id,
# "left").show()
# # right join: joins by returning all rows from the second dataframe and only matched rows from the first one
# print("== right join ==")
# df_user.join(df_salary,
# df_user.id == df_salary.id,
# "right").show()
# # left semi join: join all rows from the first dataframe and return only matched rows from the second one
# print("== left semi join ==")
# df_user.join(df_salary,
# df_user.id == df_salary.id,
# "leftsemi").show()
# # left anti join: join returns only columns from the first dataframe for non-matched records of the second dataframe
# print("== left anti join ==")
# df_user.join(df_salary,
# df_user.id == df_salary.id,
# "leftanti").show()
# # SQL join
# df_user.createOrReplaceTempView("user")
# df_salary.createOrReplaceTempView("salary")
# spark.sql("SELECT * FROM user, salary WHERE user.id == salary.id").show()
# spark.sql("SELECT * FROM user INNER JOIN salary ON user.id == salary.id").show()