S3에 Dataframe을 CSV으로 저장하는 방법
val peopleDfFile = spark.read.json("people.json") peopleDfFile.createOrReplaceTempView("people") val teenagersDf = spark.sql("SELECT name, age, address.city FROM people WHERE age >= 13 AND age <= 19") teenagersDf.coalesce(1).write.option("header", "true").csv("s3n://my-s3-bucket/teenagers.csv")
위 처럼 해주면 된다 ㅎㅎ
참고한 API 문서 자료
* Spark API 문서에서 확인해보니, dataframe writer 클래스의 csv 함수는 format("csv").save("path")와 같다고 한다.
* csv 파일을 저장할 때 옵션 등을 추가하고 싶으면 아래의 자료를 참고하면 된다.
출처 : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
고려할 점
1. sql을 사용해서 nested된 컬럼들은 하나의 컬럼으로 쫙 펴준다.
2. Array 타입의 컬럼은 CSV으로 저장할 수 없다.
그래서 나같은 경우는, Array 타입의 컬럼은 String으로 변환해줘서 추출했다.
예를 들어 interest (관심사) 컬럼에 관심사 종류의 아이디가 배열로 들어가 있는 경저우, ( ex. [1,2,3] )
"1,2,3" 이라는 값으로 데이터를 convert 해줬다.
array 타입을 String의 원하는 포맷으로 변환해주는 함수는 Spark UDF(User-Defined Function)를 사용해서 별도 함수를 생성해서 사용했다.
참고한 스택오버플로우 정보 링크는 다음과 같다.
http://stackoverflow.com/questions/39634284/how-to-convert-json-arraystring-to-csv-in-spark-sql
3. coalesce을 사용하는 이유
coalesce 옵션을 사용하지 않는 경우, 돌려보면 아마 CSV파일이 여러개가 생성될 것이다.
분산처리가 되면서, 파일이 여러개가 생성되는 것 같다.
Coalesce는 shuffle을 발생하지 않고, 파티션을 통합(consolidate)하여 파티션의 개수를 감소시키는 역할을 한다.
파티션의 개수가 줄어들기 때문에 parallelism은 감소하게 된다.
용도는 HDFS, 외부시스템으로 데이터를 저장하기전에 사용한다.
출처: http://ourcstory.tistory.com/147 [쌍쌍바나나의 블로그]
'소프트웨어-이야기 > 데이터 저장소 + 시각화 ' 카테고리의 다른 글
[리서치]람다 아키텍처 (1) | 2017.07.22 |
---|---|
[Spark]User Define Function (0) | 2017.03.11 |
[Spark] 여러개의 로그 파일 한번에 읽어오기 (0) | 2017.02.21 |
[Spark] S3에 파일이 존재하는지 확인하기 (0) | 2017.02.21 |
[Spark]DataFrame을 Parquet으로 저장하기 (0) | 2016.08.28 |