반응형
아파치 스파크
- 통합 컴퓨팅 엔진
- 클러스터 환경에서 데이터를 병렬로 처리하는 라이브러리 집합
- 병렬 처리 오픈소스 엔진
- python, java, scala, R 지원
스파크 기능 구성 (스파크에서 제공하는 전체 컴포넌트와 라이브러리)
- 구조적 스트리밍
- 고급 분석
- 라이브러리 및 에코시스템
- 구조적 API
- dataset
- dataframe
- SQL
- 저수준 API
- RDD
- 분산형 변수
Unified
- 데이터 읽기와 데이터 분석을 통합
- 스파크에서는 SQL쿼리로 데이터를 읽고 ML 라이브러리로 머신러닝 모델 평가하는 두 단계를 병합하여 데이터를 한 번만 조회하면 되도록
컴퓨팅 엔진
- 스파크 기능의 범위를 컴퓨팅 엔진으로 제한
- 스파크는 저장소 시스템의 데이터를 연산하는 역할만 수행하고, 영구 저장소 역할을 수행하지 않음.
- 데이터의 저장 위치에 상관 없이 처리에만 집중.
(데이터 이동에 드는 비용이 크기 때문에 스파크 내부에 오랜 시간 저장하지 않음) - 대신, 클라우드 기반의 스토리지 지원 (hadoop, kafka, S3, cassandra, ...)
- 데이터의 저장 위치에 상관 없이 처리에만 집중.
- 스파크는 연산 기능에 초점
라이브러리
- 스파크 컴포넌트: 데이터 분석 작업에 필요한 통합 API 제공하는 통합 엔진 기반의 자체 라이브러리
- 스파크 표준 라이브러리는 여러 오픈소스 프로젝트의 집합체
스파크 기본 아키텍처
- 컴퓨터 클러스터 (여러 자원을 모아 하나의 컴퓨터처럼 사용) 에서의 작업을 조율할 수 있는 프레임 워크(: 스파크)
클러스터의 데이터 처리 작업을 관리하고 조율 - 스파크가 연산에 사용하는 클러스터
- 스탠드얼론 클러스터 매니저
- 하둡 YARN
- 메소스
- ..
스파크 애플리케이션
- 구성
- 드라이버 프로세스
- 클러스터 노드 중 하나에서 실행
- main() 함수를 실행
- 스파크 애플리케이션 정보의 유지 관리
- 사용자 입력에 대한 응답
- 전반적인 익스큐터 프로세스의 작업과 관련된 분석
- 배포, 스케줄링 역활 수행
- 드라이버 프로세스는 스파크 애플리케이션의 수명 주기 동안 관련 정보를 모두 유지
- (다수의) 익스큐터 프로세스
- 드라이버 프로세스가 할당한 작업 수행
- 코드 실행
- 진행 상황 드라이버 노드에 전달
- 드라이버 프로세스가 할당한 작업 수행
- 드라이버 프로세스
- 클러스터 메니저는 스파크 스탠드얼론 클러스터 매니저, 하둡 YARN, 메소스 중 하나를 선택할 수 있으며 하나의 클러스터에서 여러 개의 스파크 애플리케이션을 실행할 수 있음
스파크는 클러스터 모드/로컬 모드 지원
드라이버와 익스큐터는 같은 머신이나 다른 머신에서 실행 가능
로컬 모드에서는 드라이버와 익스큐터를 단일 머신에서 스레드 형태로 실행
- 스파크 애플리케이션 핵심
- 스파크는 사용 가능한 지원을 파악하기 위해 클러스터 매니저를 사용
- 드라이버 프로세스는 주어진 작업을 완료하기 위해 드라이버 프로그램의 명령을 익스큐터에서 실행
- 익스큐터는 대부분 스파크 코드를 실행하는 역할을 하지만, 드라이버는 스파크의 언어 API 를 통해 다양한 언어로 실행 가능
스파크 언어 APIs
- scala
- java
- python
- SQL
- R
스파크 APIs
- the low-level “unstructured” APIs
- higher-level structured APIs
스파크 시작하기~~
- SparkSession
- 스파크 애플리케이션에 사용자 명령과 데이터 전송하기 위해 SparkSession 생성
- 대화형 모드로 스파크를 시작하면 스파크 애플리케이션을 관리하는 SparkSession이 자동 생성됨
(스탠드얼론 애플리케이션으로 스파크를 시작하면 사용자 애플리케이션 코드에서 SparkSession 직접 생성해줘야 함)
SparkSession
- 스파크 애플리케이션은 SparkSession이라 불리는 드라이버 프로세스로 제어
- SparkSession 인스턴스는 사용자가 정의한 처리 명령을 클러스터에서 실행
- 하나의 SparkSession은 하나의 스파크 애플리케이션애 대응
(ㄴ 이 값들은 분상 컬렉션을 나타내며, 코드 실행 시 숫자 범위의 각 부분이 서로 다른 익스큐터에 할당됨)
DataFrame
- 대표적인 구조적 API
- 테이블의 데이터를 로우와 컬럼으로 단순하게 표현
- 스키마: 컬럼과 컬럼 타입을 정의한 목록
- 스프레드시트와 비슷하지만, DataFrame은 수천 대의 컴퓨터에 분산되어 있다는 점에서 근본적으로 다름
파티션
- 스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션(청크단위 데이터)으로 분할
- 파티션은 클러스터의 물리적 머신에 존재하는 로우의 집합을 의미
- DataFrame의 파티션은 실행 중에 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식을 나타낸다.
- 파티션 수와 익스큐터의 수에 따라 병령성이 결정됨
- 파티션이 하나라면 스파크에 수천 개의 익스큐터가 있더라도 병령성은 1
- 수백 개의 파티션이 있더라도 익스큐터가 하나라면 병령성은 1
트랜스포메이션
- 스파크 핵심 데이터 구조는 immutable하며, 데이터 구조를 변경하려면 스파크에게 변경 방법을 알려줘야하며, 이 명령을 프랜스포메이션이라 함
- 실행 결과는 출력되지 않음
추상적인 트랜스포메이션만 지정한 상태이기 때문에 액션을 호출하지 않으면 실제 트랜스포메이션을 수행하지 않는다. - 트랜스포메이션 두 가지 유형
- narrow dependency
- 좁은 트랜스포메이션은 각 입력 파티션이 하나의 출력 파티션에만 영향을 미치는 경우'
- 스파크에서 파이프라이닝을 자동으로 수행
- 즉, DataFrame에 여러 필터를 지정하는 경우 모든 작업이 메모리에서 일어남
- narrow dependency
-
- wide dependency
- 하나의 입력 파티션이 여러 출력 파티션에 영향을 미치는 경우
- 스파크가 클러스터에서 파티션을 교환하는 셔플이 일어남
- 스파크는 셔플의 결과를 메모리가 아닌, 디스트에 저장
- wide dependency
지연연산
- 스파크는 연산 그래프를 처리하기 직전까지 동작을 기다린다 (Lazy Evaluation)
- 스파크는 특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고, 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성하고
- 코드를 실행하는 마지막 순간까지 대기하다가 원형 DataFrame 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일한다
- 이 과정을 거치며 전체 데이터 흐름을 최적화...
예> DataFrame의 predicate pushdown
아주 복잡한 스파크 잡이 원시 데이터에서 하나의 로우만 가져오는 필터를 가지고 있다면 필요한 레코드 하나만 읽는 것이 가장 효율적이다. 스파크는 이 필터를 데이터소스로 위임하는 최적화 작업을 자동으로 수행
액션
- 사용자가 트랜스포메이션을 사용해 세운 논리적 실행 계획을 실제 연산으로 수행하기 위한 명령
- 일련의 트랜스포메이션으로 부터 결과를 계산하도록 지시하는 명령
- 액션 유형
- 콘솔에서 데이터를 보는 액션
- 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
- 출력 데이터 소스에 저장하는 액션
- 액션을 지정하면 스파크 잡이 시작됨
- 스파크 잡은 필터(narrow 트랜스포메이션)를 수행한 후 파티션 별로 레코드 수를 카운트(wide 트랜스포메이션)하고, 각 언어에 적합한 네이티브 객체에 결과를 모은다.
- 이때 스파크가 제공하는 스파크 UI로 클러스터 실행 중인 스파크 잡을 모니터링할 수 있다
스파크 UI
- 스파트 잡은 개별 액션에 의해 트리거되는 다수의 트랜스포메이션으로 이루어져 있으며 스파트 UI로 잡을 모니터링할 수 있다
- 로컬 모드
- http://localhost:4040
- 스파크 UI에서 확인 가능한 정보
- 스파크 잡 상태
- 환경 설정
- 클러스터 상태
- 등..
예제
- 예제 데이터
- 미국 교통통계국의 항공운항 데이터
- 각 파일은 여러 로우를 가지고 이 파일들은 CSV파일이며 반정형 데이터 포맷이다. 이 파일의 각 로우는 DataFrame의 로우가 된다.
- 스파크는 다양한 데이터 소스를 지원
- 데이터는 Sparksession 의 DataFrameReader 클래스를 사용해서 얻는다
- 이때 특정 파일 포맷과 몇가지 옵션을 함께 설정
(예제에서는 스파크 DataFrame의 스키마 정보를 알아내는 스키마 추론 기능을 사용, 파일의 첫 로우를 헤더로 지정하는 옵션도 함께 설정)
- 이때 특정 파일 포맷과 몇가지 옵션을 함께 설정
- 데이터는 Sparksession 의 DataFrameReader 클래스를 사용해서 얻는다
- 스파크는 스키마 정보를 얻기 위해 데이터를 조금 읽어드리고 해당 로우의 데이터 타입을 스파크 데이터 타입에 맞게 분석
- 하지만 운영 환경에서는 데이터를 읽는 시점에 스키마를 엄격하게 지정하는 옵션을 사용해야 한다.
- 스칼라와 파이썬에서 사용하는 DataFrame은 불특정 다수의 로우와 컬럼을 자긴다.
- 로우의 수를 알 수 없는 이유는 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메이션이기 때문.
- 스파크는 각 컬럼의 데이터 타입을 추론하기 위해 적은 양의 데이터를 읽음
- DataFrame의 take() 액션을 호출하면 이전의 head 명령과 같은 결과를 얻을 수 있다.
- 트랜스포메이션 추가 지정
- 정수 데이터 타입인 count 컬럼을 기준으로 데이터를 정렬
- sort() 메서드는 DataFrame을 변경하지 않음
- 트랜스포메이션으로 sort메서드를 사용하면 이전의 Dataframe을 변환
- sort 메서드는 단지 트랜스포메이션이기 때문에 호출 시 데이터에 아무런 변화도 일어나지 않지만, 스파크는 실행 계획을 만들고 검토하여 클러스터에서 처리할 방법을 모색한다.
- 특정 DataFrame 객체에 explain 메서드를 호출하면 DataFrame의 게보나 스파크의 쿼리 실행 계획을 확인할 수 있다.
- sort() 메서드는 DataFrame을 변경하지 않음
- 정수 데이터 타입인 count 컬럼을 기준으로 데이터를 정렬
반응형