DataX 학습 노트 - Writer 플러그인 개발

11960 단어 DataX
이 문서는 주로 ElasticSearch에 데이터를 쓰기 위해 개발된 Writer 플러그인을 바탕으로 한다
 
1. DataX 소스 체크 아웃(svn checkouthttp://code.taobao.org/svn/datax/trunk)
 
2、com.taobao.datax.plugins.writer 패키지 아래에 eswriter 패키지를 만들고 새 ESWriter를 만듭니다.java,ParamKey.java
 
package com.taobao.datax.plugins.writer.eswriter;

import org.apache.log4j.Logger;

import com.taobao.datax.common.exception.DataExchangeException;
import com.taobao.datax.common.exception.ExceptionTracker;
import com.taobao.datax.common.plugin.Line;
import com.taobao.datax.common.plugin.LineReceiver;
import com.taobao.datax.common.plugin.PluginStatus;
import com.taobao.datax.common.plugin.Writer;
import com.taobao.datax.plugins.writer.eswriter.ParamKey;

public class ESWriter extends Writer {

    private String singleCurl = "";

    private String nullString = "";

    private String columnNameString = "";

    private String columnNameSplit = "";

    private String[] columnNames = null;

    private Logger logger = Logger.getLogger(ESWriter.class.getCanonicalName());

    @Override
    public int init() {
        this.singleCurl = param.getValue(ParamKey.singleCurl, 
                "curl -XPOST 'http://192.168.0.108:9200/user/student/{id}' -d '{data}'");
        this.nullString = param.getValue(ParamKey.nullChar, this.nullString);
        this.columnNameString = param.getValue(ParamKey.columnNames, "userid,name,phone");
        this.columnNameSplit = param.getValue(ParamKey.columnNameSplit, ",");
        this.columnNames = columnNameString.split(columnNameSplit);
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int connect() {
        return PluginStatus.SUCCESS.value();
    }

    private String makeCurl(Line line) {
        if (line == null || line.getFieldNum() == 0) {
            return this.singleCurl + "
"; } String item = null; int num = line.getFieldNum(); this.singleCurl = this.singleCurl.replace("{id}", line.getField(0)); StringBuilder sb = new StringBuilder().append("{"); for (int i = 1; i < num; i++) { item = line.getField(i); sb.append("\"").append(columnNames[i-1]).append("\":\""); if (null == item) { sb.append(nullString); } else { sb.append(item); } sb.append("\","); } sb.deleteCharAt(sb.length() - 1); sb.append("}"); return new StringBuilder(this.singleCurl).toString().replace("{data}", sb.toString()); } @Override public int startWrite(LineReceiver receiver) { Line line; try { while ((line = receiver.getFromReader()) != null) { String curlCommand = makeCurl(line); String[] commands = new String[3]; commands[0] = "/bin/sh"; commands[1] = "-c"; commands[2] = curlCommand; Runtime.getRuntime().exec(commands); } return PluginStatus.SUCCESS.value(); } catch (Exception e) { logger.error(ExceptionTracker.trace(e)); throw new DataExchangeException(e.getCause()); } } @Override public int commit() { return 0; } @Override public int finish() { return 0; } }
package com.taobao.datax.plugins.writer.eswriter;

public final class ParamKey {

    /*
     * @name: singleCurl 
     * @description:  single curl
     * @range: 
     * @mandatory: false
     * @default:
     */
    public final static String singleCurl = "single_curl";

    /*
     * @name: nullChar
     * @description:  replace null with the nullchar
     * @range: 
     * @mandatory: false
     * @default: 
     */
    public final static String nullChar = "null_char";

    /*
     * @name: columnNames
     * @description:  column name list 
     * @range: 
     * @mandatory: false
     * @default: 
     */
    public final static String columnNames = "column_names";

    /*
     * @name: columnNameSplit
     * @description: separator to split column names
     * @range:
     * @mandatory: false
     * @default:\t
     */
    public final static String columnNameSplit = "column_name_split";

     /*
       * @name:concurrency
       * @description:concurrency of the job
       * @range:1
       * @mandatory: false
       * @default:1
       */
    public final static String concurrency = "concurrency";

}

 
3. DataX 설치 디렉토리의 conf 디렉토리에 plugins.xml 파일에 다음 내용 추가
 
        
        1
        
        eswriter
        
        writer
        
        es
        
        eswriter-1.0.0.jar
        
        com.taobao.datax.plugins.writer.eswriter.ESWriter
        
        10
 
 
4、build을 수정합니다.xml 파일의 일부 내용
        
            
                
                
                           
                
                
                
                
                
                
                
                
                
                
                
                
            
        
        
            
                
            
        
 
5、프로젝트 디렉터리에서ant 명령 실행
 
6, DataX 설치 디렉터리의plugins/writer 디렉터리에 새 eswriter 디렉터리를 만들고build/plugins 디렉터리에 있는 eswriter-1.0.0.jar,plugins-common-1.0.0.jar와 ParamKey.java를 새 eswriter 디렉터리에 두기
 
7, DataX 설치 디렉토리에서 bin/datax를 실행합니다.py-e 순서대로 mysqlreader 생성 선택to_eswriter_1464169417703.xml
 
8、DataX 설치 디렉토리에서 jobs/mysqlreader 편집to_eswriter_1464169417703.xml 파일이 부족한 내용
 
9, DataX 설치 디렉토리에서 bin/datax를 실행합니다.py jobs/mysqlreader_to_eswriter_1464169417703.xml
 
위의 단계는 ElasticSearch의 CURL 방식을 바탕으로 하고 아래의 단계는 ElasticSearch의 JavaAPI 방식을 바탕으로 한다
코드는 다음과 같습니다.
package com.taobao.datax.plugins.writer.esbulkwriter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import com.google.gson.Gson;
import com.taobao.datax.common.exception.DataExchangeException;
import com.taobao.datax.common.exception.ExceptionTracker;
import com.taobao.datax.common.plugin.Line;
import com.taobao.datax.common.plugin.LineReceiver;
import com.taobao.datax.common.plugin.PluginStatus;
import com.taobao.datax.common.plugin.Writer;

public class ESBulkWriter extends Writer {

    private ESUtils esUtils = null;

    private List pojos = null;

    private Logger logger = Logger.getLogger(ESBulkWriter.class.getCanonicalName());

    @Override
    public int init() {
        this.pojos = new ArrayList();
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int connect() {
        List serverAddress = new ArrayList();
        serverAddress.add(new EsServerAddress("192.168.0.1", 9300));
        serverAddress.add(new EsServerAddress("192.168.0.2", 9300));
        this.esUtils = new ESUtils(new EsConfig("test", serverAddress));
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int startWrite(LineReceiver receiver) {
        Line line = null;
        Student student = null;
        Gson gson = new Gson();
        try {
            while ((line = receiver.getFromReader()) != null) {
                if (null != line && line.getFieldNum() > 0) {
                    student = new Student();
                    String field1 = line.getField(0);
                    if (StringUtils.isNotBlank(field1)) {
                        student.setId(Integer.parseInt(field1));
                    }
                    String field2 = line.getField(1);
                    if (StringUtils.isNotBlank(field2)) {
                        student.setUserid(Long.parseLong(field2));
                    }
                    String field3 = line.getField(2);
                    if (StringUtils.isNotBlank(field3)) {
                        student.setName(field3);
                    }
                    String field4 = line.getField(3);
                    if (StringUtils.isNotBlank(field4)) {
                        student.setPhone(field4);
                    }
                    pojos.add(gson.toJson(student));
                }

            }
            return PluginStatus.SUCCESS.value();
        }  catch (Exception e) {
            logger.error(ExceptionTracker.trace(e));
            throw new DataExchangeException(e.getCause());
        }
    }

    @Override
    public int commit() {
        esUtils.bulkSaveOrUpdate(pojos, "user", "student");
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int finish() {
        return 0;
    }

    class Student implements Serializable {

        private static final long serialVersionUID = 1L;

        private Integer id;

        private Long userid;

        private String name;

        private String phone;

        public Integer getId() {
            return id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public Long getUserid() {
            return userid;
        }

        public void setUserid(Long userid) {
            this.userid = userid;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getPhone() {
            return phone;
        }

        public void setPhone(String phone) {
            this.phone = phone;
        }

    }
}
package com.alibaba.datax.plugin.writer.eswriter;

import java.io.Serializable;

public class EsServerAddress implements Serializable {

	private static final long serialVersionUID = 1L;

	private String host;
	private Integer port = 9300;

	public EsServerAddress() {
		super();
	}

	public EsServerAddress(String host) {
		super();
		this.host = host;
	}

	public EsServerAddress(String host, Integer port) {
		super();
		this.host = host;
		this.port = port;
	}

	public String getHost() {
		return host;
	}

	public void setHost(String host) {
		this.host = host;
	}

	public Integer getPort() {
		return port;
	}

	public void setPort(Integer port) {
		this.port = port;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((host == null) ? 0 : host.hashCode());
		result = prime * result + ((port == null) ? 0 : port.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		EsServerAddress other = (EsServerAddress) obj;
		if (host == null) {
			if (other.host != null)
				return false;
		} else if (!host.equals(other.host))
			return false;
		if (port == null) {
			if (other.port != null)
				return false;
		} else if (!port.equals(other.port))
			return false;
		return true;
	}

	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append("EsServerAddress [host=");
		builder.append(host);
		builder.append(", port=");
		builder.append(port);
		builder.append("]");
		return builder.toString();
	}

}

다른 절차는 위 절차와 같지만, DataX 설치 디렉터리의plugins/writer/eswriter 디렉터리에 의존할 Jar 패키지를 넣어야 합니다.
 
 
 
 

좋은 웹페이지 즐겨찾기