IT 정리/아파치 스파크

SparkSQL 기초(2)

유정임 2026. 5. 4. 16:38
# lecture12_사용자별 최대 구하기

from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_total").getOrCreate()
table_schema = t.StructType([
    t.StructField("customer_name", t.StringType(), True),
    t.StructField("product_id", t.IntegerType(), True),
    t.StructField("price", t.IntegerType(), True)])

csv_file_path = "file:///home/jovyan/work/sample/product.csv"
df = spark.read.schema(table_schema).csv(csv_file_path)

customer_spent = df.groupBy("customer_name")\
                    .agg(
                        f.round(
                            f.sum("price"),
                            2
                        ).alias("cost"))
                    
# customer_spent.show()

sorted_customer_spent = customer_spent.orderBy(f.col("cost").desc())

sorted_customer_spent.show()

 

  1. df.groupBy("customer_name").agg(f.round(f.sum("price"),2).alias("cost"))
    • df.groupBy("customer_name"): "customer_name" 컬럼을 기준으로 데이터를 그룹화
    • agg() : 집계의 약자로, groupBy로 묶인 각 그룹에 대해 합계, 평균, 개수 등의 통계적 연산을 적용
    • f.round(..., 2) : 소수점 아래 둘째 자리까지 반올림 
      • price의 데이터 타입이 Integer인데 왜 필요하지???
        price가 정수 타입이지만, 향후 데이터 타입이 변경되거나 확장이 일어날때 소수점 처리를 정확히 하기 위한 안전장치
  2. sorted_customer_spent = customer_spent.orderBy(f.col("cost").desc())
    • orderBy() : sort() 메서드와 같이 기본적으로 오름차순으로 동작
    • desc() : 내림차순으로 정렬
#lecture13_브로드캐스트 조인(상대적으로 작은 데이터만)

from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_most_interviewed").getOrCreate()
table_schema = t.StructType([
    t.StructField("interviwer_id", t.StringType(), False),
    t.StructField("occupation_id", t.StringType(), False),
    t.StructField("rating", t.IntegerType(), False)])

csv_file_path = "file:///home/jovyan/work/sample/like.csv"
df = spark.read.schema(table_schema).csv(csv_file_path)

interviewer_count = df.groupBy("occupation_id").count().orderBy(f.desc("count"))

for d in interviewer_count.select("occupation_id", f.col("count").alias("cnt")).collect():
    print(f"{d.occupation_id}: {d.cnt}")


# But, What if we want to know what occupation_id is?  
# 1100: engineer
# 2030: developer
# 3801: painter
# 3021: chemistry teacher
# 9382: priest

meta = {
    "1100": "engineer",
    "2030": "developer",
    "3801": "painter",
    "3021": "chemistry teacher",
    "9382": "priest"
}
occupation_dict = spark.sparkContext.broadcast(meta)

def get_occupation_name(occupation_id: str) -> str:
    return occupation_dict.value[occupation_id]

occupation_lookup_udf = f.udf(get_occupation_name)

occupation_with_name = interviewer_count.withColumn("occupation_name", occupation_lookup_udf(f.col("occupation_id")))

occupation_with_name.show(10)
  1. df.groupBy("occupation_id").count().orderBy(f.desc("count"))
    • 그룹화된 데이터의 개수를 계산하여 자동으로 "count"라는 이름의 새로운 컬럼 생성
  2. for d in interviewer_count.select("occupation_id", f.col("count").alias("cnt")).collect():
    • "occupation_id"와 f.col("count")의 차이 
      • f.col("occupation_id")라고 작성해도 동일하게 동작
      • f.col() 은 columns 객체로 지정해서 pyspark가 제어할 수 있는 columns 객체로 변환하는 방식으로 컬럼에 어떤 연산이나 변경을 가하고 싶을때 사용
      • f.col("count").alias("cnt") : alias()는 문자열 데이터 타입에는 존재하지 않는 pyspark column 객체만의 고유 기능으로 "count"컬럼을 f.col()로 감싸서 column 객체로 만든 후 alias 적용
  3. spark.sparkContext.broadcast(meta) : broadcast는 대규보 분산 처리를 수행할 때 성능을 최적화
    • 딕셔너리 meta를 스파크의 브로드캐스브 변수로 변환하는 역할
    • 드라이버 노드에 있는 meta 데이터를 각 Executor당 딱 한 번만 전송해 메모리에 캐싱
    • 이후 해당 Executor에 돌아가는 모든 Task들은 메모리에 이미 올라와 있는 이 변수를 공유해서 읽기 전용 참조
  4. get_occupation_name(occupation_id: str) -> str 
    • worker node에 복사된 occupation_dict는 occupation_dict.value를 통해 원본 딕셔너리에 접근 할 수 있음
    • get_occupation_name 함수 실행 시, 각 행의 occupation_id 값을 key로 사용해 value를가져옴 
    • occupation_id : str 는 함수가 입력받은 매개변수(occupatioon_id)의 데이터 타입이 문자열임을 명시
    • -> str : 함수가 최종적으로 반환하는 값의 데이터 타입은 문자열
  5. f.udf(get_occupation_name)
    • get_occupation_name을 PySpark가 이해하고 분산 처리할 수 있는 UDF 형태로 변환하는 과정
      • 스파크 DF는 JVM 위에서 동작하는 분산 데이터 구조이고 get_occupation_name은 순수 파이썬 함수
      • f.udf()는 이 파이썬 함수를 감싸서, 스파크가 데이터를 한 행씩 처리할 때 파이썬 프로세스를 호출해 함수를 적용할 수 있도록 만들어 줌
