가을기 Workspace

Apache Spark 소개 및 기본 설명 본문

개발/Data Engineering

Apache Spark 소개 및 기본 설명

가을기_ 2021. 7. 17. 13:31

스파크는

  • 대용량 데이터 분산 처리 소프트웨어
  • 별도의 app 없이 대용량 쿼리 수행
  • 애플리케이션 구현을 위한 API 제공
  • 인기있는 빅데이터 솔루션, 가장 활동적인 아파치 프로젝트

 

공식 정의: Apache Spark is a unified analytics engine for large-scale data processing.

For large-scale: 데이터 크기와 상관 없이

Unified Analytics engine: 어떤 요구 사항도 처리할 수 있는 데이터 분석 엔진.

 

데이터를 처리할 일이 있을때 사용하면 빠르고, 안정적이고, 편리하다.

 

원본 데이터, 데이터 수집 → 데이터 처리 → 분석 결과 → 응용 프로그램

위의 과정에서 데이터 처리를 담당.

 

그렇다고 빅데이터라 해서 크게 다를 것은 없다.

데이터를 읽어와서 원하는 작업을 수행하고, 그 결과를 저장하거나 외부 프로세스에 전달.

// Create Database connection
conn = ..

// Execute Query
rs = sqlRunner.excute(“SELECT * FROM …”)

// Extract data from result set
process(..)

 

하지만 Spark를 굳이 사용해야하는 이유

  • 특정 광고가 하루에 몇 번 노출됐는지 알아보기
  • 특정 광고를 본 사용자가 몇명인지 알아보기
  • 최근 한달간 당일 첫 방문 시 A 업종의 광고에 노출된 횟수가 5회 이상인 사용자 중에 최근 1주일 사이 B 업종 광고에 3회 이상 연속 노출 되면서 최초 클릭을 수행한 사용자 수
    • 사용자 * 시간(ms) * 광고

스파크는 조금 더 특화된 기능들을 제공한다.

다양한 라이브러리 지원 및 통합 런타임 제공

비즈니스 로직 구현에 집중할 수 있는 API: RDD, DataFrame

빅데이터 처리 App 을 위한 프레임워크

 

간단한 예제

 

