SparkStreaming pull data from Flume
이 예 는 Spark Streaming 이 flume 에서 데 이 터 를 끌 어 내 는 실험 을 하 는 것 이다.
1. 면 설정 flume
1. 우선 필요 한 jar 다운로드: 위의 연결 에 있 습 니 다./ usr / local / flue / apache - flue - 1.6.0 - bin / lib 디 렉 터 리 에 복사 합 니 다.
jar 는 홈 페이지 에서 다운로드 할 수도 있 고 첨부 파일 의 jar 를 다운로드 할 수도 있 습 니 다.
master 1 의 flume 설정
master 1 에서 프로필 수정root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-conf.properties
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1
#set source
agent1.sources.r1.type = spooldir
agent1.sources.r1.spoolDir =/usr/local/flume/tmp/TestDir
agent1.sources.r1.channels = c1
agent1.sources.r1.fileHeader = false
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = timestamp
# set sink to hdfs
#agent1.sinks.k1.type=hdfs
#agent1.sinks.k1.hdfs.path=hdfs://master1:9000/library/flume
#agent1.sinks.k1.hdfs.fileType=DataStream
#agent1.sinks.k1.hdfs.writerFormat=TEXT
#agent1.sinks.k1.hdfs.roolInterval=1
#agent1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
#agent1.sinks.k1.channel=c1
#set sink to Spark Streaming
#agent1.sinks.k1.type = avro
#agent1.sinks.k1.channel = c1
#agent1.sinks.k1.hostname = master1
#agent1.sinks.k1.port = 9999
#set sink Spark Streaming pull data from flume
agent1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
agent1.sinks.k1.hostname = master1
agent1.sinks.k1.port = 9999
agent1.sinks.k1.channel = c1
#set channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir=/usr/local/flume/tmp/checkpointDir
agent2.channels.c1.dataDirs=/usr/local/flume/tmp/dataDirs
2. 원본 코드 작성
package com.imf.spark.SparkApps.sparkstreaming;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import scala.Tuple2;
/**
*
* @Description:Spark Streaming flume
* @Author: lujinyong168
* @Date: 2016 6 19 3:37:01
*/
public class SparkStreamingPullDataFromFlume {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingPullDataFromFlume for Java");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));
// JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils.createStream(jsc,"master1", 9999); flume push data to Spark Streaming
JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils.createPollingStream(jsc,"master1", 9999);//Spark Streaming pull data from flume
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(SparkFlumeEvent event) throws Exception {
String line = new String(event.event().getBody().array());
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { // Key, Value ( Local Reducer Reduce)
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
3. 시작 스 크 립 트 작성
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
--class com.imf.spark.SparkApps.sparkstreaming.SparkStreamingPullDataFromFlume \
--master spark://master1:7077 \
/usr/local/sparkApps/SparkStreamingPullDataFromFlume/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar
4. 시동
Hadoop 클 러 스 터 를 먼저 시작 하고 Spark 클 러 스 터 는 시작 하지 않 고 standalone 모드 를 사용 할 수 있 습 니 다.
가동 flume
명령:root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf# flume-ng agent -n agent1 -c conf -f flume-conf.properties -Dflume.root.logger=DEBUG,console
콘 솔 정보 보기 (일부 정보 캡 처):
16/06/19 16:52:27 INFO node.Application: Starting Sink k1
16/06/19 16:52:27 INFO sink.SparkSink: Starting Spark Sink: k1 on port: 9999 and interface: master1 with pool size: 10 and transaction timeout: 60.
16/06/19 16:52:27 INFO node.Application: Starting Source r1
16/06/19 16:52:27 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /usr/local/flume/tmp/TestDir
16/06/19 16:52:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
16/06/19 16:52:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
spark Streaming job 의 스 케 쥴 러 스 크 립 트 를 시작 하고 콘 솔 을 보 며 job 30 초 에 한 번 예약 합 니 다.
root@master1:/usr/local/sparkApps/SparkStreamingPullDataFromFlume# ./run.sh
16/06/19 16:59:11 INFO spark.SparkContext: Running Spark version 1.6.0
16/06/19 16:59:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/19 16:59:12 WARN spark.SparkConf:
SPARK_CLASSPATH was detected (set to '/usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar:').
This is deprecated in Spark 1.0+.
Please instead use:
- ./spark-submit with --driver-class-path to augment the driver classpath
- spark.executor.extraClassPath to augment the executor classpath
16/06/19 16:59:12 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar:' as a work-around.
16/06/19 16:59:12 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar:' as a work-around.
16/06/19 16:59:13 INFO spark.SecurityManager: Changing view acls to: root
16/06/19 16:59:13 INFO spark.SecurityManager: Changing modify acls to: root
16/06/19 16:59:13 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
16/06/19 16:59:14 INFO util.Utils: Successfully started service 'sparkDriver' on port 32969.
16/06/19 16:59:14 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/06/19 16:59:14 INFO Remoting: Starting remoting
16/06/19 16:59:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:46574]
16/06/19 16:59:15 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 46574.
16/06/19 16:59:15 INFO spark.SparkEnv: Registering MapOutputTracker
16/06/19 16:59:15 INFO spark.SparkEnv: Registering BlockManagerMaster
16/06/19 16:59:15 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-5b79546a-c1c2-466c-b72f-cef9cae03ffb
16/06/19 16:59:15 INFO storage.MemoryStore: MemoryStore started with capacity 517.4 MB
16/06/19 16:59:15 INFO spark.SparkEnv: Registering OutputCommitCoordinator
16/06/19 16:59:15 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/06/19 16:59:15 INFO server.AbstractConnector: Started [email protected]:4040
16/06/19 16:59:15 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
16/06/19 16:59:15 INFO ui.SparkUI: Started SparkUI at http://192.168.112.130:4040
16/06/19 16:59:15 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-63eb086d-39bf-4416-b21b-24acaf38d99c/httpd-2322a15e-b95e-4b4d-8bdb-fbb37e7d6c16
16/06/19 16:59:15 INFO spark.HttpServer: Starting HTTP Server
16/06/19 16:59:16 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/06/19 16:59:16 INFO server.AbstractConnector: Started [email protected]:34178
16/06/19 16:59:16 INFO util.Utils: Successfully started service 'HTTP file server' on port 34178.
16/06/19 16:59:17 INFO spark.SparkContext: Added JAR file:/usr/local/sparkApps/SparkStreamingPullDataFromFlume/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://192.168.112.130:34178/jars/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1466326757806
16/06/19 16:59:18 INFO executor.Executor: Starting executor ID driver on host localhost
16/06/19 16:59:18 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40258.
16/06/19 16:59:18 INFO netty.NettyBlockTransferService: Server created on 40258
16/06/19 16:59:18 INFO storage.BlockManagerMaster: Trying to register BlockManager
16/06/19 16:59:18 INFO storage.BlockManagerMasterEndpoint: Registering block manager localhost:40258 with 517.4 MB RAM, BlockManagerId(driver, localhost, 40258)
16/06/19 16:59:18 INFO storage.BlockManagerMaster: Registered BlockManager
16/06/19 16:59:19 INFO scheduler.EventLoggingListener: Logging events to hdfs://master1:9000/historyserverforSpark/local-1466326757943
16/06/19 16:59:20 INFO scheduler.ReceiverTracker: Starting 1 receivers
16/06/19 16:59:20 INFO scheduler.ReceiverTracker: ReceiverTracker started
16/06/19 16:59:20 INFO dstream.ForEachDStream: metadataCleanupDelay = -1
16/06/19 16:59:20 INFO dstream.ShuffledDStream: metadataCleanupDelay = -1
16/06/19 16:59:20 INFO dstream.MappedDStream: metadataCleanupDelay = -1
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: metadataCleanupDelay = -1
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: metadataCleanupDelay = -1
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Slide time = 30000 ms
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Checkpoint interval = null
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Remember duration = 30000 ms
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Initialized and validated org.apache.spark.streaming.flume.FlumePollingInputDStream@538a1d89
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Slide time = 30000 ms
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Checkpoint interval = null
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Remember duration = 30000 ms
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@1e794193
16/06/19 16:59:20 INFO dstream.MappedDStream: Slide time = 30000 ms
16/06/19 16:59:20 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/19 16:59:20 INFO dstream.MappedDStream: Checkpoint interval = null
16/06/19 16:59:20 INFO dstream.MappedDStream: Remember duration = 30000 ms
16/06/19 16:59:20 INFO dstream.MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@2647c258
16/06/19 16:59:20 INFO dstream.ShuffledDStream: Slide time = 30000 ms
16/06/19 16:59:20 INFO dstream.ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/19 16:59:20 INFO dstream.ShuffledDStream: Checkpoint interval = null
16/06/19 16:59:20 INFO dstream.ShuffledDStream: Remember duration = 30000 ms
16/06/19 16:59:20 INFO dstream.ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@f9cdfc5
16/06/19 16:59:20 INFO dstream.ForEachDStream: Slide time = 30000 ms
16/06/19 16:59:20 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/19 16:59:20 INFO dstream.ForEachDStream: Checkpoint interval = null
16/06/19 16:59:20 INFO dstream.ForEachDStream: Remember duration = 30000 ms
16/06/19 16:59:20 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@cf9fdea
16/06/19 16:59:20 INFO scheduler.ReceiverTracker: Receiver 0 started
16/06/19 16:59:20 INFO scheduler.DAGScheduler: Got job 0 (start at SparkStreamingPullDataFromFlume.java:63) with 1 output partitions
16/06/19 16:59:20 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (start at SparkStreamingPullDataFromFlume.java:63)
16/06/19 16:59:20 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/06/19 16:59:20 INFO scheduler.DAGScheduler: Missing parents: List()
16/06/19 16:59:20 INFO util.RecurringTimer: Started timer for JobGenerator at time 1466326770000
16/06/19 16:59:20 INFO scheduler.JobGenerator: Started JobGenerator at 1466326770000 ms
16/06/19 16:59:20 INFO scheduler.JobScheduler: Started JobScheduler
16/06/19 16:59:20 INFO streaming.StreamingContext: StreamingContext started
16/06/19 16:59:20 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:588), which has no missing parents
16/06/19 16:59:21 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.0 KB, free 61.0 KB)
16/06/19 16:59:21 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.5 KB, free 81.6 KB)
16/06/19 16:59:21 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40258 (size: 20.5 KB, free: 517.4 MB)
16/06/19 16:59:21 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/06/19 16:59:21 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:588)
16/06/19 16:59:21 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/06/19 16:59:21 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 3106 bytes)
16/06/19 16:59:21 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
16/06/19 16:59:21 INFO executor.Executor: Fetching http://192.168.112.130:34178/jars/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1466326757806
16/06/19 16:59:21 INFO util.Utils: Fetching http://192.168.112.130:34178/jars/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar to /tmp/spark-63eb086d-39bf-4416-b21b-24acaf38d99c/userFiles-fcba890a-719c-4351-bad0-38118b66a90c/fetchFileTemp3397272035245474294.tmp
16/06/19 16:59:23 INFO executor.Executor: Adding file:/tmp/spark-63eb086d-39bf-4416-b21b-24acaf38d99c/userFiles-fcba890a-719c-4351-bad0-38118b66a90c/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar to class loader
16/06/19 16:59:23 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1466326763400
16/06/19 16:59:23 INFO receiver.BlockGenerator: Started BlockGenerator
16/06/19 16:59:23 INFO receiver.BlockGenerator: Started block pushing thread
16/06/19 16:59:23 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 192.168.112.130:32969
16/06/19 16:59:23 INFO receiver.ReceiverSupervisorImpl: Starting receiver
16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads..
16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads..
16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads..
16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads..
16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads..
16/06/19 16:59:23 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
16/06/19 16:59:23 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped
16/06/19 16:59:30 INFO scheduler.JobScheduler: Added jobs for time 1466326770000 ms
16/06/19 16:59:30 INFO scheduler.JobScheduler: Starting job streaming job 1466326770000 ms.0 from job set of time 1466326770000 ms
16/06/19 16:59:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Registering RDD 3 (mapToPair at SparkStreamingPullDataFromFlume.java:41)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Got job 1 (print at SparkStreamingPullDataFromFlume.java:61) with 1 output partitions
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (print at SparkStreamingPullDataFromFlume.java:61)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Missing parents: List()
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents
16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.9 KB, free 84.5 KB)
16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1805.0 B, free 86.3 KB)
16/06/19 16:59:30 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:40258 (size: 1805.0 B, free: 517.4 MB)
16/06/19 16:59:30 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51)
16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 1988 bytes)
16/06/19 16:59:30 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 1)
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
16/06/19 16:59:30 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 1). 1161 bytes result sent to driver
16/06/19 16:59:30 INFO scheduler.DAGScheduler: ResultStage 2 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.086 s
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 1) in 71 ms on localhost (1/1)
16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Job 1 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.159471 s
16/06/19 16:59:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61
16/06/19 16:59:30 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 82 bytes
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Got job 2 (print at SparkStreamingPullDataFromFlume.java:61) with 3 output partitions
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (print at SparkStreamingPullDataFromFlume.java:61)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Missing parents: List()
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents
16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.9 KB, free 89.2 KB)
16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1805.0 B, free 91.0 KB)
16/06/19 16:59:30 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:40258 (size: 1805.0 B, free: 517.4 MB)
16/06/19 16:59:30 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 4 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51)
16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 3 tasks
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 2, localhost, partition 1,PROCESS_LOCAL, 1988 bytes)
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 3, localhost, partition 2,PROCESS_LOCAL, 1988 bytes)
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 4, localhost, partition 3,PROCESS_LOCAL, 1988 bytes)
16/06/19 16:59:30 INFO executor.Executor: Running task 0.0 in stage 4.0 (TID 2)
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/06/19 16:59:30 INFO executor.Executor: Finished task 0.0 in stage 4.0 (TID 2). 1161 bytes result sent to driver
16/06/19 16:59:30 INFO executor.Executor: Running task 2.0 in stage 4.0 (TID 4)
16/06/19 16:59:30 INFO executor.Executor: Running task 1.0 in stage 4.0 (TID 3)
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 2) in 16 ms on localhost (1/3)
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms
16/06/19 16:59:30 INFO executor.Executor: Finished task 1.0 in stage 4.0 (TID 3). 1161 bytes result sent to driver
16/06/19 16:59:30 INFO executor.Executor: Finished task 2.0 in stage 4.0 (TID 4). 1161 bytes result sent to driver
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 4.0 (TID 3) in 41 ms on localhost (2/3)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: ResultStage 4 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.041 s
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Job 2 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.113834 s
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 4.0 (TID 4) in 42 ms on localhost (3/3)
16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
-------------------------------------------
Time: 1466326770000 ms
-------------------------------------------
16/06/19 16:59:30 INFO scheduler.JobScheduler: Finished job streaming job 1466326770000 ms.0 from job set of time 1466326770000 ms
16/06/19 16:59:30 INFO scheduler.JobScheduler: Total delay: 0.480 s for time 1466326770000 ms (execution: 0.373 s)
16/06/19 16:59:30 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
16/06/19 16:59:30 INFO scheduler.InputInfoTracker: remove old batch metadata:
스파크 스 트 리밍 의 job 가 시작 되면 플 루 미 콘 솔 에서 아래 로 그 를 볼 수 있 으 며, 스파크 스 트 리밍 과 플 루 미 통신 이 성공 했다 는 것 을 설명 한다.
16/06/19 16:52:27 INFO sink.SparkSink: Starting Avro server for sink: k1
16/06/19 16:52:27 INFO sink.SparkSink: Blocking Sink Runner, sink will continue to run..
16/06/19 16:52:47 INFO ipc.NettyServer: [id: 0x976e21b0, /192.168.112.130:51610 => /192.168.112.130:9999] OPEN
16/06/19 16:52:47 INFO ipc.NettyServer: [id: 0x976e21b0, /192.168.112.130:51610 => /192.168.112.130:9999] BOUND: /192.168.112.130:9999
16/06/19 16:52:47 INFO ipc.NettyServer: [id: 0x976e21b0, /192.168.112.130:51610 => /192.168.112.130:9999] CONNECTED: /192.168.112.130:51610
테스트 데이터
파일 생 성 test7. log, TestDir 디 렉 터 리 에 복사 합 니 다.
root@master1:/usr/local/flume/tmp# cat test_7.log
Hello Java Java
Hello Hadoop
Hello Spark Spark Spark
root@master1:/usr/local/flume/tmp# cp test_7.log TestDir/
flume 콘 솔 정보 보기:
16/06/19 17:08:11 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
16/06/19 17:08:11 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /usr/local/flume/tmp/TestDir/test_7.log to /usr/local/flume/tmp/TestDir/test_7.log.COMPLETED
16/06/19 17:08:26 INFO file.EventQueueBackingStoreFile: Start checkpoint for /usr/local/flume/tmp/checkpointDir/checkpoint, elements to sync = 3
16/06/19 17:08:26 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1466326346643, queueSize: 0, queueHead: 7
16/06/19 17:08:26 INFO file.Log: Updated checkpoint for file: /root/.flume/file-channel/data/log-9 position: 1060 logWriteOrderID: 1466326346643
job 콘 솔 정보 보기:
16/06/19 17:08:30 INFO scheduler.JobScheduler: Starting job streaming job 1466327310000 ms.0 from job set of time 1466327310000 ms
16/06/19 17:08:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Registering RDD 75 (mapToPair at SparkStreamingPullDataFromFlume.java:41)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Got job 37 (print at SparkStreamingPullDataFromFlume.java:61) with 1 output partitions
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 74 (print at SparkStreamingPullDataFromFlume.java:61)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 73)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 73)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 73 (MapPartitionsRDD[75] at mapToPair at SparkStreamingPullDataFromFlume.java:41), which has no missing parents
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_39 stored as values in memory (estimated size 3.5 KB, free 99.4 KB)
16/06/19 17:08:30 INFO scheduler.JobScheduler: Added jobs for time 1466327310000 ms
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_39_piece0 stored as bytes in memory (estimated size 1984.0 B, free 101.3 KB)
16/06/19 17:08:30 INFO storage.BlockManagerInfo: Added broadcast_39_piece0 in memory on localhost:40258 (size: 1984.0 B, free: 517.4 MB)
16/06/19 17:08:30 INFO spark.SparkContext: Created broadcast 39 from broadcast at DAGScheduler.scala:1006
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 73 (MapPartitionsRDD[75] at mapToPair at SparkStreamingPullDataFromFlume.java:41)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Adding task set 73.0 with 1 tasks
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 73.0 (TID 75, localhost, partition 0,NODE_LOCAL, 2100 bytes)
16/06/19 17:08:30 INFO executor.Executor: Running task 0.0 in stage 73.0 (TID 75)
16/06/19 17:08:30 INFO storage.BlockManager: Found block input-0-1466326763359 locally
16/06/19 17:08:30 INFO executor.Executor: Finished task 0.0 in stage 73.0 (TID 75). 1161 bytes result sent to driver
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 73.0 (TID 75) in 18 ms on localhost (1/1)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 73.0, whose tasks have all completed, from pool
16/06/19 17:08:30 INFO scheduler.DAGScheduler: ShuffleMapStage 73 (mapToPair at SparkStreamingPullDataFromFlume.java:41) finished in 0.014 s
16/06/19 17:08:30 INFO scheduler.DAGScheduler: looking for newly runnable stages
16/06/19 17:08:30 INFO scheduler.DAGScheduler: running: Set(ResultStage 0)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 74)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: failed: Set()
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting ResultStage 74 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_40 stored as values in memory (estimated size 2.9 KB, free 104.3 KB)
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_40_piece0 stored as bytes in memory (estimated size 1807.0 B, free 106.1 KB)
16/06/19 17:08:30 INFO storage.BlockManagerInfo: Added broadcast_40_piece0 in memory on localhost:40258 (size: 1807.0 B, free: 517.4 MB)
16/06/19 17:08:30 INFO spark.SparkContext: Created broadcast 40 from broadcast at DAGScheduler.scala:1006
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 74 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Adding task set 74.0 with 1 tasks
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 74.0 (TID 76, localhost, partition 0,PROCESS_LOCAL, 1988 bytes)
16/06/19 17:08:30 INFO executor.Executor: Running task 0.0 in stage 74.0 (TID 76)
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/06/19 17:08:30 INFO executor.Executor: Finished task 0.0 in stage 74.0 (TID 76). 1161 bytes result sent to driver
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 74.0 (TID 76) in 3 ms on localhost (1/1)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 74.0, whose tasks have all completed, from pool
16/06/19 17:08:30 INFO scheduler.DAGScheduler: ResultStage 74 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.001 s
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Job 37 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.054738 s
16/06/19 17:08:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61
16/06/19 17:08:30 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 18 is 147 bytes
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Got job 38 (print at SparkStreamingPullDataFromFlume.java:61) with 3 output partitions
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 76 (print at SparkStreamingPullDataFromFlume.java:61)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 75)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Missing parents: List()
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting ResultStage 76 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_41 stored as values in memory (estimated size 2.9 KB, free 109.0 KB)
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_41_piece0 stored as bytes in memory (estimated size 1807.0 B, free 110.8 KB)
16/06/19 17:08:30 INFO storage.BlockManagerInfo: Added broadcast_41_piece0 in memory on localhost:40258 (size: 1807.0 B, free: 517.4 MB)
16/06/19 17:08:30 INFO spark.SparkContext: Created broadcast 41 from broadcast at DAGScheduler.scala:1006
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 76 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Adding task set 76.0 with 3 tasks
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 76.0 (TID 77, localhost, partition 1,NODE_LOCAL, 1988 bytes)
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 76.0 (TID 78, localhost, partition 2,NODE_LOCAL, 1988 bytes)
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 76.0 (TID 79, localhost, partition 3,PROCESS_LOCAL, 1988 bytes)
16/06/19 17:08:30 INFO executor.Executor: Running task 0.0 in stage 76.0 (TID 77)
16/06/19 17:08:30 INFO executor.Executor: Running task 2.0 in stage 76.0 (TID 79)
16/06/19 17:08:30 INFO executor.Executor: Running task 1.0 in stage 76.0 (TID 78)
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
16/06/19 17:08:30 INFO executor.Executor: Finished task 2.0 in stage 76.0 (TID 79). 1161 bytes result sent to driver
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 76.0 (TID 79) in 7 ms on localhost (1/3)
16/06/19 17:08:30 INFO executor.Executor: Finished task 0.0 in stage 76.0 (TID 77). 1336 bytes result sent to driver
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 76.0 (TID 77) in 18 ms on localhost (2/3)
16/06/19 17:08:30 INFO executor.Executor: Finished task 1.0 in stage 76.0 (TID 78). 1334 bytes result sent to driver
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 76.0 (TID 78) in 18 ms on localhost (3/3)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 76.0, whose tasks have all completed, from pool
16/06/19 17:08:30 INFO scheduler.DAGScheduler: ResultStage 76 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.003 s
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Job 38 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.046156 s
-------------------------------------------
Time: 1466327310000 ms
-------------------------------------------
(Spark,3)
(Hadoop,1)
(Hello,3)
(Java,2)
16/06/19 17:08:30 INFO scheduler.JobScheduler: Finished job streaming job 1466327310000 ms.0 from job set of time 1466327310000 ms
16/06/19 17:08:30 INFO scheduler.JobScheduler: Total delay: 0.123 s for time 1466327310000 ms (execution: 0.114 s)
16/06/19 17:08:30 INFO rdd.ShuffledRDD: Removing RDD 72 from persistence list
16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 72
16/06/19 17:08:30 INFO rdd.MapPartitionsRDD: Removing RDD 71 from persistence list
16/06/19 17:08:30 INFO rdd.MapPartitionsRDD: Removing RDD 70 from persistence list
16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 71
16/06/19 17:08:30 INFO rdd.BlockRDD: Removing RDD 69 from persistence list
16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 70
16/06/19 17:08:30 INFO flume.FlumePollingInputDStream: Removing blocks of RDD BlockRDD[69] at createPollingStream at SparkStreamingPullDataFromFlume.java:30 of time 1466327310000 ms
16/06/19 17:08:30 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1466327250000 ms)
16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 69
16/06/19 17:08:30 INFO scheduler.InputInfoTracker: remove old batch metadata: 1466327250000 ms
flume 콘 솔 과 job 콘 솔 에서 인쇄 한 정 보 를 볼 수 있 습 니 다. test7. log 파일 을 TestDir 디 렉 터 리 에 복사 할 때 flume 는 파일 을 처리 하고 checkpoint 를 처리 합 니 다. job 가 실 행 된 후에 이 데 이 터 를 끌 어 올 려 처리 합 니 다.
출처 시 나 웨 이 보:http://weibo.com.ilovepains/
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Azure HDInsight + Microsoft R Server에서 연산 처리 분산Microsoft Azure HDInsight는 Microsoft가 제공하는 Hadoop의 PaaS 서비스로 인프라 주변의 구축 노하우를 몰라도 훌륭한 Hadoop 클러스터를 구축할 수 있는 훌륭한 서비스입니다. 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.