Spark: CSV 파일 쓰기
Spark 는 saveasTextFile 함 수 를 제공 합 니 다. 이 함 수 는 RDD 코드 를 저장 할 수 있 도록 해 줍 니 다. 그래서 저 는 코드 를 다음 과 같은 형식 으로 재 구성 하여 사용 할 수 있 도록 합 니 다.
import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
(columns(5), 1)
})
})
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file) Spark 셸 에서 이 코드 를 실행 하면 최종 적 으로 / tmp / primary Types. csv 라 는 폴 더 를 얻 을 수 있 습 니 다. 여러 개의 부품 파일 이 포함 되 어 있 습 니 다.
$ ls -lah /tmp/primaryTypes.csv/
total 496
drwxr-xr-x 66 markneedham wheel 2.2K 30 Nov 07:17 .
drwxrwxrwt 80 root wheel 2.7K 30 Nov 07:16 ..
-rw-r--r-- 1 markneedham wheel 8B 30 Nov 07:16 ._SUCCESS.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00000.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00001.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00002.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00003.crc
...
-rwxrwxrwx 1 markneedham wheel 0B 30 Nov 07:16 _SUCCESS
-rwxrwxrwx 1 markneedham wheel 28B 30 Nov 07:16 part-00000
-rwxrwxrwx 1 markneedham wheel 17B 30 Nov 07:16 part-00001
-rwxrwxrwx 1 markneedham wheel 23B 30 Nov 07:16 part-00002
-rwxrwxrwx 1 markneedham wheel 16B 30 Nov 07:16 part-00003
... 만약 에 우리 가 그 중의 일부 부품 파일 을 보면 우 리 는 그것 이 범죄 유형 과 예상 수량 을 기록 한 것 을 볼 수 있다.
$ cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
$ cat /tmp/primaryTypes.csv/part-00003
BURGLARY,257310 만약 우리 가 이 CSV 파일 을 다른 Hadoop 기반 작업 에 전달 하려 고 한다 면, 이것 은 매우 좋 지만, 나 는 실제로 CSV 파일 만 을 원 하기 때문에 이것 은 내 가 원 하 는 것 이 아니다.
이 목적 을 실현 하 는 방법 은 모든 내용 을 한 구역 에 강제로 계산 하 는 것 이다. 이것 은 우리 가 하나의 부품 파일 만 생 성 한 다 는 것 을 의미한다.
val counts = partitions.repartition(1).
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file) part - 0000 지금 이렇게 보 여요.
$ cat !$
cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
NON-CRIMINAL,12
RITUALISM,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2 이것 은 일 을 할 수 있 지만 우리 가 분 구 를 넘 어 집합 할 때 보다 훨씬 느 려 서 이상 적 이지 않다.
반대로 우리 가 할 수 있 는 일 은 Hadoop 의 통합 기능 중 하 나 를 이용 하여 일부 파일 을 한 파일 에 압축 하 는 것 이다.
우선, Hadoop 을 SBT 파일 로 가 져 옵 니 다.
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2" 이제 통합 기능 을 Spark 셸 에 도입 합 니 다.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
} 이제 우 리 는 그것 을 이용 합 시다.
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file)
merge(file, destinationFile) 지금 우 리 는 둘 다 좋다.
$ cat /tmp/singlePrimaryTypes.csv
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2 4. 567917. 완전한 코드 를 사용 하려 면 그 요점 에 따라 조작 할 수 있 습 니 다
번역https://www.javacodegeeks.com/2014/12/spark-write-to-csv-file.html
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.