본문 바로가기

도서/Spark 완벽 가이드

1-2장

반응형

아파치 스파크

  • 통합 컴퓨팅 엔진
  • 클러스터 환경에서 데이터를 병렬로 처리하는 라이브러리 집합
  • 병렬  처리 오픈소스 엔진
  • python, java, scala, R 지원

스파크 기능 구성 (스파크에서 제공하는 전체 컴포넌트와 라이브러리)

  • 구조적 스트리밍
  • 고급 분석
  • 라이브러리 및 에코시스템
  • 구조적 API
    • dataset
    • dataframe
    • SQL
  • 저수준 API
    • RDD
    • 분산형 변수

Unified

  • 데이터 읽기와 데이터 분석을 통합
  • 스파크에서는 SQL쿼리로 데이터를 읽고 ML 라이브러리로 머신러닝 모델 평가하는 두 단계를 병합하여 데이터를 한 번만 조회하면 되도록

컴퓨팅 엔진

  • 스파크 기능의 범위를 컴퓨팅 엔진으로 제한
  • 스파크는 저장소 시스템의 데이터를 연산하는 역할만 수행하고, 영구 저장소 역할을 수행하지 않음.
    • 데이터의 저장 위치에 상관 없이 처리에만 집중.
      (데이터 이동에 드는 비용이 크기 때문에 스파크 내부에 오랜 시간 저장하지 않음)
    • 대신, 클라우드 기반의 스토리지 지원 (hadoop, kafka, S3, cassandra, ...)
  •  스파크는 연산 기능에 초점

라이브러리

  • 스파크 컴포넌트: 데이터 분석 작업에 필요한 통합 API 제공하는 통합 엔진 기반의 자체 라이브러리
  • 스파크 표준 라이브러리는 여러 오픈소스 프로젝트의 집합체

스파크 기본 아키텍처

  • 컴퓨터 클러스터 (여러 자원을 모아 하나의 컴퓨터처럼 사용) 에서의 작업을 조율할 수 있는 프레임 워크(: 스파크)
    클러스터의 데이터 처리 작업을 관리하고 조율
  • 스파크가 연산에 사용하는 클러스터
    • 스탠드얼론 클러스터 매니저
    • 하둡 YARN
    • 메소스
    • ..

스파크 애플리케이션

  • 구성
    • 드라이버 프로세스
      • 클러스터 노드 중 하나에서 실행
      • main() 함수를 실행
      • 스파크 애플리케이션 정보의 유지 관리
      • 사용자 입력에 대한 응답
      • 전반적인 익스큐터 프로세스의 작업과 관련된 분석
      • 배포, 스케줄링 역활 수행
      • 드라이버 프로세스는 스파크 애플리케이션의 수명 주기 동안 관련 정보를 모두 유지
    • (다수의) 익스큐터 프로세스
      • 드라이버 프로세스가 할당한 작업 수행
        • 코드 실행
        • 진행 상황 드라이버 노드에 전달

ㄴ 클러스터 매니저가 물리적 머신을 관리하고,  스파크 애플리케이션에 자원을 할당하는 방법

  • 클러스터 메니저는 스파크 스탠드얼론 클러스터 매니저, 하둡 YARN, 메소스 중 하나를 선택할 수 있으며 하나의 클러스터에서 여러 개의 스파크 애플리케이션을 실행할 수 있음
스파크는 클러스터 모드/로컬 모드 지원
드라이버와 익스큐터는 같은 머신이나 다른 머신에서 실행 가능
로컬 모드에서는 드라이버와 익스큐터를 단일 머신에서 스레드 형태로 실행
  •  스파크 애플리케이션 핵심
    • 스파크는 사용 가능한 지원을 파악하기 위해 클러스터 매니저를 사용
    • 드라이버 프로세스는 주어진 작업을 완료하기 위해 드라이버 프로그램의 명령을 익스큐터에서 실행
  •  익스큐터는 대부분 스파크 코드를 실행하는 역할을 하지만, 드라이버는 스파크의 언어 API 를 통해 다양한 언어로 실행 가능

