Kinesis Analytics SQL 질의
Kinesis Analytics를 사용하는 포인트
• 스트림 이름과 데이터 이름의 이름은 대소문자를 구분합니다.
→ 스트림 이름 또는 데이터 이름을 ""로 묶으면 소문자, 묶지 않으면 대문자
→ (개인) 흐름 문장의 흐름 이름과 데이터 이름을 만드는 것이 가장 좋다
・ Kinesis Streams demo 자주 중지
→잠시 삭제하고 다시 만드는 것이 좋다
• 참조 데이터 추가 시 비용 흐름 오류 발생
· SQL 코드를 저장하고 실행하려면 시간이 오래 걸릴 수 있음
- Random_CUT 함수는 30분 정도 걸릴 수 있어요.
틀에 미묘한 오류가 있을 수 있습니다
느끼다
구축이 수월하다
・ KPU는 자동으로 축소되지만 덮개 기능이 없기 때문에 돈을 주의해야 한다
좋아, 구린내가 나.코드 업데이트에서 런닝까지 시간이 걸릴 때가 있어요.
문서가 적다
/템플릿 오류, 주의하십시오.나 진짜 고급 분석 함수 몰라.
소개할 질의는 다음과 같습니다.
기본 조회
・CREATE PUMP&STREAM
・SELECT
・WHERE
・MULTI STREAMS
・GROUP BY
・WINDOW
1.TUMBLING WINDOW
2.SLIDING WINDOW
・INNER JOIN
고급 분석
・RANDOM_CUT_FOREST
・DISTINCT
・GROUP RANK ※ 미완성
・TOP-K ※ 미완성
기본 문법
CREATE PUMP & STREAM, SELECT
· [OR REPLACE]는 STREAM 이름이 이미 존재하면 바꾼다는 뜻입니다.
· Analytics는 PUMP에 데이터를 투입하여 STREAM에 데이터를 저장합니다.
· 함수 이름을 열 이름(avg,min,max 등)에 사용할 수 없음CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_STNBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001";
WHERE
・ 기본적으로 표준 SQL과 동일CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE SECTOR LIKE '%ALTH%'
;
MULTI STREAMS
· PUMP 이름은 함께 사용할 수 없습니다.
• MULTI STREAMS를 정의할 때 AWS 콘솔의 real-time analytics 탭에 여러 STREAM이 표시됩니다.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE SECTOR LIKE '%ALTH%'
;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE PRICE > 0.0
;
GROUP BY(TUMBLING WINDOW)
· 문법은 일반적으로 SQL과 같지만 TIME WINDOW(TUMBLING WINDOW라고 함)를 설정해야 합니다.
· ROWTIME는 Kinesis Analytics에서 미리 준비한 시간에 수집된 시간을 저장한다CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM
ticker_symbol,
COUNT(*) AS ticker_symbol_count,
AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
SLIDING WINDOW
· SLIDING WINDOW를 사용할 때 GROUP BY 자구가 아닌 집합 함수(avg,min,count 등) 다음에 WINDOW를 정의합니다.
・ WINDOW 너비의 설정은 TIME과 ROW 두 가지가 있습니다.CREATE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG_T REAL,
TICKER_SYMBOL_COUNT_T INTEGER,
TICKER_SYMBOL_AVG_R REAL,
TICKER_SYMBOL_COUNT_R INTEGER);
CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
avg(price) over TSW as ticker_symbol_avg1,
count(*) over TSW as cnt1,
avg(price) over RSW as ticker_symbol_avg2,
count(*) over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol ROWS 2 preceding);
INNER JOIN
· 서로 다른 STREAM 간의 조인 처리 가능
· 참조를 추가할 수 없음(2017/3월까지)CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL);
CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT
STREAM A.ticker_symbol,
A.ticker_symbol_count,
B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol
INNER JOIN에서 사용하는 DESTINATION_SQL_STREAM 및 DESTINATION_SQL_STREAM2 생성
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER
);
CREATE OR REPLACE PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT
STREAM ticker_symbol,
count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM ticker_symbol,
avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
참조 데이터 추가
· AWS CLI에서만 추가 가능
·describe-application을 통해 current-application-version-id 확인aws kinesisanalytics add-application-reference-data-source \
--region us-east-1 \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'
애플리케이션 보기
・aws configure에서 사전 설정aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]
참조 데이터 삭제
· describe-application을 통해 version-id와 reference-id 확인aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]
고급 분석
· 고급 분석에서 Cursol 함수를 사용합니다.(표준 SQL과 많이 다르기 때문에 처음에는 곤혹스러웠다)
・ "SELECT STREAM*FROM(TABLE(xxxFUNCTION(COURSOR(in-application-stream],option)"
RANDOM_CUT_FOREST(이상 감지)
・ 기재된 정보는'in-application-stream','number OfTrees(default:100)','subSampleSize(default:256)','timeDecay(default:10000)','shingleSize(default:1)'5개
자세한 내용은 매뉴얼을 참조하세요CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
TABLE(RANDOM_CUT_FOREST(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
)
) ;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND),
ANOMALY_SCORE DESC;
DISTINCT COUNT
·COUNT_DISTINCT_ITEMS_TUMBLING 함수에 기재된 정보는'in-application-stream','열명','TUMBLING WINDOW 시간'등 세 가지다.하나의 열 이름만 여러 열을 지정할 수 없습니다
・ Distinct Count의 정보만 내보내기CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'TICKER_SYMBOL', -- name of column in single quotes
10 -- tumbling window size in seconds
)
);
GROUP RANK
미안합니다. 지금 오류가 발생했습니다. 잘 모르겠습니다.(나는 문서를 읽었다...)
· 오류 내용은 "SQL error message: Fromline2, column55 to line9, column3: Nomatch found for function signature GROUP_RANK(,)"
· 사용된 조회는 다음과 같다.
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
PRICE real,
RANK_NUM integer
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
'PRICE', -- rankByColumnName
'rank_num', -- rankOutColumnName
'desc', -- sortOrder
'asc', --outputOrder
10, --maxIdle
5 --outputMax
)
);
TOP-K
・ 문서를 읽고 다음과 같이 기재하였습니다. SELECT STREAM 이후의 ticker_Symbol 오류 없음
· 단, 템플릿의 조회는 순환되며, TOP-K 함수에서 얻은 값은 ITEM, ITEM_COUNT라는 신비로운 두 줄 같은데.(문서에 대한 상세한 설명이 없기 때문에 많은 시도를 했지만 출력이 없었다)
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
TICKER_SYMBOL VARCHAR(4),
PRICE REAL)
;
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'ticker_symbol', -- name of column in single quotes
10, -- number of top items
10 -- tumbling window size in seconds
)
);
이제 시도한 검색을 정리하고 소개합니다.
Reference
이 문제에 관하여(Kinesis Analytics SQL 질의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/takada_tf/items/f03c36eed9e22eb74744
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
구축이 수월하다
・ KPU는 자동으로 축소되지만 덮개 기능이 없기 때문에 돈을 주의해야 한다
좋아, 구린내가 나.코드 업데이트에서 런닝까지 시간이 걸릴 때가 있어요.
문서가 적다
/템플릿 오류, 주의하십시오.나 진짜 고급 분석 함수 몰라.
소개할 질의는 다음과 같습니다.
기본 조회
・CREATE PUMP&STREAM
・SELECT
・WHERE
・MULTI STREAMS
・GROUP BY
・WINDOW
1.TUMBLING WINDOW
2.SLIDING WINDOW
・INNER JOIN
고급 분석
・RANDOM_CUT_FOREST
・DISTINCT
・GROUP RANK ※ 미완성
・TOP-K ※ 미완성
기본 문법
CREATE PUMP & STREAM, SELECT
· [OR REPLACE]는 STREAM 이름이 이미 존재하면 바꾼다는 뜻입니다.
· Analytics는 PUMP에 데이터를 투입하여 STREAM에 데이터를 저장합니다.
· 함수 이름을 열 이름(avg,min,max 등)에 사용할 수 없음CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_STNBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001";
WHERE
・ 기본적으로 표준 SQL과 동일CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE SECTOR LIKE '%ALTH%'
;
MULTI STREAMS
· PUMP 이름은 함께 사용할 수 없습니다.
• MULTI STREAMS를 정의할 때 AWS 콘솔의 real-time analytics 탭에 여러 STREAM이 표시됩니다.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE SECTOR LIKE '%ALTH%'
;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE PRICE > 0.0
;
GROUP BY(TUMBLING WINDOW)
· 문법은 일반적으로 SQL과 같지만 TIME WINDOW(TUMBLING WINDOW라고 함)를 설정해야 합니다.
· ROWTIME는 Kinesis Analytics에서 미리 준비한 시간에 수집된 시간을 저장한다CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM
ticker_symbol,
COUNT(*) AS ticker_symbol_count,
AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
SLIDING WINDOW
· SLIDING WINDOW를 사용할 때 GROUP BY 자구가 아닌 집합 함수(avg,min,count 등) 다음에 WINDOW를 정의합니다.
・ WINDOW 너비의 설정은 TIME과 ROW 두 가지가 있습니다.CREATE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG_T REAL,
TICKER_SYMBOL_COUNT_T INTEGER,
TICKER_SYMBOL_AVG_R REAL,
TICKER_SYMBOL_COUNT_R INTEGER);
CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
avg(price) over TSW as ticker_symbol_avg1,
count(*) over TSW as cnt1,
avg(price) over RSW as ticker_symbol_avg2,
count(*) over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol ROWS 2 preceding);
INNER JOIN
· 서로 다른 STREAM 간의 조인 처리 가능
· 참조를 추가할 수 없음(2017/3월까지)CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL);
CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT
STREAM A.ticker_symbol,
A.ticker_symbol_count,
B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol
INNER JOIN에서 사용하는 DESTINATION_SQL_STREAM 및 DESTINATION_SQL_STREAM2 생성
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER
);
CREATE OR REPLACE PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT
STREAM ticker_symbol,
count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM ticker_symbol,
avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
참조 데이터 추가
· AWS CLI에서만 추가 가능
·describe-application을 통해 current-application-version-id 확인aws kinesisanalytics add-application-reference-data-source \
--region us-east-1 \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'
애플리케이션 보기
・aws configure에서 사전 설정aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]
참조 데이터 삭제
· describe-application을 통해 version-id와 reference-id 확인aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]
고급 분석
· 고급 분석에서 Cursol 함수를 사용합니다.(표준 SQL과 많이 다르기 때문에 처음에는 곤혹스러웠다)
・ "SELECT STREAM*FROM(TABLE(xxxFUNCTION(COURSOR(in-application-stream],option)"
RANDOM_CUT_FOREST(이상 감지)
・ 기재된 정보는'in-application-stream','number OfTrees(default:100)','subSampleSize(default:256)','timeDecay(default:10000)','shingleSize(default:1)'5개
자세한 내용은 매뉴얼을 참조하세요CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
TABLE(RANDOM_CUT_FOREST(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
)
) ;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND),
ANOMALY_SCORE DESC;
DISTINCT COUNT
·COUNT_DISTINCT_ITEMS_TUMBLING 함수에 기재된 정보는'in-application-stream','열명','TUMBLING WINDOW 시간'등 세 가지다.하나의 열 이름만 여러 열을 지정할 수 없습니다
・ Distinct Count의 정보만 내보내기CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'TICKER_SYMBOL', -- name of column in single quotes
10 -- tumbling window size in seconds
)
);
GROUP RANK
미안합니다. 지금 오류가 발생했습니다. 잘 모르겠습니다.(나는 문서를 읽었다...)
· 오류 내용은 "SQL error message: Fromline2, column55 to line9, column3: Nomatch found for function signature GROUP_RANK(,)"
· 사용된 조회는 다음과 같다.
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
PRICE real,
RANK_NUM integer
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
'PRICE', -- rankByColumnName
'rank_num', -- rankOutColumnName
'desc', -- sortOrder
'asc', --outputOrder
10, --maxIdle
5 --outputMax
)
);
TOP-K
・ 문서를 읽고 다음과 같이 기재하였습니다. SELECT STREAM 이후의 ticker_Symbol 오류 없음
· 단, 템플릿의 조회는 순환되며, TOP-K 함수에서 얻은 값은 ITEM, ITEM_COUNT라는 신비로운 두 줄 같은데.(문서에 대한 상세한 설명이 없기 때문에 많은 시도를 했지만 출력이 없었다)
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
TICKER_SYMBOL VARCHAR(4),
PRICE REAL)
;
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'ticker_symbol', -- name of column in single quotes
10, -- number of top items
10 -- tumbling window size in seconds
)
);
이제 시도한 검색을 정리하고 소개합니다.
Reference
이 문제에 관하여(Kinesis Analytics SQL 질의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/takada_tf/items/f03c36eed9e22eb74744
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
・RANDOM_CUT_FOREST
・DISTINCT
・GROUP RANK ※ 미완성
・TOP-K ※ 미완성
기본 문법
CREATE PUMP & STREAM, SELECT
· [OR REPLACE]는 STREAM 이름이 이미 존재하면 바꾼다는 뜻입니다.
· Analytics는 PUMP에 데이터를 투입하여 STREAM에 데이터를 저장합니다.
· 함수 이름을 열 이름(avg,min,max 등)에 사용할 수 없음CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_STNBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001";
WHERE
・ 기본적으로 표준 SQL과 동일CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE SECTOR LIKE '%ALTH%'
;
MULTI STREAMS
· PUMP 이름은 함께 사용할 수 없습니다.
• MULTI STREAMS를 정의할 때 AWS 콘솔의 real-time analytics 탭에 여러 STREAM이 표시됩니다.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE SECTOR LIKE '%ALTH%'
;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE PRICE > 0.0
;
GROUP BY(TUMBLING WINDOW)
· 문법은 일반적으로 SQL과 같지만 TIME WINDOW(TUMBLING WINDOW라고 함)를 설정해야 합니다.
· ROWTIME는 Kinesis Analytics에서 미리 준비한 시간에 수집된 시간을 저장한다CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM
ticker_symbol,
COUNT(*) AS ticker_symbol_count,
AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
SLIDING WINDOW
· SLIDING WINDOW를 사용할 때 GROUP BY 자구가 아닌 집합 함수(avg,min,count 등) 다음에 WINDOW를 정의합니다.
・ WINDOW 너비의 설정은 TIME과 ROW 두 가지가 있습니다.CREATE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG_T REAL,
TICKER_SYMBOL_COUNT_T INTEGER,
TICKER_SYMBOL_AVG_R REAL,
TICKER_SYMBOL_COUNT_R INTEGER);
CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
avg(price) over TSW as ticker_symbol_avg1,
count(*) over TSW as cnt1,
avg(price) over RSW as ticker_symbol_avg2,
count(*) over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol ROWS 2 preceding);
INNER JOIN
· 서로 다른 STREAM 간의 조인 처리 가능
· 참조를 추가할 수 없음(2017/3월까지)CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL);
CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT
STREAM A.ticker_symbol,
A.ticker_symbol_count,
B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol
INNER JOIN에서 사용하는 DESTINATION_SQL_STREAM 및 DESTINATION_SQL_STREAM2 생성
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER
);
CREATE OR REPLACE PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT
STREAM ticker_symbol,
count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM ticker_symbol,
avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
참조 데이터 추가
· AWS CLI에서만 추가 가능
·describe-application을 통해 current-application-version-id 확인aws kinesisanalytics add-application-reference-data-source \
--region us-east-1 \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'
애플리케이션 보기
・aws configure에서 사전 설정aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]
참조 데이터 삭제
· describe-application을 통해 version-id와 reference-id 확인aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]
고급 분석
· 고급 분석에서 Cursol 함수를 사용합니다.(표준 SQL과 많이 다르기 때문에 처음에는 곤혹스러웠다)
・ "SELECT STREAM*FROM(TABLE(xxxFUNCTION(COURSOR(in-application-stream],option)"
RANDOM_CUT_FOREST(이상 감지)
・ 기재된 정보는'in-application-stream','number OfTrees(default:100)','subSampleSize(default:256)','timeDecay(default:10000)','shingleSize(default:1)'5개
자세한 내용은 매뉴얼을 참조하세요CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
TABLE(RANDOM_CUT_FOREST(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
)
) ;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND),
ANOMALY_SCORE DESC;
DISTINCT COUNT
·COUNT_DISTINCT_ITEMS_TUMBLING 함수에 기재된 정보는'in-application-stream','열명','TUMBLING WINDOW 시간'등 세 가지다.하나의 열 이름만 여러 열을 지정할 수 없습니다
・ Distinct Count의 정보만 내보내기CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'TICKER_SYMBOL', -- name of column in single quotes
10 -- tumbling window size in seconds
)
);
GROUP RANK
미안합니다. 지금 오류가 발생했습니다. 잘 모르겠습니다.(나는 문서를 읽었다...)
· 오류 내용은 "SQL error message: Fromline2, column55 to line9, column3: Nomatch found for function signature GROUP_RANK(,)"
· 사용된 조회는 다음과 같다.
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
PRICE real,
RANK_NUM integer
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
'PRICE', -- rankByColumnName
'rank_num', -- rankOutColumnName
'desc', -- sortOrder
'asc', --outputOrder
10, --maxIdle
5 --outputMax
)
);
TOP-K
・ 문서를 읽고 다음과 같이 기재하였습니다. SELECT STREAM 이후의 ticker_Symbol 오류 없음
· 단, 템플릿의 조회는 순환되며, TOP-K 함수에서 얻은 값은 ITEM, ITEM_COUNT라는 신비로운 두 줄 같은데.(문서에 대한 상세한 설명이 없기 때문에 많은 시도를 했지만 출력이 없었다)
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
TICKER_SYMBOL VARCHAR(4),
PRICE REAL)
;
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'ticker_symbol', -- name of column in single quotes
10, -- number of top items
10 -- tumbling window size in seconds
)
);
이제 시도한 검색을 정리하고 소개합니다.
Reference
이 문제에 관하여(Kinesis Analytics SQL 질의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/takada_tf/items/f03c36eed9e22eb74744
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_STNBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001";
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE SECTOR LIKE '%ALTH%'
;
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE SECTOR LIKE '%ALTH%'
;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE PRICE > 0.0
;
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM
ticker_symbol,
COUNT(*) AS ticker_symbol_count,
AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG_T REAL,
TICKER_SYMBOL_COUNT_T INTEGER,
TICKER_SYMBOL_AVG_R REAL,
TICKER_SYMBOL_COUNT_R INTEGER);
CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
avg(price) over TSW as ticker_symbol_avg1,
count(*) over TSW as cnt1,
avg(price) over RSW as ticker_symbol_avg2,
count(*) over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol ROWS 2 preceding);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL);
CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT
STREAM A.ticker_symbol,
A.ticker_symbol_count,
B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER
);
CREATE OR REPLACE PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT
STREAM ticker_symbol,
count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM ticker_symbol,
avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
aws kinesisanalytics add-application-reference-data-source \
--region us-east-1 \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'
aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]
aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]
· 고급 분석에서 Cursol 함수를 사용합니다.(표준 SQL과 많이 다르기 때문에 처음에는 곤혹스러웠다)
・ "SELECT STREAM*FROM(TABLE(xxxFUNCTION(COURSOR(in-application-stream],option)"
RANDOM_CUT_FOREST(이상 감지)
・ 기재된 정보는'in-application-stream','number OfTrees(default:100)','subSampleSize(default:256)','timeDecay(default:10000)','shingleSize(default:1)'5개
자세한 내용은 매뉴얼을 참조하세요
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
TABLE(RANDOM_CUT_FOREST(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
)
) ;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND),
ANOMALY_SCORE DESC;
DISTINCT COUNT
·COUNT_DISTINCT_ITEMS_TUMBLING 함수에 기재된 정보는'in-application-stream','열명','TUMBLING WINDOW 시간'등 세 가지다.하나의 열 이름만 여러 열을 지정할 수 없습니다
・ Distinct Count의 정보만 내보내기
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'TICKER_SYMBOL', -- name of column in single quotes
10 -- tumbling window size in seconds
)
);
GROUP RANK
미안합니다. 지금 오류가 발생했습니다. 잘 모르겠습니다.(나는 문서를 읽었다...)
· 오류 내용은 "SQL error message: Fromline2, column55 to line9, column3: Nomatch found for function signature GROUP_RANK(,)"
· 사용된 조회는 다음과 같다.
• 정보가 있으면 공유했으면 좋겠어요.
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
PRICE real,
RANK_NUM integer
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
'PRICE', -- rankByColumnName
'rank_num', -- rankOutColumnName
'desc', -- sortOrder
'asc', --outputOrder
10, --maxIdle
5 --outputMax
)
);
TOP-K
・ 문서를 읽고 다음과 같이 기재하였습니다. SELECT STREAM 이후의 ticker_Symbol 오류 없음
· 단, 템플릿의 조회는 순환되며, TOP-K 함수에서 얻은 값은 ITEM, ITEM_COUNT라는 신비로운 두 줄 같은데.(문서에 대한 상세한 설명이 없기 때문에 많은 시도를 했지만 출력이 없었다)
• 정보가 있으면 공유했으면 좋겠어요.
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
TICKER_SYMBOL VARCHAR(4),
PRICE REAL)
;
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'ticker_symbol', -- name of column in single quotes
10, -- number of top items
10 -- tumbling window size in seconds
)
);
이제 시도한 검색을 정리하고 소개합니다.
Reference
이 문제에 관하여(Kinesis Analytics SQL 질의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/takada_tf/items/f03c36eed9e22eb74744텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)