MongoDB에 사용자 정의 동기화 OutputFormat 개발
5265 단어 mongodboutputformat
데이터 유형은 문자열을 맵과 유사한 객체로 변환하여 데이터베이스에 삽입하는 것입니다.원래의 단선 인터페이스를 바꾸다.
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
public class MongoOutputFormat implements OutputFormat<Text, Text> {
@Override
public void checkOutputSpecs(FileSystem arg0, JobConf arg1) throws IOException {
// TODO Auto-generated method stub
System.out.println("OutputFormat CheckOutpuSpecs() function is not supported~!");
}
// OutputFormat , RecordWriter 。
// JDBC , RecordWriter 。
// , JobConf 。
@Override
public RecordWriter<Text, Text> getRecordWriter(FileSystem arg0, JobConf conf, String arg2, Progressable arg3) throws IOException {
// Configuration conf = jobconf.g.getConfiguration() ;
String ip = conf.get("mongoIp");
String port = conf.get("mongoPort");
Mongo mongo = new Mongo(ip,Integer.parseInt(port));
String username = conf.get("muser");
String password = conf.get("mpwd");
String dbname = conf.get("mongoDb");
String collectionName = conf.get("mongoCollection");
try {
return new MongoDBRecordWriter(mongo,dbname,collectionName,username,password);
}
catch (Exception ex) {
throw new IOException(ex);
}
}
/**
* A RecordWriter that writes the reduce output to a SQL table or MongoDB Collection!
*/
public static class MongoDBRecordWriter implements RecordWriter<Text, Text> {
private DBCollection coll;
private Mongo mongo;
public MongoDBRecordWriter() throws SQLException {
}
//
public MongoDBRecordWriter(DBCollection coll) {
this.coll = coll;
}
public MongoDBRecordWriter(Mongo mongo, String dbname, String collectionName, String username, String password) {
this.mongo = mongo;
DB d = this.mongo.getDB(dbname);
d.authenticate(username, password.toCharArray());
this.coll = d.getCollection(collectionName);
}
public DBCollection getCollection() {
return coll;
}
// public PreparedStatement getStatement() {
// return statement;
// }
@Override
/** Close , OutputFormat */
public void close(Reporter arg0) throws IOException {
try {
this.mongo.close();
}
catch (Exception e) {
try {
System.out.println("Close() is not supported here...");
}
catch (Exception ex) {
ex.printStackTrace();
}
throw new IOException(e);
} finally {
try {
System.out.println("Close() is not supported here...");
}
catch (Exception ex) {
ex.printStackTrace();
}
}
}
//RecordWriter , 。
@Override
public void write(Text key, Text value) throws IOException {
try {
String line = value.toString();
String[] rs = line.split("\001");
Map m = new HashMap();
m.put("created_by", rs[7]);
m.put("created_date", rs[8]);
m.put("updated_by", rs[9]);
m.put("updated_date", rs[10]);
DBObject dbObj = new BasicDBObject();
dbObj.putAll(m);
coll.save(dbObj);
}
catch (Exception e) {
// LoggingUtils.logAll(LOG, "Exception encountered", e);.
System.err.print(e);
e.printStackTrace();
}
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
레코드를 업데이트하고 업데이트 전에 동일한 레코드를 삭제하는 방법(nest js & mongoDB)ID로 레코드를 업데이트하고 싶지만 업데이트 전에 동일한 레코드에 이전에 저장된 데이터를 삭제하고 싶습니다. 프로세스는 무엇입니까? 컨트롤러.ts 서비스.ts 나는 이것을 해결하기 위해 이런 식으로 노력하고 있습니다...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.