RDD와 DF 차이점 짚고 넘어가기
1. RDD
- 탄력적 분산 데이터 세트로 스파크의 가장 기본적인 추상화 개념
- 탄력적 : 데이터 처리 중 노드에 장애가 발생해도 리니지를 통해 데이터를 복구
- 분산된 : 클러스터의 여러 노드에 데이터가 나누어져 저장
- 데이터 세트 : 텍스트 파일, JSON, 객체 리스트 등의 다양한 형태의 데이터를 담을 수 있음
- 특징
- 데이터가 어떻게 처리되어야 하는지 개발자가 직접 제어
- 데이터 내부에 구조에 대한 정보가 없음 → 스파크 입장에서는 그저 객체의 묶음
- 컴파일 타임에 타입 체크가 가능하여 런타임 오류를 줄일 수 있음
2. DF
- RDD 위에 구축된 고수준의 추상화 모델로, 테이블과 유사
- 스키마 기반 : 데이터가 어떤 컬럼으로 구성되어 있고, 각 타입이 무엇인지 정의
- Named Columns : 각 열에 이름이 붙어 있어 SQL 쿼리처럼 데이터를 다룰 수 있음
- 특징
- select, filter, groupBy 같은 직관적인 함수를 사용
- 카탈리스트 옵티마이저가 실행 계획을 자동으로 최적화해 RDD보다 훨씬 빠른 성능을 냄
Spark 간단히 짚고 넘어가기
- 이전에 배운 도커가 애플리케이션을 담는 표준화된 컨테이너라면
- 스파크는 그 컨테이너들이 여러 대의 컴퓨터에서 동시에 빅데이터를 처리할 수 있게 해주는 거대한 계산기
#lecture2
import pyspark
test_file = "file:///home/jovyan/work/sample/word.txt"
# sc = pyspark.SparkContext('local[*]')
sc = pyspark.SparkContext.getOrCreate()
text_file = sc.textFile(test_file)
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
# ln1: hello,world
# (hello, 1), (world, 1), (hello, 1) => [(hello, 1), (hello, 1)], [(world, 1)]
print(counts.collect())
- sc = pyspark.SparkContext('local[*]') ]
- 스파크의 환경 설정을 직접 정의하며 컨텍스트를 생성하는 명령
- local : 내 컴퓨터 - 보통 스파크는 여러 대의 컴퓨터(클러스터)를 묶어서 사용. 지금은 내 컴퓨터만 사용
- [*] : 모든 코어 사용 - CPU 안의 코어 갯수를 이용해 작업 수행
- sc = pyspark.SparkContext.getOrCreate()
- SparkContext(SC) : 파이썬 코드와 스파크 엔진(클러스터)사이의 다리 역할
- 자원관리 : 클러스터 매니저와 통신해 작업을 수행할 일꾼을 할당
- 작업감독 : 사용자가 작성한 코드를 보고 어떻게 나누어 계산할지 계획을 세우고 지시
- RDD 생성 : 텍스트 파일이나 리스트 데이터를 스파크가 이해할 수 있는 분산 데이터로 변환
- getOrCreate() : 가져오거나 없으면 만들어라
- 중복 생성 방지: 스파크 내에서 하나의 SparkContext만 가질 수 있음
- 안전 장치
- sc : pyspark.context.SparkContext 클래스의 객체
- 마스터 노드 주소, 애플리케이션 이름, 할당된 메모리 크기 등 현재 실행 중인 스파크의 모든 환경 정보가 담김
- SparkContext(SC) : 파이썬 코드와 스파크 엔진(클러스터)사이의 다리 역할
- text_file=sc.textFile(test_file)
- textFile : pyspark.context.SparkContext 클래스에 정의된 메서드
- 이전에 만든 sc를 통해 호출
- 외부 저장소(로컬 파일, HDFS, S3)에 있는 텍스트 데이터를 읽어와서 스파크의 기본 데이터 단위인 RDD로 변환
- 데이터 단위 : 텍스트 파일의 한줄을 하나의요소로 취급
- 분산 처리 : 파일 읽을 때 설정된 파티션 수에 따라 데이터를 쪼개서 클러스터의 여러 노드에 나누어 로드
- 결과물 타입 : pyspark.rdd.RDD 타입의 객체 반환
- textFile : pyspark.context.SparkContext 클래스에 정의된 메서드
- counts = text_file.flatMap(...)
- flatMap : [1:N 변환] 으로 각 줄을 공백으로 나누어 리스트를 만든 후, 리스트들을 모두 풀어헤쳐서 하나의 거대한 단어 뭉치(RDD)로 만듦
- 1개의 입력을 받아 0개 이상의 출력으로 확장
- flatMap(lambda line: line.split(" ")) : textFile로 읽어온 RDD의 한 요소, 즉 텍스트 파일의 '문장 한 줄'
- 일반 map이면 [["hello", "world"]]처럼 리스트를 포함한 리스트가 되지만, "hello"와 "world"자체가 개별 요소로 RDD로 재구성
- map : [1:1 변환]으로 단어 하나를 받아서 (단어,1)이라는 튜플로 변환
- reduceByKey 연산을 쓰기 위해서 반드시 (키-값) 구조가 필요하기 때문에 단어가 '키', 1은 '값'
- 단어의 개수만큼 키-값 쌍이 생성
- reduceByKey : 같은 키(단어)를 가진 값들을 하나로 합침
- 먼저 각 파티션 내에서 같은 단어끼리 합침 → 그 다음 네트워크를 통해 다른 노드에 있는 같은 단어들을 한 곳으로 모음 → 최종적으로 모든 숫자를 더해 단어별 총합 구함
- (a, b)에서 a와 b는 단어가 아니라 같은 키를 가진 요소들의 값
- a : 지금까지 누적된 합계
- b : 새로 입력된 값
- flatMap : [1:N 변환] 으로 각 줄을 공백으로 나누어 리스트를 만든 후, 리스트들을 모두 풀어헤쳐서 하나의 거대한 단어 뭉치(RDD)로 만듦
#lecture3
import collections
import pyspark
test_file = "file:///home/jovyan/work/sample/grade.txt"
# sc = pyspark.SparkContext('local[*]')
sc = pyspark.SparkContext.getOrCreate()
text_file = sc.textFile(test_file)
grade = text_file.map(lambda line: line.split(" ")[1])
# Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.
grade_count = grade.countByValue()
for grade, count in sorted(grade_count.items(), key=lambda item: item[1], reverse=True):
print(f"{grade}: {count}")
- grade = text_file.map(lambda line: line.split(" ")[1]
- 1:1 변환으로 text_file의 문장 한줄을 입력 받아 공백 기준으로 자름 → 결과는 리스트
- 리스트의 두번째 요소(인덱스 1)만 선택
- 원본 문장 RDD가 성적들로만 구성된 RDD로 변환되어 grade 변수에 담김
- grade_count = grade.countByValue()
- countByValue() : RDD내의 각 고유한 값들이 몇 번 등장하는지를 계산
- 앞서 있었던 map+reduceByKey 조합한 거라고 생각하면 됨
- 파이썬의 딕셔너리 형태로 저장
- for grade, count in sorted(...)
- grade_count.items() : 딕셔너리에 담긴 성과 개수를 (키, 값) 쌍의 리스트 형태로 꺼냄
- sorted(..., key=lambda item: item[1], reverse=True)
- 정렬 기준 : 튜플의 두번째 요소인 빈도수를 기준으로 정렬
- reverse=True : 내림차순
#lecture4_exp
import pyspark
sc = pyspark.SparkContext.getOrCreate()
# Key / Value RDD
# creating Key / Value RDD
total_by_brand = rdd.map(lambda brand: (brand, 1))
# # reduceByKey(): Merge the values for each key using an associative and commutative reduce function.
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
# groupByKey(): Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
# sortByKey(): Sorts this RDD, which is assumed to consist of (key, value) pairs.
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortByKey().first()
('1', 3)
# keys(), values(): Create a RDD of keys or just values
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.keys()
['a', 'b', 'a']
# join, rightOuterJoin, leftOuterJoin, cogroup, subtractByKey
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())
[('a', (1, 2)), ('a', (1, 3))]
# Efficiency is the key for performance!!!
# if you only need values, use mapValues() or flatMapValues()
- map : brand를 받았을 때 brand+value를 1이라고 놓고 tuple 반환
- 결과 : (brand1,1), (brand2,1), (brand1,1) ...
- reduceByKey
- sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) : 파이썬의 일반적인 리스트가 RDD로 변환
- 리스트 안의 튜플이 들어있는 형식으로 각 튜플은 두개의 요소를 가짐 →
스파크는 자동으로 각각의 값을 key-value로 인식
- 리스트 안의 튜플이 들어있는 형식으로 각 튜플은 두개의 요소를 가짐 →
- sorted(rdd.reduceByKey(add).collect())
- reduceByKey로 인해 같은 키값을 가진 것들을 묶는다 어떻게? → add
- sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) : 파이썬의 일반적인 리스트가 RDD로 변환
- groupByKey() : 키값을 기준으로 그룹화
- mapValues(len) : 키는 그대로 두고, 값에만 특정 함수를 적용해 변환
- groupByKey() 직후에는 ("a", [1,1]), ("b", [1]) 이렇게 묶임 이걸 len을 적용하면 각 2,1이 나옴
- sortByKey() : 키 중심으로 정렬
- join
#lecture4_avg
import pyspark
sc = pyspark.SparkContext.getOrCreate()
test_file = "file:///home/jovyan/work/sample/house_price.csv"
def parse_line(line: str):
city, price, count = line.split(',')
return (int(price), int(count))
lines = sc.textFile(test_file)
price_count = lines.map(parse_line)
# [(10000, 3), (10000, 5), (40000, 7), (5000, 7), (4000, 2), (9000, 4), (5000, 7), (4000, 2), (8000, 9)]
sum_of_count = price_count.mapValues(lambda count: (count, 1))\
.reduceByKey(lambda a, b: (int(a[0]) + int(b[0]), int(a[1]) + int(b[1])))
# ('10000', (3, 1)), ('10000', (5, 1)) ...
# [('10000', (8, 2)), ('4000', (4, 2)), ('9000', ('4', 1)), ('8000', ('9', 1)), ('40000', ('7', 1)), ('5000', (14, 2))]
avg_by_count = sum_of_count.mapValues(lambda total_count: int(total_count[0]) / total_count[1])
results = avg_by_count.collect()
print(results)
- mapValues() : Key는 건드리지 않은채 Value만 작업 수행
- (10000,3)을 mapValues(lambda count: (count, 1))에 입력시
- 키인 10000은 따로 보관
- 값인 3만 lambda count : (count,1)에 집어넣음
- 값은 (3,1)인 튜플로 만들어짐
- 따로 보관했던 키와 함쳐서 ('10000',(3,1)) 반환
- (10000,3)을 mapValues(lambda count: (count, 1))에 입력시
'IT 정리 > 아파치 스파크' 카테고리의 다른 글
| DF과 SparkSQL 소개 (0) | 2026.05.01 |
|---|---|
| Spark 기초(2) (0) | 2026.05.01 |
| Spark 환경 설정 (0) | 2026.04.30 |
| Apache Spark 설치(window), jupyter 접속 (0) | 2026.04.30 |
| RDD, Docker 소개 (0) | 2026.04.30 |