Storm[실천 시리즈 - 파충류를 어떻게 쓰는가] - ParserBolt.
코드 전제: 당신은 본 ID가 쓴 앞의 두 편의 블로그를 참조해야 합니다. Storm[실천 시리즈-파충류를 어떻게 쓰는가]-Fetcher
이 장의 주제:ParserBolt에서 어떻게 해석을 완성하고 앞의 구성 요소에서 데이터를 얻어 emit로 나가는지
블로그 프로세스: 블로그는 파충류 시리즈 전체를 공개하는데 그 과정은 다음과 같다.
1: 코드 구현
2: 코드의 세부 사항을 확인합니다.
3. 실제 디자인을 회고하고 정리한다.
이 ID의 블로그를 참조하는 동안 프로세스 1만 존재합니다.그럼 계속 기다리세요.일단 회사 업무가 포화 단계에 처하지 않으면
이 ID는 매일 한 편씩 제공됩니다.
package com.digitalpebble.storm.crawler.bolt.parser;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.tika.Tika;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.sax.BodyContentHandler;
import org.apache.tika.sax.Link;
import org.apache.tika.sax.LinkContentHandler;
import org.apache.tika.sax.TeeContentHandler;
import org.slf4j.LoggerFactory;
import org.xml.sax.ContentHandler;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.codahale.metrics.Timer;
import com.digitalpebble.storm.crawler.StormConfiguration;
import com.digitalpebble.storm.crawler.filtering.URLFilters;
import com.digitalpebble.storm.crawler.util.Configuration;
import com.digitalpebble.storm.crawler.util.HistogramMetric;
import com.digitalpebble.storm.crawler.util.MeterMetric;
import com.digitalpebble.storm.crawler.util.TimerMetric;
import com.digitalpebble.storm.crawler.util.URLUtil;
/**
* Uses Tika to parse the output of a fetch and extract text + metadata
***/
@SuppressWarnings("serial")
public class ParserBolt extends BaseRichBolt {
private Configuration config;
private Tika tika;
private URLFilters filters = null;
private OutputCollector collector;
private static final org.slf4j.Logger LOG = LoggerFactory
.getLogger(ParserBolt.class);
private MeterMetric eventMeters;
private HistogramMetric eventHistograms;
private TimerMetric eventTimers;
private boolean ignoreOutsideHost = false;
private boolean ignoreOutsideDomain = false;
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
config = StormConfiguration.create();
String urlconfigfile = config.get("urlfilters.config.file",
"urlfilters.json");
if (urlconfigfile != null)
try {
filters = new URLFilters(urlconfigfile);
} catch (IOException e) {
LOG.error("Exception caught while loading the URLFilters");
}
ignoreOutsideHost = config.getBoolean(
"parser.ignore.outlinks.outside.host", false);
ignoreOutsideDomain = config.getBoolean(
"parser.ignore.outlinks.outside.domain", false);
// instanciate Tika
long start = System.currentTimeMillis();
tika = new Tika();
long end = System.currentTimeMillis();
LOG.debug("Tika loaded in " + (end - start) + " msec");
this.collector = collector;
this.eventMeters = context.registerMetric("parser-meter",
new MeterMetric(), 5);
this.eventTimers = context.registerMetric("parser-timer",
new TimerMetric(), 5);
this.eventHistograms = context.registerMetric("parser-histograms",
new HistogramMetric(), 5);
}
public void execute(Tuple tuple) {
eventMeters.scope("tuple_in").mark();
byte[] content = tuple.getBinaryByField("content");
eventHistograms.scope("content_bytes").update(content.length);
String url = tuple.getStringByField("url");
HashMap<String, String[]> metadata = (HashMap<String, String[]>) tuple
.getValueByField("metadata");
// TODO check status etc...
Timer.Context timer = eventTimers.scope("parsing").time();
// rely on mime-type provided by server or guess?
ByteArrayInputStream bais = new ByteArrayInputStream(content);
Metadata md = new Metadata();
String text = null;
LinkContentHandler linkHandler = new LinkContentHandler();
ContentHandler textHandler = new BodyContentHandler();
TeeContentHandler teeHandler = new TeeContentHandler(linkHandler,
textHandler);
ParseContext parseContext = new ParseContext();
// parse
try {
tika.getParser().parse(bais, teeHandler, md, parseContext);
text = textHandler.toString();
} catch (Exception e) {
LOG.error("Exception while parsing " + url, e.getMessage());
eventMeters.scope(
"error_content_parsing_" + e.getClass().getSimpleName())
.mark();
collector.fail(tuple);
eventMeters.scope("tuple_fail").mark();
return;
} finally {
try {
bais.close();
} catch (IOException e) {
LOG.error("Exception while closing stream", e);
}
}
long duration = timer.stop();
LOG.info("Parsed " + url + " in " + duration + " msec");
// get the outlinks and convert them to strings (for now)
String fromHost;
URL url_;
try {
url_ = new URL(url);
fromHost = url_.getHost().toLowerCase();
} catch (MalformedURLException e1) {
// we would have known by now as previous
// components check whether the URL is valid
LOG.error("MalformedURLException on " + url);
eventMeters.scope(
"error_outlinks_parsing_" + e1.getClass().getSimpleName())
.mark();
collector.fail(tuple);
eventMeters.scope("tuple_fail").mark();
return;
}
List<Link> links = linkHandler.getLinks();
Set<String> slinks = new HashSet<String>(links.size());
for (Link l : links) {
if (StringUtils.isBlank(l.getUri()))
continue;
String urlOL = null;
try {
URL tmpURL = URLUtil.resolveURL(url_, l.getUri());
urlOL = tmpURL.toExternalForm();
} catch (MalformedURLException e) {
LOG.debug("MalformedURLException on " + l.getUri());
eventMeters.scope(
"error_out_link_parsing_"
+ e.getClass().getSimpleName()).mark();
continue;
}
// filter the urls
if (filters != null) {
urlOL = filters.filter(urlOL);
if (urlOL == null) {
eventMeters.scope("outlink_filtered").mark();
continue;
}
}
if (urlOL != null && ignoreOutsideHost) {
String toHost;
try {
toHost = new URL(urlOL).getHost().toLowerCase();
} catch (MalformedURLException e) {
toHost = null;
}
if (toHost == null || !toHost.equals(fromHost)) {
urlOL = null; // skip it
eventMeters.scope("outlink_outsideHost").mark();
continue;
}
}
if (urlOL != null) {
slinks.add(urlOL);
eventMeters.scope("outlink_kept").mark();
}
}
// add parse md to metadata
for (String k : md.names()) {
// TODO handle mutliple values
String[] values = md.getValues(k);
metadata.put("parse." + k, values);
}
collector.emit(tuple, new Values(url, content, metadata, text.trim(), slinks));
collector.ack(tuple);
eventMeters.scope("tuple_success").mark();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// output of this module is the list of fields to index
// with at least the URL, text content
declarer.declare(new Fields("url", "content", "metadata", "text",
"outlinks"));
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.