스파크 언어 APIs

  • scala
  • java
  • python
  • SQL
  • R

스파크 APIs

  • the low-level “unstructured” APIs
  • higher-level structured APIs

스파크 시작하기~~

  • SparkSession
    • 스파크 애플리케이션에 사용자 명령과 데이터 전송하기 위해 SparkSession 생성
    • 대화형 모드로 스파크를 시작하면 스파크 애플리케이션을 관리하는 SparkSession이 자동 생성됨
      (스탠드얼론 애플리케이션으로 스파크를 시작하면 사용자 애플리케이션 코드에서 SparkSession 직접 생성해줘야 함)

SparkSession

  • 스파크 애플리케이션은 SparkSession이라 불리는 드라이버 프로세스로 제어
  • SparkSession 인스턴스는 사용자가 정의한 처리 명령을 클러스터에서 실행
  • 하나의 SparkSession은 하나의 스파크 애플리케이션애 대응

SparkSession 확인하기
0-999까지의 값이 할당되어 있는 1000개의 로우로 구성되어있는 DataFrame 생성하기

(ㄴ 이 값들은 분상 컬렉션을 나타내며, 코드 실행 시 숫자 범위의 각 부분이 서로 다른 익스큐터에 할당됨)

DataFrame

  • 대표적인 구조적 API
  • 테이블의 데이터를 로우와 컬럼으로 단순하게 표현
    • 스키마: 컬럼과 컬럼 타입을 정의한 목록
  • 스프레드시트와 비슷하지만, DataFrame은 수천 대의 컴퓨터에 분산되어 있다는 점에서 근본적으로 다름

파티션

  • 스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션(청크단위 데이터)으로 분할
  • 파티션은 클러스터의 물리적 머신에 존재하는 로우의 집합을 의미
  • DataFrame의 파티션은 실행 중에 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식을 나타낸다.
  • 파티션 수와 익스큐터의 수에 따라 병령성이 결정됨
    • 파티션이 하나라면 스파크에 수천 개의  익스큐터가 있더라도 병령성은 1
    • 수백 개의 파티션이 있더라도 익스큐터가 하나라면 병령성은 1

트랜스포메이션

  • 스파크 핵심 데이터 구조는 immutable하며, 데이터 구조를 변경하려면 스파크에게 변경 방법을 알려줘야하며, 이 명령을 프랜스포메이션이라 함

(0-999까지의 수를 가지고 있는) myRange 에서 짝수 찾기

  • 실행 결과는  출력되지 않음
    추상적인 트랜스포메이션만 지정한 상태이기 때문에 액션을 호출하지 않으면 실제 트랜스포메이션을 수행하지 않는다.
  • 트랜스포메이션 두 가지 유형
    • narrow dependency
      • 좁은 트랜스포메이션은 각 입력 파티션이 하나의 출력 파티션에만 영향을 미치는 경우'
      • 스파크에서 파이프라이닝을 자동으로 수행
        • 즉, DataFrame에 여러 필터를 지정하는 경우 모든 작업이 메모리에서 일어남

    • wide dependency
      • 하나의 입력 파티션이 여러 출력 파티션에 영향을 미치는 경우
      • 스파크가 클러스터에서 파티션을 교환하는 셔플이 일어남
        • 스파크는 셔플의 결과를 메모리가 아닌, 디스트에 저장

지연연산

  • 스파크는 연산 그래프를 처리하기 직전까지 동작을 기다린다 (Lazy Evaluation)
    • 스파크는 특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고, 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성하고
    • 코드를 실행하는 마지막 순간까지 대기하다가 원형 DataFrame 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일한다
  • 이 과정을 거치며 전체 데이터 흐름을 최적화...
    예> DataFrame의 predicate pushdown
    아주 복잡한 스파크 잡이 원시 데이터에서 하나의 로우만 가져오는 필터를 가지고 있다면 필요한  레코드 하나만 읽는 것이 가장 효율적이다. 스파크는 이 필터를 데이터소스로 위임하는 최적화 작업을 자동으로 수행

