카테고리 없음

SparkSQL 기초(3)

유정임 2026. 5. 4. 16:41
#lecture15_na 처리

from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_missing_data").getOrCreate()
df = spark.read.csv(
    "file:///home/jovyan/work/sample/null_data.csv", header=True, inferSchema=True)
# df.show()

# DataFrame.na: Returns a DataFrameNaFunctions for handling missing values.
# DataFrame.dropna(how='any', thresh=None, subset=None)[source]: Returns a new DataFrame omitting rows with null values. DataFrame.dropna() and DataFrameNaFunctions.drop() are aliases of each other.
#   how: 'any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
#   thresh: default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
#   subset: optional list of column names to consider.

# df.na.drop(how="any").show()
# df.na.drop(thresh=2).show()
# df.na.drop(subset=["salary"]).show()

df.printSchema()

# # fill string
# df.na.fill("engineer").show()

# # fill integer
# df.na.fill(0).show()

# # fill the subset
# df.na.fill("NA", subset=["occupation"]).show()

# # fill the mean value
# mean_value = df.select(f.mean(df['salary'])).collect()

# # print(mean_value[0][0])

# df.na.fill(mean_value[0][0], subset=["salary"]).show()



# Date parsing
spark = SparkSession.builder.appName("df_manage_date").getOrCreate()
df = spark.read.csv(
    "file:///home/jovyan/work/sample/date_parsing.csv", header=True, inferSchema=True)

# # show year
# df.select(f.year('date')).show()

# # show month
# df.select(f.month('date')).show()

# # show day
# df.select(f.dayofmonth('date').alias('day')).show()
# df.select(f.dayofyear('date').alias('day')).show()

df = df.withColumn("year", f.year('date')).groupBy("year").mean("number").withColumnRenamed("avg(number)", "avg")
# df.show()
df.select("year", f.format_number("avg", 2).alias("avg")).show()
  1. df.na.drop() = df.dropna
    • drop(how="any") : any일 경우 row에서 어떤 값이라도 비어있으면 drop, all인 경우 row에서 모든 값이 비어있으면 drop
    • drop(thresh=2) : 기존 컬럼에서 빈칸이 두개일경우에만 drop, 이건 열 기준임!
    • drop(subset=["salary"]) : 해당 컬럼에 빈칸이 있을 경우 drop
  2. df.na.fill() 
    • fill("fill") : 빈칸 타입이 문자열인 경우, "fill"로 채움
    • fill(0) : 빈칸 타입이 정수인 경우, 0으로 채움
    • fill("NA", subset=["occupation"]) : "occupation" 컬럼에 빈칸이 있는 경우 "NA" 문자열로 채움
  3. df.select(f.mean(df['salary'])).collect() : 'salary' 컬럼의 평균값
    • collect() : 클러스터의 워커 노드에 분산되어 있는 데이터를 드라이버 프로그램의 메모리로 수집해 파이썬의 리스트 형태로 변환
    • mean_value의 결과는 collect()로 인해 DF의 각 행이 PySpark의 Row 객체로 변환되어 파이썬 리스트에 담김
  4.  print(mean_value[0][0])
    • 첫번째 인덱스의 의미 : 리스트에서 행 추출
      • mean_value는 Row 객체들을 담고 있는 리스트
      • 해당 리스트의 첫번째 요소에 접근하기 위해 [0]을 사용
    • 두번째 인덱스의 의미 : Row 객체에서 값 추출
      • Row 객체는 파이썬의 튜플처럼 인덱스를 통해 내부 값에 접근 가능
      • Row 객체 안의 첫번째 열값에 접근하기 위해 다시 [0]을 사용
  5. df.select
    • select(f.year()): 연도 추출
    • select(f.month()): 월 추출
    • select(f.dayofmonth()) : 날짜 추출
    • select(f.dayofyear()) : 현재 날짜가 해당 연도에서 몇 일째인지
#lecture16_join

from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_join").getOrCreate()

# user data
user_data = [
    ["1000", "Neville Hardy", "Apple"],
    ["2000", "Dacia Cohen", "Alphabet"],
    ["3000", "Elois Cox", "Neflix"],
    ["4000", "Junita Meyer", "Meta"],
    ["5000", "Cleora Banks", "Amazon"]]

user_col = ['id', 'name', 'company']
df_user = spark.createDataFrame(data=user_data, schema=user_col)
df_user.show()

# salary data
salary_data = [
    ["1000", "150000", "engineer"],
    ["2000", "240000", "manager"],
    ["3000", "120000", "human resource"],
    ["6000", "100000", "sales"]]

salary_col = ['id', 'salary', 'department']
df_salary = spark.createDataFrame(data=salary_data, schema=salary_col)
df_salary.show()

# # inner join: join the two dataframes on common key columns.
# # dataframe1.join(dataframe2,dataframe1.column_name ==  dataframe2.column_name,”inner”)
# print("== inner join ==")
# df_user.join(df_salary,
#                df_user.id == df_salary.id,
#                "inner").show()

# # inner join, then filter
# df_user.join(df_salary,
#                df_user.id == df_salary.id,
#                "inner").filter(df_user.id == 1000).show()

# # inner join, then where
# df_user.join(df_salary,
#                df_user.id == df_salary.id,
#                "inner").where(df_user.id == 1000).show()

# # multiple join with &
# df_user.join(df_salary,
#                (df_user.id == df_salary.id) & (df_user.id == 1000)
#             ).show()

# # full outer join: join the two dataframes with all matching and non-matching rows
# print("== full outer join ==")
# df_user.join(df_salary, 
#                df_user.id == df_salary.id, 
#                "fullouter").show()

# # left join:  joins by returning all rows from the first dataframe and only matched rows from the second one
# print("== left join ==")
# df_user.join(df_salary, 
#                df_user.id == df_salary.id, 
#                "left").show()

# # right join: joins by returning all rows from the second dataframe and only matched rows from the first one
# print("== right join ==")
# df_user.join(df_salary, 
#                df_user.id == df_salary.id, 
#                "right").show()

# # left semi join: join all rows from the first dataframe and return only matched rows from the second one
# print("== left semi join ==")
# df_user.join(df_salary, 
#                df_user.id == df_salary.id, 
#                "leftsemi").show()

# # left anti join: join returns only columns from the first dataframe for non-matched records of the second dataframe
# print("== left anti join ==")
# df_user.join(df_salary, 
#                df_user.id == df_salary.id, 
#                "leftanti").show()

# # SQL join
# df_user.createOrReplaceTempView("user")
# df_salary.createOrReplaceTempView("salary")

# spark.sql("SELECT * FROM user, salary WHERE user.id == salary.id").show()

# spark.sql("SELECT * FROM user INNER JOIN salary ON user.id == salary.id").show()