17,spark에서 s3로 파일 쓰기 (로컬 실행)
7796 단어 aws
1,spark에서 s3에 파일 쓰기:
1 ,pom.xml :
다운로드할 때랑 일치해요.
2, scala-spark 코드:
package com.scalaDemo
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object SparkScalaTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.config("spark.eventLog.enabled", "false")
.config("spark.driver.memory", "2g")
.config("spark.executor.memory", "2g")
.appName("SparkDemoFromS3")
.getOrCreate()
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "zhanghao")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "mima")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
val seq = List(("American Person", List("Tom", "Jim")), ("China Person", List("LiLei", "HanMeiMei")), ("Color Type", List("Red", "Blue")))
val rdd1 = spark.sparkContext.parallelize(seq)
rdd1.saveAsTextFile("s3a://demo02/test/mysparkRes.txt")
spark.close()
}
}
3, 두 번째 사례:
package com.scalaDemo
import org.apache.spark.sql.SparkSession
object SparkScalaTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.config("spark.eventLog.enabled", "false")
.config("spark.driver.memory", "2g")
.config("spark.executor.memory", "2g")
.appName("SparkDemoFromS3")
.getOrCreate()
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "zh")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "mm")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
val seq = List(("a",1),("b",2))
val rdd1 = spark.sparkContext.parallelize(seq,1)
rdd1.saveAsTextFile("s3a://demo02/test/mysparkRes02.txt")
spark.close()
}
}
package com.scalaDemo
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object SparkScalaTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.config("spark.eventLog.enabled", "false")
.config("spark.driver.memory", "2g")
.config("spark.executor.memory", "2g")
.appName("SparkDemoFromS3")
.getOrCreate()
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "zhanghao")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "mima")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
val seq = List(("American Person", List("Tom", "Jim")), ("China Person", List("LiLei", "HanMeiMei")), ("Color Type", List("Red", "Blue")))
val rdd1 = spark.sparkContext.parallelize(seq)
rdd1.saveAsTextFile("s3a://demo02/test/mysparkRes.txt")
spark.close()
}
}
package com.scalaDemo
import org.apache.spark.sql.SparkSession
object SparkScalaTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.config("spark.eventLog.enabled", "false")
.config("spark.driver.memory", "2g")
.config("spark.executor.memory", "2g")
.appName("SparkDemoFromS3")
.getOrCreate()
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "zh")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "mm")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
val seq = List(("a",1),("b",2))
val rdd1 = spark.sparkContext.parallelize(seq,1)
rdd1.saveAsTextFile("s3a://demo02/test/mysparkRes02.txt")
spark.close()
}
}