InflexDB에 데이터를 배치할 구성 요소 만들기

◆이른바 인플렉스DB


얼마 전에 유행했던 거?시간 시퀀스 데이터베이스에서 v1.7.4가 최신이다.
지금 보통 쓰는 거예요?
동향은 잘 모르지만 며칠 전에 조금 접할 기회가 있어서 조립품을 만들어 보려고 합니다.

◆테스트 환경


우선 환경, AWS 시동 centos 실례, 1.7.4 추가.
기본적으로 사용자 인증이나 설정이 없어서 직접 접근할 수 있습니다.
8086은 사내에서 접근할 수 있다.
데이터베이스는'asteria'의 이름으로 만들어졌다.
정책을 유지하는 데도'1일'이나 몇 가지를 잘해야 한다.

구체적인 구성 요소의 사용 방안은 없지만 전용 연결을 만들어 데이터를 넣는 구성 요소를 만들고 싶습니다.

◆ 라이브러리


inflexdb-java를 사용합니다.
https://github.com/influxdata/influxdb-java#influxdb-java

◆ InflexDB 연결


전용 연결이 있으면 비슷해 보이니까 연결을 만들자.

속성이 URL, 사용자 이름, 비밀번호 정도입니까?
이번에는 URL만 사용합니다.

접속 테스트에서 SHOW DATABASES 명령을 실행할 수 있는지 확인합니다.
뭘 받으면 오케이!
@Override
public TestResult test() {
    InfluxDB influxDB = null;
    InfluxDBConnectionEntry entry = (InfluxDBConnectionEntry)getEntry();
    influxDB = createInfluxDB(entry.getUrl(), entry.getUserName(), entry.getPassword());
    try {
        Query query = new Query("SHOW DATABASES");
        QueryResult result = influxDB.query(query);
        List<Result> results = result.getResults();
        for (Result res : results) {
            List<Series> series = res.getSeries();
            for (Series se : series) {
                List<List<Object>> values = se.getValues();
                for (List<Object> value : values) {
                    for (Object obj : value) {
                            return new TestResult(true);
                    }
                }
            }
        }
    } finally {
        influxDB.close();
    }
    return new TestResult(false);
}

◆구성 요소


데이터를 먼저 입력하고 싶으므로 InflexDBPut 구성 요소를 만듭니다.
하고 싶은 일
1、레코드 흐름에서 받은 레코드를 InflexDB에 넣습니다!
2, 데이터베이스, Measurement 및 보존 정책 지정!
3, 배치 포장!
4、타임이라는 필드가 없으면 타임을 설정합니다!
5,time는 MilliSeconds와 NanoSeconds를 지정하여 반드시 들어가야 한다고 조절할 수 있습니다!
6, 분류 속성 지정 태그 허용!
차이가 많지 않다.

◆속성



속성 이름
액션
연결 사용
연결을 사용하지 않을 경우 숨겨진 URL, 사용자 이름 및 암호 속성으로 지정합니다.
연결 이름
전용 연결을 지정합니다.
데이터베이스
이번에는 이미 제작된'asteria'를 지목한다.
시간 단위
MilliSeconds 및 NanoSeconds를 선택할 수 있습니다.
Measurement
Measurement의 이름을 지정합니다.
보존 정책
보존 정책을 지정합니다.
배치 처리 건수
일괄 처리의 기록 수를 지정합니다.

태그: 입력 흐름의 필드에 사용할 태그를 지정합니다.

◆ 입력 흐름


입력 스트림의 필드를 InflexDB의 필드로 직접 설정합니다.
타임 필드가 입력 흐름에 있으면 이 값을 타임으로 설정합니다
입력 흐름에 시간이 없으면 실행 시간을 사용합니다.
※ 시간이 겹치면 데이터가 손실되기 때문에 다른 값을 얻기 위해 조금만 조정해야 한다.
System.current TimeMillis 및 System나노타임()에서 같이.
private long getFirstTime(TimeUnit timeUnit) {
    long time = System.currentTimeMillis();
    if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
        return time * 1000000;//nano秒に設定してやらないといけない
    } else {
        return time;
    }
}
private long getTime(long time, TimeUnit timeUnit) {
    long thisTime;
    if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
        long nowNano = System.nanoTime();
        thisTime = time + (nowNano - _startNano);
    } else {
        thisTime = System.currentTimeMillis();
    }
    while (_timeList.contains(thisTime)) {
        thisTime = thisTime + 1;//必ず入れる!
    }
    _timeList.add(thisTime);//つかった時刻は保持しておく
    return thisTime;
}
레이블은 String만 있고 필드는 Value 유형별로 Boolean, Doube, Number, Long, String으로 설정됩니다.
Warp의 Value형
InflexDB 유형
Value.TYPE_BOOLEAN 
Boolean
Value.TYPE_DOUBLE 
Double
Value.TYPE_DECIMAL 
Number
Value.TYPE_INTEGER 
Long
Value.TYPE_STRING 
String
Value.TYPE_DATETIME 
Long

