PySimpleGui로 감시 카메라 뷰어 만들기

나는 인터넷 와이파이 카메라-Tpo C200을 샀다.
뷰어 앱을 만들었으니 소개해 드리죠.

💡만들 응용 프로그램


Image from Gyazo
다음은 개요.
  • 접속 정보 입력
  • Connect 키를 누른 후 연결 정보에 따라 연결
  • 카메라 화면 표시
  • 화살표 단추로 천공기 조작
  • "Disconnect"를 누르고 끊기
  • ⚙사전 준비


    계정 설정


    구매한 카메라는↓.
    https://www.amazon.co.jp/gp/product/B07YG7RNF2/ref=ppx_yo_dt_b_asin_title_o04_s00?ie=UTF8&psc=1
    이 카메라는 스마트폰 전용 앱으로 영상을 보거나 천공기를 조작할 수 있다.
    카메라의 영상 전송에는 사용자 이름과 비밀번호가 필요하다.
    아이디와 비밀번호의 계정 설정은 아래 공식 사이트 FAQ의
    https://www.tapo.com/jp/faq/34/
    단계 1: Tapo 응용 프로그램을 통해 계정 만들기
    참조해주세요.

    💻컨디션


    개발 환경

  • windows10 64bit Home
  • Python 3.8.10
  • Python 3.10.2
  • 파이썬 모듈


    Python에 따라 달라지는 모듈은 다음과 같습니다.
    > python -m venv env
    (env) > pip install rtsp
    (env) > pip install pysimplegui
    (env) > pip install --upgrade onvif_zeep
    

    📝절차.

  • 카메라 이미지 수신
  • 가로 흔들기 세로 흔들기 대응
  • 카메라 이미지 수신


    RTSP 라이브 스트리밍 URL


    Tapo 카메라의 RTSP 라이브 스트리밍 URL은 다음과 같습니다.
  • 고화질 1080P(1920*1080) 스트리밍의 경우
  • rtsp://username:password@IP Address:554/stream1
  • 저화질의 360P(640*360) 흐름의 경우
  • rtsp://username:password@IP Address:554/stream2
  • username,password는 미리 준비한 계정 설정입니다.

    코드


    코드는 다음과 같습니다.
    view.py
    import PySimpleGUI as sg
    import rtsp
    from PIL import Image, ImageTk
    
    sg.theme('Dark Brown')
    
    USER = "username"    # <=自分の環境に合わせる
    PASS = "password"    # <=自分の環境に合わせる
    IPADDR = "192.168.11.xx"    # <=自分の環境に合わせる
    PORT = "554"
    STREAM = "stream2"
    ONVIF_PORT = "2020"
    
    layout = [
            # カメラ接続情報
            [   sg.Text('IPADDR: ', size=(12, 1)), sg.InputText(default_text=IPADDR,  size=(20, 1),key='-ipaddr-'),
                sg.Text('PORT: ', size=(12, 1)), sg.InputText(default_text=PORT,  size=(20, 1),key='-port-'),
                sg.Text('STREAM: ', size=(12, 1)), sg.InputText(default_text=STREAM,  size=(20, 1),key='-stream-')],
            [   sg.Text('ONVIF_PORT: ', size=(12, 1)), sg.InputText(default_text=ONVIF_PORT,  size=(20, 1),key='-onvif_port-')],
            [   sg.Text('USER: ', size=(12, 1)), sg.InputText(default_text=USER,  size=(20, 1),key='-user-'),
                sg.Text('PASS: ', size=(12, 1)), sg.InputText(default_text=PASS,  size=(20, 1),key='-pass-')],
    
            # 画面表示
            [   sg.Image(filename='', key='image')],
    
            # 接続, 切断
            [   sg.Button('Connect', size=(10, 1), key ='-start-'),
                sg.Button('Disconnect', size=(10, 1), key = '-stop-')],
    ]
    
    is_streaming = False
    window = sg.Window('cam viewer', layout, location=(32, 32))
    
    while True:
        event, values = window.read(timeout=20)
        if event in (None, '-exit-'):
            break
    
        elif event == '-start-':
            rtsp_url = f"rtsp://{values['-user-']}:{values['-pass-']}@{values['-ipaddr-']}:{values['-port-']}/{values['-stream-']}"
            client = rtsp.Client(rtsp_server_uri=rtsp_url, verbose=True)
            is_streaming = True
    
        elif event == '-stop-':
            is_streaming = False
            client.close()
            img = Image.new("RGB", (640, 360), color=0)
            window['image'].update(data=ImageTk.PhotoImage(img))
    
        if is_streaming:
            frame = client.read()
            if frame is not None:
                window['image'].update(data=ImageTk.PhotoImage(frame))
            else:
                print("none")
    
    window.close()
    

    실행 프로그램


    (env) > python view.py
    
    Connect 버튼을 사용하여 영상을 재생할 수 있습니다.

    로킹


    로케이션 조작


    프레스는 ONVIF-PTZ 서비스를 사용합니다.
    ONVIF는 인터넷 카메라의 인터페이스를 호환시키기 위해 설립된 표준화 포럼이다
    파이썬 모듈의 onvifzep을 사용합니다.

    코드


    UI 섹션과 카메라 섹션을 분리하여 표시합니다.
  • UI 섹션
  • ptz_viewer.py
    import PySimpleGUI as sg
    from camera import CamPtz
    from PIL import Image, ImageTk
    
    sg.theme('Dark Brown')
    
    USER = "username"    # <=自分の環境に合わせる
    PASS = "password"    # <=自分の環境に合わせる
    IPADDR = "192.168.11.xx"    # <=自分の環境に合わせる
    PORT = "554"
    STREAM = "stream2"
    ONVIF_PORT = "2020"
    
    layout = [
            # カメラ接続情報
            [   sg.Text('IPADDR: ', size=(12, 1)), sg.InputText(default_text=IPADDR,  size=(20, 1),key='-ipaddr-'),
                sg.Text('PORT: ', size=(12, 1)), sg.InputText(default_text=PORT,  size=(20, 1),key='-port-'),
                sg.Text('STREAM: ', size=(12, 1)), sg.InputText(default_text=STREAM,  size=(20, 1),key='-stream-')],
            [   sg.Text('ONVIF_PORT: ', size=(12, 1)), sg.InputText(default_text=ONVIF_PORT,  size=(20, 1),key='-onvif_port-')],
            [   sg.Text('USER: ', size=(12, 1)), sg.InputText(default_text=USER,  size=(20, 1),key='-user-'),
                sg.Text('PASS: ', size=(12, 1)), sg.InputText(default_text=PASS,  size=(20, 1),key='-pass-')],
    
            # 画面表示
            [   sg.Image(filename='', key='image')],
    
            # 接続, 切断
            [   sg.Button('Connect', size=(10, 1), key ='-start-'),
                sg.Button('Disconnect', size=(10, 1), key = '-stop-')],
    
            # パンチルト制御
            [   sg.Button('↖', size=(4, 1), font='Helvetica 14',key ='-pt_topleft-'),
                sg.Button('↑', size=(4, 1), font='Helvetica 14',key = '-pt_topcenter-'),
                sg.Button('↗', size=(4, 1), font='Helvetica 14', key='-pt_topright-'), ],
            [   sg.Button('←', size=(4, 1), font='Helvetica 14',key ='-pt_left-'),
                sg.Button('〇', size=(4, 1), font='Helvetica 14',key = '-pt_center-'),
                sg.Button('→', size=(4, 1), font='Helvetica 14', key='-pt_right-'), ],
            [   sg.Button('↙', size=(4, 1), font='Helvetica 14',key ='-pt_btmleft-'),
                sg.Button('↓', size=(4, 1), font='Helvetica 14',key = '-pt_btmcenter-'),
                sg.Button('↘', size=(4, 1), font='Helvetica 14', key='-pt_btmright-'), ],
    ]
    
    is_streaming = False
    window = sg.Window('cam viewer', layout, location=(32, 32))
    
    while True:
        event, values = window.read(timeout=20)
        if event in (None, '-exit-'):
            break
    
        elif event == '-start-':
            cam_ptz = CamPtz(
                user = values['-user-'],
                pwd = values['-pass-'],
                ipaddr = values['-ipaddr-'],
                port = values['-port-'],
                stream = values['-stream-'],
                onvif_port = values['-onvif_port-']
            )
            # streaming
            cam_ptz.open()
            # onvif - move
            cam_ptz.setup_ptz()
            is_streaming = True
    
        elif event == '-stop-':
            is_streaming = False
            cam_ptz.close()
            img = Image.new("RGB", (640, 360), color=0)
            window['image'].update(data=ImageTk.PhotoImage(img))
    
        elif "-pt_" in event:
            x, y = 0, 0
            x = 1 if "right" in event else x
            x = -1 if "left" in event else x
            y = 1 if "top" in event else y
            y = -1 if "btm" in event else y
            print(event, "x:", x, "y:", y)
            if is_streaming:
                cam_ptz.move(x, y)
    
        if is_streaming:
            frame = cam_ptz.read()
            if frame is not None:
                window['image'].update(data=ImageTk.PhotoImage(frame))
    
    window.close()
    
  • 카메라 부분
  • camera.py
    import rtsp
    from onvif import ONVIFCamera
    import time
    
    class CamStream(object):
        def __init__(self, **kwargs):
            self.user = kwargs["user"]
            self.pwd = kwargs["pwd"]
            self.ipaddr = kwargs["ipaddr"]
            self.port = kwargs["port"]
            self.stream = kwargs["stream"]
            self.onvif_port = kwargs["onvif_port"]
            self.is_connect = False
    
    
        def open(self):
            rtsp_url = f"rtsp://{self.user}:{self.pwd}@{self.ipaddr}:{self.port}/{self.stream}"
            self.client = rtsp.Client(rtsp_server_uri=rtsp_url, verbose=True)
            self.is_connect = True
    
    
        def read(self):
            if self.is_connect is False:
                return None
            return self.client.read()
    
    
        def close(self):
            if self.is_connect:
                self.client.close()
                self.is_connect = False
            return
    
    
    
    class CamPtz(CamStream):
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
            pass
            self.XMAX = 1
            self.XMIN = -1
            self.YMAX = 1
            self.YMIN = -1
            self.moverequest = None
            self.ptz = None
            self.is_ptz_active = False
    
    
        def setup_ptz(self):
            mycam = ONVIFCamera(self.ipaddr, self.onvif_port, self.user, self.pwd)
            # Create media service object
            media = mycam.create_media_service()
            # Create ptz service object
            self.ptz = mycam.create_ptz_service()
            # Get target profile
            media_profile = media.GetProfiles()[0]
            # Get PTZ configuration options for getting continuous move range
            request = self.ptz.create_type('GetConfigurationOptions')
            request.ConfigurationToken = media_profile.PTZConfiguration.token
            ptz_configuration_options = self.ptz.GetConfigurationOptions(request)
    
            self.moverequest = self.ptz.create_type('ContinuousMove')
            self.moverequest.ProfileToken = media_profile.token
            if self.moverequest.Velocity is None:
                self.moverequest.Velocity = self.ptz.GetStatus({'ProfileToken': media_profile.token}).Position
    
            # Get range of pan and tilt
            # NOTE: X and Y are velocity vector
            self.XMAX = ptz_configuration_options.Spaces.ContinuousPanTiltVelocitySpace[0].XRange.Max
            self.XMIN = ptz_configuration_options.Spaces.ContinuousPanTiltVelocitySpace[0].XRange.Min
            self.YMAX = ptz_configuration_options.Spaces.ContinuousPanTiltVelocitySpace[0].YRange.Max
            self.YMIN = ptz_configuration_options.Spaces.ContinuousPanTiltVelocitySpace[0].YRange.Min
            return
    
    
        def move(self, x, y):
            self.moverequest.Velocity.PanTilt.x = x
            self.moverequest.Velocity.PanTilt.y = y
            if self.is_ptz_active:
                self.ptz.Stop({'ProfileToken': self.moverequest.ProfileToken})
            active = True
            self.ptz.ContinuousMove(self.moverequest)
    

    실행 프로그램


    (env) > python ptz_viewer.py
    
    Connect 버튼을 사용하여 영상을 재생할 수 있습니다.
    화살표 버튼을 사용하여 천공 작업을 수행할 수 있습니다.

    최후


    카메라의 영상과 가로세로 흔들기를 얻을 수 있다면 많이 할 수 있을 것 같다.

    좋은 웹페이지 즐겨찾기