InflexDB에 데이터를 배치할 구성 요소 만들기
9374 단어 ASTERIAinfluxdbAsteriaWARP
◆이른바 인플렉스DB
얼마 전에 유행했던 거?시간 시퀀스 데이터베이스에서 v1.7.4가 최신이다.
지금 보통 쓰는 거예요?
동향은 잘 모르지만 며칠 전에 조금 접할 기회가 있어서 조립품을 만들어 보려고 합니다.
◆테스트 환경
우선 환경, AWS 시동 centos 실례, 1.7.4 추가.
기본적으로 사용자 인증이나 설정이 없어서 직접 접근할 수 있습니다.
8086은 사내에서 접근할 수 있다.
데이터베이스는'asteria'의 이름으로 만들어졌다.
정책을 유지하는 데도'1일'이나 몇 가지를 잘해야 한다.
구체적인 구성 요소의 사용 방안은 없지만 전용 연결을 만들어 데이터를 넣는 구성 요소를 만들고 싶습니다.
◆ 라이브러리
inflexdb-java를 사용합니다.
https://github.com/influxdata/influxdb-java#influxdb-java
◆ InflexDB 연결
전용 연결이 있으면 비슷해 보이니까 연결을 만들자.
속성이 URL, 사용자 이름, 비밀번호 정도입니까?
이번에는 URL만 사용합니다.
접속 테스트에서 SHOW DATABASES 명령을 실행할 수 있는지 확인합니다.
뭘 받으면 오케이!@Override
public TestResult test() {
InfluxDB influxDB = null;
InfluxDBConnectionEntry entry = (InfluxDBConnectionEntry)getEntry();
influxDB = createInfluxDB(entry.getUrl(), entry.getUserName(), entry.getPassword());
try {
Query query = new Query("SHOW DATABASES");
QueryResult result = influxDB.query(query);
List<Result> results = result.getResults();
for (Result res : results) {
List<Series> series = res.getSeries();
for (Series se : series) {
List<List<Object>> values = se.getValues();
for (List<Object> value : values) {
for (Object obj : value) {
return new TestResult(true);
}
}
}
}
} finally {
influxDB.close();
}
return new TestResult(false);
}
◆구성 요소
데이터를 먼저 입력하고 싶으므로 InflexDBPut 구성 요소를 만듭니다.
하고 싶은 일
1、레코드 흐름에서 받은 레코드를 InflexDB에 넣습니다!
2, 데이터베이스, Measurement 및 보존 정책 지정!
3, 배치 포장!
4、타임이라는 필드가 없으면 타임을 설정합니다!
5,time는 MilliSeconds와 NanoSeconds를 지정하여 반드시 들어가야 한다고 조절할 수 있습니다!
6, 분류 속성 지정 태그 허용!
차이가 많지 않다.
◆속성
속성 이름
액션
연결 사용
연결을 사용하지 않을 경우 숨겨진 URL, 사용자 이름 및 암호 속성으로 지정합니다.
연결 이름
전용 연결을 지정합니다.
데이터베이스
이번에는 이미 제작된'asteria'를 지목한다.
시간 단위
MilliSeconds 및 NanoSeconds를 선택할 수 있습니다.
Measurement
Measurement의 이름을 지정합니다.
보존 정책
보존 정책을 지정합니다.
배치 처리 건수
일괄 처리의 기록 수를 지정합니다.
태그: 입력 흐름의 필드에 사용할 태그를 지정합니다.
◆ 입력 흐름
입력 스트림의 필드를 InflexDB의 필드로 직접 설정합니다.
타임 필드가 입력 흐름에 있으면 이 값을 타임으로 설정합니다
입력 흐름에 시간이 없으면 실행 시간을 사용합니다.
※ 시간이 겹치면 데이터가 손실되기 때문에 다른 값을 얻기 위해 조금만 조정해야 한다.
System.current TimeMillis 및 System나노타임()에서 같이.private long getFirstTime(TimeUnit timeUnit) {
long time = System.currentTimeMillis();
if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
return time * 1000000;//nano秒に設定してやらないといけない
} else {
return time;
}
}
private long getTime(long time, TimeUnit timeUnit) {
long thisTime;
if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
long nowNano = System.nanoTime();
thisTime = time + (nowNano - _startNano);
} else {
thisTime = System.currentTimeMillis();
}
while (_timeList.contains(thisTime)) {
thisTime = thisTime + 1;//必ず入れる!
}
_timeList.add(thisTime);//つかった時刻は保持しておく
return thisTime;
}
레이블은 String만 있고 필드는 Value 유형별로 Boolean, Doube, Number, Long, String으로 설정됩니다.
Warp의 Value형
InflexDB 유형
Value.TYPE_BOOLEAN
Boolean
Value.TYPE_DOUBLE
Double
Value.TYPE_DECIMAL
Number
Value.TYPE_INTEGER
Long
Value.TYPE_STRING
String
Value.TYPE_DATETIME
Long
◆execute
처리가 간단하여 기록 정보에서 Point를 생성하여 대량으로 지정한 기록 수에 도달한 후 InflumxDB에 기록합니다.
음반이 끝날 때까지 그걸 진행할 거야.private BatchPoints createBatchPoints(String database, String retentionPolicy, TimeUnit timeunit) {
//BatchPointsを作成します
Builder builder = BatchPoints.database(database).precision(timeunit);
if (!StringUtil.isEmpty(retentionPolicy)) {
builder = builder.retentionPolicy(retentionPolicy);
}
return builder.build();
}
public Point createPoint(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags, Map<String, Value> fields) {
//Pointのビルダーを作成
org.influxdb.dto.Point.Builder builder = Point.measurement(measurement).time(time, timeUnit);
//タグを設定します
for (Entry<String, String> entry : tags.entrySet()) {
builder = builder.tag(entry.getKey(), entry.getValue());
}
//フィールドを型を考えながら設定します
for (Entry<String, Value> field : fields.entrySet()) {
Value value = field.getValue();
if (value.getType() == Value.TYPE_BOOLEAN) {
builder = builder.addField(field.getKey(), value.booleanValue());
} else if (value.getType() == Value.TYPE_DOUBLE) {
builder = builder.addField(field.getKey(), value.doubleValue());
} else if (value.getType() == Value.TYPE_DECIMAL) {
builder = builder.addField(field.getKey(), value.decimalValue());
} else if (value.getType() == Value.TYPE_INTEGER) {
builder = builder.addField(field.getKey(), value.intValue());
} else if (value.getType() == Value.TYPE_STRING) {
builder = builder.addField(field.getKey(), value.strValue());
} else if (value.getType() == Value.TYPE_DATETIME) {
builder = builder.addField(field.getKey(), value.longValue());
}
}
//Pointを作成します
Point point = builder.build();
return point;
}
@Override
public boolean execute(ExecuteContext context) throws FlowException {
_timeList.clear();
TimeUnit timeUnit = getTimeUnit();
BatchPoints batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
long time = getFirstTime(timeUnit);
long fieldTime = -1;
int batchCount = 0;
int batchMax = _batch.intValue();
//ストリームからレコードを取得してループします
StreamDataObject[] streams = getInputConnector().getStreamArray();
for (int i=0; i < streams.length; i++) {
//まだ複数のストリームはありませんが・・
FieldDefinition fd = streams[i].getFieldDefinition();
Record record = streams[i].getRecord();
int len = fd.getFieldCount();
Map<String, String> tags = new HashMap<>();
Map<String, Value> fields = new HashMap<>();
while (record != null) {
tags.clear();
fields.clear();
fieldTime = -1;
for (int j = 0; j < len; j++) {
//レコードのフィールドの名前
String name = fd.getField(j).getName();
Value fieldValue = record.getValue(name);
//そのフィールドがtime、タグでないか確認
if (FIELD_NAME_TIME.equals(name)) {
fieldTime = fieldValue.longValue();
} else if (isTag(name)) {
tags.put(name, fieldValue.strValue());
} else {
fields.put(name, fieldValue);
}
}
time = fieldTime == -1 ? getTime(time, timeUnit) : fieldTime;
Point point = createPoint(_measurement.strValue(), time, timeUnit, tags, fields);
//Pointをためて
batchPoints.point(point);
batchCount++;
if (batchCount >= batchMax) {
//バッチで書き込みます
_influxDB.write(batchPoints);
batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
}
record = record.nextRecord();
context.notifyRunning();
}
}
if (batchPoints.getPoints().size() > 0) {
//余りも書き込み
_influxDB.write(batchPoints);
}
passStream();
return true;
}
◆실행 결과
Flow Service의 로그를 넣고 명령에서 보십시오.
◆정리
Exception 프로세싱 같은 거 안 넣었는데... InflumxDB가 재밌네.
ASTERRIA Warp와 같은 환경에서 InflumxDB를 추가하고 플로우 서비스에 로그를 추가하는 것도 있나요?내 생각엔
지정한 기간 동안 자동으로 사라집니다. 검색을 통해 로그를 검색할 수 있다면 기쁠 것입니다.
Reference
이 문제에 관하여(InflexDB에 데이터를 배치할 구성 요소 만들기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/ArimitsuIshii/items/b9790b368f1411fba443
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
우선 환경, AWS 시동 centos 실례, 1.7.4 추가.
기본적으로 사용자 인증이나 설정이 없어서 직접 접근할 수 있습니다.
8086은 사내에서 접근할 수 있다.
데이터베이스는'asteria'의 이름으로 만들어졌다.
정책을 유지하는 데도'1일'이나 몇 가지를 잘해야 한다.
구체적인 구성 요소의 사용 방안은 없지만 전용 연결을 만들어 데이터를 넣는 구성 요소를 만들고 싶습니다.
◆ 라이브러리
inflexdb-java를 사용합니다.
https://github.com/influxdata/influxdb-java#influxdb-java
◆ InflexDB 연결
전용 연결이 있으면 비슷해 보이니까 연결을 만들자.
속성이 URL, 사용자 이름, 비밀번호 정도입니까?
이번에는 URL만 사용합니다.
접속 테스트에서 SHOW DATABASES 명령을 실행할 수 있는지 확인합니다.
뭘 받으면 오케이!@Override
public TestResult test() {
InfluxDB influxDB = null;
InfluxDBConnectionEntry entry = (InfluxDBConnectionEntry)getEntry();
influxDB = createInfluxDB(entry.getUrl(), entry.getUserName(), entry.getPassword());
try {
Query query = new Query("SHOW DATABASES");
QueryResult result = influxDB.query(query);
List<Result> results = result.getResults();
for (Result res : results) {
List<Series> series = res.getSeries();
for (Series se : series) {
List<List<Object>> values = se.getValues();
for (List<Object> value : values) {
for (Object obj : value) {
return new TestResult(true);
}
}
}
}
} finally {
influxDB.close();
}
return new TestResult(false);
}
◆구성 요소
데이터를 먼저 입력하고 싶으므로 InflexDBPut 구성 요소를 만듭니다.
하고 싶은 일
1、레코드 흐름에서 받은 레코드를 InflexDB에 넣습니다!
2, 데이터베이스, Measurement 및 보존 정책 지정!
3, 배치 포장!
4、타임이라는 필드가 없으면 타임을 설정합니다!
5,time는 MilliSeconds와 NanoSeconds를 지정하여 반드시 들어가야 한다고 조절할 수 있습니다!
6, 분류 속성 지정 태그 허용!
차이가 많지 않다.
◆속성
속성 이름
액션
연결 사용
연결을 사용하지 않을 경우 숨겨진 URL, 사용자 이름 및 암호 속성으로 지정합니다.
연결 이름
전용 연결을 지정합니다.
데이터베이스
이번에는 이미 제작된'asteria'를 지목한다.
시간 단위
MilliSeconds 및 NanoSeconds를 선택할 수 있습니다.
Measurement
Measurement의 이름을 지정합니다.
보존 정책
보존 정책을 지정합니다.
배치 처리 건수
일괄 처리의 기록 수를 지정합니다.
태그: 입력 흐름의 필드에 사용할 태그를 지정합니다.
◆ 입력 흐름
입력 스트림의 필드를 InflexDB의 필드로 직접 설정합니다.
타임 필드가 입력 흐름에 있으면 이 값을 타임으로 설정합니다
입력 흐름에 시간이 없으면 실행 시간을 사용합니다.
※ 시간이 겹치면 데이터가 손실되기 때문에 다른 값을 얻기 위해 조금만 조정해야 한다.
System.current TimeMillis 및 System나노타임()에서 같이.private long getFirstTime(TimeUnit timeUnit) {
long time = System.currentTimeMillis();
if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
return time * 1000000;//nano秒に設定してやらないといけない
} else {
return time;
}
}
private long getTime(long time, TimeUnit timeUnit) {
long thisTime;
if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
long nowNano = System.nanoTime();
thisTime = time + (nowNano - _startNano);
} else {
thisTime = System.currentTimeMillis();
}
while (_timeList.contains(thisTime)) {
thisTime = thisTime + 1;//必ず入れる!
}
_timeList.add(thisTime);//つかった時刻は保持しておく
return thisTime;
}
레이블은 String만 있고 필드는 Value 유형별로 Boolean, Doube, Number, Long, String으로 설정됩니다.
Warp의 Value형
InflexDB 유형
Value.TYPE_BOOLEAN
Boolean
Value.TYPE_DOUBLE
Double
Value.TYPE_DECIMAL
Number
Value.TYPE_INTEGER
Long
Value.TYPE_STRING
String
Value.TYPE_DATETIME
Long
◆execute
처리가 간단하여 기록 정보에서 Point를 생성하여 대량으로 지정한 기록 수에 도달한 후 InflumxDB에 기록합니다.
음반이 끝날 때까지 그걸 진행할 거야.private BatchPoints createBatchPoints(String database, String retentionPolicy, TimeUnit timeunit) {
//BatchPointsを作成します
Builder builder = BatchPoints.database(database).precision(timeunit);
if (!StringUtil.isEmpty(retentionPolicy)) {
builder = builder.retentionPolicy(retentionPolicy);
}
return builder.build();
}
public Point createPoint(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags, Map<String, Value> fields) {
//Pointのビルダーを作成
org.influxdb.dto.Point.Builder builder = Point.measurement(measurement).time(time, timeUnit);
//タグを設定します
for (Entry<String, String> entry : tags.entrySet()) {
builder = builder.tag(entry.getKey(), entry.getValue());
}
//フィールドを型を考えながら設定します
for (Entry<String, Value> field : fields.entrySet()) {
Value value = field.getValue();
if (value.getType() == Value.TYPE_BOOLEAN) {
builder = builder.addField(field.getKey(), value.booleanValue());
} else if (value.getType() == Value.TYPE_DOUBLE) {
builder = builder.addField(field.getKey(), value.doubleValue());
} else if (value.getType() == Value.TYPE_DECIMAL) {
builder = builder.addField(field.getKey(), value.decimalValue());
} else if (value.getType() == Value.TYPE_INTEGER) {
builder = builder.addField(field.getKey(), value.intValue());
} else if (value.getType() == Value.TYPE_STRING) {
builder = builder.addField(field.getKey(), value.strValue());
} else if (value.getType() == Value.TYPE_DATETIME) {
builder = builder.addField(field.getKey(), value.longValue());
}
}
//Pointを作成します
Point point = builder.build();
return point;
}
@Override
public boolean execute(ExecuteContext context) throws FlowException {
_timeList.clear();
TimeUnit timeUnit = getTimeUnit();
BatchPoints batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
long time = getFirstTime(timeUnit);
long fieldTime = -1;
int batchCount = 0;
int batchMax = _batch.intValue();
//ストリームからレコードを取得してループします
StreamDataObject[] streams = getInputConnector().getStreamArray();
for (int i=0; i < streams.length; i++) {
//まだ複数のストリームはありませんが・・
FieldDefinition fd = streams[i].getFieldDefinition();
Record record = streams[i].getRecord();
int len = fd.getFieldCount();
Map<String, String> tags = new HashMap<>();
Map<String, Value> fields = new HashMap<>();
while (record != null) {
tags.clear();
fields.clear();
fieldTime = -1;
for (int j = 0; j < len; j++) {
//レコードのフィールドの名前
String name = fd.getField(j).getName();
Value fieldValue = record.getValue(name);
//そのフィールドがtime、タグでないか確認
if (FIELD_NAME_TIME.equals(name)) {
fieldTime = fieldValue.longValue();
} else if (isTag(name)) {
tags.put(name, fieldValue.strValue());
} else {
fields.put(name, fieldValue);
}
}
time = fieldTime == -1 ? getTime(time, timeUnit) : fieldTime;
Point point = createPoint(_measurement.strValue(), time, timeUnit, tags, fields);
//Pointをためて
batchPoints.point(point);
batchCount++;
if (batchCount >= batchMax) {
//バッチで書き込みます
_influxDB.write(batchPoints);
batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
}
record = record.nextRecord();
context.notifyRunning();
}
}
if (batchPoints.getPoints().size() > 0) {
//余りも書き込み
_influxDB.write(batchPoints);
}
passStream();
return true;
}
◆실행 결과
Flow Service의 로그를 넣고 명령에서 보십시오.
◆정리
Exception 프로세싱 같은 거 안 넣었는데... InflumxDB가 재밌네.
ASTERRIA Warp와 같은 환경에서 InflumxDB를 추가하고 플로우 서비스에 로그를 추가하는 것도 있나요?내 생각엔
지정한 기간 동안 자동으로 사라집니다. 검색을 통해 로그를 검색할 수 있다면 기쁠 것입니다.
Reference
이 문제에 관하여(InflexDB에 데이터를 배치할 구성 요소 만들기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/ArimitsuIshii/items/b9790b368f1411fba443
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
전용 연결이 있으면 비슷해 보이니까 연결을 만들자.
속성이 URL, 사용자 이름, 비밀번호 정도입니까?
이번에는 URL만 사용합니다.
접속 테스트에서 SHOW DATABASES 명령을 실행할 수 있는지 확인합니다.
뭘 받으면 오케이!
@Override
public TestResult test() {
InfluxDB influxDB = null;
InfluxDBConnectionEntry entry = (InfluxDBConnectionEntry)getEntry();
influxDB = createInfluxDB(entry.getUrl(), entry.getUserName(), entry.getPassword());
try {
Query query = new Query("SHOW DATABASES");
QueryResult result = influxDB.query(query);
List<Result> results = result.getResults();
for (Result res : results) {
List<Series> series = res.getSeries();
for (Series se : series) {
List<List<Object>> values = se.getValues();
for (List<Object> value : values) {
for (Object obj : value) {
return new TestResult(true);
}
}
}
}
} finally {
influxDB.close();
}
return new TestResult(false);
}
◆구성 요소
데이터를 먼저 입력하고 싶으므로 InflexDBPut 구성 요소를 만듭니다.
하고 싶은 일
1、레코드 흐름에서 받은 레코드를 InflexDB에 넣습니다!
2, 데이터베이스, Measurement 및 보존 정책 지정!
3, 배치 포장!
4、타임이라는 필드가 없으면 타임을 설정합니다!
5,time는 MilliSeconds와 NanoSeconds를 지정하여 반드시 들어가야 한다고 조절할 수 있습니다!
6, 분류 속성 지정 태그 허용!
차이가 많지 않다.
◆속성
속성 이름
액션
연결 사용
연결을 사용하지 않을 경우 숨겨진 URL, 사용자 이름 및 암호 속성으로 지정합니다.
연결 이름
전용 연결을 지정합니다.
데이터베이스
이번에는 이미 제작된'asteria'를 지목한다.
시간 단위
MilliSeconds 및 NanoSeconds를 선택할 수 있습니다.
Measurement
Measurement의 이름을 지정합니다.
보존 정책
보존 정책을 지정합니다.
배치 처리 건수
일괄 처리의 기록 수를 지정합니다.
태그: 입력 흐름의 필드에 사용할 태그를 지정합니다.
◆ 입력 흐름
입력 스트림의 필드를 InflexDB의 필드로 직접 설정합니다.
타임 필드가 입력 흐름에 있으면 이 값을 타임으로 설정합니다
입력 흐름에 시간이 없으면 실행 시간을 사용합니다.
※ 시간이 겹치면 데이터가 손실되기 때문에 다른 값을 얻기 위해 조금만 조정해야 한다.
System.current TimeMillis 및 System나노타임()에서 같이.private long getFirstTime(TimeUnit timeUnit) {
long time = System.currentTimeMillis();
if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
return time * 1000000;//nano秒に設定してやらないといけない
} else {
return time;
}
}
private long getTime(long time, TimeUnit timeUnit) {
long thisTime;
if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
long nowNano = System.nanoTime();
thisTime = time + (nowNano - _startNano);
} else {
thisTime = System.currentTimeMillis();
}
while (_timeList.contains(thisTime)) {
thisTime = thisTime + 1;//必ず入れる!
}
_timeList.add(thisTime);//つかった時刻は保持しておく
return thisTime;
}
레이블은 String만 있고 필드는 Value 유형별로 Boolean, Doube, Number, Long, String으로 설정됩니다.
Warp의 Value형
InflexDB 유형
Value.TYPE_BOOLEAN
Boolean
Value.TYPE_DOUBLE
Double
Value.TYPE_DECIMAL
Number
Value.TYPE_INTEGER
Long
Value.TYPE_STRING
String
Value.TYPE_DATETIME
Long
◆execute
처리가 간단하여 기록 정보에서 Point를 생성하여 대량으로 지정한 기록 수에 도달한 후 InflumxDB에 기록합니다.
음반이 끝날 때까지 그걸 진행할 거야.private BatchPoints createBatchPoints(String database, String retentionPolicy, TimeUnit timeunit) {
//BatchPointsを作成します
Builder builder = BatchPoints.database(database).precision(timeunit);
if (!StringUtil.isEmpty(retentionPolicy)) {
builder = builder.retentionPolicy(retentionPolicy);
}
return builder.build();
}
public Point createPoint(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags, Map<String, Value> fields) {
//Pointのビルダーを作成
org.influxdb.dto.Point.Builder builder = Point.measurement(measurement).time(time, timeUnit);
//タグを設定します
for (Entry<String, String> entry : tags.entrySet()) {
builder = builder.tag(entry.getKey(), entry.getValue());
}
//フィールドを型を考えながら設定します
for (Entry<String, Value> field : fields.entrySet()) {
Value value = field.getValue();
if (value.getType() == Value.TYPE_BOOLEAN) {
builder = builder.addField(field.getKey(), value.booleanValue());
} else if (value.getType() == Value.TYPE_DOUBLE) {
builder = builder.addField(field.getKey(), value.doubleValue());
} else if (value.getType() == Value.TYPE_DECIMAL) {
builder = builder.addField(field.getKey(), value.decimalValue());
} else if (value.getType() == Value.TYPE_INTEGER) {
builder = builder.addField(field.getKey(), value.intValue());
} else if (value.getType() == Value.TYPE_STRING) {
builder = builder.addField(field.getKey(), value.strValue());
} else if (value.getType() == Value.TYPE_DATETIME) {
builder = builder.addField(field.getKey(), value.longValue());
}
}
//Pointを作成します
Point point = builder.build();
return point;
}
@Override
public boolean execute(ExecuteContext context) throws FlowException {
_timeList.clear();
TimeUnit timeUnit = getTimeUnit();
BatchPoints batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
long time = getFirstTime(timeUnit);
long fieldTime = -1;
int batchCount = 0;
int batchMax = _batch.intValue();
//ストリームからレコードを取得してループします
StreamDataObject[] streams = getInputConnector().getStreamArray();
for (int i=0; i < streams.length; i++) {
//まだ複数のストリームはありませんが・・
FieldDefinition fd = streams[i].getFieldDefinition();
Record record = streams[i].getRecord();
int len = fd.getFieldCount();
Map<String, String> tags = new HashMap<>();
Map<String, Value> fields = new HashMap<>();
while (record != null) {
tags.clear();
fields.clear();
fieldTime = -1;
for (int j = 0; j < len; j++) {
//レコードのフィールドの名前
String name = fd.getField(j).getName();
Value fieldValue = record.getValue(name);
//そのフィールドがtime、タグでないか確認
if (FIELD_NAME_TIME.equals(name)) {
fieldTime = fieldValue.longValue();
} else if (isTag(name)) {
tags.put(name, fieldValue.strValue());
} else {
fields.put(name, fieldValue);
}
}
time = fieldTime == -1 ? getTime(time, timeUnit) : fieldTime;
Point point = createPoint(_measurement.strValue(), time, timeUnit, tags, fields);
//Pointをためて
batchPoints.point(point);
batchCount++;
if (batchCount >= batchMax) {
//バッチで書き込みます
_influxDB.write(batchPoints);
batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
}
record = record.nextRecord();
context.notifyRunning();
}
}
if (batchPoints.getPoints().size() > 0) {
//余りも書き込み
_influxDB.write(batchPoints);
}
passStream();
return true;
}
◆실행 결과
Flow Service의 로그를 넣고 명령에서 보십시오.
◆정리
Exception 프로세싱 같은 거 안 넣었는데... InflumxDB가 재밌네.
ASTERRIA Warp와 같은 환경에서 InflumxDB를 추가하고 플로우 서비스에 로그를 추가하는 것도 있나요?내 생각엔
지정한 기간 동안 자동으로 사라집니다. 검색을 통해 로그를 검색할 수 있다면 기쁠 것입니다.
Reference
이 문제에 관하여(InflexDB에 데이터를 배치할 구성 요소 만들기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/ArimitsuIshii/items/b9790b368f1411fba443
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
속성 이름
액션
연결 사용
연결을 사용하지 않을 경우 숨겨진 URL, 사용자 이름 및 암호 속성으로 지정합니다.
연결 이름
전용 연결을 지정합니다.
데이터베이스
이번에는 이미 제작된'asteria'를 지목한다.
시간 단위
MilliSeconds 및 NanoSeconds를 선택할 수 있습니다.
Measurement
Measurement의 이름을 지정합니다.
보존 정책
보존 정책을 지정합니다.
배치 처리 건수
일괄 처리의 기록 수를 지정합니다.
태그: 입력 흐름의 필드에 사용할 태그를 지정합니다.
◆ 입력 흐름
입력 스트림의 필드를 InflexDB의 필드로 직접 설정합니다.
타임 필드가 입력 흐름에 있으면 이 값을 타임으로 설정합니다
입력 흐름에 시간이 없으면 실행 시간을 사용합니다.
※ 시간이 겹치면 데이터가 손실되기 때문에 다른 값을 얻기 위해 조금만 조정해야 한다.
System.current TimeMillis 및 System나노타임()에서 같이.private long getFirstTime(TimeUnit timeUnit) {
long time = System.currentTimeMillis();
if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
return time * 1000000;//nano秒に設定してやらないといけない
} else {
return time;
}
}
private long getTime(long time, TimeUnit timeUnit) {
long thisTime;
if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
long nowNano = System.nanoTime();
thisTime = time + (nowNano - _startNano);
} else {
thisTime = System.currentTimeMillis();
}
while (_timeList.contains(thisTime)) {
thisTime = thisTime + 1;//必ず入れる!
}
_timeList.add(thisTime);//つかった時刻は保持しておく
return thisTime;
}
레이블은 String만 있고 필드는 Value 유형별로 Boolean, Doube, Number, Long, String으로 설정됩니다.
Warp의 Value형
InflexDB 유형
Value.TYPE_BOOLEAN
Boolean
Value.TYPE_DOUBLE
Double
Value.TYPE_DECIMAL
Number
Value.TYPE_INTEGER
Long
Value.TYPE_STRING
String
Value.TYPE_DATETIME
Long
◆execute
처리가 간단하여 기록 정보에서 Point를 생성하여 대량으로 지정한 기록 수에 도달한 후 InflumxDB에 기록합니다.
음반이 끝날 때까지 그걸 진행할 거야.private BatchPoints createBatchPoints(String database, String retentionPolicy, TimeUnit timeunit) {
//BatchPointsを作成します
Builder builder = BatchPoints.database(database).precision(timeunit);
if (!StringUtil.isEmpty(retentionPolicy)) {
builder = builder.retentionPolicy(retentionPolicy);
}
return builder.build();
}
public Point createPoint(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags, Map<String, Value> fields) {
//Pointのビルダーを作成
org.influxdb.dto.Point.Builder builder = Point.measurement(measurement).time(time, timeUnit);
//タグを設定します
for (Entry<String, String> entry : tags.entrySet()) {
builder = builder.tag(entry.getKey(), entry.getValue());
}
//フィールドを型を考えながら設定します
for (Entry<String, Value> field : fields.entrySet()) {
Value value = field.getValue();
if (value.getType() == Value.TYPE_BOOLEAN) {
builder = builder.addField(field.getKey(), value.booleanValue());
} else if (value.getType() == Value.TYPE_DOUBLE) {
builder = builder.addField(field.getKey(), value.doubleValue());
} else if (value.getType() == Value.TYPE_DECIMAL) {
builder = builder.addField(field.getKey(), value.decimalValue());
} else if (value.getType() == Value.TYPE_INTEGER) {
builder = builder.addField(field.getKey(), value.intValue());
} else if (value.getType() == Value.TYPE_STRING) {
builder = builder.addField(field.getKey(), value.strValue());
} else if (value.getType() == Value.TYPE_DATETIME) {
builder = builder.addField(field.getKey(), value.longValue());
}
}
//Pointを作成します
Point point = builder.build();
return point;
}
@Override
public boolean execute(ExecuteContext context) throws FlowException {
_timeList.clear();
TimeUnit timeUnit = getTimeUnit();
BatchPoints batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
long time = getFirstTime(timeUnit);
long fieldTime = -1;
int batchCount = 0;
int batchMax = _batch.intValue();
//ストリームからレコードを取得してループします
StreamDataObject[] streams = getInputConnector().getStreamArray();
for (int i=0; i < streams.length; i++) {
//まだ複数のストリームはありませんが・・
FieldDefinition fd = streams[i].getFieldDefinition();
Record record = streams[i].getRecord();
int len = fd.getFieldCount();
Map<String, String> tags = new HashMap<>();
Map<String, Value> fields = new HashMap<>();
while (record != null) {
tags.clear();
fields.clear();
fieldTime = -1;
for (int j = 0; j < len; j++) {
//レコードのフィールドの名前
String name = fd.getField(j).getName();
Value fieldValue = record.getValue(name);
//そのフィールドがtime、タグでないか確認
if (FIELD_NAME_TIME.equals(name)) {
fieldTime = fieldValue.longValue();
} else if (isTag(name)) {
tags.put(name, fieldValue.strValue());
} else {
fields.put(name, fieldValue);
}
}
time = fieldTime == -1 ? getTime(time, timeUnit) : fieldTime;
Point point = createPoint(_measurement.strValue(), time, timeUnit, tags, fields);
//Pointをためて
batchPoints.point(point);
batchCount++;
if (batchCount >= batchMax) {
//バッチで書き込みます
_influxDB.write(batchPoints);
batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
}
record = record.nextRecord();
context.notifyRunning();
}
}
if (batchPoints.getPoints().size() > 0) {
//余りも書き込み
_influxDB.write(batchPoints);
}
passStream();
return true;
}
◆실행 결과
Flow Service의 로그를 넣고 명령에서 보십시오.
◆정리
Exception 프로세싱 같은 거 안 넣었는데... InflumxDB가 재밌네.
ASTERRIA Warp와 같은 환경에서 InflumxDB를 추가하고 플로우 서비스에 로그를 추가하는 것도 있나요?내 생각엔
지정한 기간 동안 자동으로 사라집니다. 검색을 통해 로그를 검색할 수 있다면 기쁠 것입니다.
Reference
이 문제에 관하여(InflexDB에 데이터를 배치할 구성 요소 만들기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/ArimitsuIshii/items/b9790b368f1411fba443
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
private long getFirstTime(TimeUnit timeUnit) {
long time = System.currentTimeMillis();
if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
return time * 1000000;//nano秒に設定してやらないといけない
} else {
return time;
}
}
private long getTime(long time, TimeUnit timeUnit) {
long thisTime;
if (timeUnit.equals(TimeUnit.NANOSECONDS)) {
long nowNano = System.nanoTime();
thisTime = time + (nowNano - _startNano);
} else {
thisTime = System.currentTimeMillis();
}
while (_timeList.contains(thisTime)) {
thisTime = thisTime + 1;//必ず入れる!
}
_timeList.add(thisTime);//つかった時刻は保持しておく
return thisTime;
}
처리가 간단하여 기록 정보에서 Point를 생성하여 대량으로 지정한 기록 수에 도달한 후 InflumxDB에 기록합니다.
음반이 끝날 때까지 그걸 진행할 거야.
private BatchPoints createBatchPoints(String database, String retentionPolicy, TimeUnit timeunit) {
//BatchPointsを作成します
Builder builder = BatchPoints.database(database).precision(timeunit);
if (!StringUtil.isEmpty(retentionPolicy)) {
builder = builder.retentionPolicy(retentionPolicy);
}
return builder.build();
}
public Point createPoint(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags, Map<String, Value> fields) {
//Pointのビルダーを作成
org.influxdb.dto.Point.Builder builder = Point.measurement(measurement).time(time, timeUnit);
//タグを設定します
for (Entry<String, String> entry : tags.entrySet()) {
builder = builder.tag(entry.getKey(), entry.getValue());
}
//フィールドを型を考えながら設定します
for (Entry<String, Value> field : fields.entrySet()) {
Value value = field.getValue();
if (value.getType() == Value.TYPE_BOOLEAN) {
builder = builder.addField(field.getKey(), value.booleanValue());
} else if (value.getType() == Value.TYPE_DOUBLE) {
builder = builder.addField(field.getKey(), value.doubleValue());
} else if (value.getType() == Value.TYPE_DECIMAL) {
builder = builder.addField(field.getKey(), value.decimalValue());
} else if (value.getType() == Value.TYPE_INTEGER) {
builder = builder.addField(field.getKey(), value.intValue());
} else if (value.getType() == Value.TYPE_STRING) {
builder = builder.addField(field.getKey(), value.strValue());
} else if (value.getType() == Value.TYPE_DATETIME) {
builder = builder.addField(field.getKey(), value.longValue());
}
}
//Pointを作成します
Point point = builder.build();
return point;
}
@Override
public boolean execute(ExecuteContext context) throws FlowException {
_timeList.clear();
TimeUnit timeUnit = getTimeUnit();
BatchPoints batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
long time = getFirstTime(timeUnit);
long fieldTime = -1;
int batchCount = 0;
int batchMax = _batch.intValue();
//ストリームからレコードを取得してループします
StreamDataObject[] streams = getInputConnector().getStreamArray();
for (int i=0; i < streams.length; i++) {
//まだ複数のストリームはありませんが・・
FieldDefinition fd = streams[i].getFieldDefinition();
Record record = streams[i].getRecord();
int len = fd.getFieldCount();
Map<String, String> tags = new HashMap<>();
Map<String, Value> fields = new HashMap<>();
while (record != null) {
tags.clear();
fields.clear();
fieldTime = -1;
for (int j = 0; j < len; j++) {
//レコードのフィールドの名前
String name = fd.getField(j).getName();
Value fieldValue = record.getValue(name);
//そのフィールドがtime、タグでないか確認
if (FIELD_NAME_TIME.equals(name)) {
fieldTime = fieldValue.longValue();
} else if (isTag(name)) {
tags.put(name, fieldValue.strValue());
} else {
fields.put(name, fieldValue);
}
}
time = fieldTime == -1 ? getTime(time, timeUnit) : fieldTime;
Point point = createPoint(_measurement.strValue(), time, timeUnit, tags, fields);
//Pointをためて
batchPoints.point(point);
batchCount++;
if (batchCount >= batchMax) {
//バッチで書き込みます
_influxDB.write(batchPoints);
batchPoints = createBatchPoints(_database.strValue(),_retentionPolicy.strValue(), timeUnit);
}
record = record.nextRecord();
context.notifyRunning();
}
}
if (batchPoints.getPoints().size() > 0) {
//余りも書き込み
_influxDB.write(batchPoints);
}
passStream();
return true;
}
◆실행 결과
Flow Service의 로그를 넣고 명령에서 보십시오.
◆정리
Exception 프로세싱 같은 거 안 넣었는데... InflumxDB가 재밌네.
ASTERRIA Warp와 같은 환경에서 InflumxDB를 추가하고 플로우 서비스에 로그를 추가하는 것도 있나요?내 생각엔
지정한 기간 동안 자동으로 사라집니다. 검색을 통해 로그를 검색할 수 있다면 기쁠 것입니다.
Reference
이 문제에 관하여(InflexDB에 데이터를 배치할 구성 요소 만들기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/ArimitsuIshii/items/b9790b368f1411fba443
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
Exception 프로세싱 같은 거 안 넣었는데... InflumxDB가 재밌네.
ASTERRIA Warp와 같은 환경에서 InflumxDB를 추가하고 플로우 서비스에 로그를 추가하는 것도 있나요?내 생각엔
지정한 기간 동안 자동으로 사라집니다. 검색을 통해 로그를 검색할 수 있다면 기쁠 것입니다.
Reference
이 문제에 관하여(InflexDB에 데이터를 배치할 구성 요소 만들기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/ArimitsuIshii/items/b9790b368f1411fba443텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)