From a091d161cb0150bbc325a7d3ac9ec23bf8de21cc Mon Sep 17 00:00:00 2001 From: Tanner Collin Date: Sun, 30 Jan 2022 17:05:24 -0700 Subject: [PATCH] Remove EzViz junk --- main.py | 24 +- pyezviz/__init__.py | 27 -- pyezviz/__main__.py | 309 ------------- pyezviz/camera.py | 234 ---------- pyezviz/cas.py | 160 ------- pyezviz/client.py | 919 --------------------------------------- pyezviz/constants.py | 109 ----- pyezviz/exceptions.py | 20 - pyezviz/mqtt.py | 234 ---------- pyezviz/test_cam_rtsp.py | 147 ------- 10 files changed, 2 insertions(+), 2181 deletions(-) delete mode 100644 pyezviz/__init__.py delete mode 100644 pyezviz/__main__.py delete mode 100644 pyezviz/camera.py delete mode 100644 pyezviz/cas.py delete mode 100644 pyezviz/client.py delete mode 100644 pyezviz/constants.py delete mode 100644 pyezviz/exceptions.py delete mode 100644 pyezviz/mqtt.py delete mode 100644 pyezviz/test_cam_rtsp.py diff --git a/main.py b/main.py index 6299c0a..dd9db3f 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,6 @@ import time import json import pygame -from pyezviz import EzvizClient, MQTTClient import secrets @@ -34,16 +33,6 @@ def backdoor(): play_sound(CHIME) play_sound(BACKDOOR) -def on_message(client, userdata, mqtt_message): - message = json.loads(mqtt_message.payload) - #print(json.dumps(mqtt_message, indent=4)) - - if message['alert'] == 'somebody there ring the door': # lmao - logging.info('Received door bell press alert') - if 'E80451501' in message['ext']: - logging.info('Backdoor pressed') - backdoor() - if __name__ == '__main__': logging.info('') logging.info('==========================') @@ -52,14 +41,5 @@ if __name__ == '__main__': pygame.mixer.pre_init(buffer=4096) pygame.mixer.init(buffer=4096) - client = EzvizClient(secrets.EZVIZ_EMAIL, secrets.EZVIZ_PASS, 'apiius.ezvizlife.com') - try: - logging.info('Logging into EZVIZ client...') - token = client.login() - mqtt = MQTTClient(token, on_message) - logging.info('Starting MQTT...') - mqtt.start() - except Exception as exp: - logging.exception(str(exp)) - finally: - client.close_session() + backdoor() + diff --git a/pyezviz/__init__.py b/pyezviz/__init__.py deleted file mode 100644 index d236101..0000000 --- a/pyezviz/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -"""init pyezviz.""" -from pyezviz.camera import EzvizCamera -from pyezviz.cas import EzvizCAS -from pyezviz.client import EzvizClient -from pyezviz.constants import (DefenseModeType, DeviceCatagories, - DeviceSwitchType, SoundMode) -from pyezviz.exceptions import (AuthTestResultFailed, HTTPError, InvalidHost, - InvalidURL, PyEzvizError) -from pyezviz.mqtt import MQTTClient -from pyezviz.test_cam_rtsp import TestRTSPAuth - -__all__ = [ - "EzvizCamera", - "EzvizClient", - "PyEzvizError", - "InvalidURL", - "HTTPError", - "InvalidHost", - "AuthTestResultFailed", - "EzvizCAS", - "MQTTClient", - "DefenseModeType", - "DeviceCatagories", - "DeviceSwitchType", - "SoundMode", - "TestRTSPAuth", -] diff --git a/pyezviz/__main__.py b/pyezviz/__main__.py deleted file mode 100644 index c2221e1..0000000 --- a/pyezviz/__main__.py +++ /dev/null @@ -1,309 +0,0 @@ -"""pyezviz command line.""" -import argparse -import http.client -import json -import logging -import sys - -import pandas -from pyezviz import EzvizCamera, EzvizClient, MQTTClient -from pyezviz.constants import DefenseModeType - - -def main(): - """Initiate arg parser.""" - parser = argparse.ArgumentParser(prog="pyezviz") - parser.add_argument("-u", "--username", required=True, help="Ezviz username") - parser.add_argument("-p", "--password", required=True, help="Ezviz Password") - parser.add_argument( - "-r", - "--region", - required=False, - default="apiieu.ezvizlife.com", - help="Ezviz API region", - ) - parser.add_argument( - "--debug", "-d", action="store_true", help="Print debug messages to stderr" - ) - - subparsers = parser.add_subparsers(dest="action") - - parser_device = subparsers.add_parser( - "devices", help="Play with all devices at once" - ) - parser_device.add_argument( - "device_action", - type=str, - default="status", - help="Device action to perform", - choices=["device", "status", "switch", "connection"], - ) - - parser_home_defence_mode = subparsers.add_parser( - "home_defence_mode", help="Set home defence mode" - ) - - parser_mqtt = subparsers.add_parser("mqtt", help="Set home defence mode") - - parser_home_defence_mode.add_argument( - "--mode", required=False, help="Choose mode", choices=["HOME_MODE", "AWAY_MODE"] - ) - - parser_camera = subparsers.add_parser("camera", help="Camera actions") - parser_camera.add_argument("--serial", required=True, help="camera SERIAL") - - subparsers_camera = parser_camera.add_subparsers(dest="camera_action") - - parser_camera_status = subparsers_camera.add_parser( - "status", help="Get the status of the camera" - ) - parser_camera_move = subparsers_camera.add_parser("move", help="Move the camera") - parser_camera_move.add_argument( - "--direction", - required=True, - help="Direction to move the camera to", - choices=["up", "down", "right", "left"], - ) - parser_camera_move.add_argument( - "--speed", - required=False, - help="Speed of the movement", - default=5, - type=int, - choices=range(1, 10), - ) - - parser_camera_switch = subparsers_camera.add_parser( - "switch", help="Change the status of a switch" - ) - parser_camera_switch.add_argument( - "--switch", - required=True, - help="Switch to switch", - choices=["audio", "ir", "state", "privacy", "sleep", "follow_move"], - ) - parser_camera_switch.add_argument( - "--enable", - required=False, - help="Enable (or not)", - default=1, - type=int, - choices=[0, 1], - ) - - parser_camera_alarm = subparsers_camera.add_parser( - "alarm", help="Configure the camera alarm" - ) - parser_camera_alarm.add_argument( - "--notify", required=False, help="Enable (or not)", type=int, choices=[0, 1] - ) - parser_camera_alarm.add_argument( - "--sound", - required=False, - help="Sound level (2 is silent, 1 intensive, 0 soft)", - type=int, - choices=[0, 1, 2], - ) - parser_camera_alarm.add_argument( - "--sensibility", - required=False, - help="Sensibility level (Non-Cameras = from 1 to 6) or (Cameras = 1 to 100)", - type=int, - choices=range(0, 100), - ) - parser_camera_alarm.add_argument( - "--schedule", required=False, help="Schedule in json format *test*", type=str - ) - - args = parser.parse_args() - - # print("--------------args") - # print("--------------args: %s",args) - # print("--------------args") - - client = EzvizClient(args.username, args.password, args.region) - - if args.debug: - - http.client.HTTPConnection.debuglevel = 5 - # You must initialize logging, otherwise you'll not see debug output. - logging.basicConfig() - logging.getLogger().setLevel(logging.DEBUG) - requests_log = logging.getLogger("requests.packages.urllib3") - requests_log.setLevel(logging.DEBUG) - requests_log.propagate = True - - if args.action == "devices": - - if args.device_action == "device": - try: - client.login() - print(json.dumps(client.get_device(), indent=2)) - except Exception as exp: # pylint: disable=broad-except - print(exp) - finally: - client.close_session() - - elif args.device_action == "status": - try: - client.login() - print( - pandas.DataFrame(client.load_cameras()).to_string( - columns=[ - "serial", - "name", - # version, - # upgrade_available, - "status", - "device_category", - "device_sub_category", - "sleep", - "privacy", - "audio", - "ir_led", - "state_led", - # follow_move, - # alarm_notify, - # alarm_schedules_enabled, - # alarm_sound_mod, - # encrypted, - "local_ip", - "local_rtsp_port", - "detection_sensibility", - "battery_level", - "alarm_schedules_enabled", - "alarm_notify", - "Motion_Trigger", - # last_alarm_time, - # last_alarm_pic - ] - ) - ) - except Exception as exp: # pylint: disable=broad-except - print(exp) - finally: - client.close_session() - - elif args.device_action == "switch": - try: - client.login() - print(json.dumps(client.get_switch(), indent=2)) - except Exception as exp: # pylint: disable=broad-except - print(exp) - finally: - client.close_session() - - elif args.device_action == "connection": - try: - client.login() - print(json.dumps(client.get_connection(), indent=2)) - except Exception as exp: # pylint: disable=broad-except - print(exp) - finally: - client.close_session() - - else: - print("Action not implemented: %s", args.device_action) - - elif args.action == "home_defence_mode": - - if args.mode: - try: - client.login() - print( - json.dumps( - client.api_set_defence_mode( - getattr(DefenseModeType, args.mode).value - ), - indent=2, - ) - ) - except Exception as exp: # pylint: disable=broad-except - print(exp) - finally: - client.close_session() - - elif args.action == "mqtt": - - try: - token = client.login() - mqtt = MQTTClient(token) - mqtt.start() - - except Exception as exp: # pylint: disable=broad-except - print(exp) - finally: - client.close_session() - - elif args.action == "camera": - - # load camera object - try: - client.login() - camera = EzvizCamera(client, args.serial) - logging.debug("Camera loaded") - except Exception as exp: # pylint: disable=broad-except - print(exp) - client.close_session() - - if args.camera_action == "move": - try: - camera.move(args.direction, args.speed) - except Exception as exp: # pylint: disable=broad-except - print(exp) - finally: - client.close_session() - - elif args.camera_action == "status": - try: - print(json.dumps(camera.status(), indent=2)) - - except Exception as exp: # pylint: disable=broad-except - print(exp) - finally: - client.close_session() - - elif args.camera_action == "switch": - try: - if args.switch == "ir": - camera.switch_device_ir_led(args.enable) - elif args.switch == "state": - print(args.enable) - camera.switch_device_state_led(args.enable) - elif args.switch == "audio": - camera.switch_device_audio(args.enable) - elif args.switch == "privacy": - camera.switch_privacy_mode(args.enable) - elif args.switch == "sleep": - camera.switch_sleep_mode(args.enable) - elif args.switch == "follow_move": - camera.switch_follow_move(args.enable) - except Exception as exp: # pylint: disable=broad-except - print(exp) - finally: - client.close_session() - - elif args.camera_action == "alarm": - try: - if args.sound is not None: - camera.alarm_sound(args.sound) - if args.notify is not None: - camera.alarm_notify(args.notify) - if args.sensibility is not None: - camera.alarm_detection_sensibility(args.sensibility) - if args.schedule is not None: - camera.change_defence_schedule(args.schedule) - except Exception as exp: # pylint: disable=broad-except - print(exp) - finally: - client.close_session() - - else: - print("Action not implemented, try running with -h switch for help") - - else: - print("Action not implemented: %s", args.action) - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/pyezviz/camera.py b/pyezviz/camera.py deleted file mode 100644 index 2632314..0000000 --- a/pyezviz/camera.py +++ /dev/null @@ -1,234 +0,0 @@ -"""pyezviz camera api.""" -from __future__ import annotations - -import datetime -from typing import Any - -from pyezviz.constants import DeviceCatagories, DeviceSwitchType, SoundMode -from pyezviz.exceptions import PyEzvizError - - -class EzvizCamera: - """Initialize Ezviz camera object.""" - - def __init__(self, client, serial: str, device_obj: dict | None = None) -> None: - """Initialize the camera object.""" - self._client = client - self._serial = serial - self._switch: dict[int, bool] = {} - self._alarmmotiontrigger: dict[str, Any] = {} - self._device = device_obj or {} - self.alarmlist_time = None - self.alarmlist_pic = None - - def load(self) -> None: - """Update device info for camera serial.""" - - if self._device is None: - self._device = self._client.get_all_per_serial_infos(self._serial) - - self._alarm_list() - - self._switch_status() - - def _switch_status(self) -> None: - """load device switches""" - - if self._device.get("switchStatusInfos"): - for switch in self._device["switchStatusInfos"]: - self._switch.update({switch["type"]: switch["enable"]}) - - else: - self._switch = {0: False} - - def _detection_sensibility(self) -> Any: - """load detection sensibility""" - result = "Unknown" - - if self._switch.get(DeviceSwitchType.AUTO_SLEEP.value) is not True: - if ( - self._device["deviceInfos"]["deviceCategory"] - == DeviceCatagories.BATTERY_CAMERA_DEVICE_CATEGORY.value - ): - result = self._client.get_detection_sensibility( - self._serial, - "3", - ) - else: - result = self._client.get_detection_sensibility(self._serial) - - if self._switch.get(DeviceSwitchType.AUTO_SLEEP.value) is True: - result = "Hibernate" - - return result - - def _alarm_list(self) -> None: - """get last alarm info for this camera's self._serial""" - alarmlist = self._client.get_alarminfo(self._serial) - - if alarmlist.get("page").get("totalResults") > 0: - self.alarmlist_time = alarmlist.get("alarms")[0].get("alarmStartTimeStr") - self.alarmlist_pic = alarmlist.get("alarms")[0].get("picUrl") - - if self.alarmlist_time: - self._motion_trigger(self.alarmlist_time) - - def _local_ip(self) -> str: - """ "Fix empty ip value for certain cameras""" - if self._device.get("wifiInfos"): - return self._device["wifiInfos"].get("address") - - # Seems to return none or 0.0.0.0 on some. This should run 2nd. - if self._device.get("connectionInfos"): - if self._device["connectionInfos"].get("localIp"): - return self._device["connectionInfos"]["localIp"] - - return "0.0.0.0" - - def _motion_trigger(self, alarmlist_time: str) -> None: - """Create motion sensor based on last alarm time.""" - now = datetime.datetime.now().replace(microsecond=0) - alarm_trigger_active = 0 - today_date = datetime.date.today() - fix = datetime.datetime.now().replace(microsecond=0) - - # Need to handle error if time format different - fix = datetime.datetime.strptime( - alarmlist_time.replace("Today", str(today_date)), - "%Y-%m-%d %H:%M:%S", - ) - - # returns a timedelta object - timepassed = now - fix - - if timepassed < datetime.timedelta(seconds=60): - alarm_trigger_active = 1 - - self._alarmmotiontrigger = { - "alarm_trigger_active": alarm_trigger_active, - "timepassed": timepassed.total_seconds(), - } - - def _is_alarm_schedules_enabled(self) -> bool | None: - """Checks if alarm schedules enabled""" - time_plans = None - - if self._device.get("timePlanInfos"): - time_plans = [ - item for item in self._device["timePlanInfos"] if item.get("type") == 2 - ] - - if time_plans: - return bool(time_plans[0].get("enable")) - - return None - - def status(self) -> dict[Any, Any]: - """Return the status of the camera.""" - self.load() - - return { - "serial": self._serial, - "name": self._device["deviceInfos"].get("name"), - "version": self._device["deviceInfos"].get("version"), - "upgrade_available": self._device["statusInfos"].get("upgradeAvailable"), - "status": self._device["deviceInfos"].get("status"), - "device_category": self._device["deviceInfos"].get("deviceCategory"), - "device_sub_category": self._device["deviceInfos"].get("deviceSubCategory"), - "sleep": self._switch.get(DeviceSwitchType.SLEEP.value) - or self._switch.get(DeviceSwitchType.AUTO_SLEEP.value), - "privacy": self._switch.get(DeviceSwitchType.PRIVACY.value), - "audio": self._switch.get(DeviceSwitchType.SOUND.value), - "ir_led": self._switch.get(DeviceSwitchType.INFRARED_LIGHT.value), - "state_led": self._switch.get(DeviceSwitchType.LIGHT.value), - "follow_move": self._switch.get(DeviceSwitchType.MOBILE_TRACKING.value), - "alarm_notify": bool(self._device["statusInfos"].get("globalStatus")), - "alarm_schedules_enabled": self._is_alarm_schedules_enabled(), - "alarm_sound_mod": SoundMode( - self._device["statusInfos"].get("alarmSoundMode") - ).name, - "encrypted": bool(self._device["statusInfos"].get("isEncrypted")), - "local_ip": self._local_ip(), - "wan_ip": self._device.get("connectionInfos", {}).get("netIp", "0.0.0.0"), - "local_rtsp_port": self._device["connectionInfos"].get( - "localRtspPort", "554" - ), - "supported_channels": self._device["deviceInfos"].get("channelNumber"), - "detection_sensibility": self._detection_sensibility(), - "battery_level": self._device["statusInfos"] - .get("optionals", {}) - .get("powerRemaining"), - "PIR_Status": self._device["statusInfos"].get("pirStatus"), - "Motion_Trigger": self._alarmmotiontrigger.get("alarm_trigger_active"), - "Seconds_Last_Trigger": self._alarmmotiontrigger.get("timepassed"), - "last_alarm_time": self.alarmlist_time, - "last_alarm_pic": self.alarmlist_pic, - "wifiInfos": self._device.get("wifiInfos"), - "switches": self._switch, - } - - def move(self, direction: str, speed: int = 5) -> bool: - """Move camera.""" - if direction not in ["right", "left", "down", "up"]: - raise PyEzvizError(f"Invalid direction: {direction} ") - - # launch the start command - self._client.ptz_control(str(direction).upper(), self._serial, "START", speed) - # launch the stop command - self._client.ptz_control(str(direction).upper(), self._serial, "STOP", speed) - - return True - - def alarm_notify(self, enable: int) -> bool: - """Enable/Disable camera notification when movement is detected.""" - return self._client.set_camera_defence(self._serial, enable) - - def alarm_sound(self, sound_type: int) -> bool: - """Enable/Disable camera sound when movement is detected.""" - # we force enable = 1 , to make sound... - return self._client.alarm_sound(self._serial, sound_type, 1) - - def alarm_detection_sensibility(self, sensibility, type_value=0): - """Enable/Disable camera sound when movement is detected.""" - # we force enable = 1 , to make sound... - return self._client.detection_sensibility(self._serial, sensibility, type_value) - - def switch_device_audio(self, enable=0): - """Switch audio status on a device.""" - return self._client.switch_status( - self._serial, DeviceSwitchType.SOUND.value, enable - ) - - def switch_device_state_led(self, enable=0): - """Switch led status on a device.""" - return self._client.switch_status( - self._serial, DeviceSwitchType.LIGHT.value, enable - ) - - def switch_device_ir_led(self, enable=0): - """Switch ir status on a device.""" - return self._client.switch_status( - self._serial, DeviceSwitchType.INFRARED_LIGHT.value, enable - ) - - def switch_privacy_mode(self, enable=0): - """Switch privacy mode on a device.""" - return self._client.switch_status( - self._serial, DeviceSwitchType.PRIVACY.value, enable - ) - - def switch_sleep_mode(self, enable=0): - """Switch sleep mode on a device.""" - return self._client.switch_status( - self._serial, DeviceSwitchType.SLEEP.value, enable - ) - - def switch_follow_move(self, enable=0): - """Switch follow move.""" - return self._client.switch_status( - self._serial, DeviceSwitchType.MOBILE_TRACKING.value, enable - ) - - def change_defence_schedule(self, schedule, enable=0): - """Change defence schedule. Requires json formatted schedules.""" - return self._client.api_set_defence_schdule(self._serial, schedule, enable) diff --git a/pyezviz/cas.py b/pyezviz/cas.py deleted file mode 100644 index c48ed05..0000000 --- a/pyezviz/cas.py +++ /dev/null @@ -1,160 +0,0 @@ -"""Ezviz CAS API Functions.""" - -import random -import socket -import ssl -from io import BytesIO -from itertools import cycle - -import xmltodict -from Crypto.Cipher import AES -from pyezviz.constants import FEATURE_CODE, XOR_KEY -from pyezviz.exceptions import InvalidHost - - -def xor_enc_dec(msg, xor_key=XOR_KEY): - """Xor encodes camera serial""" - with BytesIO(msg) as stream: - xor_msg = bytes(a ^ b for a, b in zip(stream.read(), cycle(xor_key))) - return xor_msg - - -class EzvizCAS: - """Ezviz CAS server client.""" - - def __init__(self, token): - """Initialize the client object.""" - self._session = None - self._token = token or { - "session_id": None, - "rf_session_id": None, - "username": None, - "api_url": "apiieu.ezvizlife.com", - } - self._service_urls = token["service_urls"] - - def cas_get_encryption(self, devserial): - """Fetch encryption code from ezviz cas server""" - - # Random hex 64 characters long. - rand_hex = random.randrange(10 ** 80) - rand_hex = "%064x" % rand_hex - rand_hex = rand_hex[:64] - - payload = ( - f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00" - f"\x00\x02" # Check or order? - f"\x00\x00\x00\x00\x00\x00 " - f"\x01" # Check or order? - f"\x00\x00\x00\x00\x00\x00\x02\t\x00\x00\x00\x00" - f'\n\n\t' - f'{self._token["session_id"]}' - f"\n\t{FEATURE_CODE}\n\t" - f"{devserial}" - f"\n\t0\n\n" - ).encode("latin1") - - payload_end_padding = rand_hex.encode("latin1") - - context = ssl.SSLContext(cert_reqs=ssl.CERT_NONE) - - # Create a TCP/IP socket - my_socket = socket.create_connection( - (self._service_urls["sysConf"][15], self._service_urls["sysConf"][16]) - ) - my_socket = context.wrap_socket( - my_socket, server_hostname=self._service_urls["sysConf"][15] - ) - - # Get CAS Encryption Key - try: - my_socket.send(payload + payload_end_padding) - response = my_socket.recv(1024) - print(f"Get Encryption Key: {response}") - - except (socket.gaierror, ConnectionRefusedError) as err: - raise InvalidHost("Invalid IP or Hostname") from err - - finally: - my_socket.close() - - # Trim header, tail and convert xml to dict. - response = response[32::] - response = response[:-32:] - response = xmltodict.parse(response) - - return response - - def set_camera_defence_state(self, serial, enable=1): - """Enable alarm notifications.""" - - # Random hex 64 characters long. - rand_hex = random.randrange(10 ** 80) - rand_hex = "%064x" % rand_hex - rand_hex = rand_hex[:64] - - payload = ( - f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00" - f"\x00\x14" # Check or order? - f"\x00\x00\x00\x00\x00\x00 " - f"\x05" - f"\x00\x00\x00\x00\x00\x00\x02\xd0\x00\x00\x01\xe0" - f'\n\n\t' - f'\n\t' - f'\n\n' - f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00" - f"\x00\x13" - f"\x00\x00\x00\x00\x00\x000\x0f\xff\xff\xff\xff" - f"\x00\x00\x00\xb0\x00\x00\x00\x00" - ).encode("latin1") - - payload_end_padding = rand_hex.encode("latin1") - - # xor camera serial - xor_cam_serial = xor_enc_dec(serial.encode("latin1")) - - defence_msg_string = ( - f'{xor_cam_serial.decode()}2+,*xdv.0" ' - f'encoding="utf-8"?>\n' - f"\n" - f"\tABCDEFG\n" - f'\t\n' - f"\n" - f"\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10" - ).encode("latin1") - - context = ssl.SSLContext(cert_reqs=ssl.CERT_NONE) - - # Create a TCP/IP socket - my_socket = socket.create_connection( - (self._service_urls["sysConf"][15], self._service_urls["sysConf"][16]) - ) - my_socket = context.wrap_socket( - my_socket, server_hostname=self._service_urls["sysConf"][15] - ) - - cas_client = self.cas_get_encryption(serial) - - aes_key = cas_client["Response"]["Session"]["@Key"].encode("latin1") - iv_value = ( - f"{serial}{cas_client['Response']['Session']['@OperationCode']}".encode( - "latin1" - ) - ) - - # Message encryption - cipher = AES.new(aes_key, AES.MODE_CBC, iv_value) - - try: - defence_msg_string = cipher.encrypt(defence_msg_string) - my_socket.send(payload + defence_msg_string + payload_end_padding) - print(f"Set camera response: {my_socket.recv()}") - - except (socket.gaierror, ConnectionRefusedError) as err: - raise InvalidHost("Invalid IP or Hostname") from err - - finally: - my_socket.close() - - return True diff --git a/pyezviz/client.py b/pyezviz/client.py deleted file mode 100644 index 4b28988..0000000 --- a/pyezviz/client.py +++ /dev/null @@ -1,919 +0,0 @@ -"""Ezviz API.""" -from __future__ import annotations - -import hashlib -import logging -from typing import Any -from uuid import uuid4 - -import requests - -from pyezviz.camera import EzvizCamera -from pyezviz.cas import EzvizCAS -from pyezviz.constants import ( - DEFAULT_TIMEOUT, - FEATURE_CODE, - MAX_RETRIES, - DefenseModeType, - DeviceCatagories, -) -from pyezviz.exceptions import HTTPError, InvalidURL, PyEzvizError - -API_ENDPOINT_CLOUDDEVICES = "/api/cloud/v2/cloudDevices/getAll" -API_ENDPOINT_PAGELIST = "/v3/userdevices/v1/devices/pagelist" -API_ENDPOINT_DEVICES = "/v3/devices/" -API_ENDPOINT_LOGIN = "/v3/users/login/v5" -API_ENDPOINT_REFRESH_SESSION_ID = "/v3/apigateway/login" -API_ENDPOINT_SWITCH_STATUS = "/switchStatus" -API_ENDPOINT_PTZCONTROL = "/ptzControl" -API_ENDPOINT_ALARM_SOUND = "/alarm/sound" -API_ENDPOINT_DETECTION_SENSIBILITY = "/api/device/configAlgorithm" -API_ENDPOINT_DETECTION_SENSIBILITY_GET = "/api/device/queryAlgorithmConfig" -API_ENDPOINT_ALARMINFO_GET = "/v3/alarms/v2/advanced" -API_ENDPOINT_SET_DEFENCE_SCHEDULE = "/api/device/defence/plan2" -API_ENDPOINT_SWITCH_DEFENCE_MODE = "/v3/userdevices/v1/group/switchDefenceMode" -API_ENDPOINT_SWITCH_SOUND_ALARM = "/sendAlarm" -API_ENDPOINT_SERVER_INFO = "/v3/configurations/system/info" - - -class EzvizClient: - """Initialize api client object.""" - - def __init__( - self, - account: str | None = None, - password: str | None = None, - url: str = "apiieu.ezvizlife.com", - timeout: int = DEFAULT_TIMEOUT, - token: dict | None = None, - ) -> None: - """Initialize the client object.""" - self.account = account - self.password = password - self._session = requests.session() - # Set Android generic user agent. - self._session.headers.update({"User-Agent": "okhttp/3.12.1"}) - self._token = token or { - "session_id": None, - "rf_session_id": None, - "username": None, - "api_url": url, - } - self._timeout = timeout - - def _login(self, account: str, password: str) -> dict[Any, Any]: - """Login to Ezviz API.""" - - # Region code to url. - if len(self._token["api_url"].split(".")) == 1: - self._token["api_url"] = "apii" + self._token["api_url"] + ".ezvizlife.com" - - # Ezviz API sends md5 of password - temp = hashlib.md5() - temp.update(password.encode("utf-8")) - md5pass = temp.hexdigest() - payload = { - "account": account, - "password": md5pass, - "featureCode": FEATURE_CODE, - "msgType": "0", - "cuName": "SFRDIDEw", - } - - try: - req = self._session.post( - "https://" + self._token["api_url"] + API_ENDPOINT_LOGIN, - allow_redirects=False, - headers={ - "clientType": "3", - "customno": "1000001", - "clientNo": "web_site", - "appId": "ys7", - "lang": "en", - }, - data=payload, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.ConnectionError as err: - raise InvalidURL("A Invalid URL or Proxy error occured") from err - - except requests.HTTPError as err: - raise HTTPError from err - - try: - json_result = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - if json_result["meta"]["code"] == 1100: - self._token["api_url"] = json_result["loginArea"]["apiDomain"] - print("Region incorrect!") - print(f"Your region url: {self._token['api_url']}") - self.close_session() - return self.login() - - if json_result["meta"]["code"] == 1013: - raise PyEzvizError("Incorrect Username.") - - if json_result["meta"]["code"] == 1014: - raise PyEzvizError("Incorrect Password.") - - if json_result["meta"]["code"] == 1015: - raise PyEzvizError("The user is locked.") - - self._token["session_id"] = str(json_result["loginSession"]["sessionId"]) - self._token["rf_session_id"] = str(json_result["loginSession"]["rfSessionId"]) - self._token["username"] = str(json_result["loginUser"]["username"]) - self._token["api_url"] = str(json_result["loginArea"]["apiDomain"]) - if not self._token["session_id"]: - raise PyEzvizError( - f"Login error: Please check your username/password: {req.text}" - ) - - self._token["service_urls"] = self.get_service_urls() - - return self._token - - def get_service_urls(self) -> Any: - """Get Ezviz service urls.""" - - try: - req = self._session.get( - f"https://{self._token['api_url']}{API_ENDPOINT_SERVER_INFO}", - headers={ - "sessionId": self._token["session_id"], - "featureCode": FEATURE_CODE, - }, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.ConnectionError as err: - raise InvalidURL("A Invalid URL or Proxy error occured") from err - - except requests.HTTPError as err: - if err.response.status_code == 401: - # session is wrong, need to relogin - self.login() - - raise HTTPError from err - - if not req.text: - raise PyEzvizError("No data") - - try: - json_output = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - if json_output.get("meta").get("code") != 200: - logging.info("Json request error") - - service_urls = json_output["systemConfigInfo"] - service_urls["sysConf"] = service_urls["sysConf"].split("|") - - return service_urls - - def _api_get_pagelist( - self, page_filter: str, json_key: str | None = None, max_retries: int = 0 - ) -> Any: - """Get data from pagelist API.""" - - if max_retries > MAX_RETRIES: - raise PyEzvizError("Can't gather proper data. Max retries exceeded.") - - if page_filter is None: - raise PyEzvizError("Trying to call get_pagelist without filter") - - try: - req = self._session.get( - "https://" + self._token["api_url"] + API_ENDPOINT_PAGELIST, - headers={"sessionId": self._token["session_id"]}, - params={"filter": page_filter}, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.HTTPError as err: - if err.response.status_code == 401: - # session is wrong, need to relogin - self.login() - return self._api_get_pagelist(page_filter, json_key, max_retries + 1) - - raise HTTPError from err - - if not req.text: - raise PyEzvizError("No data") - - try: - json_output = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - if json_output.get("meta").get("code") != 200: - # session is wrong, need to relogin - self.login() - logging.info( - "Json request error, relogging (max retries: %s)", str(max_retries) - ) - return self._api_get_pagelist(page_filter, json_key, max_retries + 1) - - if json_key is None: - json_result = json_output - else: - json_result = json_output[json_key] - - if not json_result: - # session is wrong, need to relogin - self.login() - logging.info( - "Impossible to load the devices, here is the returned response: %s", - str(req.text), - ) - return self._api_get_pagelist(page_filter, json_key, max_retries + 1) - - return json_result - - def get_alarminfo(self, serial: str, max_retries: int = 0) -> Any: - """Get data from alarm info API.""" - if max_retries > MAX_RETRIES: - raise PyEzvizError("Can't gather proper data. Max retries exceeded.") - - params: dict[str, int | str] = { - "deviceSerials": serial, - "queryType": -1, - "limit": 1, - "stype": -1, - } - - try: - req = self._session.get( - "https://" + self._token["api_url"] + API_ENDPOINT_ALARMINFO_GET, - headers={"sessionId": self._token["session_id"]}, - params=params, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.HTTPError as err: - if err.response.status_code == 401: - # session is wrong, need to relogin - self.login() - return self.get_alarminfo(serial, max_retries + 1) - - raise HTTPError from err - - if req.text == "": - raise PyEzvizError("No data") - - try: - json_output = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - return json_output - - def _switch_status( - self, serial: str, status_type: int, enable: int, max_retries: int = 0 - ) -> bool: - """Switch status on a device.""" - if max_retries > MAX_RETRIES: - raise PyEzvizError("Can't gather proper data. Max retries exceeded.") - - try: - req = self._session.put( - "https://" - + self._token["api_url"] - + API_ENDPOINT_DEVICES - + serial - + "/1/1/" - + str(status_type) - + API_ENDPOINT_SWITCH_STATUS, - headers={"sessionId": self._token["session_id"]}, - data={ - "enable": enable, - "serial": serial, - "channelNo": "1", - "type": status_type, - }, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.HTTPError as err: - if err.response.status_code == 401: - # session is wrong, need to relogin - self.login() - return self._switch_status(serial, status_type, enable, max_retries + 1) - - raise HTTPError from err - - try: - json_output = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - if json_output.get("meta").get("code") != 200: - raise PyEzvizError( - f"Could not set the switch: Got {req.status_code} : {req.text})" - ) - - return True - - def sound_alarm(self, serial: str, enable: int = 1, max_retries: int = 0) -> bool: - """Sound alarm on a device.""" - if max_retries > MAX_RETRIES: - raise PyEzvizError("Can't gather proper data. Max retries exceeded.") - - try: - req = self._session.put( - "https://" - + self._token["api_url"] - + API_ENDPOINT_DEVICES - + serial - + "/0" - + API_ENDPOINT_SWITCH_SOUND_ALARM, - headers={"sessionId": self._token["session_id"]}, - data={ - "enable": enable, - }, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.HTTPError as err: - if err.response.status_code == 401: - # session is wrong, need to relogin - self.login() - return self.sound_alarm(serial, enable, max_retries + 1) - - raise HTTPError from err - - try: - json_output = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - if json_output.get("meta").get("code") != 200: - raise PyEzvizError( - f"Could not set the alarm sound: Got {req.status_code} : {req.text})" - ) - - return True - - def load_cameras(self) -> dict[Any, Any]: - """Load and return all cameras objects.""" - - devices = self._get_all_device_infos() - cameras = {} - supported_categories = [ - DeviceCatagories.COMMON_DEVICE_CATEGORY.value, - DeviceCatagories.CAMERA_DEVICE_CATEGORY.value, - DeviceCatagories.BATTERY_CAMERA_DEVICE_CATEGORY.value, - DeviceCatagories.DOORBELL_DEVICE_CATEGORY.value, - DeviceCatagories.BASE_STATION_DEVICE_CATEGORY.value, - ] - - for device, data in devices.items(): - if data["deviceInfos"]["deviceCategory"] in supported_categories: - # Add support for connected HikVision cameras - if ( - data["deviceInfos"]["deviceCategory"] - == DeviceCatagories.COMMON_DEVICE_CATEGORY.value - and not data["deviceInfos"]["hik"] - ): - continue - - # Create camera object - - camera = EzvizCamera(self, device, data) - - camera.load() - cameras[device] = camera.status() - - return cameras - - def _get_all_device_infos(self) -> dict[Any, Any]: - """Load all devices and build dict per device serial.""" - - devices = self._get_page_list() - result: dict[Any, Any] = {} - - for device in devices["deviceInfos"]: - result[device["deviceSerial"]] = {} - result[device["deviceSerial"]]["deviceInfos"] = device - result[device["deviceSerial"]]["connectionInfos"] = devices.get( - "connectionInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["p2pInfos"] = devices.get("p2pInfos").get( - device["deviceSerial"] - ) - result[device["deviceSerial"]]["alarmNodisturbInfos"] = devices.get( - "alarmNodisturbInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["kmsInfos"] = devices.get("kmsInfos").get( - device["deviceSerial"] - ) - result[device["deviceSerial"]]["timePlanInfos"] = devices.get( - "timePlanInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["statusInfos"] = devices.get( - "statusInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["wifiInfos"] = devices.get("wifiInfos").get( - device["deviceSerial"] - ) - result[device["deviceSerial"]]["switchStatusInfos"] = devices.get( - "switchStatusInfos" - ).get(device["deviceSerial"]) - for item in devices["cameraInfos"]: - if item["deviceSerial"] == device["deviceSerial"]: - result[device["deviceSerial"]]["cameraInfos"] = item - - return result - - def get_all_per_serial_infos(self, serial: str) -> dict[Any, Any] | None: - """Load all devices and build dict per device serial.""" - - if serial is None: - raise PyEzvizError("Need serial number for this query") - - devices = self._get_page_list() - result: dict[str, dict] = {serial: {}} - - for device in devices["deviceInfos"]: - if device["deviceSerial"] == serial: - result[device["deviceSerial"]]["deviceInfos"] = device - result[device["deviceSerial"]]["deviceInfos"] = device - result[device["deviceSerial"]]["connectionInfos"] = devices.get( - "connectionInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["p2pInfos"] = devices.get( - "p2pInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["alarmNodisturbInfos"] = devices.get( - "alarmNodisturbInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["kmsInfos"] = devices.get( - "kmsInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["timePlanInfos"] = devices.get( - "timePlanInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["statusInfos"] = devices.get( - "statusInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["wifiInfos"] = devices.get( - "wifiInfos" - ).get(device["deviceSerial"]) - result[device["deviceSerial"]]["switchStatusInfos"] = devices.get( - "switchStatusInfos" - ).get(device["deviceSerial"]) - for item in devices["cameraInfos"]: - if item["deviceSerial"] == device["deviceSerial"]: - result[device["deviceSerial"]]["cameraInfos"] = item - - return result.get(serial) - - def ptz_control( - self, command: str, serial: str, action: str, speed: int = 5 - ) -> Any: - """PTZ Control by API.""" - if command is None: - raise PyEzvizError("Trying to call ptzControl without command") - if action is None: - raise PyEzvizError("Trying to call ptzControl without action") - - try: - req = self._session.put( - "https://" - + self._token["api_url"] - + API_ENDPOINT_DEVICES - + serial - + API_ENDPOINT_PTZCONTROL, - data={ - "command": command, - "action": action, - "channelNo": "1", - "speed": speed, - "uuid": str(uuid4()), - "serial": serial, - }, - headers={"sessionId": self._token["session_id"], "clientType": "1"}, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.HTTPError as err: - raise HTTPError from err - - return req.text - - def login(self) -> dict[Any, Any]: - """Get or refresh ezviz login token.""" - if self._token["session_id"] and self._token["rf_session_id"]: - try: - req = self._session.put( - "https://" - + self._token["api_url"] - + API_ENDPOINT_REFRESH_SESSION_ID, - data={ - "refreshSessionId": self._token["rf_session_id"], - "featureCode": FEATURE_CODE, - }, - headers={"sessionId": self._token["session_id"]}, - timeout=self._timeout, - ) - req.raise_for_status() - - except requests.HTTPError as err: - raise HTTPError from err - - try: - json_result = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - self._token["session_id"] = str(json_result["sessionInfo"]["sessionId"]) - self._token["rf_session_id"] = str( - json_result["sessionInfo"]["refreshSessionId"] - ) - if not self._token["session_id"]: - raise PyEzvizError(f"Relogin required: {req.text}") - - if not self._token.get("service_urls"): - self._token["service_urls"] = self.get_service_urls() - - return self._token - - if self.account and self.password: - return self._login(account=self.account, password=self.password) - - raise PyEzvizError("Login with account and password required") - - def set_camera_defence(self, serial: str, enable: int) -> bool: - """Enable/Disable motion detection on camera.""" - cas_client = EzvizCAS(self._token) - cas_client.set_camera_defence_state(serial, enable) - - return True - - def api_set_defence_schedule( - self, serial: str, schedule: str, enable: int, max_retries: int = 0 - ) -> bool: - """Set defence schedules.""" - if max_retries > MAX_RETRIES: - raise PyEzvizError("Can't gather proper data. Max retries exceeded.") - - schedulestring = ( - '{"CN":0,"EL":' - + str(enable) - + ',"SS":"' - + serial - + '","WP":[' - + schedule - + "]}]}" - ) - try: - req = self._session.post( - "https://" + self._token["api_url"] + API_ENDPOINT_SET_DEFENCE_SCHEDULE, - headers={"sessionId": self._token["session_id"]}, - data={ - "devTimingPlan": schedulestring, - }, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.HTTPError as err: - if err.response.status_code == 401: - # session is wrong, need to relogin - self.login() - return self.api_set_defence_schedule( - serial, schedule, enable, max_retries + 1 - ) - - raise HTTPError from err - - try: - json_output = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - if json_output.get("resultCode") != 0: - raise PyEzvizError( - f"Could not set the schedule: Got {req.status_code} : {req.text})" - ) - - return True - - def api_set_defence_mode(self, mode: DefenseModeType, max_retries: int = 0) -> bool: - """Set defence mode.""" - if max_retries > MAX_RETRIES: - raise PyEzvizError("Can't gather proper data. Max retries exceeded.") - - try: - req = self._session.post( - "https://" + self._token["api_url"] + API_ENDPOINT_SWITCH_DEFENCE_MODE, - headers={"sessionId": self._token["session_id"]}, - data={ - "groupId": -1, - "mode": mode, - }, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.HTTPError as err: - if err.response.status_code == 401: - # session is wrong, need to relogin - self.login() - return self.api_set_defence_mode(mode, max_retries + 1) - - raise HTTPError from err - - try: - json_output = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - if json_output.get("meta").get("code") != 200: - raise PyEzvizError( - f"Could not set defence mode: Got {req.status_code} : {req.text})" - ) - - return True - - def detection_sensibility( - self, - serial: str, - sensibility: int = 3, - type_value: int = 3, - max_retries: int = 0, - ) -> bool | str: - """Set detection sensibility.""" - if max_retries > MAX_RETRIES: - raise PyEzvizError("Can't gather proper data. Max retries exceeded.") - - if sensibility not in [0, 1, 2, 3, 4, 5, 6] and type_value == 0: - raise PyEzvizError( - "Unproper sensibility for type 0 (should be within 1 to 6)." - ) - - try: - req = self._session.post( - "https://" - + self._token["api_url"] - + API_ENDPOINT_DETECTION_SENSIBILITY, - headers={"sessionId": self._token["session_id"]}, - data={ - "subSerial": serial, - "type": type_value, - "channelNo": "1", - "value": sensibility, - }, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.HTTPError as err: - if err.response.status_code == 401: - # session is wrong, need to re-log-in - self.login() - return self.detection_sensibility( - serial, sensibility, type_value, max_retries + 1 - ) - - raise HTTPError from err - - try: - response_json = req.json() - - except ValueError as err: - raise PyEzvizError("Could not decode response:" + str(err)) from err - - if response_json["resultCode"] and response_json["resultCode"] != "0": - return "Unknown value" - - return True - - def get_detection_sensibility( - self, serial: str, type_value: str = "0", max_retries: int = 0 - ) -> Any: - """Get detection sensibility notifications.""" - if max_retries > MAX_RETRIES: - raise PyEzvizError("Can't gather proper data. Max retries exceeded.") - - try: - req = self._session.post( - "https://" - + self._token["api_url"] - + API_ENDPOINT_DETECTION_SENSIBILITY_GET, - headers={"sessionId": self._token["session_id"]}, - data={ - "subSerial": serial, - }, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.HTTPError as err: - if err.response.status_code == 401: - # session is wrong, need to re-log-in. - self.login() - return self.get_detection_sensibility( - serial, type_value, max_retries + 1 - ) - - raise HTTPError from err - - try: - response_json = req.json() - - except ValueError as err: - raise PyEzvizError("Could not decode response:" + str(err)) from err - - if response_json["resultCode"] != "0": - return "Unknown" - - if response_json["algorithmConfig"]["algorithmList"]: - for idx in response_json["algorithmConfig"]["algorithmList"]: - if idx["type"] == type_value: - return idx["value"] - - return "Unknown" - - # soundtype: 0 = normal, 1 = intensive, 2 = disabled ... don't ask me why... - def alarm_sound( - self, serial: str, sound_type: int, enable: int = 1, max_retries: int = 0 - ) -> bool: - """Enable alarm sound by API.""" - if max_retries > MAX_RETRIES: - raise PyEzvizError("Can't gather proper data. Max retries exceeded.") - - if sound_type not in [0, 1, 2]: - raise PyEzvizError( - "Invalid sound_type, should be 0,1,2: " + str(sound_type) - ) - - try: - req = self._session.put( - "https://" - + self._token["api_url"] - + API_ENDPOINT_DEVICES - + serial - + API_ENDPOINT_ALARM_SOUND, - headers={"sessionId": self._token["session_id"]}, - data={ - "enable": enable, - "soundType": sound_type, - "voiceId": "0", - "deviceSerial": serial, - }, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.HTTPError as err: - if err.response.status_code == 401: - # session is wrong, need to re-log-in - self.login() - return self.alarm_sound(serial, sound_type, enable, max_retries + 1) - - raise HTTPError from err - - return True - - def switch_status(self, serial: str, status_type: int, enable: int = 0) -> bool: - """Switch status of a device.""" - return self._switch_status(serial, status_type, enable) - - def _get_page_list(self) -> Any: - """Get ezviz device info broken down in sections.""" - return self._api_get_pagelist( - page_filter="CLOUD, TIME_PLAN, CONNECTION, SWITCH," - "STATUS, WIFI, NODISTURB, KMS, P2P," - "TIME_PLAN, CHANNEL, VTM, DETECTOR," - "FEATURE, UPGRADE, VIDEO_QUALITY, QOS", - json_key=None, - ) - - def get_device(self) -> Any: - """Get ezviz devices filter.""" - return self._api_get_pagelist(page_filter="CLOUD", json_key="deviceInfos") - - def get_connection(self) -> Any: - """Get ezviz connection infos filter.""" - return self._api_get_pagelist( - page_filter="CONNECTION", json_key="connectionInfos" - ) - - def _get_status(self) -> Any: - """Get ezviz status infos filter.""" - return self._api_get_pagelist(page_filter="STATUS", json_key="statusInfos") - - def get_switch(self) -> Any: - """Get ezviz switch infos filter.""" - return self._api_get_pagelist( - page_filter="SWITCH", json_key="switchStatusInfos" - ) - - def _get_wifi(self) -> Any: - """Get ezviz wifi infos filter.""" - return self._api_get_pagelist(page_filter="WIFI", json_key="wifiInfos") - - def _get_nodisturb(self) -> Any: - """Get ezviz nodisturb infos filter.""" - return self._api_get_pagelist( - page_filter="NODISTURB", json_key="alarmNodisturbInfos" - ) - - def _get_p2p(self) -> Any: - """Get ezviz P2P infos filter.""" - return self._api_get_pagelist(page_filter="P2P", json_key="p2pInfos") - - def _get_kms(self) -> Any: - """Get ezviz KMS infos filter.""" - return self._api_get_pagelist(page_filter="KMS", json_key="kmsInfos") - - def _get_time_plan(self) -> Any: - """Get ezviz TIME_PLAN infos filter.""" - return self._api_get_pagelist(page_filter="TIME_PLAN", json_key="timePlanInfos") - - def close_session(self) -> None: - """Clear current session.""" - if self._session: - self._session.close() - - self._session = requests.session() - self._session.headers.update( - {"User-Agent": "okhttp/3.12.1"} - ) # Android generic user agent. diff --git a/pyezviz/constants.py b/pyezviz/constants.py deleted file mode 100644 index 398195c..0000000 --- a/pyezviz/constants.py +++ /dev/null @@ -1,109 +0,0 @@ -"""Device switch types relationship.""" -from enum import Enum, unique - -FEATURE_CODE = "c22cb01f8cb83351422d82fad59c8e4e" -XOR_KEY = b"\x0c\x0eJ^X\x15@Rr" -DEFAULT_TIMEOUT = 25 -MAX_RETRIES = 3 - -@unique -class DeviceSwitchType(Enum): - """Device switch name and number.""" - - ALARM_TONE = 1 - STREAM_ADAPTIVE = 2 - LIGHT = 3 - INTELLIGENT_ANALYSIS = 4 - LOG_UPLOAD = 5 - DEFENCE_PLAN = 6 - PRIVACY = 7 - SOUND_LOCALIZATION = 8 - CRUISE = 9 - INFRARED_LIGHT = 10 - WIFI = 11 - WIFI_MARKETING = 12 - WIFI_LIGHT = 13 - PLUG = 14 - SLEEP = 21 - SOUND = 22 - BABY_CARE = 23 - LOGO = 24 - MOBILE_TRACKING = 25 - CHANNELOFFLINE = 26 - ALL_DAY_VIDEO = 29 - AUTO_SLEEP = 32 - ROAMING_STATUS = 34 - DEVICE_4G = 35 - ALARM_REMIND_MODE = 37 - OUTDOOR_RINGING_SOUND = 39 - INTELLIGENT_PQ_SWITCH = 40 - DOORBELL_TALK = 101 - HUMAN_INTELLIGENT_DETECTION = 200 - LIGHT_FLICKER = 301 - ALARM_LIGHT = 303 - ALARM_LIGHT_RELEVANCE = 305 - DEVICE_HUMAN_RELATE_LIGHT = 41 - TAMPER_ALARM = 306 - DETECTION_TYPE = 451 - OUTLET_RECOVER = 600 - CHIME_INDICATOR_LIGHT = 611 - TRACKING = 650 - CRUISE_TRACKING = 651 - PARTIAL_IMAGE_OPTIMIZE = 700 - FEATURE_TRACKING = 701 - - -class SoundMode(Enum): - """Alarm sound level description.""" - - SILENT = 2 - SOFT = 0 - INTENSE = 1 - CUSTOM = 3 - PLAN = 4 - - -class DefenseModeType(Enum): - """Defense mode name and number.""" - - HOME_MODE = 1 - AWAY_MODE = 2 - SLEEP_MODE = 3 - UNSET_MODE = 0 - - -class DeviceCatagories(Enum): - """Supported device categories.""" - - COMMON_DEVICE_CATEGORY = "COMMON" - CAMERA_DEVICE_CATEGORY = "IPC" - BATTERY_CAMERA_DEVICE_CATEGORY = "BatteryCamera" - DOORBELL_DEVICE_CATEGORY = "BDoorBell" - BASE_STATION_DEVICE_CATEGORY = "XVR" - - -class SensorType(Enum): - """Sensors and their types to expose in HA.""" - - # pylint: disable=invalid-name - sw_version = "None" - alarm_sound_mod = "None" - battery_level = "battery" - detection_sensibility = "None" - last_alarm_time = "None" - Seconds_Last_Trigger = "None" - last_alarm_pic = "None" - supported_channels = "None" - local_ip = "None" - wan_ip = "None" - PIR_Status = "motion" - - -class BinarySensorType(Enum): - """Binary_sensors and their types to expose in HA.""" - - # pylint: disable=invalid-name - Motion_Trigger = "motion" - alarm_schedules_enabled = "None" - encrypted = "None" - upgrade_available = "None" diff --git a/pyezviz/exceptions.py b/pyezviz/exceptions.py deleted file mode 100644 index 465b73d..0000000 --- a/pyezviz/exceptions.py +++ /dev/null @@ -1,20 +0,0 @@ -"""PyEzviz Exceptions.""" - -class PyEzvizError(Exception): - """Ezviz api exception.""" - - -class InvalidURL(PyEzvizError): - """Invalid url exception.""" - - -class HTTPError(PyEzvizError): - """Invalid host exception.""" - - -class InvalidHost(PyEzvizError): - """Invalid host exception.""" - - -class AuthTestResultFailed(PyEzvizError): - """Authentication failed""" diff --git a/pyezviz/mqtt.py b/pyezviz/mqtt.py deleted file mode 100644 index c634657..0000000 --- a/pyezviz/mqtt.py +++ /dev/null @@ -1,234 +0,0 @@ -"""Ezviz cloud MQTT client for push messages.""" -import os -import logging -logging.basicConfig( - format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', - level=logging.DEBUG if os.environ.get('DEBUG') else logging.INFO) - -import base64 -import json -import threading -import time - -import paho.mqtt.client as mqtt -import requests -from pyezviz.constants import DEFAULT_TIMEOUT, FEATURE_CODE -from pyezviz.exceptions import HTTPError, InvalidURL, PyEzvizError - -API_ENDPOINT_SERVER_INFO = "/v3/configurations/system/info" -API_ENDPOINT_REGISTER_MQTT = "/v1/getClientId" -API_ENDPOINT_START_MQTT = "/api/push/start" -API_ENDPOINT_STOP_MQTT = "/api/push/stop" - - -MQTT_APP_KEY = "4c6b3cc2-b5eb-4813-a592-612c1374c1fe" -APP_SECRET = "17454517-cc1c-42b3-a845-99b4a15dd3e6" - - -def on_subscribe(client, userdata, mid, granted_qos): - """On MQTT message subscribe.""" - # pylint: disable=unused-argument - logging.info("Subscribed: " + str(mid) + " " + str(granted_qos)) - - -def on_connect(client, userdata, flags, return_code): - """On MQTT connect.""" - # pylint: disable=unused-argument - if return_code == 0: - logging.debug("connected OK Returned code=%s", return_code) - else: - logging.info("Bad connection Returned code=%s", return_code) - client.reconnect() - - -#def on_message(client, userdata, msg): -# """On MQTT message receive.""" -# # pylint: disable=unused-argument -# mqtt_message = json.loads(msg.payload) -# mqtt_message["ext"] = mqtt_message["ext"].split(",") -# -# # Print payload message -# decoded_message = {mqtt_message['ext'][2]:{'id':mqtt_message['id'], 'alert':mqtt_message['alert'], 'time':mqtt_message['ext'][1], 'alert type':mqtt_message['ext'][4], 'image':mqtt_message['ext'][16]}} -# print(decoded_message) - - -class MQTTClient(threading.Thread): - """Open MQTT connection to ezviz cloud.""" - - def __init__( - self, - token, - callback, - timeout=DEFAULT_TIMEOUT, - ): - """Initialize the client object.""" - threading.Thread.__init__(self) - self._session = None - self._token = token or { - "session_id": None, - "rf_session_id": None, - "username": None, - "api_url": "apiieu.ezvizlife.com", - } - self._callback = callback - self._timeout = timeout - self._stop_event = threading.Event() - self._mqtt_data = { - "mqtt_clientid": None, - "ticket": None, - "push_url": token["service_urls"]["pushAddr"], - } - - def _mqtt(self): - """Receive MQTT messages from ezviz server""" - - ezviz_mqtt_client = mqtt.Client( - client_id=self._mqtt_data["mqtt_clientid"], protocol=4, transport="tcp" - ) - ezviz_mqtt_client.on_connect = on_connect - ezviz_mqtt_client.on_subscribe = on_subscribe - ezviz_mqtt_client.on_message = self._callback - ezviz_mqtt_client.username_pw_set(MQTT_APP_KEY, APP_SECRET) - - ezviz_mqtt_client.connect(self._mqtt_data["push_url"], 1882, 60) - ezviz_mqtt_client.subscribe( - f"{MQTT_APP_KEY}/ticket/{self._mqtt_data['ticket']}", qos=2 - ) - - ezviz_mqtt_client.loop_start() - return ezviz_mqtt_client - - def _register_ezviz_push(self): - """Register for push messages.""" - - auth_seq = base64.b64encode(f"{MQTT_APP_KEY}:{APP_SECRET}".encode("ascii")) - auth_seq = "Basic " + auth_seq.decode() - - payload = { - "appKey": MQTT_APP_KEY, - "clientType": "5", - "mac": FEATURE_CODE, - "token": "123456", - "version": "v1.3.0", - } - - try: - req = self._session.post( - f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_REGISTER_MQTT}", - allow_redirects=False, - headers={"Authorization": auth_seq}, - data=payload, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.ConnectionError as err: - raise InvalidURL("A Invalid URL or Proxy error occured") from err - - except requests.HTTPError as err: - raise HTTPError from err - - try: - json_result = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - self._mqtt_data["mqtt_clientid"] = json_result["data"]["clientId"] - - def run(self): - """Method representing the thread's activity which should not be used directly.""" - - if self._session is None: - self._session = requests.session() - self._session.headers.update( - {"User-Agent": "okhttp/3.12.1"} - ) # Android generic user agent. - - self._register_ezviz_push() - self._start_ezviz_push() - self._mqtt() - - while not self._stop_event.is_set(): - time.sleep(1) - - def start(self): - """Start mqtt. - Start mqtt thread - """ - super().start() - - def stop(self): - """Stop push notifications.""" - - payload = { - "appKey": MQTT_APP_KEY, - "clientId": self._mqtt_data["mqtt_clientid"], - "clientType": 5, - "sessionId": self._token["session_id"], - "username": self._token["username"], - } - - try: - req = self._session.post( - f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_STOP_MQTT}", - data=payload, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.ConnectionError as err: - raise InvalidURL("A Invalid URL or Proxy error occured") from err - - except requests.HTTPError as err: - raise HTTPError from err - - self._stop_event.set() - - def _start_ezviz_push(self): - """Send start for push messages to ezviz api.""" - - payload = { - "appKey": MQTT_APP_KEY, - "clientId": self._mqtt_data["mqtt_clientid"], - "clientType": 5, - "sessionId": self._token["session_id"], - "username": self._token["username"], - "token": "123456", - } - - try: - req = self._session.post( - f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_START_MQTT}", - allow_redirects=False, - data=payload, - timeout=self._timeout, - ) - - req.raise_for_status() - - except requests.ConnectionError as err: - raise InvalidURL("A Invalid URL or Proxy error occured") from err - - except requests.HTTPError as err: - raise HTTPError from err - - try: - json_result = req.json() - - except ValueError as err: - raise PyEzvizError( - "Impossible to decode response: " - + str(err) - + "\nResponse was: " - + str(req.text) - ) from err - - self._mqtt_data["ticket"] = json_result["ticket"] diff --git a/pyezviz/test_cam_rtsp.py b/pyezviz/test_cam_rtsp.py deleted file mode 100644 index 916048a..0000000 --- a/pyezviz/test_cam_rtsp.py +++ /dev/null @@ -1,147 +0,0 @@ -"""Test camera RTSP authentication""" -import base64 -import hashlib -import socket -from pyezviz.exceptions import AuthTestResultFailed, InvalidHost - - -def genmsg_describe(url, seq, user_agent, auth_seq): - """Generate RTSP describe message""" - msg_ret = "DESCRIBE " + url + " RTSP/1.0\r\n" - msg_ret += "CSeq: " + str(seq) + "\r\n" - msg_ret += "Authorization: " + auth_seq + "\r\n" - msg_ret += "User-Agent: " + user_agent + "\r\n" - msg_ret += "Accept: application/sdp\r\n" - msg_ret += "\r\n" - return msg_ret - - -class TestRTSPAuth: - """Initialize RTSP credential test""" - - def __init__( - self, - ip_addr, - username=None, - password=None, - test_uri="", - ): - self._rtsp_details = { - "bufLen": 1024, - "defaultServerIp": ip_addr, - "defaultServerPort": 554, - "defaultTestUri": test_uri, - "defaultUserAgent": "RTSP Client", - "defaultUsername": username, - "defaultPassword": password, - } - - def generate_auth_string(self, realm, method, uri, nonce): - """Generate digest auth string """ - map_return_info = {} - m_1 = hashlib.md5( - f"{self._rtsp_details['defaultUsername']}:" - f"{realm.decode()}:" - f"{self._rtsp_details['defaultPassword']}".encode() - ).hexdigest() - m_2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest() - response = hashlib.md5(f"{m_1}:{nonce}:{m_2}".encode()).hexdigest() - - map_return_info = ( - f"Digest " - f"username=\"{self._rtsp_details['defaultUsername']}\", " - f'realm="{realm.decode()}", ' - f'algorithm="MD5", ' - f'nonce="{nonce.decode()}", ' - f'uri="{uri}", ' - f'response="{response}"' - ) - return map_return_info - - def main(self): - """Main function """ - session = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - try: - session.connect( - ( - self._rtsp_details["defaultServerIp"], - self._rtsp_details["defaultServerPort"], - ) - ) - - except TimeoutError as err: - raise AuthTestResultFailed("Invalid ip or camera hibernating") from err - - except (socket.gaierror, ConnectionRefusedError) as err: - raise InvalidHost("Invalid IP or Hostname") from err - - seq = 1 - - url = ( - "rtsp://" - + self._rtsp_details["defaultServerIp"] - + self._rtsp_details["defaultTestUri"] - ) - - auth_seq = base64.b64encode( - f"{self._rtsp_details['defaultUsername']}:" - f"{self._rtsp_details['defaultPassword']}".encode("ascii") - ) - auth_seq = "Basic " + auth_seq.decode() - - print( - genmsg_describe(url, seq, self._rtsp_details["defaultUserAgent"], auth_seq) - ) - session.send( - genmsg_describe( - url, seq, self._rtsp_details["defaultUserAgent"], auth_seq - ).encode() - ) - msg1 = session.recv(self._rtsp_details["bufLen"]) - seq = seq + 1 - - if msg1.decode().find("200 OK") > 1: - print(f"Basic auth result: {msg1.decode()}") - return print("Basic Auth test passed. Credentials Valid!") - - if msg1.decode().find("Unauthorized") > 1: - # Basic failed, doing new DESCRIBE with digest authentication. - start = msg1.decode().find("realm") - begin = msg1.decode().find('"', start) - end = msg1.decode().find('"', begin + 1) - realm = msg1[begin + 1 : end] - - start = msg1.decode().find("nonce") - begin = msg1.decode().find('"', start) - end = msg1.decode().find('"', begin + 1) - nonce = msg1[begin + 1 : end] - - auth_seq = self.generate_auth_string( - realm, - "DESCRIBE", - self._rtsp_details["defaultTestUri"], - nonce, - ) - - print( - genmsg_describe( - url, seq, self._rtsp_details["defaultUserAgent"], auth_seq - ) - ) - - session.send( - genmsg_describe( - url, seq, self._rtsp_details["defaultUserAgent"], auth_seq - ).encode() - ) - msg1 = session.recv(self._rtsp_details["bufLen"]) - print(f"Digest auth result: {msg1.decode()}") - - if msg1.decode().find("200 OK") > 1: - return print("Digest Auth test Passed. Credentials Valid!") - - if msg1.decode().find("401 Unauthorized") > 1: - raise AuthTestResultFailed("Credentials not valid!!") - - return print("Basic Auth test passed. Credentials Valid!")