【처음의 Databricks】금융거래 데이터로부터 이상 검지 #3 Anomaly Detector

소개



Databricks Notebook에서 Azure Anomaly Detector API를 두드려 결과를 시각화할 수 있습니다.

연락목차


  • 배포/환경 설정
  • Collaborative Notebook에서 데이터 시각화

  • Anomaly Detector를 데이터 탐색 도구로 사용해보기 →이 논문
  • 첫 번째 모델 구축 (데이터 편향 미 고려)
  • 두 번째 모델 구축 (데이터 편향 고려)

  • Anomaly Detector란?



    Anomaly Detector는 관리형 AI 서비스로 제공되는 Azure Cognitive Services 중 하나입니다.
  • 시계열 데이터 세트에서 이상 감지
  • 교사 없음 학습
  • 이상에 대한 감도를 미세 조정 가능
  • API로 응용 프로그램에 내장 가능
  • 데이터 유형 및 누락 값에 대한 제약이 조금 엄격합니다

  • 예측 결과의 시각화 예

    X축이 시간, Y축이 수치. 파란색 실선이 투입한 데이터.
    해석 결과 이상이 아니라고 판정된 영역이 하늘색, 이상치라고 판정된 데이터는 적색으로 플로팅되어 있다.

    본 연재의 주제인 이상 검지 모델에는 이용하고 있지 않습니다만, 시계열 데이터의 특징을 살짝 파악하고 싶을 때에 사용할 수 있을지도, 입력/출력 모두 심플하므로 Detabricks 의 가시화 기능과 궁합 좋을지도, 라고 하는 것으로 시도해 보았습니다.

    테스트 데이터 생성



    Anomaly Detector에 입력하는 Body 데이터는 다음을 충족해야 합니다.
  • UTC 형식의 타임 스탬프 (컬럼 이름 = timestamp)
  • 수치 (이상 검출 대상. 컬럼명 = value)

  • 이번은 상기 일로 주기성을 명확하게 보여진 구간을 발췌해, Step 가 전자, 트랜잭션수가 후자가 되도록(듯이) 데이터를 성형, 테스트 데이터로 했습니다.
    # step から時系列のカラム作成 1/6-17 のみ抜粋。タイプはTRANSFER
    query1 = '''
    SELECT
      CONCAT('2020-01-', LPAD(ceil(step / 24), 2, '0'), ' ', LPAD((step - 1) % 24, 2, '0'), ':00:00') as date_time_str
      , count(1) as value
    FROM
      financial
    WHERE
      type='TRANSFER' AND step < 409 AND step > 120
    GROUP BY step
    ORDER BY step
    '''
    
    agged_df = spark.sql(query1)
    agged_df.createOrReplaceTempView("agged_view")
    
    # タイムスタンプ作成
    query2 = '''
    SELECT
      CAST(date_time_str as timestamp) as timestamp
      , value 
    FROM
      agged_view
    '''
    
    agged_df2 = spark.sql(query2)
    agged_df2.createOrReplaceTempView("agged_view2")
    
    # リクエスト用データフレーム作成
    agged_pd = agged_df2.select("*").toPandas()
    agged_pd.loc[:, ["timestamp"]] = agged_pd["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    

    테스트 데이터를 막대 그래프로 사용해 봅니다. 수치의 풍선이 있는 곳에서 거래가 적은 것은 곧 알 수 있습니다만, 그 밖에도 특필해야 할 포인트가 있을지도 모릅니다.
    SELECT * FROM agged_view2
    

    결과

    환경 변수 설정



    먼저 Anomaly Detector 리소스를 만듭니다. ( 이 기사 가 참고가 됩니다)

    엔드포인트 URL 및 API Key를 검색한 후 클러스터의 환경 변수에 추가하십시오. (노트북에 평평하게하는 것은 안전하지 않습니다)




    함수 정의



    매개 변수와 실행 함수를 정의합니다.
    import pandas as pd
    import json
    import requests
    
    url = "/anomalydetector/v1.0/timeseries/entire/detect"
    endpoint = os.environ["ANOMALY_DETECTOR_ENDPOINT"]
    subscription_key = os.environ["ANOMALY_DETECTOR_KEY"]
    
    # 作成したデータフレームを json に変換
    series = json.loads(agged_pd.to_json(orient="records"))
    
    # 時系列データの粒度と異常値の感度を Bodyに入れる
    body = {
      "series": series
      ,"granularity": "hourly"
      , "sensitivity": 50
    }
    
    def send_request(endpoint, url, subscription_key, request_data):
        headers = {'Content-Type': 'application/json', 'Ocp-Apim-Subscription-Key': subscription_key}
        response = requests.post(endpoint+url, data=json.dumps(request_data), headers=headers)
        return json.loads(response.content.decode("utf-8"))
    
    def detect_batch(request_data):
        print("Detecting anomalies as a batch")
        # リクエスト送信、返り値を出力
        result = send_request(endpoint, url, subscription_key, request_data)
        print(json.dumps(result, indent=4))
        return result
    
        if result.get('code') is not None:
            print("Detection failed. ErrorCode:{}, ErrorMessage:{}".format(result['code'], result['message']))
        else:
            # データセット内の異常値を表示
            anomalies = result["isAnomaly"]
            print("Anomalies detected in the following data positions:")
    
            for x in range(len(anomalies)):
                if anomalies[x]:
                    print (x, request_data['series'][x]['value'])
    

    실행 및 결과 시각화



    API에 데이터를 던지고 원래 데이터 프레임과 결합합니다.
    results = detect_batch(body)
    mgd_df = pd.concat([agged_pd, pd.DataFrame(results)], axis=1)
    
    isAnomaly (예측 결과. 1=이상, 0=일반)으로 좁혀 그래프화합니다. 연설 거품이 나오는 부분은 육안으로는 눈치 채지 못했지만, 확실히 다른 날의 같은 시간에 비하면 트랜잭션 수가 낮을 것 같습니다.
    display(mgd_df)
    

    결과

    plot option


    요약



    우리는 그래프로 하면 어쩐지 알게 된 것 같은 신경이 쓰이는 경향이 있습니다. Anonaly Detector 는 선입관 없이 시계열 데이터를 보고 싶을 때 유용할지도 모르겠네요. 다음에는 실제로 결정 트리를 사용하여 모델을 구축합니다. 기대하세요!

    참고 링크



    Anomaly Detector 공식 톱 페이지
    Anomaly Detector API 사용에 대한 모범 사례
    Cognitive Service: Anomaly Detector API 사용
    빠른 시작 : Anomaly Detector REST API 및 Python을 사용하여 시계열 데이터의 이상을 감지합니다.

    좋은 웹페이지 즐겨찾기