Atlas에서 Spark의 Lineage 보기

12105 단어 스파크atlas
    val path = paths.map(_.path) mkString ", "
    val clustername = System.getProperty("cluster.name")
    val absolutebasepath = System.getProperty("absolute.base.path")
    val upath = path.replace(absolutebasepath,"")
    new EndpointDataset(name, qualifiedName, attributes, new FileEndpoint(upath, upath+"@"+ clustername), EndpointType.file, EndpointDirection.input, st)
    spark-shell  --master yarn  --driver-java-options='-Dspline.persistence.factory=za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory' --files /usr/hdp/2.6.4.0-91/kafka/conf/producer.properties --conf 'spark.driver.extraJavaOptions=-Datlas.kafka.bootstrap.servers=hdp264-0.field.hortonworks.com:6667 -Dbootstrap.servers=hdp264-0.field.hortonworks.com:6667 -Dspline.persistence.factory=za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory -Datlas.kafka.auto.commit.enable=false -Datlas.kafka.hook.group.id=atlas -Datlas.kafka.zookeeper.connect=hdp264-0.field.hortonworks.com:2181 -Datlas.kafka.zookeeper.connection.timeout.ms=30000 -Datlas.kafka.zookeeper.session.timeout.ms=60000 -Datlas.kafka.zookeeper.sync.time.ms=20 -Dcluster.name=hdp264 -Dabsolute.base.path=hdfs://hdp264-0.field.hortonworks.com:8020'
    import za.co.absa.spline.core.SparkLineageInitializer._
    spark.enableLineageTracking()
    import org.apache.spark.sql.SaveMode
    val sourceDS = spark.read.option("header","true").option("inferSchema","true").csv("/user/nifi/data/wikidata.csv").as("source").filter($"total_response_size" > 1000).filter($"count_views" > 10)
    val domainMappingDS =spark.read.option("header","true").option("inferSchema","true").csv("/user/nifi/data/domain.csv").as("mapping")
    val joinedDS = sourceDS.join(domainMappingDS, $"domain_code" ===$"d_code","left_outer").select($"page_title".as("page"),$"d_name".as("domain"), $"count_views")
    joinedDS.write.mode(SaveMode.Overwrite).format("orc").save("/user/nifi/sparkoutput")



좋은 웹페이지 즐겨찾기