DataX 학습 노트 - Writer 플러그인 개발
11960 단어 DataX
1. DataX 소스 체크 아웃(svn checkouthttp://code.taobao.org/svn/datax/trunk)
2、com.taobao.datax.plugins.writer 패키지 아래에 eswriter 패키지를 만들고 새 ESWriter를 만듭니다.java,ParamKey.java
package com.taobao.datax.plugins.writer.eswriter;
import org.apache.log4j.Logger;
import com.taobao.datax.common.exception.DataExchangeException;
import com.taobao.datax.common.exception.ExceptionTracker;
import com.taobao.datax.common.plugin.Line;
import com.taobao.datax.common.plugin.LineReceiver;
import com.taobao.datax.common.plugin.PluginStatus;
import com.taobao.datax.common.plugin.Writer;
import com.taobao.datax.plugins.writer.eswriter.ParamKey;
public class ESWriter extends Writer {
private String singleCurl = "";
private String nullString = "";
private String columnNameString = "";
private String columnNameSplit = "";
private String[] columnNames = null;
private Logger logger = Logger.getLogger(ESWriter.class.getCanonicalName());
@Override
public int init() {
this.singleCurl = param.getValue(ParamKey.singleCurl,
"curl -XPOST 'http://192.168.0.108:9200/user/student/{id}' -d '{data}'");
this.nullString = param.getValue(ParamKey.nullChar, this.nullString);
this.columnNameString = param.getValue(ParamKey.columnNames, "userid,name,phone");
this.columnNameSplit = param.getValue(ParamKey.columnNameSplit, ",");
this.columnNames = columnNameString.split(columnNameSplit);
return PluginStatus.SUCCESS.value();
}
@Override
public int connect() {
return PluginStatus.SUCCESS.value();
}
private String makeCurl(Line line) {
if (line == null || line.getFieldNum() == 0) {
return this.singleCurl + "
";
}
String item = null;
int num = line.getFieldNum();
this.singleCurl = this.singleCurl.replace("{id}", line.getField(0));
StringBuilder sb = new StringBuilder().append("{");
for (int i = 1; i < num; i++) {
item = line.getField(i);
sb.append("\"").append(columnNames[i-1]).append("\":\"");
if (null == item) {
sb.append(nullString);
} else {
sb.append(item);
}
sb.append("\",");
}
sb.deleteCharAt(sb.length() - 1);
sb.append("}");
return new StringBuilder(this.singleCurl).toString().replace("{data}", sb.toString());
}
@Override
public int startWrite(LineReceiver receiver) {
Line line;
try {
while ((line = receiver.getFromReader()) != null) {
String curlCommand = makeCurl(line);
String[] commands = new String[3];
commands[0] = "/bin/sh";
commands[1] = "-c";
commands[2] = curlCommand;
Runtime.getRuntime().exec(commands);
}
return PluginStatus.SUCCESS.value();
} catch (Exception e) {
logger.error(ExceptionTracker.trace(e));
throw new DataExchangeException(e.getCause());
}
}
@Override
public int commit() {
return 0;
}
@Override
public int finish() {
return 0;
}
}
package com.taobao.datax.plugins.writer.eswriter;
public final class ParamKey {
/*
* @name: singleCurl
* @description: single curl
* @range:
* @mandatory: false
* @default:
*/
public final static String singleCurl = "single_curl";
/*
* @name: nullChar
* @description: replace null with the nullchar
* @range:
* @mandatory: false
* @default:
*/
public final static String nullChar = "null_char";
/*
* @name: columnNames
* @description: column name list
* @range:
* @mandatory: false
* @default:
*/
public final static String columnNames = "column_names";
/*
* @name: columnNameSplit
* @description: separator to split column names
* @range:
* @mandatory: false
* @default:\t
*/
public final static String columnNameSplit = "column_name_split";
/*
* @name:concurrency
* @description:concurrency of the job
* @range:1
* @mandatory: false
* @default:1
*/
public final static String concurrency = "concurrency";
}
3. DataX 설치 디렉토리의 conf 디렉토리에 plugins.xml 파일에 다음 내용 추가
1
eswriter
writer
es
eswriter-1.0.0.jar
com.taobao.datax.plugins.writer.eswriter.ESWriter
10
4、build을 수정합니다.xml 파일의 일부 내용
5、프로젝트 디렉터리에서ant 명령 실행
6, DataX 설치 디렉터리의plugins/writer 디렉터리에 새 eswriter 디렉터리를 만들고build/plugins 디렉터리에 있는 eswriter-1.0.0.jar,plugins-common-1.0.0.jar와 ParamKey.java를 새 eswriter 디렉터리에 두기
7, DataX 설치 디렉토리에서 bin/datax를 실행합니다.py-e 순서대로 mysqlreader 생성 선택to_eswriter_1464169417703.xml
8、DataX 설치 디렉토리에서 jobs/mysqlreader 편집to_eswriter_1464169417703.xml 파일이 부족한 내용
9, DataX 설치 디렉토리에서 bin/datax를 실행합니다.py jobs/mysqlreader_to_eswriter_1464169417703.xml
위의 단계는 ElasticSearch의 CURL 방식을 바탕으로 하고 아래의 단계는 ElasticSearch의 JavaAPI 방식을 바탕으로 한다
코드는 다음과 같습니다.
package com.taobao.datax.plugins.writer.esbulkwriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import com.google.gson.Gson;
import com.taobao.datax.common.exception.DataExchangeException;
import com.taobao.datax.common.exception.ExceptionTracker;
import com.taobao.datax.common.plugin.Line;
import com.taobao.datax.common.plugin.LineReceiver;
import com.taobao.datax.common.plugin.PluginStatus;
import com.taobao.datax.common.plugin.Writer;
public class ESBulkWriter extends Writer {
private ESUtils esUtils = null;
private List pojos = null;
private Logger logger = Logger.getLogger(ESBulkWriter.class.getCanonicalName());
@Override
public int init() {
this.pojos = new ArrayList();
return PluginStatus.SUCCESS.value();
}
@Override
public int connect() {
List serverAddress = new ArrayList();
serverAddress.add(new EsServerAddress("192.168.0.1", 9300));
serverAddress.add(new EsServerAddress("192.168.0.2", 9300));
this.esUtils = new ESUtils(new EsConfig("test", serverAddress));
return PluginStatus.SUCCESS.value();
}
@Override
public int startWrite(LineReceiver receiver) {
Line line = null;
Student student = null;
Gson gson = new Gson();
try {
while ((line = receiver.getFromReader()) != null) {
if (null != line && line.getFieldNum() > 0) {
student = new Student();
String field1 = line.getField(0);
if (StringUtils.isNotBlank(field1)) {
student.setId(Integer.parseInt(field1));
}
String field2 = line.getField(1);
if (StringUtils.isNotBlank(field2)) {
student.setUserid(Long.parseLong(field2));
}
String field3 = line.getField(2);
if (StringUtils.isNotBlank(field3)) {
student.setName(field3);
}
String field4 = line.getField(3);
if (StringUtils.isNotBlank(field4)) {
student.setPhone(field4);
}
pojos.add(gson.toJson(student));
}
}
return PluginStatus.SUCCESS.value();
} catch (Exception e) {
logger.error(ExceptionTracker.trace(e));
throw new DataExchangeException(e.getCause());
}
}
@Override
public int commit() {
esUtils.bulkSaveOrUpdate(pojos, "user", "student");
return PluginStatus.SUCCESS.value();
}
@Override
public int finish() {
return 0;
}
class Student implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id;
private Long userid;
private String name;
private String phone;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Long getUserid() {
return userid;
}
public void setUserid(Long userid) {
this.userid = userid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
}
}
package com.alibaba.datax.plugin.writer.eswriter;
import java.io.Serializable;
public class EsServerAddress implements Serializable {
private static final long serialVersionUID = 1L;
private String host;
private Integer port = 9300;
public EsServerAddress() {
super();
}
public EsServerAddress(String host) {
super();
this.host = host;
}
public EsServerAddress(String host, Integer port) {
super();
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + ((port == null) ? 0 : port.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
EsServerAddress other = (EsServerAddress) obj;
if (host == null) {
if (other.host != null)
return false;
} else if (!host.equals(other.host))
return false;
if (port == null) {
if (other.port != null)
return false;
} else if (!port.equals(other.port))
return false;
return true;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("EsServerAddress [host=");
builder.append(host);
builder.append(", port=");
builder.append(port);
builder.append("]");
return builder.toString();
}
}
다른 절차는 위 절차와 같지만, DataX 설치 디렉터리의plugins/writer/eswriter 디렉터리에 의존할 Jar 패키지를 넣어야 합니다.