IT 정리/아파치 스파크

SparkSQL 기초(1)

유정임 2026. 5. 3. 18:01

 

#lecture8

from pyspark.sql import (
    Row,
    SparkSession)
from pyspark.sql.functions import col, asc, desc

def parse_line(line: str):
    fields = line.split('|') # |
    return Row(
        name=str(fields[0]),
        country=str(fields[1]),
        email=str(fields[2]),
        compensation=int(fields[3]))


spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
lines = spark.sparkContext.textFile("file:///home/jovyan/work/sample/income.txt")
income_data = lines.map(parse_line)

# Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
# SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)[source]
schema_income = spark.createDataFrame(data=income_data).cache()

# Creates or replaces a local temporary view with this DataFrame.
schema_income.createOrReplaceTempView("income")

# returns the dataframe
medium_income_df = spark.sql(
    "SELECT * FROM income WHERE compensation >= 70000 AND compensation <= 100000")
# medium_income_df.show()

# for income_data in medium_income_df.collect():
#     # print(income_data)
#     print(income_data.name)

# # use function instead of sql function
# schema_income.groupBy("country").count().orderBy(col("count").desc()).show()

Q : 이전에 사용했던  sc = pyspark.SparkContext.getOrCreate() 와의 차이점은??

A : SparkSession은 기존의 모든 컨텍스트(SparkContext, SQLContext, HiveContext 등) 하나로 묶은 통합 진입점

SparkSession 객체 하나만 생성하면 내부적으로 SparkContext가 자동으로 생성되므로, 별도의 컨텍스트들을 관리할 필요가 없음.

 

  1. spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
    • SparkSession.builder : 객체를 생성하기 위한 빌더 객체를 호출
    • appName("SparkSQL") : 스파크 애플리케이션의 이름을 "SparkSQL"로 지정
      • 이름은 스파크 웹 UI나 클러스터 매니저에게 현재 실행 중인 작업을 모니터링할 때 식별자로 사용
  2. schema_income = spark.createDataFrame(data=income_data).cache()
    • createDataFrame : RDD를 스파크 DF로 변환(데이터의 구조를 부여)
      • RDD는 데이터를 안전하게 분산 처리하지만, 데이터 안에 어떤 칼럼이 있고 어떤 타입인지 스파크가 명확히 알지 못하는 비정형 데이터에 가까움, 이를 정형 데이터로 변환하기 위한 과정
    • cache : 모든 데이터를 메모리(RAM)에 올림
      • 데이터 사이즈가 작기 때문에 메모리에 올려서 속도를 빠르게 함 : 한 번 읽어서 정형화한 DF 데이터를 메모리에 보관해서 다시 사용할 때 원본 파일을 또 읽을 필요가 없음
      • 스파크의 Lazy Evaluation 특징 : 데이터를 변환하는 작업을 즉시 실행하지 않고, 실제 결과가 필요할 때 과정 실행
  3. schema_income.createOrReplaceTempView("income")
    • createOrReplaceTempView : DF를 "income"이라는 이름의 임시 뷰로 생성하거나 교체

 

#lecture9_csv 파일 읽어오기1

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg,
    col,
    round as rnd
)

spark = SparkSession.builder.appName("sql_import_csv").getOrCreate()
csv_file_path = "file:///home/jovyan/work/sample/age.csv"

# header option: either csv has header or not(default: header = false)
# inferSchema: either all columns are str or not
data = spark.read.option("header", "true")\
            .option("inferSchema", "true")\
            .csv(csv_file_path)
# data = spark.read.option("header", "true")\
#             .csv(csv_file_path)

# # show schema
# data.printSchema()

# # show column name with data
# data.select("name", "age").show()

# # filter the data for age of 20 above
# data.filter(data.age > 20).show()

# # group by age and aggregates for count
# data.groupBy("age").count().show()

# # custom arithmetic
# data.select(data.name, data.age, data.age - 10).show()

# # column alias
# data.select(data.name, col("age").alias("age1")).show()

# # average
# data.select(data.name, data.age, data.country)\
#         .groupBy("country")\
#         .avg("age").show()

# # average & sort
# data.select(data.name, data.age, data.country)\
#         .groupBy("country")\
#         .avg("age").sort("avg(age)").show()

# # average & round
# data.select(data.name, data.age, data.country)\
#         .groupBy("country")\
#         .agg(rnd(avg("age"), 2).alias("avg_age")).show()
  1. data = spark.read.option("header", "true").option("inferSchema", "true") .csv(csv_file_path)
    • spark.read : 스파크세션에서 파일을 읽기 위한 DataFrameReader 객체 가져옴
    • .option("header", "true") : csv파일의 첫번째 행을 컬럼의 이름으로 사용할 것인지 여부 결정
    • .option("inferSchema", "true") : 각 컬럼에 들어있는 데이터 타입을 스파크가 자동으로 추론
      • false라면 모든 컬럼을 기본 타입인 문자열로 인식
  2. data.select("name", "age").show()
    • .select("name") : 컬럼이 name인 열만 추출
  3. data.select(data.name, data.age, data.age - 10).show()
    • 이름, 나이, 나이-10 이 추출
  4. data.select(data.name, col("age").alias("age1")).show()
    • 컬럼명을 age에서 age1으로 수정
# lecture10 _ 단어 갯수 세어보기

