Kinesis Analytics SQL 질의
Kinesis Analytics를 사용하는 포인트 
• 스트림 이름과 데이터 이름의 이름은 대소문자를 구분합니다.
→ 스트림 이름 또는 데이터 이름을 ""로 묶으면 소문자, 묶지 않으면 대문자
→ (개인) 흐름 문장의 흐름 이름과 데이터 이름을 만드는 것이 가장 좋다
・ Kinesis Streams demo 자주 중지
→잠시 삭제하고 다시 만드는 것이 좋다
• 참조 데이터 추가 시 비용 흐름 오류 발생
· SQL 코드를 저장하고 실행하려면 시간이 오래 걸릴 수 있음
  - Random_CUT 함수는 30분 정도 걸릴 수 있어요.
틀에 미묘한 오류가 있을 수 있습니다
느끼다 
구축이 수월하다
・ KPU는 자동으로 축소되지만 덮개 기능이 없기 때문에 돈을 주의해야 한다
좋아, 구린내가 나.코드 업데이트에서 런닝까지 시간이 걸릴 때가 있어요.
문서가 적다
/템플릿 오류, 주의하십시오.나 진짜 고급 분석 함수 몰라.
소개할 질의는 다음과 같습니다.
기본 조회 
・CREATE PUMP&STREAM
・SELECT
・WHERE
・MULTI STREAMS
・GROUP BY
・WINDOW
 1.TUMBLING WINDOW
 2.SLIDING WINDOW
・INNER JOIN
고급 분석 
・RANDOM_CUT_FOREST
・DISTINCT
・GROUP RANK ※ 미완성
・TOP-K ※ 미완성
기본 문법 
CREATE PUMP & STREAM, SELECT
 
· [OR REPLACE]는 STREAM 이름이 이미 존재하면 바꾼다는 뜻입니다.
· Analytics는 PUMP에 데이터를 투입하여 STREAM에 데이터를 저장합니다.
· 함수 이름을 열 이름(avg,min,max 등)에 사용할 수 없음CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_STNBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001";
 
WHERE
 
・ 기본적으로 표준 SQL과 동일CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001"
   WHERE SECTOR LIKE '%ALTH%' 
;
 
MULTI STREAMS
 
· PUMP 이름은 함께 사용할 수 없습니다.
• MULTI STREAMS를 정의할 때 AWS 콘솔의 real-time analytics 탭에 여러 STREAM이 표시됩니다.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE SECTOR LIKE '%ALTH%' 
;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE PRICE > 0.0
;
 
GROUP BY(TUMBLING WINDOW)
 
· 문법은 일반적으로 SQL과 같지만 TIME WINDOW(TUMBLING WINDOW라고 함)를 설정해야 합니다.
· ROWTIME는 Kinesis Analytics에서 미리 준비한 시간에 수집된 시간을 저장한다CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER,
  TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE  PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM 
   ticker_symbol, 
   COUNT(*) AS ticker_symbol_count,
   AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
 
SLIDING WINDOW
 
· SLIDING WINDOW를 사용할 때 GROUP BY 자구가 아닌 집합 함수(avg,min,count 등) 다음에 WINDOW를 정의합니다.
・ WINDOW 너비의 설정은 TIME과 ROW 두 가지가 있습니다.CREATE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4), 
    TICKER_SYMBOL_AVG_T REAL,
    TICKER_SYMBOL_COUNT_T INTEGER,
    TICKER_SYMBOL_AVG_R REAL,
    TICKER_SYMBOL_COUNT_R INTEGER);
CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
              avg(price) over TSW as ticker_symbol_avg1,
              count(*)  over TSW as cnt1,
              avg(price) over RSW as ticker_symbol_avg2,
              count(*)  over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol  ROWS 2 preceding);
 
INNER JOIN
 
· 서로 다른 STREAM 간의 조인 처리 가능
· 참조를 추가할 수 없음(2017/3월까지)CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
  TICKER_SYMBOL VARCHAR(4),
  TICKER_SYMBOL_COUNT INTEGER, 
  TICKER_SYMBOL_AVG REAL);
CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT 
   STREAM A.ticker_symbol,  
          A.ticker_symbol_count,
          B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol
 
