Spark: CSV 파일 쓰기

몇 주 전에 나 는 Spark 를 어떻게 사용 하여 시카고 시의 범죄 데이터 세트 를 탐색 하 는 지 썼 고 모든 범죄 의 수량 을 얻 었 다. 나 는 이 를 CSV 파일 에 쓰 고 싶다.
Spark 는 saveasTextFile 함 수 를 제공 합 니 다. 이 함 수 는 RDD 코드 를 저장 할 수 있 도록 해 줍 니 다. 그래서 저 는 코드 를 다음 과 같은 형식 으로 재 구성 하여 사용 할 수 있 도록 합 니 다.
import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
 
def dropHeader(data: RDD[String]): RDD[String] = {
  data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
      lines.drop(1)
    }
    lines
  })
}
 
// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
 
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
 
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
 
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
  val parser = new CSVParser(',')
  lines.map(line => {
    val columns = parser.parseLine(line)
    (columns(5), 1)
  })
})
 
val counts = partitions.
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }
 
counts.saveAsTextFile(file)

Spark 셸 에서 이 코드 를 실행 하면 최종 적 으로 / tmp / primary Types. csv 라 는 폴 더 를 얻 을 수 있 습 니 다. 여러 개의 부품 파일 이 포함 되 어 있 습 니 다.
$ ls -lah /tmp/primaryTypes.csv/
total 496
drwxr-xr-x  66 markneedham  wheel   2.2K 30 Nov 07:17 .
drwxrwxrwt  80 root         wheel   2.7K 30 Nov 07:16 ..
-rw-r--r--   1 markneedham  wheel     8B 30 Nov 07:16 ._SUCCESS.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00000.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00001.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00002.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00003.crc
...
-rwxrwxrwx   1 markneedham  wheel     0B 30 Nov 07:16 _SUCCESS
-rwxrwxrwx   1 markneedham  wheel    28B 30 Nov 07:16 part-00000
-rwxrwxrwx   1 markneedham  wheel    17B 30 Nov 07:16 part-00001
-rwxrwxrwx   1 markneedham  wheel    23B 30 Nov 07:16 part-00002
-rwxrwxrwx   1 markneedham  wheel    16B 30 Nov 07:16 part-00003
...

만약 에 우리 가 그 중의 일부 부품 파일 을 보면 우 리 는 그것 이 범죄 유형 과 예상 수량 을 기록 한 것 을 볼 수 있다.
$ cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
 
$ cat /tmp/primaryTypes.csv/part-00003
BURGLARY,257310

만약 우리 가 이 CSV 파일 을 다른 Hadoop 기반 작업 에 전달 하려 고 한다 면, 이것 은 매우 좋 지만, 나 는 실제로 CSV 파일 만 을 원 하기 때문에 이것 은 내 가 원 하 는 것 이 아니다.
이 목적 을 실현 하 는 방법 은 모든 내용 을 한 구역 에 강제로 계산 하 는 것 이다. 이것 은 우리 가 하나의 부품 파일 만 생 성 한 다 는 것 을 의미한다.
val counts = partitions.repartition(1).
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }
 
 
counts.saveAsTextFile(file)

part - 0000 지금 이렇게 보 여요.
$ cat !$
cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
NON-CRIMINAL,12
RITUALISM,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

이것 은 일 을 할 수 있 지만 우리 가 분 구 를 넘 어 집합 할 때 보다 훨씬 느 려 서 이상 적 이지 않다.
반대로 우리 가 할 수 있 는 일 은 Hadoop 의 통합 기능 중 하 나 를 이용 하여 일부 파일 을 한 파일 에 압축 하 는 것 이다.
우선, Hadoop 을 SBT 파일 로 가 져 옵 니 다.
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"

이제 통합 기능 을 Spark 셸 에 도입 합 니 다.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
 
def merge(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

이제 우 리 는 그것 을 이용 합 시다.
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
 
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
 
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
 
counts.saveAsTextFile(file)
 
merge(file, destinationFile)

지금 우 리 는 둘 다 좋다.
$ cat /tmp/singlePrimaryTypes.csv
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

4. 567917. 완전한 코드 를 사용 하려 면 그 요점 에 따라 조작 할 수 있 습 니 다
번역https://www.javacodegeeks.com/2014/12/spark-write-to-csv-file.html

좋은 웹페이지 즐겨찾기