from pyspark.sql import (
    functions,
    Row,
    SparkSession
)

spark = SparkSession.builder.appName("df_wordcount").getOrCreate()

# # functions.explode(col)
# # Returns a new row for each element in the given array or map
# df = spark.createDataFrame([
#         Row(a=1,
#             intlist=[1,2,3],
#             mapfield={"a": "b"}
#            )])

# df.select(functions.explode(df.intlist).alias("anInt")).collect()
# # output: [Row(anInt=1), Row(anInt=2), Row(anInt=3)]


# # functions.split(str, pattern, limit=-1)
# # Splits str around matches of the given pattern.
# df = spark.createDataFrame([
#         Row(word="hello world and pyspark")])
# df.select(functions.split(df.word, ' ').alias("word")).collect()


csv_file_path = "file:///home/jovyan/work/sample/lorem_ipsum.txt"
df = spark.read.text(csv_file_path)

# # save as "value"
# df.show()

words = df.select(
    functions.explode(
        functions.split(df.value, ' ')).alias("word"))
# words.show()
word_counts = words.groupBy("word").count().orderBy(functions.col("count").desc())
# 
word_counts.show()
  1. df = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
    • Row() : 1개 행을 정의
      • a = 1 : a라는 이름의 컬럼값을 1이라고 생성
      • intlist=[1,2,3] : intlist라는 이름의 리스트 값을 [1,2,3]이라고 생성
      • mapfield={"a": "b"} : mapfield라는 이름의 딕셔너리 생성
  2. df.select(functions.explode(df.intlist).alias("anInt")).collect()
    • functions.explode(df.intlist) : 배열이나 map 형태의 컬럼을 입력받아, 그 안에 담긴 각 요소들을 개별적인 행으로 쪼개어줌
  3. df.select(functions.split(df.word, ' ').alias("word")).collect()
    • functions.split(df.word, ' ') : df의 word컬럼을 공백으로 나누고 해당 컬럼을 word라고 별칭
      • functions.split(df.word, ' ', 2) : 이 경우에는 배열의 크기를 최대 2개로 제한해서 "hello", "world and pyspark" 두개로 나눔. 기본값은 -1
  4. words = df.select(
        functions.explode(
            functions.split(df.value, ' ')).alias("word"))
    • df.value : 이전에 spark.read.text()에서 별도의 컬럼 이름이 없기 때문에, 자동으로 'value'라는 이름의 문자열 타입 컬럼을 생성해 모든 데이터를 집어넣음 = 파일 확장자가 .txt이든 .csv이든 spark.read.text()로 읽는 순간, df 내부에 이미 'value'라는 이름의 컬럼이 자동 생성
  5. word_counts = words.groupBy("word").count().orderBy(functions.col("count").desc())
    • .orderBy(functions.col("count").desc()) : 특정 컬럼을 기준으로 데이터를 정렬(sort와 동일)
      • functions.col() : 특정 컬럼을 객체 형태로 지정하고 제어
#lecture11 _csv 파일 읽어오기2

from pyspark.sql import (
    functions as f,
    Row,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_struct").getOrCreate()

# types.StructField(name, dataType, nullable=True, metadata=None)
table_schema = t.StructType([
    t.StructField("country", t.StringType(), True),
    t.StructField("temperature", t.FloatType(), True),
    t.StructField("observed_date", t.StringType(), True)])

csv_file_path = "file:///home/jovyan/work/sample/temp_with_date.csv"
df = spark.read.schema(table_schema).csv(csv_file_path)
# df.printSchema()

data = df.select("country", "temperature", "observed_date")

min_temperature = data.groupBy("country").min("temperature")

# min_temperature.show()

# # celsius to fahrenheit: (0°C × 9/5) + 32 
f_temperature = data.withColumn(
                    "temperature",
                    (f.col("temperature") * 9 / 5) + 32)\
                .select("country", "temperature")
f_temperature.show()

※ csv 파일을 읽어올 때 헤더가 없을때 하는 방법 

  1. table_schema = t.StructType([
        t.StructField("country", t.StringType(), True),
        t.StructField("temperature", t.FloatType(), True),
        t.StructField("observed_date", t.StringType(), True)])
    • t.StructType : 데이너프레임 전체의 스키마를 정의하는 컨테이너 역할
      • t : pyspark.sql,types를 축약한 표현
      • StructField(name, dataType, nullable = True, metadata = None)
        • name : 칼럼 이름
        • datatype : 데이터 타입
        • nullable = True : null 값 허용
        • metadata : 해당 컬럼에 추가적인 메타데이터를 저장할때 사용하며 생략 가능
  2. spark.read.schema(table_schema).csv(csv_file_path)
    • 이전에 헤더 있을 때 csv 읽어오는 법(spark.read.option("header", "true").csv(path))과 차이
      • read.schema는 개발자가 직접 스키마를 지정하여 읽기
      • read.option는 첫번째 줄을 컬럼의 헤더로 인식하여 읽기

'IT 정리 > 아파치 스파크' 카테고리의 다른 글

SparkSQL 기초(2)  (0) 2026.05.04
DF과 SparkSQL 소개  (0) 2026.05.01
Spark 기초(2)  (0) 2026.05.01
Spark 기초(1)  (0) 2026.05.01
Spark 환경 설정  (0) 2026.04.30