Kinesis Analytics SQL 질의

12218 단어 SQLanalyticsKinesis
SQL 쿼리 Kinesis Analytics에 대해 AWS 측은 템플릿을 준비했지만 미묘한 오류가 있어 요약했습니다.또 푹 빠진 곳도 많아서 정리해 봤습니다.

Kinesis Analytics를 사용하는 포인트


• 스트림 이름과 데이터 이름의 이름은 대소문자를 구분합니다.
→ 스트림 이름 또는 데이터 이름을 ""로 묶으면 소문자, 묶지 않으면 대문자
→ (개인) 흐름 문장의 흐름 이름과 데이터 이름을 만드는 것이 가장 좋다
・ Kinesis Streams demo 자주 중지
→잠시 삭제하고 다시 만드는 것이 좋다
• 참조 데이터 추가 시 비용 흐름 오류 발생
· SQL 코드를 저장하고 실행하려면 시간이 오래 걸릴 수 있음
  - Random_CUT 함수는 30분 정도 걸릴 수 있어요.
틀에 미묘한 오류가 있을 수 있습니다

느끼다


구축이 수월하다
・ KPU는 자동으로 축소되지만 덮개 기능이 없기 때문에 돈을 주의해야 한다
좋아, 구린내가 나.코드 업데이트에서 런닝까지 시간이 걸릴 때가 있어요.
문서가 적다
/템플릿 오류, 주의하십시오.나 진짜 고급 분석 함수 몰라.
소개할 질의는 다음과 같습니다.

기본 조회


・CREATE PUMP&STREAM
・SELECT
・WHERE
・MULTI STREAMS
・GROUP BY
・WINDOW
 1.TUMBLING WINDOW
 2.SLIDING WINDOW
・INNER JOIN

고급 분석


・RANDOM_CUT_FOREST
・DISTINCT
・GROUP RANK ※ 미완성
・TOP-K ※ 미완성

기본 문법


CREATE PUMP & STREAM, SELECT


· [OR REPLACE]는 STREAM 이름이 이미 존재하면 바꾼다는 뜻입니다.
· Analytics는 PUMP에 데이터를 투입하여 STREAM에 데이터를 저장합니다.
· 함수 이름을 열 이름(avg,min,max 등)에 사용할 수 없음
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_STNBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001";

WHERE


・ 기본적으로 표준 SQL과 동일
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001"
   WHERE SECTOR LIKE '%ALTH%' 
;

MULTI STREAMS


· PUMP 이름은 함께 사용할 수 없습니다.
• MULTI STREAMS를 정의할 때 AWS 콘솔의 real-time analytics 탭에 여러 STREAM이 표시됩니다.
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE SECTOR LIKE '%ALTH%' 
;


CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE PRICE > 0.0
;

GROUP BY(TUMBLING WINDOW)


· 문법은 일반적으로 SQL과 같지만 TIME WINDOW(TUMBLING WINDOW라고 함)를 설정해야 합니다.
· ROWTIME는 Kinesis Analytics에서 미리 준비한 시간에 수집된 시간을 저장한다
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER,
  TICKER_SYMBOL_AVG REAL
);

CREATE OR REPLACE  PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM 
   ticker_symbol, 
   COUNT(*) AS ticker_symbol_count,
   AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

SLIDING WINDOW


· SLIDING WINDOW를 사용할 때 GROUP BY 자구가 아닌 집합 함수(avg,min,count 등) 다음에 WINDOW를 정의합니다.
・ WINDOW 너비의 설정은 TIME과 ROW 두 가지가 있습니다.
CREATE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4), 
    TICKER_SYMBOL_AVG_T REAL,
    TICKER_SYMBOL_COUNT_T INTEGER,
    TICKER_SYMBOL_AVG_R REAL,
    TICKER_SYMBOL_COUNT_R INTEGER);

CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
              avg(price) over TSW as ticker_symbol_avg1,
              count(*)  over TSW as cnt1,
              avg(price) over RSW as ticker_symbol_avg2,
              count(*)  over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol  ROWS 2 preceding);

INNER JOIN


· 서로 다른 STREAM 간의 조인 처리 가능
· 참조를 추가할 수 없음(2017/3월까지)
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
  TICKER_SYMBOL VARCHAR(4),
  TICKER_SYMBOL_COUNT INTEGER, 
  TICKER_SYMBOL_AVG REAL);

CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT 
   STREAM A.ticker_symbol,  
          A.ticker_symbol_count,
          B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol

INNER JOIN에서 사용하는 DESTINATION_SQL_STREAM 및 DESTINATION_SQL_STREAM2 생성

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER
);

CREATE OR REPLACE  PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT 
  STREAM ticker_symbol, 
         count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_AVG REAL
);

CREATE OR REPLACE  PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
  STREAM ticker_symbol, 
         avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

참조 데이터 추가


· AWS CLI에서만 추가 가능
·describe-application을 통해 current-application-version-id 확인
aws kinesisanalytics add-application-reference-data-source \
--region us-east-1   \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'

애플리케이션 보기


・aws configure에서 사전 설정
aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]

참조 데이터 삭제


· describe-application을 통해 version-id와 reference-id 확인
aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]

고급 분석


· 고급 분석에서 Cursol 함수를 사용합니다.(표준 SQL과 많이 다르기 때문에 처음에는 곤혹스러웠다)
・ "SELECT STREAM*FROM(TABLE(xxxFUNCTION(COURSOR(in-application-stream],option)"

RANDOM_CUT_FOREST(이상 감지)


・ 기재된 정보는'in-application-stream','number OfTrees(default:100)','subSampleSize(default:256)','timeDecay(default:10000)','shingleSize(default:1)'5개
자세한 내용은 매뉴얼을 참조하세요
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM  ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
      TABLE(RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
      )
    ) ;

CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
   STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND), 
         ANOMALY_SCORE DESC;

DISTINCT COUNT


·COUNT_DISTINCT_ITEMS_TUMBLING 함수에 기재된 정보는'in-application-stream','열명','TUMBLING WINDOW 시간'등 세 가지다.하나의 열 이름만 여러 열을 지정할 수 없습니다
・ Distinct Count의 정보만 내보내기
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'TICKER_SYMBOL', -- name of column in single quotes
  10 -- tumbling window size in seconds
  )
);

GROUP RANK


미안합니다. 지금 오류가 발생했습니다. 잘 모르겠습니다.(나는 문서를 읽었다...)
· 오류 내용은 "SQL error message: Fromline2, column55 to line9, column3: Nomatch found for function signature GROUP_RANK(,)"
· 사용된 조회는 다음과 같다.
• 정보가 있으면 공유했으면 좋겠어요.
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4),
    PRICE real,
    RANK_NUM integer
);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
  CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
  'PRICE', -- rankByColumnName
  'rank_num', -- rankOutColumnName
  'desc', -- sortOrder
  'asc', --outputOrder
  10, --maxIdle
  5 --outputMax
  )
);

TOP-K


・ 문서를 읽고 다음과 같이 기재하였습니다. SELECT STREAM 이후의 ticker_Symbol 오류 없음
· 단, 템플릿의 조회는 순환되며, TOP-K 함수에서 얻은 값은 ITEM, ITEM_COUNT라는 신비로운 두 줄 같은데.(문서에 대한 상세한 설명이 없기 때문에 많은 시도를 했지만 출력이 없었다)
• 정보가 있으면 공유했으면 좋겠어요.
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
    TICKER_SYMBOL VARCHAR(4), 
    PRICE REAL)
;

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'ticker_symbol', -- name of column in single quotes
  10, -- number of top items
  10 -- tumbling window size in seconds
  )
);
이제 시도한 검색을 정리하고 소개합니다.

좋은 웹페이지 즐겨찾기