INNER JOIN에서 사용하는 DESTINATION_SQL_STREAM 및 DESTINATION_SQL_STREAM2 생성
 CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER
);
CREATE OR REPLACE  PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT 
  STREAM ticker_symbol, 
         count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE  PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
  STREAM ticker_symbol, 
         avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
참조 데이터 추가
 
· AWS CLI에서만 추가 가능
·describe-application을 통해 current-application-version-id 확인aws kinesisanalytics add-application-reference-data-source \
--region us-east-1   \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'
애플리케이션 보기
 
・aws configure에서 사전 설정aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]
참조 데이터 삭제
 
· describe-application을 통해 version-id와 reference-id 확인aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]
고급 분석 
· 고급 분석에서 Cursol 함수를 사용합니다.(표준 SQL과 많이 다르기 때문에 처음에는 곤혹스러웠다)
・ "SELECT STREAM*FROM(TABLE(xxxFUNCTION(COURSOR(in-application-stream],option)"
RANDOM_CUT_FOREST(이상 감지)
 
・ 기재된 정보는'in-application-stream','number OfTrees(default:100)','subSampleSize(default:256)','timeDecay(default:10000)','shingleSize(default:1)'5개
자세한 내용은 매뉴얼을 참조하세요CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM  ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
      TABLE(RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
      )
    ) ;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
   STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND), 
         ANOMALY_SCORE DESC;
 
DISTINCT COUNT
 
·COUNT_DISTINCT_ITEMS_TUMBLING 함수에 기재된 정보는'in-application-stream','열명','TUMBLING WINDOW 시간'등 세 가지다.하나의 열 이름만 여러 열을 지정할 수 없습니다
・ Distinct Count의 정보만 내보내기CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'TICKER_SYMBOL', -- name of column in single quotes
  10 -- tumbling window size in seconds
  )
);
 
GROUP RANK
 
미안합니다. 지금 오류가 발생했습니다. 잘 모르겠습니다.(나는 문서를 읽었다...)
· 오류 내용은 "SQL error message: Fromline2, column55 to line9, column3: Nomatch found for function signature GROUP_RANK(,)"
· 사용된 조회는 다음과 같다.
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4),
    PRICE real,
    RANK_NUM integer
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
  CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
  'PRICE', -- rankByColumnName
  'rank_num', -- rankOutColumnName
  'desc', -- sortOrder
  'asc', --outputOrder
  10, --maxIdle
  5 --outputMax
  )
);
TOP-K
 
・ 문서를 읽고 다음과 같이 기재하였습니다. SELECT STREAM 이후의 ticker_Symbol 오류 없음
· 단, 템플릿의 조회는 순환되며, TOP-K 함수에서 얻은 값은 ITEM, ITEM_COUNT라는 신비로운 두 줄 같은데.(문서에 대한 상세한 설명이 없기 때문에 많은 시도를 했지만 출력이 없었다)
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
    TICKER_SYMBOL VARCHAR(4), 
    PRICE REAL)
