Sqoop 내 보 내기 과정 에서 데 이 터 를 처리 하 는 방법

25756 단어 빅 데이터Sqoop
글 목록
  • 1. 자바 코드 호출 Sqoop API 내 보 내기 데이터
  • 2. 부분 도 출 과정 분석
  • 3. Sqoop 내 보 내기 과정 에서 데 이 터 를 처리 하 는 방법
  • 참조 링크
  • 1. 자바 코드 호출 Sqoop API 내 보 내기 데이터
    현재 테스트 용 빅 데이터 클 러 스 터 버 전: cdh 6.3.2, Sqoop 의존 패키지 버 전 은 1.4.7 - cdh 6.3.2 입 니 다.Sqoop API 를 호출 하 는 자바 코드 는 다음 과 같 습 니 다.
    package blog;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.log4j.BasicConfigurator;
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    import org.apache.sqoop.Sqoop;
    import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
    import org.apache.sqoop.tool.SqoopTool;
    import org.apache.sqoop.util.OptionsFileUtil;
    
    
    /**
     * @author 0x3E6
     * @date 2020/02/05 20:58 PM
     */
    @Slf4j
    public class App {
    
        private static String[] getArgs() {
            String tb = "tb1";
            return new String[]{
                    "--connect", "jdbc:mysql://192.168.0.101:3306/test?useSSL=false",
                    "--username", "test",
                    "--password", "G00d!1uck",
                    "--table", tb,
                    "--hcatalog-database", "test",
                    "--hcatalog-table", tb
            };
        }
    
        private static int execSqoop(String toolName, String[] args) throws Exception {
            String[] expandArguments = OptionsFileUtil.expandArguments(args);
            SqoopTool tool = SqoopTool.getTool(toolName);
            Configuration conf = new Configuration();
            //        MapReduce  ,      MapReduce     , Mapper  map      
            conf.set("mapreduce.framework.name", "local");
            conf.set("custom.checkColumn", "id");
            conf.set("custom.lastValue", "2");
            Configuration loadPlugins = SqoopTool.loadPlugins(conf);
            Sqoop sqoop = new Sqoop(tool, loadPlugins);
            return Sqoop.runSqoop(sqoop, expandArguments);
        }
    
        static void exportData() throws Exception {
            log.info("{}", execSqoop("export", getArgs()));
        }
    
        public static void main(String[] args) throws Exception {
            Logger.getRootLogger().setLevel(Level.INFO);
            BasicConfigurator.configure();
            System.setProperty("HADOOP_USER_NAME", "hdfs");
            exportData();
        }
    }
    

    2. 부분 내 보 내기 프로 세 스 분석
    ExportTool 중
    private void exportTable(SqoopOptions options, String tableName) {
        ...
        // INSERT-based export.
        //   MySQLManager.exportTable
          manager.exportTable(context);
    }
    

    MySQL Manager 의 상속 관 계 는 다음 과 같다.
    MySQLManager->InformationSchemaManager->CatalogQueryManager->GenericJdbcManager->SqlManager
    

    또한 SqlManager 만 exportTable 방법 을 실 현 했 기 때문에 실제 호출 된 SqlManager 의 exportTable 방법:
      /**
       * Export data stored in HDFS into a table in a database.
       */
      public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
          throws IOException, ExportException {
        context.setConnManager(this);
        JdbcExportJob exportJob = new JdbcExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
        exportJob.runExport();
      }
    

    exportJob. runExport 방법 에서 MapReduce 설정 을 실 행 했 습 니 다. jdbc ExportJob 은 ExportJobBase 를 계승 하고 runExport 방법 을 다시 쓰 지 않 았 기 때문에 실제 호출 된 ExportJobBase 의 runExport 방법:
    public void runExport() throws ExportException, IOException {
    
        ......
    
        Job job = createJob(conf);
        try {
          // Set the external jar to use for the job.
          job.getConfiguration().set("mapred.jar", ormJarFile);
          if (options.getMapreduceJobName() != null) {
            job.setJobName(options.getMapreduceJobName());
          }
    
          propagateOptionsToJob(job);
          if (isHCatJob) {
            LOG.info("Configuring HCatalog for export job");
            SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
            hCatUtils.configureHCat(options, job, cmgr, tableName,
              job.getConfiguration());
          }
          //   hcatalog     ,   InputFormat  org.apache.sqoop.mapreduce.hcat.SqoopHCatExportFormat
          configureInputFormat(job, tableName, tableClassName, null);
          //  ExportJobBase getOutputFormatClass    ,   batch      OutPutFormat  org.apache.sqoop.mapreduce.ExportOutputFormat
          configureOutputFormat(job, tableName, tableClassName);
          //   this   JdbcExportJob,   ExportJobBase configureMapper,
          //     JdbcExportJob  getMapperClass  ,        hcatalog     
          // SqoopHCatUtilities.getExportMapperClass() org.apache.sqoop.mapreduce.hcat.SqoopHCatExportMapper
          configureMapper(job, tableName, tableClassName);
          configureNumTasks(job);
          cacheJars(job, context.getConnManager());
    
          jobSetup(job);
          setJob(job);
          boolean success = runJob(job);
          if (!success) {
            LOG.error("Export job failed!");
            throw new ExportException("Export job failed!");
          }
    
          if (options.isValidationEnabled()) {
            validateExport(tableName, conf, job);
          }
            ......
      }
    }
    

    3. Sqoop 내 보 내기 과정 에서 데 이 터 를 처리 하 는 방법
    위 runExport 방법 에서 설명 한 바 와 같이 내 보 낼 때 설정 한 Mapper 는 SqoopHCatUtilities.getExportMapperClass() 을 통 해 얻 을 수 있 습 니 다. 실제 적 으로 이 도구 류 의 static 변수 exportMapperClass 를 얻 었 습 니 다. 이 구성원 변 수 는 SqoopHCatUtilities 의 static 코드 블록 에 클래스 org.apache.sqoop.mapreduce.hcat.SqoopHCatExportMapper 를 부여 하고 전체 내 보 내기 과정 이 수정 되 지 않 았 습 니 다. SqoopHCatExportMapper 내용 은 SqoopHCatExportMapper. 자바 를 참고 합 니 다.
    따라서 Sqoop 명령 을 실행 하기 전에 SqoopHCatUtilitiesexportMapperClass 값 을 수정 하고 사용자 정의 논 리 를 추가 하여 내 보 낸 데 이 터 를 처리 하거나 여과 할 수 있 습 니 다.
    예 를 들 어 인쇄 내 보 내기 과정 에서 모든 데이터 에 대한 수요 (물론 실제 적 으로 이렇게 지루 한 수요 가 없 을 것 입 니 다) 가 있 으 면 SqoopHCatExportMapper 류 를 복사 하여 다음 과 같은 내용 으로 수정 합 니 다.
    package blog;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hive.hcatalog.data.HCatRecord;
    import org.apache.sqoop.lib.SqoopRecord;
    import org.apache.sqoop.mapreduce.AutoProgressMapper;
    import org.apache.sqoop.mapreduce.hcat.SqoopHCatExportHelper;
    
    import java.io.IOException;
    
    /**
     * A mapper that works on combined hcat splits.
     */
    public class ModifiedSqoopHCatExportMapper
            extends
            AutoProgressMapper<WritableComparable, HCatRecord,
                    SqoopRecord, WritableComparable> {
        public static final Log LOG = LogFactory
                .getLog(ModifiedSqoopHCatExportMapper.class.getName());
        private SqoopHCatExportHelper helper;
    
        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            super.setup(context);
    
            Configuration conf = context.getConfiguration();
            helper = new SqoopHCatExportHelper(conf);
        }
    
        @Override
        public void map(WritableComparable key, HCatRecord value,
                        Context context)
                throws IOException, InterruptedException {
            // value class org.apache.hive.hcatalog.data.DefaultHCatRecord
            //    :org.apache.hive.hcatalog.data.DefaultHCatRecord->org.apache.hive.hcatalog.data.HCatRecord->java.lang.Object
            // Context         , Configuration 。
            SqoopRecord record = helper.convertToSqoopRecord(value);
            LOG.info("===" + record.getFieldMap().toString() + "===");
            context.write(record, NullWritable.get());
        }
    
    }
    

    주로 맵 방법 으로 데 이 터 를 출력 합 니 다.
    앞의 main 방법 에서 데 이 터 를 내 보 내 는 명령 을 호출 하기 전에 수 정 된 Mapper 를 설정 합 니 다.
        public static void main(String[] args) throws Exception {
            Logger.getRootLogger().setLevel(Level.INFO);
            BasicConfigurator.configure();
            System.setProperty("HADOOP_USER_NAME", "hdfs");
            //   Mapper      
            SqoopHCatUtilities.setExportMapperClass(ModifiedSqoopHCatExportMapper.class);
            exportData();
        }
    

    로그 에 인쇄 된 데 이 터 는 다음 과 같 습 니 다.
    ...
    14587 [LocalJobRunner Map Task Executor #0] INFO blog.ModifiedSqoopHCatExportMapper  - ==={s=s3, id=3, ft=3.3}===
    14587 [LocalJobRunner Map Task Executor #0] INFO blog.ModifiedSqoopHCatExportMapper  - ==={s=s4, id=4, ft=4.4}===
    ...
    

    참조 링크
  • Cloudera Sqoop 창고
  • Sqoop Developer’s Guide v1.4.7
  • 좋은 웹페이지 즐겨찾기