sparkstreaming 대상을 통해 json 분석

4497 단어 spark
데이터 형식
"Name": "소명유한공사", "id": "1233467", "company": {"KeyNo": "0o0o0asdsd", "Org": 2, "Name": "소홍"}, "Partners": [{"ooo":-1, "def":false, "kk": "97.58%"}, {"oooo":-1, "def": 0, abc": 269.65]
pom 의존
  
 		org.json
 		json
   		20160810
   
 

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.json.JSONObject


/**
  * Created by zx on 2019/9/26.
  */
object demo3 {
  def main(args: Array[String]): Unit = {

    val group = "g001"
    val conf = new SparkConf().setAppName("OrderCount").setMaster("local[4]")
    val ssc = new StreamingContext(conf, Duration(5000))
    val topic = "json1"
    val brokerList = "hd-3:9092"
    val zkQuorum = "hd-2:2181,hd-3:2181,hd-4:2181"
    val topics: Set[String] = Set(topic)
    val topicDirs = new ZKGroupTopicDirs(group, topic)
    val zkTopicPath = s"${topicDirs.consumerOffsetDir}"

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "hd-2,hd-3,hd-4")
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")

    val tableName = "circle"
    val jobConf = new JobConf(hbaseConf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)


    val kafkaParams = Map(
      "metadata.broker.list" -> brokerList,
      "group.id" -> group,
      "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
    )

    val zkClient = new ZkClient(zkQuorum)
    val children = zkClient.countChildren(zkTopicPath)
    var kafkaStream: InputDStream[(String, String)] = null
    var fromOffsets: Map[TopicAndPartition, Long] = Map()

    if (children > 0) {
      for (i  partitionOffset.toLong)
      }

      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())

      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    } else {
      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    }

    var offsetRanges = Array[OffsetRange]()

    kafkaStream.foreachRDD{ kafkaRDD =>
      if(!kafkaRDD.isEmpty()) {
        offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
        val lines: RDD[String] = kafkaRDD.map(_._2)

        val value: RDD[dataModel] = kafkaRDD.map(x => {
          val a = x._2.toString
          val obj: JSONObject = new JSONObject(a)
          val keyno = obj.getString("_id")
          val name = obj.getString("Name")
          val oper = obj.getJSONObject("Oper").toString
          val Partners = obj.getJSONArray("Partners").toString
          dataModel(keyno, name,oper,Partners)
        })

        value.map(x =>{
          val put = new Put(Bytes.toBytes(x.keyno))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("companyname"), Bytes.toBytes(x.name))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oper"), Bytes.toBytes(x.oper))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("Partners"), Bytes.toBytes(x.Partners))
          (new ImmutableBytesWritable, put)
        }).saveAsHadoopDataset(jobConf)

        for (o 

좋은 웹페이지 즐겨찾기