IT 정리/아파치 스파크

Spark 기초(2)

유정임 2026. 5. 1. 14:08
#lecture5_filter

# filter
# Return a new RDD containing only the elements that satisfy a predicate.

import pyspark

sc = pyspark.SparkContext.getOrCreate()
test_file = "file:///home/jovyan/work/sample/temperature.csv"

def get_data(line, header):
    if line != header:
        col = line.split(',')
        city = col[6].strip("\"")
        avg_temp_fahr = col[4]
        yield (city, avg_temp_fahr)

lines = sc.textFile(test_file)

# get header string
header = lines.first()

parsed_line = lines.flatMap(lambda line: get_data(line, header))

# filter NA values
filtered_line = parsed_line.filter(lambda x: "NA" not in x[1])

# finding min temperature
min_temp = filtered_line.reduceByKey(lambda x, y: min(float(x), float(y)))
final_list = min_temp.collect()
for city, temperature in final_list:
    print(f"{city}: {temperature}")

print("------------------")
# finding max temperature
min_temp = filtered_line.reduceByKey(lambda x, y: max(float(x), float(y)))
final_list = min_temp.collect()
for city, temperature in final_list:
    print(f"{city}: {temperature}")
  1. 첫번째 행은 행이름이 적혀 있기 때문에 추출해야 함
    • header = lines.first()
      parsed_line = lines.flatMap(lambda line: get_data(line, header))
      def get_data(line, header):
      if line != header:
              col = line.split(',')
              city = col[6].strip("\"")
              avg_temp_fahr = col[4]
              yield (city, avg_temp_fahr)
      • header가 line과 다르다면 get_data 함수를 통해 튜플을 만듦
      • lines 의 결과는
        ["record_id,month,day,year,AverageTemperatureFahr,Uncertainty,City,country_id,Country", "474376,01,01,1853,NA,NA,Auckland,NEW,New Zealand", "474381,06,01,1853,51.9062,36.9572,Auckland,NEW,New Zealand", ...]
      • col은 각 한 행을 split 하고
      • city는 각행의 6번째 값을 strip하는데 이때 양 옆 큰따옴표 제거하기 위해 strip("\"")
      • yield (city, avg_temp_fahr) : 데이터를 하나씩 생성해서 보냄
        • return은 결과값을 통째로 넘겨준 뒤 함수를 종료시킨다면 yield는 하나씩 결과를 내보내면서 함수의 상태로 잠깐 멈춘채 대기하고 호출이 오면 다시 실행
      • flatMap은 함수가 반환하는 값들을 하나의 리스트로 합쳐주는 역할임
  2. filter NA 
    • filtered_line = parsed_line.filter(lambda x: "NA" not in x[1])
      • 2번째 열(인덱스 1번)이 NA가 아닌 행만 필터링
  3. 최대 최소찾기
    • min_temp = filtered_line.reduceByKey(lambda x, y: min(float(x), float(y)))
      final_list = min_temp.collect()
      for city, temperature in final_list:
                print(f"{city}: {temperature}")
      • x는 이미 계산된 최소 값
      • y는 새로운 값으로
      • 두개를 비교해 min/max를 계속 남겨놓는 방식
#lecture6_map_flat

# map vs. flatMap

# map transformation applies a function to each row in a DataFrame/Dataset and returns the new transformed Dataset.
# 1 => 1
# flatMap transformation flattens the DataFrame/Dataset after applying the function on every element and returns a new transformed Dataset. 
# The returned Dataset will return more rows than the current DataFrame. It is also referred to as a one-to-many transformation function
# 1 => Many
# One of the use cases of flatMap() is to flatten column which contains arrays, list, or any nested collection

import pyspark

sc = pyspark.SparkContext.getOrCreate()
rdd = sc.parallelize([("name", "joe,sarah,tom"), ("car", "hyundai")])
result = rdd.map(lambda x: x[1].split(","))
# print(result.collect())
# [['joe', 'sarah', 'tom'], ['hyundai']]

rdd = sc.parallelize([("name", "joe,sarah,tom"), ("car", "hyundai")])
result = rdd.flatMap(lambda x: x[1].split(","))
# print(result.collect())
# ['joe', 'sarah', 'tom', 'hyundai']


test_file = "file:///home/jovyan/work/sample/lorem_ipsum.txt"
lines = sc.textFile(test_file)
words = lines.flatMap(lambda x: x.split())
# word_count = words.countByValue()
# print(word_count)
# for word, count in word_count.items():
#     print(f"{word}: {count}")
    
    
# # How about sort by key?
word_count = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
sorted_word_count = word_count.map(lambda x: (x[1], x[0])).sortByKey()
for count, word in sorted_word_count.collect():
    print(f"{count}: {word}")


map과 flatmap의 차이

  • flatmap은 직관적으로 하면 평평하게 만든다 → 하나의 리스트로 만든다.
  • map은 1개가 들어가면 1개가 나오고, flatmap은 1개가 들어가면 N개가 나온다. 
  • map은 함수가 리스트를 반환하다면 결과물은 중첩리스트로 나옴
  • flatmap은 함수가 리스트를 반환하다면 내용물을 모두 꺼내서 하나의 거대한 단일 리스트로 나옴

 

'IT 정리 > 아파치 스파크' 카테고리의 다른 글

SparkSQL 기초(1)  (0) 2026.05.03
DF과 SparkSQL 소개  (0) 2026.05.01
Spark 기초(1)  (0) 2026.05.01
Spark 환경 설정  (0) 2026.04.30
Apache Spark 설치(window), jupyter 접속  (0) 2026.04.30