SPARK-SQL 기초 응용 프로그램 입문 1-sparkSession, Dataset, DataFrame, select, groupBy 등

3770 단어 #spark
관련 테스트 데이터와 포조 클래스, 블로그 보기https://blog.csdn.net/qq_41712271/article/details/107812188  
//      
import static org.apache.spark.sql.functions.col;

public static void main(String[] args) {
        //0: spark sql    
        SparkSession spark = SparkSession
                .builder()
                .config("spark.driver.host", "localhost")
                .appName("SparkSqlDabbler")
                .master("local")
                .getOrCreate();

        String jsonDataPath = "file:///e:/people.json";

        //1: DataFrame Dataset   
        Dataset dataFrame = spark.read().json(jsonDataPath);

        Dataset dataset = spark.read().json(jsonDataPath).as(Encoders.bean(Person.class));

        //2: DataFrame Dataset   
        Dataset datasetFromDF = dataFrame.as(Encoders.bean(Person.class));

        //3: DataFrame Dataset schema       
        dataFrame.schema();
        dataFrame.printSchema();

        dataset.schema();
        dataset.printSchema();

        //4:  API     sql
        dataFrame.createOrReplaceTempView("people");
        dataset.createOrReplaceTempView("people");

        Dataset sqlDF = spark.sql("select age, count(*) from people where age > 21 group by age");
        sqlDF.show();


        //5: DataFrame api   
        // Displays the content of the DataFrame to stdout
        dataFrame.show();
        // +----+-------+
        // | age|   name|
        // +----+-------+
        // |29|Michael|
        // |  30|   Andy|
        // |  19| Justin|
        // +----+-------+

        //    name     
        dataFrame.select("name").show();
        // +-------+
        // |   name|
        // +-------+
        // |Michael|
        // |   Andy|
        // | Justin|
        // +-------+

        //        ,  age      1
        dataFrame.select(col("name"), col("age").plus(1)).show();
        // +-------+---------+
        // |   name|(age + 1)|
        // +-------+---------+
        // |Michael|     30|
        // |   Andy|       31|
        // | Justin|       20|
        // +-------+---------+

        //   age > 21  
        dataFrame.filter(col("age").gt(21)).show();
        // +---+----+
        // |age|name|
        // +---+----+
        // | 30|Andy|
        // +---+----+

        // Count people by age
        dataFrame.groupBy("age").count().show();
        // +----+-----+
        // | age|count|
        // +----+-----+
        // |  19|    1|
        // |29|    1|
        // |  30|    1|
        // +----+-----+


        //6: Dataset api   
        dataset.show();
        // +---+----+
        // |age|name|
        // +---+----+
        // | 32|Andy|
        // +---+----+

        //  DSL  
        Dataset nameDF = dataset.select("name");
        nameDF.show();

        Dataset nameDS = dataset.select(dataset.col("name")).as(Encoders.STRING());
        nameDS.show();

        //         
        KeyValueGroupedDataset kvgd = dataset.map(new MapFunction() {
            @Override
            public Person call(Person person) throws Exception {
                if (person.getAge() > 10) {
                    person.setAge(10);
                }
                return person;
            }
        }, Encoders.bean(Person.class)).groupByKey(new MapFunction() {
            @Override
            public String call(Person person) throws Exception {
                return person.getName();
            }
        }, Encoders.STRING());

        kvgd.count().show();

        spark.stop();
    }

좋은 웹페이지 즐겨찾기