#lecture14_csv로 추출하기

from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

# Attribution 3.0 Unported (CC BY 3.0)
# https://www.kaggle.com/datasets/csanhueza/the-marvel-universe-social-network

spark = SparkSession.builder.appName("df_most_popular").getOrCreate()
# csv_file_path = "file:///home/jovyan/work/sample/hero-network.csv"
# # read file
# df = spark.read\
#             .option("header", "true")\
#             .option("inferSchema", "true").csv(csv_file_path)

# # pyspark.sql.functions.collect_set(col): Aggregate function: returns a set of objects with duplicate elements eliminated.
# data = df.groupBy("hero1")\
#             .agg(
#                 f.collect_set("hero2").alias("connection"))\
#             .withColumnRenamed("hero1", "hero")
# # data.show()
# # pyspark.sql.functions.concat_ws(sep, *cols): Concatenates multiple input string columns together into a single string column, using the given separator.
# data = data.withColumn("connection", f.concat_ws(",", f.col("connection")))
# data.show()

# # DataFrame.coalesce(numPartitions): Returns a new DataFrame that has exactly numPartitions partitions.
# data.coalesce(1).write.option("header", True).csv("output")

# # load the file
csv_file_path = "file:///home/jovyan/work/output"
df = spark.read\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .csv(csv_file_path)
# df.show()

# pyspark.sql.functions.size(col): Collection function: returns the length of the array or map stored in the column.
df = df.withColumn(
        "connection_size",
        f.size(
            f.split(f.col("connection"), ",")))\
        .orderBy(f.desc("connection_size"))
df.show()

most_popular_hero = df.select("hero").first()
print(most_popular_hero.hero)
  1. df.groupBy("hero1").agg(f.collect_set("hero2").alias("connection")).withColumnRenamed("hero1", "hero")
    • collect_set() : 데이터를 하나로 묶어주되, 중복을 알아서 제거해줌
      • collect_list()는 중복을 허용하고 만난 순서대로 모두 담고 collect_set()는 중복을 완전히 제거하고 고유한 값만 담음
    • .withColumnRenamed() : 컬렴의 이름을 바꾸는 함수
  2. data.withColumn("connection", f.concat_ws(",", f.col("connection"))) : 하나의 깔끔한 문자열로 변환
    • 이전 단계까지 connection 컬럼안에 배열 형태로 들어가 있는 데이터를 ','를 통해 하나의 평범한 텍스트로 변환
    • concat_ws: 구분자를 사용해 합치다
    • withColumn() : 기존 컬럼의 값을 변경하거나 새로운 컬럼 추가할때 사용
      • 기존 배열 형태의 "connection"컬럼 내용을 가공한 문자열 데이터로 덮어 쓰기
  3. data.coalesce(1).write.option("header", True).csv("output")
    • coalesce() : 여러개로 쪼개진 데이터 조각들 다시 합치는 역할
    • write: 스파크에게 데이터를 저장하겠다고 명령
    • csv("output") : output이라는 파일로 저장

 

'IT 정리 > 아파치 스파크' 카테고리의 다른 글

SparkSQL 기초(1)  (0) 2026.05.03
DF과 SparkSQL 소개  (0) 2026.05.01
Spark 기초(2)  (0) 2026.05.01
Spark 기초(1)  (0) 2026.05.01
Spark 환경 설정  (0) 2026.04.30