본문 바로가기

소프트웨어-이야기/데이터 저장소 + 시각화

[Spark]DataFrame을 S3에 CSV으로 저장하기

S3에 Dataframe을 CSV으로 저장하는 방법


val peopleDfFile = spark.read.json("people.json") peopleDfFile.createOrReplaceTempView("people") val teenagersDf = spark.sql("SELECT name, age, address.city FROM people WHERE age >= 13 AND age <= 19") teenagersDf.coalesce(1).write.option("header", "true").csv("s3n://my-s3-bucket/teenagers.csv")


위 처럼 해주면 된다 ㅎㅎ 


참고한  API  문서 자료


* Spark API 문서에서 확인해보니, dataframe writer 클래스의 csv 함수는 format("csv").save("path")와 같다고 한다.

* csv 파일을 저장할 때 옵션 등을 추가하고 싶으면 아래의 자료를 참고하면 된다.


출처 : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter



고려할 점


1. sql을 사용해서 nested된 컬럼들은 하나의 컬럼으로 쫙 펴준다.


2. Array 타입의 컬럼은 CSV으로 저장할 수 없다.


그래서 나같은 경우는, Array 타입의 컬럼은 String으로 변환해줘서 추출했다.

예를 들어 interest (관심사) 컬럼에 관심사 종류의 아이디가 배열로 들어가 있는 경저우, ( ex. [1,2,3] )

"1,2,3" 이라는 값으로 데이터를 convert 해줬다.

array 타입을 String의 원하는 포맷으로 변환해주는 함수는 Spark UDF(User-Defined Function)를 사용해서 별도 함수를 생성해서 사용했다.

참고한 스택오버플로우 정보 링크는 다음과 같다.

http://stackoverflow.com/questions/39634284/how-to-convert-json-arraystring-to-csv-in-spark-sql


3. coalesce을 사용하는 이유


coalesce 옵션을 사용하지 않는 경우, 돌려보면 아마 CSV파일이 여러개가 생성될 것이다.

분산처리가 되면서, 파일이 여러개가 생성되는 것 같다.

Coalesce는 shuffle을 발생하지 않고, 파티션을 통합(consolidate)하여 파티션의 개수를 감소시키는 역할을 한다.

파티션의 개수가 줄어들기 때문에 parallelism은 감소하게 된다. 

용도는 HDFS, 외부시스템으로 데이터를 저장하기전에 사용한다. 

출처: http://ourcstory.tistory.com/147 [쌍쌍바나나의 블로그]