【처음의 Databricks】금융거래 데이터로부터 이상 검지 #3 Anomaly Detector
14357 단어 AzureDatabricksAnomalyDetector
소개
Databricks Notebook에서 Azure Anomaly Detector API를 두드려 결과를 시각화할 수 있습니다.
연락목차
Anomaly Detector를 데이터 탐색 도구로 사용해보기 →이 논문
Anomaly Detector란?
Anomaly Detector는 관리형 AI 서비스로 제공되는 Azure Cognitive Services 중 하나입니다.
예측 결과의 시각화 예
X축이 시간, Y축이 수치. 파란색 실선이 투입한 데이터.
해석 결과 이상이 아니라고 판정된 영역이 하늘색, 이상치라고 판정된 데이터는 적색으로 플로팅되어 있다.
본 연재의 주제인 이상 검지 모델에는 이용하고 있지 않습니다만, 시계열 데이터의 특징을 살짝 파악하고 싶을 때에 사용할 수 있을지도, 입력/출력 모두 심플하므로 Detabricks 의 가시화 기능과 궁합 좋을지도, 라고 하는 것으로 시도해 보았습니다.
테스트 데이터 생성
Anomaly Detector에 입력하는 Body 데이터는 다음을 충족해야 합니다.
이번은 상기 일로 주기성을 명확하게 보여진 구간을 발췌해, Step 가 전자, 트랜잭션수가 후자가 되도록(듯이) 데이터를 성형, 테스트 데이터로 했습니다.
# step から時系列のカラム作成 1/6-17 のみ抜粋。タイプはTRANSFER
query1 = '''
SELECT
CONCAT('2020-01-', LPAD(ceil(step / 24), 2, '0'), ' ', LPAD((step - 1) % 24, 2, '0'), ':00:00') as date_time_str
, count(1) as value
FROM
financial
WHERE
type='TRANSFER' AND step < 409 AND step > 120
GROUP BY step
ORDER BY step
'''
agged_df = spark.sql(query1)
agged_df.createOrReplaceTempView("agged_view")
# タイムスタンプ作成
query2 = '''
SELECT
CAST(date_time_str as timestamp) as timestamp
, value
FROM
agged_view
'''
agged_df2 = spark.sql(query2)
agged_df2.createOrReplaceTempView("agged_view2")
# リクエスト用データフレーム作成
agged_pd = agged_df2.select("*").toPandas()
agged_pd.loc[:, ["timestamp"]] = agged_pd["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")
테스트 데이터를 막대 그래프로 사용해 봅니다. 수치의 풍선이 있는 곳에서 거래가 적은 것은 곧 알 수 있습니다만, 그 밖에도 특필해야 할 포인트가 있을지도 모릅니다.
SELECT * FROM agged_view2
결과
환경 변수 설정
먼저 Anomaly Detector 리소스를 만듭니다. ( 이 기사 가 참고가 됩니다)
엔드포인트 URL 및 API Key를 검색한 후 클러스터의 환경 변수에 추가하십시오. (노트북에 평평하게하는 것은 안전하지 않습니다)
함수 정의
매개 변수와 실행 함수를 정의합니다.
import pandas as pd
import json
import requests
url = "/anomalydetector/v1.0/timeseries/entire/detect"
endpoint = os.environ["ANOMALY_DETECTOR_ENDPOINT"]
subscription_key = os.environ["ANOMALY_DETECTOR_KEY"]
# 作成したデータフレームを json に変換
series = json.loads(agged_pd.to_json(orient="records"))
# 時系列データの粒度と異常値の感度を Bodyに入れる
body = {
"series": series
,"granularity": "hourly"
, "sensitivity": 50
}
def send_request(endpoint, url, subscription_key, request_data):
headers = {'Content-Type': 'application/json', 'Ocp-Apim-Subscription-Key': subscription_key}
response = requests.post(endpoint+url, data=json.dumps(request_data), headers=headers)
return json.loads(response.content.decode("utf-8"))
def detect_batch(request_data):
print("Detecting anomalies as a batch")
# リクエスト送信、返り値を出力
result = send_request(endpoint, url, subscription_key, request_data)
print(json.dumps(result, indent=4))
return result
if result.get('code') is not None:
print("Detection failed. ErrorCode:{}, ErrorMessage:{}".format(result['code'], result['message']))
else:
# データセット内の異常値を表示
anomalies = result["isAnomaly"]
print("Anomalies detected in the following data positions:")
for x in range(len(anomalies)):
if anomalies[x]:
print (x, request_data['series'][x]['value'])
실행 및 결과 시각화
API에 데이터를 던지고 원래 데이터 프레임과 결합합니다.
results = detect_batch(body)
mgd_df = pd.concat([agged_pd, pd.DataFrame(results)], axis=1)
isAnomaly
(예측 결과. 1=이상, 0=일반)으로 좁혀 그래프화합니다. 연설 거품이 나오는 부분은 육안으로는 눈치 채지 못했지만, 확실히 다른 날의 같은 시간에 비하면 트랜잭션 수가 낮을 것 같습니다.display(mgd_df)
결과
plot option
요약
우리는 그래프로 하면 어쩐지 알게 된 것 같은 신경이 쓰이는 경향이 있습니다. Anonaly Detector 는 선입관 없이 시계열 데이터를 보고 싶을 때 유용할지도 모르겠네요. 다음에는 실제로 결정 트리를 사용하여 모델을 구축합니다. 기대하세요!
참고 링크
Anomaly Detector 공식 톱 페이지
Anomaly Detector API 사용에 대한 모범 사례
Cognitive Service: Anomaly Detector API 사용
빠른 시작 : Anomaly Detector REST API 및 Python을 사용하여 시계열 데이터의 이상을 감지합니다.
Reference
이 문제에 관하여(【처음의 Databricks】금융거래 데이터로부터 이상 검지 #3 Anomaly Detector), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/Catetin0310/items/d29b5fb00e047a7c107e텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)