;
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'ticker_symbol', -- name of column in single quotes
  10, -- number of top items
  10 -- tumbling window size in seconds
  )
);
이제 시도한 검색을 정리하고 소개합니다.
                
                    
        
    
    
    
    
    
                
                
                
                
                    
                        
                            
                            
                            Reference
                            
                            이 문제에 관하여(Kinesis Analytics SQL 질의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
                                
                                https://qiita.com/takada_tf/items/f03c36eed9e22eb74744
                            
                            
                            
                                텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
                            
                            
                                
                                
                                
                                
                                
                                우수한 개발자 콘텐츠 발견에 전념
                                (Collection and Share based on the CC Protocol.)
                            
                            
                        
                    
                
                
                
            
구축이 수월하다
・ KPU는 자동으로 축소되지만 덮개 기능이 없기 때문에 돈을 주의해야 한다
좋아, 구린내가 나.코드 업데이트에서 런닝까지 시간이 걸릴 때가 있어요.
문서가 적다
/템플릿 오류, 주의하십시오.나 진짜 고급 분석 함수 몰라.
소개할 질의는 다음과 같습니다.
기본 조회 
・CREATE PUMP&STREAM
・SELECT
・WHERE
・MULTI STREAMS
・GROUP BY
・WINDOW
 1.TUMBLING WINDOW
 2.SLIDING WINDOW
・INNER JOIN
고급 분석 
・RANDOM_CUT_FOREST
・DISTINCT
・GROUP RANK ※ 미완성
・TOP-K ※ 미완성
기본 문법 
CREATE PUMP & STREAM, SELECT
 
· [OR REPLACE]는 STREAM 이름이 이미 존재하면 바꾼다는 뜻입니다.
· Analytics는 PUMP에 데이터를 투입하여 STREAM에 데이터를 저장합니다.
· 함수 이름을 열 이름(avg,min,max 등)에 사용할 수 없음CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_STNBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001";
 
WHERE
 
・ 기본적으로 표준 SQL과 동일CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001"
   WHERE SECTOR LIKE '%ALTH%' 
;
 
MULTI STREAMS
 
· PUMP 이름은 함께 사용할 수 없습니다.
• MULTI STREAMS를 정의할 때 AWS 콘솔의 real-time analytics 탭에 여러 STREAM이 표시됩니다.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE SECTOR LIKE '%ALTH%' 
;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE PRICE > 0.0
;
 
GROUP BY(TUMBLING WINDOW)
 
· 문법은 일반적으로 SQL과 같지만 TIME WINDOW(TUMBLING WINDOW라고 함)를 설정해야 합니다.
· ROWTIME는 Kinesis Analytics에서 미리 준비한 시간에 수집된 시간을 저장한다CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER,
  TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE  PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM 
   ticker_symbol, 
   COUNT(*) AS ticker_symbol_count,
   AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
 
SLIDING WINDOW
 
· SLIDING WINDOW를 사용할 때 GROUP BY 자구가 아닌 집합 함수(avg,min,count 등) 다음에 WINDOW를 정의합니다.
・ WINDOW 너비의 설정은 TIME과 ROW 두 가지가 있습니다.CREATE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4), 
    TICKER_SYMBOL_AVG_T REAL,
    TICKER_SYMBOL_COUNT_T INTEGER,
    TICKER_SYMBOL_AVG_R REAL,
    TICKER_SYMBOL_COUNT_R INTEGER);
CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
              avg(price) over TSW as ticker_symbol_avg1,
              count(*)  over TSW as cnt1,
              avg(price) over RSW as ticker_symbol_avg2,
              count(*)  over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol  ROWS 2 preceding);
 
INNER JOIN
 
· 서로 다른 STREAM 간의 조인 처리 가능
· 참조를 추가할 수 없음(2017/3월까지)CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
  TICKER_SYMBOL VARCHAR(4),
  TICKER_SYMBOL_COUNT INTEGER, 
  TICKER_SYMBOL_AVG REAL);
CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT 
   STREAM A.ticker_symbol,  
          A.ticker_symbol_count,
          B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol
 
INNER JOIN에서 사용하는 DESTINATION_SQL_STREAM 및 DESTINATION_SQL_STREAM2 생성
 CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER
);
CREATE OR REPLACE  PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT 
  STREAM ticker_symbol, 
         count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE  PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
  STREAM ticker_symbol, 
         avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
참조 데이터 추가
 
· AWS CLI에서만 추가 가능
·describe-application을 통해 current-application-version-id 확인aws kinesisanalytics add-application-reference-data-source \
--region us-east-1   \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'
애플리케이션 보기
 
・aws configure에서 사전 설정aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]
참조 데이터 삭제
 
· describe-application을 통해 version-id와 reference-id 확인aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]
고급 분석 
· 고급 분석에서 Cursol 함수를 사용합니다.(표준 SQL과 많이 다르기 때문에 처음에는 곤혹스러웠다)
・ "SELECT STREAM*FROM(TABLE(xxxFUNCTION(COURSOR(in-application-stream],option)"
RANDOM_CUT_FOREST(이상 감지)
 
・ 기재된 정보는'in-application-stream','number OfTrees(default:100)','subSampleSize(default:256)','timeDecay(default:10000)','shingleSize(default:1)'5개
자세한 내용은 매뉴얼을 참조하세요CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM  ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
      TABLE(RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
      )
    ) ;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
   STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND), 
         ANOMALY_SCORE DESC;
 
DISTINCT COUNT
 
