sparkstreaming 대상을 통해 json 분석
4497 단어 spark
"Name": "소명유한공사", "id": "1233467", "company": {"KeyNo": "0o0o0asdsd", "Org": 2, "Name": "소홍"}, "Partners": [{"ooo":-1, "def":false, "kk": "97.58%"}, {"oooo":-1, "def": 0, abc": 269.65]
pom 의존
org.json
json
20160810
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.json.JSONObject
/**
* Created by zx on 2019/9/26.
*/
object demo3 {
def main(args: Array[String]): Unit = {
val group = "g001"
val conf = new SparkConf().setAppName("OrderCount").setMaster("local[4]")
val ssc = new StreamingContext(conf, Duration(5000))
val topic = "json1"
val brokerList = "hd-3:9092"
val zkQuorum = "hd-2:2181,hd-3:2181,hd-4:2181"
val topics: Set[String] = Set(topic)
val topicDirs = new ZKGroupTopicDirs(group, topic)
val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "hd-2,hd-3,hd-4")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
val tableName = "circle"
val jobConf = new JobConf(hbaseConf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val kafkaParams = Map(
"metadata.broker.list" -> brokerList,
"group.id" -> group,
"auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
)
val zkClient = new ZkClient(zkQuorum)
val children = zkClient.countChildren(zkTopicPath)
var kafkaStream: InputDStream[(String, String)] = null
var fromOffsets: Map[TopicAndPartition, Long] = Map()
if (children > 0) {
for (i partitionOffset.toLong)
}
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
} else {
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}
var offsetRanges = Array[OffsetRange]()
kafkaStream.foreachRDD{ kafkaRDD =>
if(!kafkaRDD.isEmpty()) {
offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
val lines: RDD[String] = kafkaRDD.map(_._2)
val value: RDD[dataModel] = kafkaRDD.map(x => {
val a = x._2.toString
val obj: JSONObject = new JSONObject(a)
val keyno = obj.getString("_id")
val name = obj.getString("Name")
val oper = obj.getJSONObject("Oper").toString
val Partners = obj.getJSONArray("Partners").toString
dataModel(keyno, name,oper,Partners)
})
value.map(x =>{
val put = new Put(Bytes.toBytes(x.keyno))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("companyname"), Bytes.toBytes(x.name))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oper"), Bytes.toBytes(x.oper))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("Partners"), Bytes.toBytes(x.Partners))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
for (o
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.