#lecture8
from pyspark.sql import (
Row,
SparkSession)
from pyspark.sql.functions import col, asc, desc
def parse_line(line: str):
fields = line.split('|') # |
return Row(
name=str(fields[0]),
country=str(fields[1]),
email=str(fields[2]),
compensation=int(fields[3]))
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
lines = spark.sparkContext.textFile("file:///home/jovyan/work/sample/income.txt")
income_data = lines.map(parse_line)
# Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
# SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)[source]
schema_income = spark.createDataFrame(data=income_data).cache()
# Creates or replaces a local temporary view with this DataFrame.
schema_income.createOrReplaceTempView("income")
# returns the dataframe
medium_income_df = spark.sql(
"SELECT * FROM income WHERE compensation >= 70000 AND compensation <= 100000")
# medium_income_df.show()
# for income_data in medium_income_df.collect():
# # print(income_data)
# print(income_data.name)
# # use function instead of sql function
# schema_income.groupBy("country").count().orderBy(col("count").desc()).show()
Q : 이전에 사용했던 sc = pyspark.SparkContext.getOrCreate() 와의 차이점은??
A : SparkSession은 기존의 모든 컨텍스트(SparkContext, SQLContext, HiveContext 등) 하나로 묶은 통합 진입점
SparkSession 객체 하나만 생성하면 내부적으로 SparkContext가 자동으로 생성되므로, 별도의 컨텍스트들을 관리할 필요가 없음.
- spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
- SparkSession.builder : 객체를 생성하기 위한 빌더 객체를 호출
- appName("SparkSQL") : 스파크 애플리케이션의 이름을 "SparkSQL"로 지정
- 이름은 스파크 웹 UI나 클러스터 매니저에게 현재 실행 중인 작업을 모니터링할 때 식별자로 사용
- schema_income = spark.createDataFrame(data=income_data).cache()
- createDataFrame : RDD를 스파크 DF로 변환(데이터의 구조를 부여)
- RDD는 데이터를 안전하게 분산 처리하지만, 데이터 안에 어떤 칼럼이 있고 어떤 타입인지 스파크가 명확히 알지 못하는 비정형 데이터에 가까움, 이를 정형 데이터로 변환하기 위한 과정
- cache : 모든 데이터를 메모리(RAM)에 올림
- 데이터 사이즈가 작기 때문에 메모리에 올려서 속도를 빠르게 함 : 한 번 읽어서 정형화한 DF 데이터를 메모리에 보관해서 다시 사용할 때 원본 파일을 또 읽을 필요가 없음
- 스파크의 Lazy Evaluation 특징 : 데이터를 변환하는 작업을 즉시 실행하지 않고, 실제 결과가 필요할 때 과정 실행
- createDataFrame : RDD를 스파크 DF로 변환(데이터의 구조를 부여)
- schema_income.createOrReplaceTempView("income")
- createOrReplaceTempView : DF를 "income"이라는 이름의 임시 뷰로 생성하거나 교체
#lecture9_csv 파일 읽어오기1
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
avg,
col,
round as rnd
)
spark = SparkSession.builder.appName("sql_import_csv").getOrCreate()
csv_file_path = "file:///home/jovyan/work/sample/age.csv"
# header option: either csv has header or not(default: header = false)
# inferSchema: either all columns are str or not
data = spark.read.option("header", "true")\
.option("inferSchema", "true")\
.csv(csv_file_path)
# data = spark.read.option("header", "true")\
# .csv(csv_file_path)
# # show schema
# data.printSchema()
# # show column name with data
# data.select("name", "age").show()
# # filter the data for age of 20 above
# data.filter(data.age > 20).show()
# # group by age and aggregates for count
# data.groupBy("age").count().show()
# # custom arithmetic
# data.select(data.name, data.age, data.age - 10).show()
# # column alias
# data.select(data.name, col("age").alias("age1")).show()
# # average
# data.select(data.name, data.age, data.country)\
# .groupBy("country")\
# .avg("age").show()
# # average & sort
# data.select(data.name, data.age, data.country)\
# .groupBy("country")\
# .avg("age").sort("avg(age)").show()
# # average & round
# data.select(data.name, data.age, data.country)\
# .groupBy("country")\
# .agg(rnd(avg("age"), 2).alias("avg_age")).show()
- data = spark.read.option("header", "true").option("inferSchema", "true") .csv(csv_file_path)
- spark.read : 스파크세션에서 파일을 읽기 위한 DataFrameReader 객체 가져옴
- .option("header", "true") : csv파일의 첫번째 행을 컬럼의 이름으로 사용할 것인지 여부 결정
- .option("inferSchema", "true") : 각 컬럼에 들어있는 데이터 타입을 스파크가 자동으로 추론
- false라면 모든 컬럼을 기본 타입인 문자열로 인식
- data.select("name", "age").show()
- .select("name") : 컬럼이 name인 열만 추출
- data.select(data.name, data.age, data.age - 10).show()
- 이름, 나이, 나이-10 이 추출
- data.select(data.name, col("age").alias("age1")).show()
- 컬럼명을 age에서 age1으로 수정
# lecture10 _ 단어 갯수 세어보기
from pyspark.sql import (
functions,
Row,
SparkSession
)
spark = SparkSession.builder.appName("df_wordcount").getOrCreate()
# # functions.explode(col)
# # Returns a new row for each element in the given array or map
# df = spark.createDataFrame([
# Row(a=1,
# intlist=[1,2,3],
# mapfield={"a": "b"}
# )])
# df.select(functions.explode(df.intlist).alias("anInt")).collect()
# # output: [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
# # functions.split(str, pattern, limit=-1)
# # Splits str around matches of the given pattern.
# df = spark.createDataFrame([
# Row(word="hello world and pyspark")])
# df.select(functions.split(df.word, ' ').alias("word")).collect()
csv_file_path = "file:///home/jovyan/work/sample/lorem_ipsum.txt"
df = spark.read.text(csv_file_path)
# # save as "value"
# df.show()
words = df.select(
functions.explode(
functions.split(df.value, ' ')).alias("word"))
# words.show()
word_counts = words.groupBy("word").count().orderBy(functions.col("count").desc())
#
word_counts.show()
- df = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
- Row() : 1개 행을 정의
- a = 1 : a라는 이름의 컬럼값을 1이라고 생성
- intlist=[1,2,3] : intlist라는 이름의 리스트 값을 [1,2,3]이라고 생성
- mapfield={"a": "b"} : mapfield라는 이름의 딕셔너리 생성
- Row() : 1개 행을 정의
- df.select(functions.explode(df.intlist).alias("anInt")).collect()
- functions.explode(df.intlist) : 배열이나 map 형태의 컬럼을 입력받아, 그 안에 담긴 각 요소들을 개별적인 행으로 쪼개어줌
- df.select(functions.split(df.word, ' ').alias("word")).collect()
- functions.split(df.word, ' ') : df의 word컬럼을 공백으로 나누고 해당 컬럼을 word라고 별칭
- functions.split(df.word, ' ', 2) : 이 경우에는 배열의 크기를 최대 2개로 제한해서 "hello", "world and pyspark" 두개로 나눔. 기본값은 -1
- functions.split(df.word, ' ') : df의 word컬럼을 공백으로 나누고 해당 컬럼을 word라고 별칭
- words = df.select(
functions.explode(
functions.split(df.value, ' ')).alias("word"))- df.value : 이전에 spark.read.text()에서 별도의 컬럼 이름이 없기 때문에, 자동으로 'value'라는 이름의 문자열 타입 컬럼을 생성해 모든 데이터를 집어넣음 = 파일 확장자가 .txt이든 .csv이든 spark.read.text()로 읽는 순간, df 내부에 이미 'value'라는 이름의 컬럼이 자동 생성
- word_counts = words.groupBy("word").count().orderBy(functions.col("count").desc())
- .orderBy(functions.col("count").desc()) : 특정 컬럼을 기준으로 데이터를 정렬(sort와 동일)
- functions.col() : 특정 컬럼을 객체 형태로 지정하고 제어
- .orderBy(functions.col("count").desc()) : 특정 컬럼을 기준으로 데이터를 정렬(sort와 동일)
#lecture11 _csv 파일 읽어오기2
from pyspark.sql import (
functions as f,
Row,
SparkSession,
types as t
)
spark = SparkSession.builder.appName("df_struct").getOrCreate()
# types.StructField(name, dataType, nullable=True, metadata=None)
table_schema = t.StructType([
t.StructField("country", t.StringType(), True),
t.StructField("temperature", t.FloatType(), True),
t.StructField("observed_date", t.StringType(), True)])
csv_file_path = "file:///home/jovyan/work/sample/temp_with_date.csv"
df = spark.read.schema(table_schema).csv(csv_file_path)
# df.printSchema()
data = df.select("country", "temperature", "observed_date")
min_temperature = data.groupBy("country").min("temperature")
# min_temperature.show()
# # celsius to fahrenheit: (0°C × 9/5) + 32
f_temperature = data.withColumn(
"temperature",
(f.col("temperature") * 9 / 5) + 32)\
.select("country", "temperature")
f_temperature.show()
※ csv 파일을 읽어올 때 헤더가 없을때 하는 방법
- table_schema = t.StructType([
t.StructField("country", t.StringType(), True),
t.StructField("temperature", t.FloatType(), True),
t.StructField("observed_date", t.StringType(), True)])- t.StructType : 데이너프레임 전체의 스키마를 정의하는 컨테이너 역할
- t : pyspark.sql,types를 축약한 표현
- StructField(name, dataType, nullable = True, metadata = None)
- name : 칼럼 이름
- datatype : 데이터 타입
- nullable = True : null 값 허용
- metadata : 해당 컬럼에 추가적인 메타데이터를 저장할때 사용하며 생략 가능
- t.StructType : 데이너프레임 전체의 스키마를 정의하는 컨테이너 역할
- spark.read.schema(table_schema).csv(csv_file_path)
- 이전에 헤더 있을 때 csv 읽어오는 법(spark.read.option("header", "true").csv(path))과 차이
- read.schema는 개발자가 직접 스키마를 지정하여 읽기
- read.option는 첫번째 줄을 컬럼의 헤더로 인식하여 읽기
- 이전에 헤더 있을 때 csv 읽어오는 법(spark.read.option("header", "true").csv(path))과 차이
'IT 정리 > 아파치 스파크' 카테고리의 다른 글
| SparkSQL 기초(2) (0) | 2026.05.04 |
|---|---|
| DF과 SparkSQL 소개 (0) | 2026.05.01 |
| Spark 기초(2) (0) | 2026.05.01 |
| Spark 기초(1) (0) | 2026.05.01 |
| Spark 환경 설정 (0) | 2026.04.30 |