·COUNT_DISTINCT_ITEMS_TUMBLING 함수에 기재된 정보는'in-application-stream','열명','TUMBLING WINDOW 시간'등 세 가지다.하나의 열 이름만 여러 열을 지정할 수 없습니다
・ Distinct Count의 정보만 내보내기CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'TICKER_SYMBOL', -- name of column in single quotes
  10 -- tumbling window size in seconds
  )
);
 
GROUP RANK
 
미안합니다. 지금 오류가 발생했습니다. 잘 모르겠습니다.(나는 문서를 읽었다...)
· 오류 내용은 "SQL error message: Fromline2, column55 to line9, column3: Nomatch found for function signature GROUP_RANK(,)"
· 사용된 조회는 다음과 같다.
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4),
    PRICE real,
    RANK_NUM integer
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
  CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
  'PRICE', -- rankByColumnName
  'rank_num', -- rankOutColumnName
  'desc', -- sortOrder
  'asc', --outputOrder
  10, --maxIdle
  5 --outputMax
  )
);
TOP-K
 
・ 문서를 읽고 다음과 같이 기재하였습니다. SELECT STREAM 이후의 ticker_Symbol 오류 없음
· 단, 템플릿의 조회는 순환되며, TOP-K 함수에서 얻은 값은 ITEM, ITEM_COUNT라는 신비로운 두 줄 같은데.(문서에 대한 상세한 설명이 없기 때문에 많은 시도를 했지만 출력이 없었다)
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
    TICKER_SYMBOL VARCHAR(4), 
    PRICE REAL)
;
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'ticker_symbol', -- name of column in single quotes
  10, -- number of top items
  10 -- tumbling window size in seconds
  )
);
이제 시도한 검색을 정리하고 소개합니다.
                
                    
        
    
    
    
    
    
                
                
                
                
                    
                        
                            
                            
                            Reference
                            
                            이 문제에 관하여(Kinesis Analytics SQL 질의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
                                
                                https://qiita.com/takada_tf/items/f03c36eed9e22eb74744
                            
                            
                            
                                텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
                            
                            
                                
                                
                                
                                
                                
                                우수한 개발자 콘텐츠 발견에 전념
                                (Collection and Share based on the CC Protocol.)
                            
                            
                        
                    
                
                
                
            
・RANDOM_CUT_FOREST
・DISTINCT
・GROUP RANK ※ 미완성
・TOP-K ※ 미완성
기본 문법 
CREATE PUMP & STREAM, SELECT
 
· [OR REPLACE]는 STREAM 이름이 이미 존재하면 바꾼다는 뜻입니다.
· Analytics는 PUMP에 데이터를 투입하여 STREAM에 데이터를 저장합니다.
· 함수 이름을 열 이름(avg,min,max 등)에 사용할 수 없음CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_STNBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001";
 
WHERE
 
・ 기본적으로 표준 SQL과 동일CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001"
   WHERE SECTOR LIKE '%ALTH%' 
;
 
MULTI STREAMS
 
· PUMP 이름은 함께 사용할 수 없습니다.
• MULTI STREAMS를 정의할 때 AWS 콘솔의 real-time analytics 탭에 여러 STREAM이 표시됩니다.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE SECTOR LIKE '%ALTH%' 
;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE PRICE > 0.0
;
 
GROUP BY(TUMBLING WINDOW)
 
· 문법은 일반적으로 SQL과 같지만 TIME WINDOW(TUMBLING WINDOW라고 함)를 설정해야 합니다.
· ROWTIME는 Kinesis Analytics에서 미리 준비한 시간에 수집된 시간을 저장한다CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER,
  TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE  PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM 
   ticker_symbol, 
   COUNT(*) AS ticker_symbol_count,
   AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
 
SLIDING WINDOW
 
· SLIDING WINDOW를 사용할 때 GROUP BY 자구가 아닌 집합 함수(avg,min,count 등) 다음에 WINDOW를 정의합니다.
・ WINDOW 너비의 설정은 TIME과 ROW 두 가지가 있습니다.CREATE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4), 
    TICKER_SYMBOL_AVG_T REAL,
    TICKER_SYMBOL_COUNT_T INTEGER,
    TICKER_SYMBOL_AVG_R REAL,
    TICKER_SYMBOL_COUNT_R INTEGER);
CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
              avg(price) over TSW as ticker_symbol_avg1,
              count(*)  over TSW as cnt1,
              avg(price) over RSW as ticker_symbol_avg2,
              count(*)  over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol  ROWS 2 preceding);
 
INNER JOIN
 
· 서로 다른 STREAM 간의 조인 처리 가능
· 참조를 추가할 수 없음(2017/3월까지)CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
  TICKER_SYMBOL VARCHAR(4),
  TICKER_SYMBOL_COUNT INTEGER, 
  TICKER_SYMBOL_AVG REAL);
CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT 
   STREAM A.ticker_symbol,  
          A.ticker_symbol_count,
          B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol
 
INNER JOIN에서 사용하는 DESTINATION_SQL_STREAM 및 DESTINATION_SQL_STREAM2 생성
 CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER
);
CREATE OR REPLACE  PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT 
  STREAM ticker_symbol, 
         count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE  PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
  STREAM ticker_symbol, 
         avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
참조 데이터 추가
 
· AWS CLI에서만 추가 가능
·describe-application을 통해 current-application-version-id 확인aws kinesisanalytics add-application-reference-data-source \
--region us-east-1   \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'
애플리케이션 보기
 
・aws configure에서 사전 설정aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]
참조 데이터 삭제
 
· describe-application을 통해 version-id와 reference-id 확인aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]
고급 분석 
· 고급 분석에서 Cursol 함수를 사용합니다.(표준 SQL과 많이 다르기 때문에 처음에는 곤혹스러웠다)
・ "SELECT STREAM*FROM(TABLE(xxxFUNCTION(COURSOR(in-application-stream],option)"
RANDOM_CUT_FOREST(이상 감지)
 
・ 기재된 정보는'in-application-stream','number OfTrees(default:100)','subSampleSize(default:256)','timeDecay(default:10000)','shingleSize(default:1)'5개
자세한 내용은 매뉴얼을 참조하세요CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM  ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
      TABLE(RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
      )
    ) ;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
   STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND), 
         ANOMALY_SCORE DESC;
 
DISTINCT COUNT
 
·COUNT_DISTINCT_ITEMS_TUMBLING 함수에 기재된 정보는'in-application-stream','열명','TUMBLING WINDOW 시간'등 세 가지다.하나의 열 이름만 여러 열을 지정할 수 없습니다
・ Distinct Count의 정보만 내보내기CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'TICKER_SYMBOL', -- name of column in single quotes
  10 -- tumbling window size in seconds
  )
);
 
GROUP RANK
 
미안합니다. 지금 오류가 발생했습니다. 잘 모르겠습니다.(나는 문서를 읽었다...)
· 오류 내용은 "SQL error message: Fromline2, column55 to line9, column3: Nomatch found for function signature GROUP_RANK(,)"
· 사용된 조회는 다음과 같다.
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4),
    PRICE real,
    RANK_NUM integer
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
  CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
  'PRICE', -- rankByColumnName
  'rank_num', -- rankOutColumnName
  'desc', -- sortOrder
  'asc', --outputOrder
  10, --maxIdle
  5 --outputMax
  )
);
TOP-K
 
・ 문서를 읽고 다음과 같이 기재하였습니다. SELECT STREAM 이후의 ticker_Symbol 오류 없음
· 단, 템플릿의 조회는 순환되며, TOP-K 함수에서 얻은 값은 ITEM, ITEM_COUNT라는 신비로운 두 줄 같은데.(문서에 대한 상세한 설명이 없기 때문에 많은 시도를 했지만 출력이 없었다)
• 정보가 있으면 공유했으면 좋겠어요.CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
    TICKER_SYMBOL VARCHAR(4), 
    PRICE REAL)
;
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'ticker_symbol', -- name of column in single quotes
  10, -- number of top items
  10 -- tumbling window size in seconds
  )
);
이제 시도한 검색을 정리하고 소개합니다.
                
                    
        
    
    
    
    
    
                
                
                
                
                    
                        
                            
                            
                            Reference
                            
                            이 문제에 관하여(Kinesis Analytics SQL 질의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
                                
                                https://qiita.com/takada_tf/items/f03c36eed9e22eb74744
                            
                            
                            
                                텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
                            
                            
                                
                                
                                
                                
                                
                                우수한 개발자 콘텐츠 발견에 전념
                                (Collection and Share based on the CC Protocol.)
                            
                            
                        
                    
                
                
                
            
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_STNBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001";
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001"
   WHERE SECTOR LIKE '%ALTH%' 
;
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE SECTOR LIKE '%ALTH%' 
;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE PRICE > 0.0
;
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER,
  TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE  PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM 
   ticker_symbol, 
   COUNT(*) AS ticker_symbol_count,
   AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4), 
    TICKER_SYMBOL_AVG_T REAL,
    TICKER_SYMBOL_COUNT_T INTEGER,
    TICKER_SYMBOL_AVG_R REAL,
    TICKER_SYMBOL_COUNT_R INTEGER);
CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
              avg(price) over TSW as ticker_symbol_avg1,
              count(*)  over TSW as cnt1,
              avg(price) over RSW as ticker_symbol_avg2,
              count(*)  over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol  ROWS 2 preceding);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
  TICKER_SYMBOL VARCHAR(4),
  TICKER_SYMBOL_COUNT INTEGER, 
  TICKER_SYMBOL_AVG REAL);
CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT 
   STREAM A.ticker_symbol,  
          A.ticker_symbol_count,
          B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER
);
CREATE OR REPLACE  PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT 
  STREAM ticker_symbol, 
         count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE  PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
  STREAM ticker_symbol, 
         avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
aws kinesisanalytics add-application-reference-data-source \
--region us-east-1   \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'
aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]
aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]
· 고급 분석에서 Cursol 함수를 사용합니다.(표준 SQL과 많이 다르기 때문에 처음에는 곤혹스러웠다)
・ "SELECT STREAM*FROM(TABLE(xxxFUNCTION(COURSOR(in-application-stream],option)"
RANDOM_CUT_FOREST(이상 감지)
・ 기재된 정보는'in-application-stream','number OfTrees(default:100)','subSampleSize(default:256)','timeDecay(default:10000)','shingleSize(default:1)'5개
자세한 내용은 매뉴얼을 참조하세요
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM  ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
      TABLE(RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
      )
    ) ;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
   STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND), 
         ANOMALY_SCORE DESC;
 DISTINCT COUNT
·COUNT_DISTINCT_ITEMS_TUMBLING 함수에 기재된 정보는'in-application-stream','열명','TUMBLING WINDOW 시간'등 세 가지다.하나의 열 이름만 여러 열을 지정할 수 없습니다
・ Distinct Count의 정보만 내보내기
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'TICKER_SYMBOL', -- name of column in single quotes
  10 -- tumbling window size in seconds
  )
);
 GROUP RANK
미안합니다. 지금 오류가 발생했습니다. 잘 모르겠습니다.(나는 문서를 읽었다...)
· 오류 내용은 "SQL error message: Fromline2, column55 to line9, column3: Nomatch found for function signature GROUP_RANK(,)"
· 사용된 조회는 다음과 같다.
• 정보가 있으면 공유했으면 좋겠어요.
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4),
    PRICE real,
    RANK_NUM integer
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
  CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
  'PRICE', -- rankByColumnName
  'rank_num', -- rankOutColumnName
  'desc', -- sortOrder
  'asc', --outputOrder
  10, --maxIdle
  5 --outputMax
  )
);
TOP-K
・ 문서를 읽고 다음과 같이 기재하였습니다. SELECT STREAM 이후의 ticker_Symbol 오류 없음
· 단, 템플릿의 조회는 순환되며, TOP-K 함수에서 얻은 값은 ITEM, ITEM_COUNT라는 신비로운 두 줄 같은데.(문서에 대한 상세한 설명이 없기 때문에 많은 시도를 했지만 출력이 없었다)
• 정보가 있으면 공유했으면 좋겠어요.
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
    TICKER_SYMBOL VARCHAR(4), 
    PRICE REAL)
;
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'ticker_symbol', -- name of column in single quotes
  10, -- number of top items
  10 -- tumbling window size in seconds
  )
);
이제 시도한 검색을 정리하고 소개합니다.
                Reference
이 문제에 관하여(Kinesis Analytics SQL 질의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/takada_tf/items/f03c36eed9e22eb74744텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
                                
                                
                                
                                
                                
                                우수한 개발자 콘텐츠 발견에 전념
                                (Collection and Share based on the CC Protocol.)