Sqoop 내 보 내기 과정 에서 데 이 터 를 처리 하 는 방법
현재 테스트 용 빅 데이터 클 러 스 터 버 전: cdh 6.3.2, Sqoop 의존 패키지 버 전 은 1.4.7 - cdh 6.3.2 입 니 다.Sqoop API 를 호출 하 는 자바 코드 는 다음 과 같 습 니 다.
package blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.tool.SqoopTool;
import org.apache.sqoop.util.OptionsFileUtil;
/**
* @author 0x3E6
* @date 2020/02/05 20:58 PM
*/
@Slf4j
public class App {
private static String[] getArgs() {
String tb = "tb1";
return new String[]{
"--connect", "jdbc:mysql://192.168.0.101:3306/test?useSSL=false",
"--username", "test",
"--password", "G00d!1uck",
"--table", tb,
"--hcatalog-database", "test",
"--hcatalog-table", tb
};
}
private static int execSqoop(String toolName, String[] args) throws Exception {
String[] expandArguments = OptionsFileUtil.expandArguments(args);
SqoopTool tool = SqoopTool.getTool(toolName);
Configuration conf = new Configuration();
// MapReduce , MapReduce , Mapper map
conf.set("mapreduce.framework.name", "local");
conf.set("custom.checkColumn", "id");
conf.set("custom.lastValue", "2");
Configuration loadPlugins = SqoopTool.loadPlugins(conf);
Sqoop sqoop = new Sqoop(tool, loadPlugins);
return Sqoop.runSqoop(sqoop, expandArguments);
}
static void exportData() throws Exception {
log.info("{}", execSqoop("export", getArgs()));
}
public static void main(String[] args) throws Exception {
Logger.getRootLogger().setLevel(Level.INFO);
BasicConfigurator.configure();
System.setProperty("HADOOP_USER_NAME", "hdfs");
exportData();
}
}
2. 부분 내 보 내기 프로 세 스 분석
ExportTool 중
private void exportTable(SqoopOptions options, String tableName) {
...
// INSERT-based export.
// MySQLManager.exportTable
manager.exportTable(context);
}
MySQL Manager 의 상속 관 계 는 다음 과 같다.
MySQLManager->InformationSchemaManager->CatalogQueryManager->GenericJdbcManager->SqlManager
또한 SqlManager 만 exportTable 방법 을 실 현 했 기 때문에 실제 호출 된 SqlManager 의 exportTable 방법:
/**
* Export data stored in HDFS into a table in a database.
*/
public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
exportJob. runExport 방법 에서 MapReduce 설정 을 실 행 했 습 니 다. jdbc ExportJob 은 ExportJobBase 를 계승 하고 runExport 방법 을 다시 쓰 지 않 았 기 때문에 실제 호출 된 ExportJobBase 의 runExport 방법:
public void runExport() throws ExportException, IOException {
......
Job job = createJob(conf);
try {
// Set the external jar to use for the job.
job.getConfiguration().set("mapred.jar", ormJarFile);
if (options.getMapreduceJobName() != null) {
job.setJobName(options.getMapreduceJobName());
}
propagateOptionsToJob(job);
if (isHCatJob) {
LOG.info("Configuring HCatalog for export job");
SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
hCatUtils.configureHCat(options, job, cmgr, tableName,
job.getConfiguration());
}
// hcatalog , InputFormat org.apache.sqoop.mapreduce.hcat.SqoopHCatExportFormat
configureInputFormat(job, tableName, tableClassName, null);
// ExportJobBase getOutputFormatClass , batch OutPutFormat org.apache.sqoop.mapreduce.ExportOutputFormat
configureOutputFormat(job, tableName, tableClassName);
// this JdbcExportJob, ExportJobBase configureMapper,
// JdbcExportJob getMapperClass , hcatalog
// SqoopHCatUtilities.getExportMapperClass() org.apache.sqoop.mapreduce.hcat.SqoopHCatExportMapper
configureMapper(job, tableName, tableClassName);
configureNumTasks(job);
cacheJars(job, context.getConnManager());
jobSetup(job);
setJob(job);
boolean success = runJob(job);
if (!success) {
LOG.error("Export job failed!");
throw new ExportException("Export job failed!");
}
if (options.isValidationEnabled()) {
validateExport(tableName, conf, job);
}
......
}
}
3. Sqoop 내 보 내기 과정 에서 데 이 터 를 처리 하 는 방법
위 runExport 방법 에서 설명 한 바 와 같이 내 보 낼 때 설정 한 Mapper 는
SqoopHCatUtilities.getExportMapperClass()
을 통 해 얻 을 수 있 습 니 다. 실제 적 으로 이 도구 류 의 static 변수 exportMapperClass
를 얻 었 습 니 다. 이 구성원 변 수 는 SqoopHCatUtilities
의 static 코드 블록 에 클래스 org.apache.sqoop.mapreduce.hcat.SqoopHCatExportMapper
를 부여 하고 전체 내 보 내기 과정 이 수정 되 지 않 았 습 니 다. SqoopHCatExportMapper
내용 은 SqoopHCatExportMapper. 자바 를 참고 합 니 다.따라서 Sqoop 명령 을 실행 하기 전에
SqoopHCatUtilities
의 exportMapperClass
값 을 수정 하고 사용자 정의 논 리 를 추가 하여 내 보 낸 데 이 터 를 처리 하거나 여과 할 수 있 습 니 다.예 를 들 어 인쇄 내 보 내기 과정 에서 모든 데이터 에 대한 수요 (물론 실제 적 으로 이렇게 지루 한 수요 가 없 을 것 입 니 다) 가 있 으 면
SqoopHCatExportMapper
류 를 복사 하여 다음 과 같은 내용 으로 수정 합 니 다.package blog;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.mapreduce.AutoProgressMapper;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatExportHelper;
import java.io.IOException;
/**
* A mapper that works on combined hcat splits.
*/
public class ModifiedSqoopHCatExportMapper
extends
AutoProgressMapper<WritableComparable, HCatRecord,
SqoopRecord, WritableComparable> {
public static final Log LOG = LogFactory
.getLog(ModifiedSqoopHCatExportMapper.class.getName());
private SqoopHCatExportHelper helper;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
helper = new SqoopHCatExportHelper(conf);
}
@Override
public void map(WritableComparable key, HCatRecord value,
Context context)
throws IOException, InterruptedException {
// value class org.apache.hive.hcatalog.data.DefaultHCatRecord
// :org.apache.hive.hcatalog.data.DefaultHCatRecord->org.apache.hive.hcatalog.data.HCatRecord->java.lang.Object
// Context , Configuration 。
SqoopRecord record = helper.convertToSqoopRecord(value);
LOG.info("===" + record.getFieldMap().toString() + "===");
context.write(record, NullWritable.get());
}
}
주로 맵 방법 으로 데 이 터 를 출력 합 니 다.
앞의 main 방법 에서 데 이 터 를 내 보 내 는 명령 을 호출 하기 전에 수 정 된 Mapper 를 설정 합 니 다.
public static void main(String[] args) throws Exception {
Logger.getRootLogger().setLevel(Level.INFO);
BasicConfigurator.configure();
System.setProperty("HADOOP_USER_NAME", "hdfs");
// Mapper
SqoopHCatUtilities.setExportMapperClass(ModifiedSqoopHCatExportMapper.class);
exportData();
}
로그 에 인쇄 된 데 이 터 는 다음 과 같 습 니 다.
...
14587 [LocalJobRunner Map Task Executor #0] INFO blog.ModifiedSqoopHCatExportMapper - ==={s=s3, id=3, ft=3.3}===
14587 [LocalJobRunner Map Task Executor #0] INFO blog.ModifiedSqoopHCatExportMapper - ==={s=s4, id=4, ft=4.4}===
...
참조 링크
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.