액션

  • 사용자가 트랜스포메이션을 사용해 세운 논리적 실행 계획을 실제 연산으로 수행하기 위한 명령
  • 일련의 트랜스포메이션으로 부터 결과를 계산하도록 지시하는 명령 

가장 단순한 액션 명령인 count 메서드

  • 액션 유형
    • 콘솔에서 데이터를 보는 액션
    • 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
    • 출력 데이터 소스에 저장하는 액션
  • 액션을 지정하면 스파크 잡이 시작됨
  • 스파크 잡은 필터(narrow 트랜스포메이션)를 수행한 후 파티션 별로 레코드 수를 카운트(wide 트랜스포메이션)하고, 각 언어에 적합한 네이티브 객체에 결과를 모은다.
    • 이때 스파크가 제공하는 스파크 UI로 클러스터 실행 중인 스파크 잡을 모니터링할 수 있다

스파크 UI

  • 스파트 잡은 개별 액션에 의해 트리거되는 다수의 트랜스포메이션으로 이루어져 있으며 스파트 UI로 잡을 모니터링할 수 있다
  • 로컬 모드
    • http://localhost:4040
  • 스파크 UI에서 확인 가능한 정보
    • 스파크 잡 상태
    • 환경 설정
    • 클러스터 상태
    • 등.. 

예제

  • 예제 데이터
  • 각 파일은 여러 로우를 가지고 이 파일들은 CSV파일이며 반정형 데이터 포맷이다. 이 파일의 각 로우는 DataFrame의 로우가 된다. 
  • 스파크는 다양한 데이터 소스를 지원
    • 데이터는 Sparksession 의 DataFrameReader 클래스를 사용해서 얻는다
      • 이때 특정 파일 포맷과 몇가지 옵션을 함께 설정
        (예제에서는 스파크 DataFrame의 스키마 정보를 알아내는 스키마 추론 기능을 사용, 파일의 첫 로우를 헤더로 지정하는 옵션도 함께 설정)
  • 스파크는 스키마 정보를 얻기 위해 데이터를 조금 읽어드리고 해당 로우의 데이터 타입을 스파크 데이터 타입에 맞게 분석
    • 하지만 운영 환경에서는 데이터를 읽는 시점에 스키마를 엄격하게 지정하는 옵션을 사용해야 한다.
  • 스칼라와 파이썬에서 사용하는 DataFrame은 불특정 다수의 로우와 컬럼을 자긴다.
    • 로우의 수를 알 수 없는 이유는 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메이션이기 때문.
    • 스파크는 각 컬럼의 데이터 타입을 추론하기 위해 적은 양의 데이터를 읽음

DataFrame에서 CSV 파일을 읽어 로컬 배열이나 리스트 형태로 변황하는 과정

  • DataFrame의 take() 액션을 호출하면 이전의 head 명령과 같은 결과를 얻을 수 있다.
  • 트랜스포메이션 추가 지정
    • 정수 데이터 타입인 count 컬럼을 기준으로 데이터를 정렬
      • sort() 메서드는 DataFrame을 변경하지 않음
        • 트랜스포메이션으로 sort메서드를 사용하면 이전의 Dataframe을 변환
        • sort 메서드는 단지 트랜스포메이션이기 때문에 호출 시 데이터에 아무런 변화도 일어나지 않지만, 스파크는 실행 계획을 만들고 검토하여 클러스터에서 처리할 방법을 모색한다.
          • 특정 DataFrame 객체에 explain 메서드를 호출하면 DataFrame의 게보나 스파크의 쿼리 실행 계획을 확인할 수 있다.

결과 DataFrame에 take 메서드를 호출하면

 

반응형