DataX-TxtFileWriter 데이터 문 제 를 쓰 지 않 습 니 다.

4382 단어 빅 데이터JAVA
앞 에 쓰다
저 는 datax 로 동기 화 도구 플러그 인 을 개발 하고 있 습 니 다.kafka 에서 데 이 터 를 소비 하여 HIVE 에 기록 해 야 합 니 다.테스트 도 구 는 먼저 TxtFileWriter 를 writer 로 사용 하여 중간 결 과 를 관찰 합 니 다.
 
문제 에 봉착 하 다
나 는 reader 에서 while(true)를 사용 하여 데 이 터 를 소비 하기 때문이다.다음 그림 에서 로 그 를 쳐 서 데 이 터 를 읽 었 고 sendToWriter 도 있 지만 생산 파일 크기 는 0 입 니 다.
  public void startRead(RecordSender recordSender) {
        LOG.info("[RowInKafkaTask] start to read here.");
        Record record = recordSender.createRecord();
        while (true) {
            ConsumerRecords messages = consumer.poll(Constant.TIME_OUT);
            for (ConsumerRecord message : messages) {
                byte[] row = parseRowKeyFromKafkaMsg(message.value(), this.kafkaColumn);
                try {
                    boolean result = putDataToRecord(record, row);
                    if (result) {
                        Log.info("[RowInKafkaTask] result is {}", result);
                        recordSender.sendToWriter(record);
                        recordSender.flush();
                    } else
                        LOG.error("[RowInKafkaTask] putDataToRecord false");
                } catch (Exception e) {
                    LOG.error("[RowInKafkaTask] exception found.", e);
                }
                record = recordSender.createRecord();
            }
            recordSender.flush();
        }

    }

위치 지정:
1.Writer 가 데 이 터 를 받 지 못 했다 고 의심 하여 TxtFileWriter 의 절 차 를 읽 었 습 니 다.파일 을 쓰 는 것 이 FileOutputStream 을 통 해 파일 을 쓰 는 것 으로 밝 혀 졌 습 니 다.
   @Override
        public void startWrite(RecordReceiver lineReceiver) {
            LOG.info("begin do write...");
            String fileFullPath = this.buildFilePath();
            LOG.info(String.format("write to file : [%s]", fileFullPath));

            OutputStream outputStream = null;
            try {
//   FileOutPutStream  
                File newFile = new File(fileFullPath);
                newFile.createNewFile();
                outputStream = new FileOutputStream(newFile);
                UnstructuredStorageWriterUtil.writeToStream(lineReceiver,
                        outputStream, this.writerSliceConfig, this.fileName,
                        this.getTaskPluginCollector());
            } catch (SecurityException se) {
...

기록 을 못 받 았 다 고 의심 하 다.그래서 다음 과 같은 로 그 를 추 가 했 습 니 다.Unstructured Storage WriterUtil.java 에서 doWriteToStream 방법;
private static void doWriteToStream(RecordReceiver lineReceiver,
            BufferedWriter writer, String contex, Configuration config,
            TaskPluginCollector taskPluginCollector) throws IOException {

        ...  

        Record record = null;
        while ((record = lineReceiver.getFromReader()) != null) {
            LOG.info("[Unstrctured..Util] write one record.");
            UnstructuredStorageWriterUtil.transportOneRecord(record,
                    nullFormat, dateParse, taskPluginCollector,
                    unstructuredWriter);
        }

        // warn:          
        // IOUtils.closeQuietly(unstructuredWriter);
    }

2.플러그 인 재 컴 파일,/opt/datax/plugin/writer/txtfilewriter/libs 경로 에서 원래 plugin-ustructured-storage-util-0.1-SNAPSHOT.jar 를 교체 합 니 다. 로 그 를 인쇄 할 수 있 는 것 을 발견 하면 데이터 가 이미 받 았 음 을 설명 합 니 다.
3.kafka 에 데 이 터 를 더 넣 었 는데 파일 이 드디어 기록 되 었 지만 항상 4K 크기 가 위로 증가 합 니 다.파트너 와 토론 하여"캐 시 영역 이 비어 있 지 않 은 지"의 심 스 러 운 점 을 얻 었 습 니 다.
4.transportOne Record 에 flush()를 추가 하 는 방법 은 다음 과 같다.
 public static void transportOneRecord(Record record, String nullFormat,
            DateFormat dateParse, TaskPluginCollector taskPluginCollector,
            UnstructuredWriter unstructuredWriter) {
        // warn: default is null
        if (null == nullFormat) {
            nullFormat = "null";
        }
        try {
           //  
            }
            unstructuredWriter.writeOneRecord(splitedRows);
//    
            unstructuredWriter.flush();
        } catch (Exception e) {
            // warn: dirty data
            taskPluginCollector.collectDirtyRecord(record, e);
        }
    }

재 컴 파일,교체,실행.
마지막 에 쓰다
분명히 마지막 에 효력 이 발생 했다.
이것 은 datax 의 bug 입 니 다.reader 스 레 드 가 상주 할 때 만 존재 합 니 다.
물 좀 붙 여 주세요.

좋은 웹페이지 즐겨찾기