# lecture12_사용자별 최대 구하기
from pyspark.sql import (
functions as f,
SparkSession,
types as t
)
spark = SparkSession.builder.appName("df_total").getOrCreate()
table_schema = t.StructType([
t.StructField("customer_name", t.StringType(), True),
t.StructField("product_id", t.IntegerType(), True),
t.StructField("price", t.IntegerType(), True)])
csv_file_path = "file:///home/jovyan/work/sample/product.csv"
df = spark.read.schema(table_schema).csv(csv_file_path)
customer_spent = df.groupBy("customer_name")\
.agg(
f.round(
f.sum("price"),
2
).alias("cost"))
# customer_spent.show()
sorted_customer_spent = customer_spent.orderBy(f.col("cost").desc())
sorted_customer_spent.show()
- df.groupBy("customer_name").agg(f.round(f.sum("price"),2).alias("cost"))
- df.groupBy("customer_name"): "customer_name" 컬럼을 기준으로 데이터를 그룹화
- agg() : 집계의 약자로, groupBy로 묶인 각 그룹에 대해 합계, 평균, 개수 등의 통계적 연산을 적용
- f.round(..., 2) : 소수점 아래 둘째 자리까지 반올림
- price의 데이터 타입이 Integer인데 왜 필요하지???
price가 정수 타입이지만, 향후 데이터 타입이 변경되거나 확장이 일어날때 소수점 처리를 정확히 하기 위한 안전장치
- sorted_customer_spent = customer_spent.orderBy(f.col("cost").desc())
- orderBy() : sort() 메서드와 같이 기본적으로 오름차순으로 동작
- desc() : 내림차순으로 정렬
#lecture13_브로드캐스트 조인(상대적으로 작은 데이터만)
from pyspark.sql import (
functions as f,
SparkSession,
types as t
)
spark = SparkSession.builder.appName("df_most_interviewed").getOrCreate()
table_schema = t.StructType([
t.StructField("interviwer_id", t.StringType(), False),
t.StructField("occupation_id", t.StringType(), False),
t.StructField("rating", t.IntegerType(), False)])
csv_file_path = "file:///home/jovyan/work/sample/like.csv"
df = spark.read.schema(table_schema).csv(csv_file_path)
interviewer_count = df.groupBy("occupation_id").count().orderBy(f.desc("count"))
for d in interviewer_count.select("occupation_id", f.col("count").alias("cnt")).collect():
print(f"{d.occupation_id}: {d.cnt}")
# But, What if we want to know what occupation_id is?
# 1100: engineer
# 2030: developer
# 3801: painter
# 3021: chemistry teacher
# 9382: priest
meta = {
"1100": "engineer",
"2030": "developer",
"3801": "painter",
"3021": "chemistry teacher",
"9382": "priest"
}
occupation_dict = spark.sparkContext.broadcast(meta)
def get_occupation_name(occupation_id: str) -> str:
return occupation_dict.value[occupation_id]
occupation_lookup_udf = f.udf(get_occupation_name)
occupation_with_name = interviewer_count.withColumn("occupation_name", occupation_lookup_udf(f.col("occupation_id")))
occupation_with_name.show(10)
- df.groupBy("occupation_id").count().orderBy(f.desc("count"))
- 그룹화된 데이터의 개수를 계산하여 자동으로 "count"라는 이름의 새로운 컬럼 생성
- for d in interviewer_count.select("occupation_id", f.col("count").alias("cnt")).collect():
- "occupation_id"와 f.col("count")의 차이
- f.col("occupation_id")라고 작성해도 동일하게 동작
- f.col() 은 columns 객체로 지정해서 pyspark가 제어할 수 있는 columns 객체로 변환하는 방식으로 컬럼에 어떤 연산이나 변경을 가하고 싶을때 사용
- f.col("count").alias("cnt") : alias()는 문자열 데이터 타입에는 존재하지 않는 pyspark column 객체만의 고유 기능으로 "count"컬럼을 f.col()로 감싸서 column 객체로 만든 후 alias 적용
- spark.sparkContext.broadcast(meta) : broadcast는 대규보 분산 처리를 수행할 때 성능을 최적화
- 딕셔너리 meta를 스파크의 브로드캐스브 변수로 변환하는 역할
- 드라이버 노드에 있는 meta 데이터를 각 Executor당 딱 한 번만 전송해 메모리에 캐싱
- 이후 해당 Executor에 돌아가는 모든 Task들은 메모리에 이미 올라와 있는 이 변수를 공유해서 읽기 전용 참조
- get_occupation_name(occupation_id: str) -> str
- worker node에 복사된 occupation_dict는 occupation_dict.value를 통해 원본 딕셔너리에 접근 할 수 있음
- get_occupation_name 함수 실행 시, 각 행의 occupation_id 값을 key로 사용해 value를가져옴
- occupation_id : str 는 함수가 입력받은 매개변수(occupatioon_id)의 데이터 타입이 문자열임을 명시
- -> str : 함수가 최종적으로 반환하는 값의 데이터 타입은 문자열
- f.udf(get_occupation_name)
- get_occupation_name을 PySpark가 이해하고 분산 처리할 수 있는 UDF 형태로 변환하는 과정
- 스파크 DF는 JVM 위에서 동작하는 분산 데이터 구조이고 get_occupation_name은 순수 파이썬 함수
- f.udf()는 이 파이썬 함수를 감싸서, 스파크가 데이터를 한 행씩 처리할 때 파이썬 프로세스를 호출해 함수를 적용할 수 있도록 만들어 줌
#lecture14_csv로 추출하기
from pyspark.sql import (
functions as f,
SparkSession,
types as t
)
# Attribution 3.0 Unported (CC BY 3.0)
# https://www.kaggle.com/datasets/csanhueza/the-marvel-universe-social-network
spark = SparkSession.builder.appName("df_most_popular").getOrCreate()
# csv_file_path = "file:///home/jovyan/work/sample/hero-network.csv"
# # read file
# df = spark.read\
# .option("header", "true")\
# .option("inferSchema", "true").csv(csv_file_path)
# # pyspark.sql.functions.collect_set(col): Aggregate function: returns a set of objects with duplicate elements eliminated.
# data = df.groupBy("hero1")\
# .agg(
# f.collect_set("hero2").alias("connection"))\
# .withColumnRenamed("hero1", "hero")
# # data.show()
# # pyspark.sql.functions.concat_ws(sep, *cols): Concatenates multiple input string columns together into a single string column, using the given separator.
# data = data.withColumn("connection", f.concat_ws(",", f.col("connection")))
# data.show()
# # DataFrame.coalesce(numPartitions): Returns a new DataFrame that has exactly numPartitions partitions.
# data.coalesce(1).write.option("header", True).csv("output")
# # load the file
csv_file_path = "file:///home/jovyan/work/output"
df = spark.read\
.option("header", "true")\
.option("inferSchema", "true")\
.csv(csv_file_path)
# df.show()
# pyspark.sql.functions.size(col): Collection function: returns the length of the array or map stored in the column.
df = df.withColumn(
"connection_size",
f.size(
f.split(f.col("connection"), ",")))\
.orderBy(f.desc("connection_size"))
df.show()
most_popular_hero = df.select("hero").first()
print(most_popular_hero.hero)
- df.groupBy("hero1").agg(f.collect_set("hero2").alias("connection")).withColumnRenamed("hero1", "hero")
- collect_set() : 데이터를 하나로 묶어주되, 중복을 알아서 제거해줌
- collect_list()는 중복을 허용하고 만난 순서대로 모두 담고 collect_set()는 중복을 완전히 제거하고 고유한 값만 담음
- .withColumnRenamed() : 컬렴의 이름을 바꾸는 함수
- data.withColumn("connection", f.concat_ws(",", f.col("connection"))) : 하나의 깔끔한 문자열로 변환
- 이전 단계까지 connection 컬럼안에 배열 형태로 들어가 있는 데이터를 ','를 통해 하나의 평범한 텍스트로 변환
- concat_ws: 구분자를 사용해 합치다
- withColumn() : 기존 컬럼의 값을 변경하거나 새로운 컬럼 추가할때 사용
- 기존 배열 형태의 "connection"컬럼 내용을 가공한 문자열 데이터로 덮어 쓰기
- data.coalesce(1).write.option("header", True).csv("output")
- coalesce() : 여러개로 쪼개진 데이터 조각들 다시 합치는 역할
- write: 스파크에게 데이터를 저장하겠다고 명령
- csv("output") : output이라는 파일로 저장