Word count

  • 파일의 경로를 입력으로 전달받는다 (path = "/mydir/20191225/path1")
  • 결과를 저장할 컬렉션 준비 (key: word, value: count)
  • 파일을 한 줄 씩 읽으면서 (Spark is a fast and general cluster... Spark is built using [Apache Maven ...)
  • 각 줄을 공백 문자를 기준으로 단어로 분리한다. ([Spark, is, a, fast, and, general, cluster ...], [Spark, is, built, using, [Apache, Maven, ...])
  • 분리된 모든 단어에 대해, 아직 등록되지 않은 단어면 count 1로 등록하고 이미 등록된 단어면 1을 증가시킨다.

 

데이터 처리계의 Hello World.

  1. 가장 간단하고 기본적이면서도 자주 활용되는 문법을 사용.
  1. 프로그램 및 관련 개발, 런타임 환경의 정상 여부를 알 수 있다.

 

단어수 세기: 읽고, 처리하고 (파싱 → 단어 수 세기), 결과를 전달 (출력)

한 서버에서 처리한다고 가정하고 로직을 짜보면 아래와 같다.

  1. 파일 경로 전달
  1. 결과 저장 컬렉션 준비
  1. 파일을 한 줄 씩 읽으면서
    1. 각 줄을 공백 문자를 기준으로 여러 개 단어로 분리
    1. 분리된 각 단어 순회
      1. 이미 등록: count 증가
      1. Or: count를 1로 등록
  1. 결과 출력

 

파일의 크기가 매우 커지면? 동작하지 않을 수 있다.

파일을 나눠 여러 서버에서 나눠서 처리해야 한다.

어떤 일들을 해야할까?

  • 서버 준비, 실행 환경 구성
  • 파일 분할 (전용 유틸리티 사용)
  • 분할한 파일 결과물을 여러 서버에 복사
  • 프로그램 빌드
  • 빌드된 프로그램 여러 서버에 복사
  • 여러 서버에서 프로그램 실행
  • 각 서버별로 실행 결과를 기록
  • 모두 더해서 최종 결과 계산

 

범용적으로 발전시킨다.

  • 변하지 않는 것: 재사용 가능한 공통 모듈
  • 변하는 것: 구현을 통해 확장 가능하도록 추상화,

 

파일 분할 문제

  • 파일 크기: 엄청 크면, 서버 한대에서 split이 안된다.
  • 파일 복사 시간: 긴 시간이 걸린다.
  • 서버 다운: 파일 복사했다 치고, 서버가 죽으면?

⇒ 분산 파일 시스템이 필요. 그런데 쉽게 구현하기는 어렵고, 스파크가 해야하는 영역인가?

 

그래서 스파크는 hdfs api를 사용한다.

  • 장점: 호환성, 안정성, 편의성
  • 단점: 하둡 라이브러리 의존성. (하둡을 띄워놔야하는건 아님, 그래서 로컬에서 실행 가능하다.)

 

프로그램 빌드 및 실행

Input ⇒ Output

map(프로그램), transform(수학 용어)

map(): 입력을 출력으로 변환하는 함수

map(파일) {
  return 단어 수 맵
}

map(이미지) {
   return JSON_OBJECT
}

 

병합

서버가 많아지면서, 나눠서 처리한 부분을 합쳐야한다. (단어수 세기의 경우, 단어수 결과의 합)

누가 하는가: "셔플"

// 각 서버에서 실행
map(input)

// 최종 결과 처리
map(input) 
//...?

둘 다 map이면 헷갈린다.

 

  • 함수 분리
    • map (String input)
    • reduce (List<String> input)
    • anotherFuncttion
  • 타입 분리 (인터페이스, 클래스)
    • class Mapper
    • class Reducer
    • class AwesomeCalculator
  • 둘 다 분리
    • class Mapper { Result map(String input) }
    • class Reducer { reduce(List<Result> input) }
    • class AwesomeCalculator { Result run(String input) }
  • 스파크의 경우: 둘 다. RDD: CheckPointRDD, ShuffleRDD, JdbcRDD ...

 

RDD ?

RDD: Resilient Distributed Dataset.

분산 데이터 모델.

Hadoop의 MapReduce API와 비슷한 열할.

Map/Reduce → How: Workflow.

Dataset → What: 이 데이터가 뭐냐.

 

val rdd1 = sc.textFile("myFile")
val rdd2 = rdd1.map()
val rdd3 = rdd2.reduceByKey()

// 추신. 위의 코드는 data chain을 만드는 일 
// 실제로 show(), write() 가 해야지만 Workflow가 정의된다. lazy 처리

 

요약하자면 데이터 처리 프로그램 작성을 위한 스파크의 프로그래밍 모델.

RDD 를 이용해서 프로그램을 작성한다.

 

 

클러스터 매니저 (CM)

분산 데이터 처리를 위한 인프라.

  1. 여러 서버들을 제어, 어떤 프로그램이 죽었을때 살려야한다.
  1. 프로그램 실행도 제어 해야한다.
  1. reduce 결과 데이터를 shuffle도 해야한다.

하둡 클러스터를 쓸 것인가? 데이터 분산을 위해 hdfs를 썼지만, spark가 클러스터 제어를 포기할수는 없다.

Hadoop, Kubernetes ...

 

  • 기존 클러스터 활용 → hadoop - yarn, kubernetes 등, 기존 프로젝트와 통합시킬 수 있게.
  • 자체 클러스터 제공 → Mesos
  • GCP, AWS 클라우드 환경 제공
  • 로컬 클러스터 매니저도 있다.

⇒ 클러스터 매니저가 제공하는 함수를 상속 및 사용 해야한다. classpath 지정 등.

⇒ 매니저마다 작성, 배포, 실행이 달라질 수 있다.

Yarn, Mesos, k8s 마다 자체 실행환경이 존재.

어떤식으로 구동해야하는지는 spark-submit

 

  • Executor

(Scheduler로 부터) Task를 할당받아 실행하는 역할 수행.

  • Driver

애플리케이션 실행에 필요한 다양한 백그라운드 프로세스들과 정보를 관리. (마스터 역할을 하는 수많은 백그라운드 프로세스들의 집합)

  • 실행
    • 클러스터별 익스큐터 실행코드
    • spark-submit
    • —master = yarn | mesos:// | k8s://
    • 실행 스크립트 생성,호출, 의존성처리
    • 단일 프로그래밍 모델
    • CM 에 자유롭다. → Unit Test, IDE Debugging

 

  • 특정 노드에는 Driver 백그라운드 프로세스, Executor 백그라운드 프로세스가 통신하면서 실행된다.
  • Cluster Manager의 동작을 spark 의 런타임 모델이 덮어씌웠다. → yarn의 application master를 고민하지 않아도 된다.
    • SparkContext생성 == Initialize Env.
    // 스파크 컨텍스트 생성
    val sc = new SparkContext(conf)
    
    // RDD 생성
    val rdd1 = sc.textFile("myfile")
    val rdd2 = rdd1.map((_, 1L))

 

 

sc.textFile("path")
	.flatMap(_.split(" "))
	.map((_, 1L))
	.reduceBykey(_ + _)
	.saveAsTextFile(<path>)

 

 

 

  1. rdd.show() 호출
  1. DAG Scheduler - workflow로 변환을 시켜주는 모듈, 각각의 작업을 task로 나눠준다.
  1. POOL → 프로그램이 여러개 동시에 자원을 점유해야한다. FIFO 스케쥴러, Fair 스케쥴러. Task 우선순위에 따라 정렬을 해주고 Scheduler에 전달하면,
  1. Scheduler가 Executer에 전달한다.
  1. rdd.show() 부터 Scheduler까지가 Driver다.

 

 

 

스파크 특징

  • Scala로 작성
  • Scala, Java, Python, R 지원.
  • Lazy Optimization
  • Memory. 중간 데이터 관리할때, 파일에 쓰기도 한다.
  • Shell REPL 제공
  • Unit Test, Source 코드 관리

 

 

Spark-Shell (REPL)

scala 스크립트 실행 가능

 

 

Spark Submit

$ spark-submit /apps/spark/bin/spark-submit \
--class a.b.c.MyClass \
--name example \
--master yarn \
--deploy-mode client \
--conf spark.driver.memoryOverhead=8g ¥
/mydeploy/my-program.jar

yarn 용 driver , executor 실행

 

Spark UI

실행하고 localhost:4040으로 접속 가능.

현재 클러스터에서 돌아가고있는 Job, Task를 확인할 수 있다.

 

 

Programming API

  • Dataset: Generic, ex) Dataset[String], Dataset[Int]
  • DataFrame
// Read
val df1 = spark.read.text("myfile")

// Process
val df2 = df1.map((e) => ...)
val df3 = df2.selectExpr("concat(... ")

// Write
df3.write.save("output.txt")
  • RDD
// Read
val rdd1 = sparkContext.textFile("myfile")

// Process
val rdd2 = rdd1.map(v => v + ",")

// Write
rdd2.saveAsTextFile("output.txt")

 

 

RDD

.map()은 transform, .show()는 action

 

absract class RD {
  final def iterator(...

  def compute(...
  ...
}

RDD가 안에 실제로 들어있는 것이 아닌, 데이터에 접근하는 함수가 존재.

 

 

 

DataFarme

  • rdd의 한계.
    • ex) rdd.map(_ + "a"), 10억row면 10억번의 연산, GC같은 문제가 발생
    • BlackBox 라는 것.
  • 최적화를 하자.
    • BlackBox를 제거
    • 제한된 API
    • 선언적 프로그래밍 : SQL, Built-in functions org.apache.spark.sql.functions
    • ex) rdd.map(v ⇒ v + "a") → df.selectExpr("concat(COL1, 'a')"), df.select(concat(col"a"), col("b")), df.createOrReplaceTempView("myTable")
    • df에서 map이 불가능하진 않다.
    df.map {
        case Row(a:String, b:String) => Row.fromSeq(...
    }

 

Dataset

  • DataFrame === Dataset[Row]
  • 컴파일 타임에서 타입 안정성을 위해. ⇒ 테스트 등지에서. 데이터를 꺼내보기 위함
  • User Defiend Function 성능에서 serialization 시 spark encoder를 쓰므로 RDD 보다 조금이러도 더 최적화된다.
  • 성능: RDD < Dataset < DataFrame

 

보통은 DataFrame에서 해결이 되는데, Internal을 보면서 Customize를 하기 위해선 RDD를 알아야한다.

 

 

Streaming Processing

  • Spark Streaming
    • RDD 기반
    • 제한적
    • Microbatch
  • Structured Streaming
    • DataFrame 기반
    • Event Time, Watermark
    • Microbatch / Continuous

 

Spark Streaming

Spark Streaming에서 보는 Streaming은

data를 조그맣게 잘라서 1개씩 RDD로 만들고 처리

 

문제점: 실제 발생시간보다 처리하는 시점이 늦어졌을때 Event Time으로 처리해야하는데 Spark Streaming은 모두 Processing Time 기준이다.

 

 

Structured Streaming

영원히 이어지는 데이터로 간주한다.

실제로는 얘도 batch를 돈다. 메모리에 담았다가 hdfs에 체크아웃,

한번 처리할때마다 .delta로 저장, 일정 시간이 지나면 .snapshot으로 모은다.

 

참고: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

 

MLLib

지원하는 알고리즘은 RDD쪽에 더 많다.

  • spark.mllib
    • RDD
  • spark.ml
    • DataFrame
    • Pipeline
    • Feature Engineering
    • ML Algorithms

 

Comments