일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 티스토리챌린지
- docker
- 계정 관리
- 물리삭제
- redis
- Django
- 로그 백업
- grafana
- Hadoop
- node exporter
- elasticsearch
- JWT
- NoSQL
- 오블완
- ci/cd
- hive
- aws ec2
- Locust
- logstash
- prometheus
- DAG
- AWS
- unique constraint
- soft delete
- slack
- Next.js
- 논리삭제
- hard delete
- nginx
- Airflow
- Today
- Total
먹수의 개발일지
[Spark] Spark 기초 본문
Spark 구성요소와 아키텍쳐
SparkSession의 생성
- master과 appName을 필수로 지정해서 사용해보자
- local[1]에 숫자는 core 개수이며, 로컬에서 띄울때는 local로 지정해야한다. 그 외에 리소스 매니저(yarn, mesos, kubernetes 등)를 지정할 수 있다.
val spark = SparkSession.builder()
.master("local[1]") // yarn, mesos available
.appName("SparkExamples") //UI에서 이름을 찾을 수 있다.
.config("spark.sql.warehouse.dir", "<path>/spark-warehouse")
.enableHiveSupport()
.getOrCreate();
spark 실행하기
spark-shell
Dataframe 생성하기
- spakr 세션하에서 데이터프레임을 만들때 아래와 같이 만들 수 있다.
val df = spark.createDataFrame(List(("apple", 2000), ("banana", 1400), ("grape", 1700)))
df.show()
Temporary Table 생성하기
- 위의 SparkSession에서 진행한 createDataFrame으로 생성된 df로 temporary table을 생성한다.
- session 안에서만 유효한 테이블로 spark session을 내리면 지워질 테이블이다.
- spark sql로 생성한 table에 쿼리를 날려 새로운 df를 만들 수 있다.
df.createOrReplaceTempView("fruit_table")
val df2 = spark.sql("SELECT _1,_2 FROM fruit_table")
df2.show()
catalog로 현재 세션 내에서 접근 가능한 database와 table들을 확인할 수 있다.
spark.catalog.listTables.show()
아래와 같이 Temporary table 타입의 fruit_tabel이 생성된 것을 확인할 수 있다.
SparkContext
하나의 JVM으로 내가 실행하는 모든 spark job(worker)와 메타데이터를 관리한다. spark의 health정보도 sparkContext에서 관리한다.
SparkSession에서 SparkContext 얻기
spark.sparkContext
spark.sparkContext로 SparkContext 정보를 확인했을때, local로 되어있는 것을 볼 수 있다. yarn 등 다른것으로 설정해본 후 다시 확인해보자.
spark.sparkContext.master
RDD (Resilient Distributed Datasets)
Spark에서 사용하는 가장 기본적인 데이터 구조이다. 모든 자료의 구현체이다.
RDD를 사용성 있게 추상화한 것들로는 Dataframe / Dataset / SQL이 있다. API가 제공되어 편하게 사용하는 형태지만 실제 데이터는 다 RDD 형태이다.
MapReduce 작업을 빠르고 효과적으로 하는 것을 목표로 설계했다.
MapReduce의 단점
- MapReduce 작업시 외부 스토리지에 데이터를 쓰고 읽어야 하기 때문에, 생산성이 떨어진다.
- 데이터 공유 작업은 복제, serialization, disk I/O 때문에 느려진다. Hadoop에서 데이터를 사용하는 겨웅 맵리듀스는 스토리지와의 read-write에 90% 시간을 사용한다.
RDD 특징
in-memory processing을 통해 메모리 상태를 object로 저장하고, 작업들 사이에서 공유하면 Disk I/O보다 빠를 뿐만 아니라 부하도 줄어들게 된다.
메모리 상에서의 데이터 공유는 network I/O나 disk I/O 보다 10배에서 100배 빠르다고 한다.
→ RDD persist
RDD를 persist하면 RDD를 메모리에 유지할 수 있다. 여러 operation에서 반복적으로 사용하게 될 데이터의 snapshot에 대해서는 메모리에 persist 시키면 다음 작업에서는 인메모리에서 더 빠르게 변화된 데이터를 사용할 수 있다. RDD는 메모리, disk, 분산된 노드에도 persist 할 수 있다.
RDD 실습하기
데이터 경로 path로 지정하기
val path = "hdfs://nn:9000/root/data/2008.csv"
sparkContext가 hadoop 경로를 알고 있기 때문에 아래와 같이 path를 읽어서 rdd를 저장한다.
한 줄씩 읽어서 저장한다.
val rdd = spark.sparkContext.textFile(path)
//첫번째 줄 읽기
rdd.first
//1~5번째줄 까지 읽기
rdd.take(5).foreach(println)
csv 파일을 array로 한 줄씩 저장하기
val rddCsv = rdd.map(f => {
f.split(",")
})
Create RDD
RDD는 다음과 같은 방식으로 생성할 수 있다.
- 파일, 외부스토리지, hive로부터 읽어서 생성하기
- 다른 RDD의 연산 결과
- Dataframe과 Dataset에서 .rdd로 생성 가능
RDD로 수행하는 동작은 Transformation & Action으로 나뉜다.
Transformation
RDD의 데이터에 변화를 주는 spark operation들을 통칭한다. Transformation의 결과는 새로운 하나 또는 복수개의 RDD가 된다.
Spark UI 서버에서 spark history 확인하기
RDD operation 할때마다 작업들에 대해 DAG visualization을 보는 습관을 들이면 스파크 프로그래밍을 잘하는데 도움이 된다.
RDD transformations은 Lazy operation이다. rdd.first와 같이 action이 호출될대, 앞서 선언된 transformation(val rdd CSV = rdd.map(f ⇒ { f.split(”,”) }) 이 모두 수행된다.