DataX-TxtFileWriter 데이터 문 제 를 쓰 지 않 습 니 다.
저 는 datax 로 동기 화 도구 플러그 인 을 개발 하고 있 습 니 다.kafka 에서 데 이 터 를 소비 하여 HIVE 에 기록 해 야 합 니 다.테스트 도 구 는 먼저 TxtFileWriter 를 writer 로 사용 하여 중간 결 과 를 관찰 합 니 다.
문제 에 봉착 하 다
나 는 reader 에서 while(true)를 사용 하여 데 이 터 를 소비 하기 때문이다.다음 그림 에서 로 그 를 쳐 서 데 이 터 를 읽 었 고 sendToWriter 도 있 지만 생산 파일 크기 는 0 입 니 다.
public void startRead(RecordSender recordSender) {
LOG.info("[RowInKafkaTask] start to read here.");
Record record = recordSender.createRecord();
while (true) {
ConsumerRecords messages = consumer.poll(Constant.TIME_OUT);
for (ConsumerRecord message : messages) {
byte[] row = parseRowKeyFromKafkaMsg(message.value(), this.kafkaColumn);
try {
boolean result = putDataToRecord(record, row);
if (result) {
Log.info("[RowInKafkaTask] result is {}", result);
recordSender.sendToWriter(record);
recordSender.flush();
} else
LOG.error("[RowInKafkaTask] putDataToRecord false");
} catch (Exception e) {
LOG.error("[RowInKafkaTask] exception found.", e);
}
record = recordSender.createRecord();
}
recordSender.flush();
}
}
위치 지정:
1.Writer 가 데 이 터 를 받 지 못 했다 고 의심 하여 TxtFileWriter 의 절 차 를 읽 었 습 니 다.파일 을 쓰 는 것 이 FileOutputStream 을 통 해 파일 을 쓰 는 것 으로 밝 혀 졌 습 니 다.
@Override
public void startWrite(RecordReceiver lineReceiver) {
LOG.info("begin do write...");
String fileFullPath = this.buildFilePath();
LOG.info(String.format("write to file : [%s]", fileFullPath));
OutputStream outputStream = null;
try {
// FileOutPutStream
File newFile = new File(fileFullPath);
newFile.createNewFile();
outputStream = new FileOutputStream(newFile);
UnstructuredStorageWriterUtil.writeToStream(lineReceiver,
outputStream, this.writerSliceConfig, this.fileName,
this.getTaskPluginCollector());
} catch (SecurityException se) {
...
기록 을 못 받 았 다 고 의심 하 다.그래서 다음 과 같은 로 그 를 추 가 했 습 니 다.Unstructured Storage WriterUtil.java 에서 doWriteToStream 방법;
private static void doWriteToStream(RecordReceiver lineReceiver,
BufferedWriter writer, String contex, Configuration config,
TaskPluginCollector taskPluginCollector) throws IOException {
...
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
LOG.info("[Unstrctured..Util] write one record.");
UnstructuredStorageWriterUtil.transportOneRecord(record,
nullFormat, dateParse, taskPluginCollector,
unstructuredWriter);
}
// warn:
// IOUtils.closeQuietly(unstructuredWriter);
}
2.플러그 인 재 컴 파일,/opt/datax/plugin/writer/txtfilewriter/libs 경로 에서 원래 plugin-ustructured-storage-util-0.1-SNAPSHOT.jar 를 교체 합 니 다. 로 그 를 인쇄 할 수 있 는 것 을 발견 하면 데이터 가 이미 받 았 음 을 설명 합 니 다.
3.kafka 에 데 이 터 를 더 넣 었 는데 파일 이 드디어 기록 되 었 지만 항상 4K 크기 가 위로 증가 합 니 다.파트너 와 토론 하여"캐 시 영역 이 비어 있 지 않 은 지"의 심 스 러 운 점 을 얻 었 습 니 다.
4.transportOne Record 에 flush()를 추가 하 는 방법 은 다음 과 같다.
public static void transportOneRecord(Record record, String nullFormat,
DateFormat dateParse, TaskPluginCollector taskPluginCollector,
UnstructuredWriter unstructuredWriter) {
// warn: default is null
if (null == nullFormat) {
nullFormat = "null";
}
try {
//
}
unstructuredWriter.writeOneRecord(splitedRows);
//
unstructuredWriter.flush();
} catch (Exception e) {
// warn: dirty data
taskPluginCollector.collectDirtyRecord(record, e);
}
}
재 컴 파일,교체,실행.
마지막 에 쓰다
분명히 마지막 에 효력 이 발생 했다.
이것 은 datax 의 bug 입 니 다.reader 스 레 드 가 상주 할 때 만 존재 합 니 다.
물 좀 붙 여 주세요.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.