◆execute


처리가 간단하여 기록 정보에서 Point를 생성하여 대량으로 지정한 기록 수에 도달한 후 InflumxDB에 기록합니다.
음반이 끝날 때까지 그걸 진행할 거야.
private BatchPoints createBatchPoints(String database, String retentionPolicy, TimeUnit timeunit) {
    //BatchPointsを作成します
    Builder builder = BatchPoints.database(database).precision(timeunit);
    if (!StringUtil.isEmpty(retentionPolicy)) {
        builder = builder.retentionPolicy(retentionPolicy);
    }
    return builder.build();
}
public Point createPoint(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags, Map<String, Value> fields) {
    //Pointのビルダーを作成
    org.influxdb.dto.Point.Builder builder = Point.measurement(measurement).time(time, timeUnit);
  //タグを設定します
    for (Entry<String, String> entry : tags.entrySet()) {
        builder = builder.tag(entry.getKey(), entry.getValue());
    }
  //フィールドを型を考えながら設定します
    for (Entry<String, Value> field : fields.entrySet()) {
        Value value = field.getValue();
        if (value.getType() == Value.TYPE_BOOLEAN) {
            builder = builder.addField(field.getKey(), value.booleanValue());
        } else if (value.getType() == Value.TYPE_DOUBLE) {
            builder = builder.addField(field.getKey(), value.doubleValue());
        } else if (value.getType() == Value.TYPE_DECIMAL) {
            builder = builder.addField(field.getKey(), value.decimalValue());
        } else if (value.getType() == Value.TYPE_INTEGER) {
            builder = builder.addField(field.getKey(), value.intValue());
        } else if (value.getType() == Value.TYPE_STRING) {
            builder = builder.addField(field.getKey(), value.strValue());
        } else if (value.getType() == Value.TYPE_DATETIME) {
            builder = builder.addField(field.getKey(), value.longValue());
        }
    }
  //Pointを作成します
    Point point = builder.build();
    return point;
}
@Override
public boolean execute(ExecuteContext context) throws FlowException {
    _timeList.clear();
    TimeUnit timeUnit = getTimeUnit();
    BatchPoints batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);

    long time = getFirstTime(timeUnit);
    long fieldTime = -1;
    int batchCount = 0;
    int batchMax = _batch.intValue();

  //ストリームからレコードを取得してループします
    StreamDataObject[] streams = getInputConnector().getStreamArray();
    for (int i=0; i < streams.length; i++) {
     //まだ複数のストリームはありませんが・・
        FieldDefinition fd = streams[i].getFieldDefinition();
        Record record = streams[i].getRecord();
        int len = fd.getFieldCount();
        Map<String, String> tags = new HashMap<>();
        Map<String, Value> fields = new HashMap<>();
        while (record != null) {
            tags.clear();
            fields.clear();
            fieldTime = -1;
            for (int j = 0; j < len; j++) {
                //レコードのフィールドの名前
                String name = fd.getField(j).getName();
                Value fieldValue = record.getValue(name);
                //そのフィールドがtime、タグでないか確認
                if (FIELD_NAME_TIME.equals(name)) {
                    fieldTime = fieldValue.longValue();
                } else if (isTag(name)) {
                    tags.put(name, fieldValue.strValue());
                } else {
                    fields.put(name, fieldValue);
                }
            }
            time = fieldTime == -1 ? getTime(time, timeUnit) : fieldTime;
            Point point = createPoint(_measurement.strValue(), time, timeUnit, tags, fields);
      //Pointをためて
            batchPoints.point(point);
            batchCount++;
            if (batchCount >= batchMax) {
         //バッチで書き込みます
                _influxDB.write(batchPoints);
                batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
            }
            record = record.nextRecord();
            context.notifyRunning();
        }
    }
    if (batchPoints.getPoints().size() > 0) {
    //余りも書き込み
        _influxDB.write(batchPoints);
    }
    passStream();
    return true;
}

◆실행 결과


Flow Service의 로그를 넣고 명령에서 보십시오.

◆정리


Exception 프로세싱 같은 거 안 넣었는데... InflumxDB가 재밌네.
ASTERRIA Warp와 같은 환경에서 InflumxDB를 추가하고 플로우 서비스에 로그를 추가하는 것도 있나요?내 생각엔
지정한 기간 동안 자동으로 사라집니다. 검색을 통해 로그를 검색할 수 있다면 기쁠 것입니다.

좋은 웹페이지 즐겨찾기