diff --git a/pyezviz/__init__.py b/pyezviz/__init__.py
new file mode 100644
index 0000000..d236101
--- /dev/null
+++ b/pyezviz/__init__.py
@@ -0,0 +1,27 @@
+"""init pyezviz."""
+from pyezviz.camera import EzvizCamera
+from pyezviz.cas import EzvizCAS
+from pyezviz.client import EzvizClient
+from pyezviz.constants import (DefenseModeType, DeviceCatagories,
+ DeviceSwitchType, SoundMode)
+from pyezviz.exceptions import (AuthTestResultFailed, HTTPError, InvalidHost,
+ InvalidURL, PyEzvizError)
+from pyezviz.mqtt import MQTTClient
+from pyezviz.test_cam_rtsp import TestRTSPAuth
+
+__all__ = [
+ "EzvizCamera",
+ "EzvizClient",
+ "PyEzvizError",
+ "InvalidURL",
+ "HTTPError",
+ "InvalidHost",
+ "AuthTestResultFailed",
+ "EzvizCAS",
+ "MQTTClient",
+ "DefenseModeType",
+ "DeviceCatagories",
+ "DeviceSwitchType",
+ "SoundMode",
+ "TestRTSPAuth",
+]
diff --git a/pyezviz/__main__.py b/pyezviz/__main__.py
new file mode 100644
index 0000000..c2221e1
--- /dev/null
+++ b/pyezviz/__main__.py
@@ -0,0 +1,309 @@
+"""pyezviz command line."""
+import argparse
+import http.client
+import json
+import logging
+import sys
+
+import pandas
+from pyezviz import EzvizCamera, EzvizClient, MQTTClient
+from pyezviz.constants import DefenseModeType
+
+
+def main():
+ """Initiate arg parser."""
+ parser = argparse.ArgumentParser(prog="pyezviz")
+ parser.add_argument("-u", "--username", required=True, help="Ezviz username")
+ parser.add_argument("-p", "--password", required=True, help="Ezviz Password")
+ parser.add_argument(
+ "-r",
+ "--region",
+ required=False,
+ default="apiieu.ezvizlife.com",
+ help="Ezviz API region",
+ )
+ parser.add_argument(
+ "--debug", "-d", action="store_true", help="Print debug messages to stderr"
+ )
+
+ subparsers = parser.add_subparsers(dest="action")
+
+ parser_device = subparsers.add_parser(
+ "devices", help="Play with all devices at once"
+ )
+ parser_device.add_argument(
+ "device_action",
+ type=str,
+ default="status",
+ help="Device action to perform",
+ choices=["device", "status", "switch", "connection"],
+ )
+
+ parser_home_defence_mode = subparsers.add_parser(
+ "home_defence_mode", help="Set home defence mode"
+ )
+
+ parser_mqtt = subparsers.add_parser("mqtt", help="Set home defence mode")
+
+ parser_home_defence_mode.add_argument(
+ "--mode", required=False, help="Choose mode", choices=["HOME_MODE", "AWAY_MODE"]
+ )
+
+ parser_camera = subparsers.add_parser("camera", help="Camera actions")
+ parser_camera.add_argument("--serial", required=True, help="camera SERIAL")
+
+ subparsers_camera = parser_camera.add_subparsers(dest="camera_action")
+
+ parser_camera_status = subparsers_camera.add_parser(
+ "status", help="Get the status of the camera"
+ )
+ parser_camera_move = subparsers_camera.add_parser("move", help="Move the camera")
+ parser_camera_move.add_argument(
+ "--direction",
+ required=True,
+ help="Direction to move the camera to",
+ choices=["up", "down", "right", "left"],
+ )
+ parser_camera_move.add_argument(
+ "--speed",
+ required=False,
+ help="Speed of the movement",
+ default=5,
+ type=int,
+ choices=range(1, 10),
+ )
+
+ parser_camera_switch = subparsers_camera.add_parser(
+ "switch", help="Change the status of a switch"
+ )
+ parser_camera_switch.add_argument(
+ "--switch",
+ required=True,
+ help="Switch to switch",
+ choices=["audio", "ir", "state", "privacy", "sleep", "follow_move"],
+ )
+ parser_camera_switch.add_argument(
+ "--enable",
+ required=False,
+ help="Enable (or not)",
+ default=1,
+ type=int,
+ choices=[0, 1],
+ )
+
+ parser_camera_alarm = subparsers_camera.add_parser(
+ "alarm", help="Configure the camera alarm"
+ )
+ parser_camera_alarm.add_argument(
+ "--notify", required=False, help="Enable (or not)", type=int, choices=[0, 1]
+ )
+ parser_camera_alarm.add_argument(
+ "--sound",
+ required=False,
+ help="Sound level (2 is silent, 1 intensive, 0 soft)",
+ type=int,
+ choices=[0, 1, 2],
+ )
+ parser_camera_alarm.add_argument(
+ "--sensibility",
+ required=False,
+ help="Sensibility level (Non-Cameras = from 1 to 6) or (Cameras = 1 to 100)",
+ type=int,
+ choices=range(0, 100),
+ )
+ parser_camera_alarm.add_argument(
+ "--schedule", required=False, help="Schedule in json format *test*", type=str
+ )
+
+ args = parser.parse_args()
+
+ # print("--------------args")
+ # print("--------------args: %s",args)
+ # print("--------------args")
+
+ client = EzvizClient(args.username, args.password, args.region)
+
+ if args.debug:
+
+ http.client.HTTPConnection.debuglevel = 5
+ # You must initialize logging, otherwise you'll not see debug output.
+ logging.basicConfig()
+ logging.getLogger().setLevel(logging.DEBUG)
+ requests_log = logging.getLogger("requests.packages.urllib3")
+ requests_log.setLevel(logging.DEBUG)
+ requests_log.propagate = True
+
+ if args.action == "devices":
+
+ if args.device_action == "device":
+ try:
+ client.login()
+ print(json.dumps(client.get_device(), indent=2))
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ finally:
+ client.close_session()
+
+ elif args.device_action == "status":
+ try:
+ client.login()
+ print(
+ pandas.DataFrame(client.load_cameras()).to_string(
+ columns=[
+ "serial",
+ "name",
+ # version,
+ # upgrade_available,
+ "status",
+ "device_category",
+ "device_sub_category",
+ "sleep",
+ "privacy",
+ "audio",
+ "ir_led",
+ "state_led",
+ # follow_move,
+ # alarm_notify,
+ # alarm_schedules_enabled,
+ # alarm_sound_mod,
+ # encrypted,
+ "local_ip",
+ "local_rtsp_port",
+ "detection_sensibility",
+ "battery_level",
+ "alarm_schedules_enabled",
+ "alarm_notify",
+ "Motion_Trigger",
+ # last_alarm_time,
+ # last_alarm_pic
+ ]
+ )
+ )
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ finally:
+ client.close_session()
+
+ elif args.device_action == "switch":
+ try:
+ client.login()
+ print(json.dumps(client.get_switch(), indent=2))
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ finally:
+ client.close_session()
+
+ elif args.device_action == "connection":
+ try:
+ client.login()
+ print(json.dumps(client.get_connection(), indent=2))
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ finally:
+ client.close_session()
+
+ else:
+ print("Action not implemented: %s", args.device_action)
+
+ elif args.action == "home_defence_mode":
+
+ if args.mode:
+ try:
+ client.login()
+ print(
+ json.dumps(
+ client.api_set_defence_mode(
+ getattr(DefenseModeType, args.mode).value
+ ),
+ indent=2,
+ )
+ )
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ finally:
+ client.close_session()
+
+ elif args.action == "mqtt":
+
+ try:
+ token = client.login()
+ mqtt = MQTTClient(token)
+ mqtt.start()
+
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ finally:
+ client.close_session()
+
+ elif args.action == "camera":
+
+ # load camera object
+ try:
+ client.login()
+ camera = EzvizCamera(client, args.serial)
+ logging.debug("Camera loaded")
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ client.close_session()
+
+ if args.camera_action == "move":
+ try:
+ camera.move(args.direction, args.speed)
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ finally:
+ client.close_session()
+
+ elif args.camera_action == "status":
+ try:
+ print(json.dumps(camera.status(), indent=2))
+
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ finally:
+ client.close_session()
+
+ elif args.camera_action == "switch":
+ try:
+ if args.switch == "ir":
+ camera.switch_device_ir_led(args.enable)
+ elif args.switch == "state":
+ print(args.enable)
+ camera.switch_device_state_led(args.enable)
+ elif args.switch == "audio":
+ camera.switch_device_audio(args.enable)
+ elif args.switch == "privacy":
+ camera.switch_privacy_mode(args.enable)
+ elif args.switch == "sleep":
+ camera.switch_sleep_mode(args.enable)
+ elif args.switch == "follow_move":
+ camera.switch_follow_move(args.enable)
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ finally:
+ client.close_session()
+
+ elif args.camera_action == "alarm":
+ try:
+ if args.sound is not None:
+ camera.alarm_sound(args.sound)
+ if args.notify is not None:
+ camera.alarm_notify(args.notify)
+ if args.sensibility is not None:
+ camera.alarm_detection_sensibility(args.sensibility)
+ if args.schedule is not None:
+ camera.change_defence_schedule(args.schedule)
+ except Exception as exp: # pylint: disable=broad-except
+ print(exp)
+ finally:
+ client.close_session()
+
+ else:
+ print("Action not implemented, try running with -h switch for help")
+
+ else:
+ print("Action not implemented: %s", args.action)
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/pyezviz/camera.py b/pyezviz/camera.py
new file mode 100644
index 0000000..2632314
--- /dev/null
+++ b/pyezviz/camera.py
@@ -0,0 +1,234 @@
+"""pyezviz camera api."""
+from __future__ import annotations
+
+import datetime
+from typing import Any
+
+from pyezviz.constants import DeviceCatagories, DeviceSwitchType, SoundMode
+from pyezviz.exceptions import PyEzvizError
+
+
+class EzvizCamera:
+ """Initialize Ezviz camera object."""
+
+ def __init__(self, client, serial: str, device_obj: dict | None = None) -> None:
+ """Initialize the camera object."""
+ self._client = client
+ self._serial = serial
+ self._switch: dict[int, bool] = {}
+ self._alarmmotiontrigger: dict[str, Any] = {}
+ self._device = device_obj or {}
+ self.alarmlist_time = None
+ self.alarmlist_pic = None
+
+ def load(self) -> None:
+ """Update device info for camera serial."""
+
+ if self._device is None:
+ self._device = self._client.get_all_per_serial_infos(self._serial)
+
+ self._alarm_list()
+
+ self._switch_status()
+
+ def _switch_status(self) -> None:
+ """load device switches"""
+
+ if self._device.get("switchStatusInfos"):
+ for switch in self._device["switchStatusInfos"]:
+ self._switch.update({switch["type"]: switch["enable"]})
+
+ else:
+ self._switch = {0: False}
+
+ def _detection_sensibility(self) -> Any:
+ """load detection sensibility"""
+ result = "Unknown"
+
+ if self._switch.get(DeviceSwitchType.AUTO_SLEEP.value) is not True:
+ if (
+ self._device["deviceInfos"]["deviceCategory"]
+ == DeviceCatagories.BATTERY_CAMERA_DEVICE_CATEGORY.value
+ ):
+ result = self._client.get_detection_sensibility(
+ self._serial,
+ "3",
+ )
+ else:
+ result = self._client.get_detection_sensibility(self._serial)
+
+ if self._switch.get(DeviceSwitchType.AUTO_SLEEP.value) is True:
+ result = "Hibernate"
+
+ return result
+
+ def _alarm_list(self) -> None:
+ """get last alarm info for this camera's self._serial"""
+ alarmlist = self._client.get_alarminfo(self._serial)
+
+ if alarmlist.get("page").get("totalResults") > 0:
+ self.alarmlist_time = alarmlist.get("alarms")[0].get("alarmStartTimeStr")
+ self.alarmlist_pic = alarmlist.get("alarms")[0].get("picUrl")
+
+ if self.alarmlist_time:
+ self._motion_trigger(self.alarmlist_time)
+
+ def _local_ip(self) -> str:
+ """ "Fix empty ip value for certain cameras"""
+ if self._device.get("wifiInfos"):
+ return self._device["wifiInfos"].get("address")
+
+ # Seems to return none or 0.0.0.0 on some. This should run 2nd.
+ if self._device.get("connectionInfos"):
+ if self._device["connectionInfos"].get("localIp"):
+ return self._device["connectionInfos"]["localIp"]
+
+ return "0.0.0.0"
+
+ def _motion_trigger(self, alarmlist_time: str) -> None:
+ """Create motion sensor based on last alarm time."""
+ now = datetime.datetime.now().replace(microsecond=0)
+ alarm_trigger_active = 0
+ today_date = datetime.date.today()
+ fix = datetime.datetime.now().replace(microsecond=0)
+
+ # Need to handle error if time format different
+ fix = datetime.datetime.strptime(
+ alarmlist_time.replace("Today", str(today_date)),
+ "%Y-%m-%d %H:%M:%S",
+ )
+
+ # returns a timedelta object
+ timepassed = now - fix
+
+ if timepassed < datetime.timedelta(seconds=60):
+ alarm_trigger_active = 1
+
+ self._alarmmotiontrigger = {
+ "alarm_trigger_active": alarm_trigger_active,
+ "timepassed": timepassed.total_seconds(),
+ }
+
+ def _is_alarm_schedules_enabled(self) -> bool | None:
+ """Checks if alarm schedules enabled"""
+ time_plans = None
+
+ if self._device.get("timePlanInfos"):
+ time_plans = [
+ item for item in self._device["timePlanInfos"] if item.get("type") == 2
+ ]
+
+ if time_plans:
+ return bool(time_plans[0].get("enable"))
+
+ return None
+
+ def status(self) -> dict[Any, Any]:
+ """Return the status of the camera."""
+ self.load()
+
+ return {
+ "serial": self._serial,
+ "name": self._device["deviceInfos"].get("name"),
+ "version": self._device["deviceInfos"].get("version"),
+ "upgrade_available": self._device["statusInfos"].get("upgradeAvailable"),
+ "status": self._device["deviceInfos"].get("status"),
+ "device_category": self._device["deviceInfos"].get("deviceCategory"),
+ "device_sub_category": self._device["deviceInfos"].get("deviceSubCategory"),
+ "sleep": self._switch.get(DeviceSwitchType.SLEEP.value)
+ or self._switch.get(DeviceSwitchType.AUTO_SLEEP.value),
+ "privacy": self._switch.get(DeviceSwitchType.PRIVACY.value),
+ "audio": self._switch.get(DeviceSwitchType.SOUND.value),
+ "ir_led": self._switch.get(DeviceSwitchType.INFRARED_LIGHT.value),
+ "state_led": self._switch.get(DeviceSwitchType.LIGHT.value),
+ "follow_move": self._switch.get(DeviceSwitchType.MOBILE_TRACKING.value),
+ "alarm_notify": bool(self._device["statusInfos"].get("globalStatus")),
+ "alarm_schedules_enabled": self._is_alarm_schedules_enabled(),
+ "alarm_sound_mod": SoundMode(
+ self._device["statusInfos"].get("alarmSoundMode")
+ ).name,
+ "encrypted": bool(self._device["statusInfos"].get("isEncrypted")),
+ "local_ip": self._local_ip(),
+ "wan_ip": self._device.get("connectionInfos", {}).get("netIp", "0.0.0.0"),
+ "local_rtsp_port": self._device["connectionInfos"].get(
+ "localRtspPort", "554"
+ ),
+ "supported_channels": self._device["deviceInfos"].get("channelNumber"),
+ "detection_sensibility": self._detection_sensibility(),
+ "battery_level": self._device["statusInfos"]
+ .get("optionals", {})
+ .get("powerRemaining"),
+ "PIR_Status": self._device["statusInfos"].get("pirStatus"),
+ "Motion_Trigger": self._alarmmotiontrigger.get("alarm_trigger_active"),
+ "Seconds_Last_Trigger": self._alarmmotiontrigger.get("timepassed"),
+ "last_alarm_time": self.alarmlist_time,
+ "last_alarm_pic": self.alarmlist_pic,
+ "wifiInfos": self._device.get("wifiInfos"),
+ "switches": self._switch,
+ }
+
+ def move(self, direction: str, speed: int = 5) -> bool:
+ """Move camera."""
+ if direction not in ["right", "left", "down", "up"]:
+ raise PyEzvizError(f"Invalid direction: {direction} ")
+
+ # launch the start command
+ self._client.ptz_control(str(direction).upper(), self._serial, "START", speed)
+ # launch the stop command
+ self._client.ptz_control(str(direction).upper(), self._serial, "STOP", speed)
+
+ return True
+
+ def alarm_notify(self, enable: int) -> bool:
+ """Enable/Disable camera notification when movement is detected."""
+ return self._client.set_camera_defence(self._serial, enable)
+
+ def alarm_sound(self, sound_type: int) -> bool:
+ """Enable/Disable camera sound when movement is detected."""
+ # we force enable = 1 , to make sound...
+ return self._client.alarm_sound(self._serial, sound_type, 1)
+
+ def alarm_detection_sensibility(self, sensibility, type_value=0):
+ """Enable/Disable camera sound when movement is detected."""
+ # we force enable = 1 , to make sound...
+ return self._client.detection_sensibility(self._serial, sensibility, type_value)
+
+ def switch_device_audio(self, enable=0):
+ """Switch audio status on a device."""
+ return self._client.switch_status(
+ self._serial, DeviceSwitchType.SOUND.value, enable
+ )
+
+ def switch_device_state_led(self, enable=0):
+ """Switch led status on a device."""
+ return self._client.switch_status(
+ self._serial, DeviceSwitchType.LIGHT.value, enable
+ )
+
+ def switch_device_ir_led(self, enable=0):
+ """Switch ir status on a device."""
+ return self._client.switch_status(
+ self._serial, DeviceSwitchType.INFRARED_LIGHT.value, enable
+ )
+
+ def switch_privacy_mode(self, enable=0):
+ """Switch privacy mode on a device."""
+ return self._client.switch_status(
+ self._serial, DeviceSwitchType.PRIVACY.value, enable
+ )
+
+ def switch_sleep_mode(self, enable=0):
+ """Switch sleep mode on a device."""
+ return self._client.switch_status(
+ self._serial, DeviceSwitchType.SLEEP.value, enable
+ )
+
+ def switch_follow_move(self, enable=0):
+ """Switch follow move."""
+ return self._client.switch_status(
+ self._serial, DeviceSwitchType.MOBILE_TRACKING.value, enable
+ )
+
+ def change_defence_schedule(self, schedule, enable=0):
+ """Change defence schedule. Requires json formatted schedules."""
+ return self._client.api_set_defence_schdule(self._serial, schedule, enable)
diff --git a/pyezviz/cas.py b/pyezviz/cas.py
new file mode 100644
index 0000000..c48ed05
--- /dev/null
+++ b/pyezviz/cas.py
@@ -0,0 +1,160 @@
+"""Ezviz CAS API Functions."""
+
+import random
+import socket
+import ssl
+from io import BytesIO
+from itertools import cycle
+
+import xmltodict
+from Crypto.Cipher import AES
+from pyezviz.constants import FEATURE_CODE, XOR_KEY
+from pyezviz.exceptions import InvalidHost
+
+
+def xor_enc_dec(msg, xor_key=XOR_KEY):
+ """Xor encodes camera serial"""
+ with BytesIO(msg) as stream:
+ xor_msg = bytes(a ^ b for a, b in zip(stream.read(), cycle(xor_key)))
+ return xor_msg
+
+
+class EzvizCAS:
+ """Ezviz CAS server client."""
+
+ def __init__(self, token):
+ """Initialize the client object."""
+ self._session = None
+ self._token = token or {
+ "session_id": None,
+ "rf_session_id": None,
+ "username": None,
+ "api_url": "apiieu.ezvizlife.com",
+ }
+ self._service_urls = token["service_urls"]
+
+ def cas_get_encryption(self, devserial):
+ """Fetch encryption code from ezviz cas server"""
+
+ # Random hex 64 characters long.
+ rand_hex = random.randrange(10 ** 80)
+ rand_hex = "%064x" % rand_hex
+ rand_hex = rand_hex[:64]
+
+ payload = (
+ f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00"
+ f"\x00\x02" # Check or order?
+ f"\x00\x00\x00\x00\x00\x00 "
+ f"\x01" # Check or order?
+ f"\x00\x00\x00\x00\x00\x00\x02\t\x00\x00\x00\x00"
+ f'\n\n\t'
+ f'{self._token["session_id"]}'
+ f"\n\t{FEATURE_CODE}\n\t"
+ f"{devserial}"
+ f"\n\t0\n\n"
+ ).encode("latin1")
+
+ payload_end_padding = rand_hex.encode("latin1")
+
+ context = ssl.SSLContext(cert_reqs=ssl.CERT_NONE)
+
+ # Create a TCP/IP socket
+ my_socket = socket.create_connection(
+ (self._service_urls["sysConf"][15], self._service_urls["sysConf"][16])
+ )
+ my_socket = context.wrap_socket(
+ my_socket, server_hostname=self._service_urls["sysConf"][15]
+ )
+
+ # Get CAS Encryption Key
+ try:
+ my_socket.send(payload + payload_end_padding)
+ response = my_socket.recv(1024)
+ print(f"Get Encryption Key: {response}")
+
+ except (socket.gaierror, ConnectionRefusedError) as err:
+ raise InvalidHost("Invalid IP or Hostname") from err
+
+ finally:
+ my_socket.close()
+
+ # Trim header, tail and convert xml to dict.
+ response = response[32::]
+ response = response[:-32:]
+ response = xmltodict.parse(response)
+
+ return response
+
+ def set_camera_defence_state(self, serial, enable=1):
+ """Enable alarm notifications."""
+
+ # Random hex 64 characters long.
+ rand_hex = random.randrange(10 ** 80)
+ rand_hex = "%064x" % rand_hex
+ rand_hex = rand_hex[:64]
+
+ payload = (
+ f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00"
+ f"\x00\x14" # Check or order?
+ f"\x00\x00\x00\x00\x00\x00 "
+ f"\x05"
+ f"\x00\x00\x00\x00\x00\x00\x02\xd0\x00\x00\x01\xe0"
+ f'\n\n\t'
+ f'\n\t'
+ f'\n\n'
+ f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00"
+ f"\x00\x13"
+ f"\x00\x00\x00\x00\x00\x000\x0f\xff\xff\xff\xff"
+ f"\x00\x00\x00\xb0\x00\x00\x00\x00"
+ ).encode("latin1")
+
+ payload_end_padding = rand_hex.encode("latin1")
+
+ # xor camera serial
+ xor_cam_serial = xor_enc_dec(serial.encode("latin1"))
+
+ defence_msg_string = (
+ f'{xor_cam_serial.decode()}2+,*xdv.0" '
+ f'encoding="utf-8"?>\n'
+ f"\n"
+ f"\tABCDEFG\n"
+ f'\t\n'
+ f"\n"
+ f"\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10"
+ ).encode("latin1")
+
+ context = ssl.SSLContext(cert_reqs=ssl.CERT_NONE)
+
+ # Create a TCP/IP socket
+ my_socket = socket.create_connection(
+ (self._service_urls["sysConf"][15], self._service_urls["sysConf"][16])
+ )
+ my_socket = context.wrap_socket(
+ my_socket, server_hostname=self._service_urls["sysConf"][15]
+ )
+
+ cas_client = self.cas_get_encryption(serial)
+
+ aes_key = cas_client["Response"]["Session"]["@Key"].encode("latin1")
+ iv_value = (
+ f"{serial}{cas_client['Response']['Session']['@OperationCode']}".encode(
+ "latin1"
+ )
+ )
+
+ # Message encryption
+ cipher = AES.new(aes_key, AES.MODE_CBC, iv_value)
+
+ try:
+ defence_msg_string = cipher.encrypt(defence_msg_string)
+ my_socket.send(payload + defence_msg_string + payload_end_padding)
+ print(f"Set camera response: {my_socket.recv()}")
+
+ except (socket.gaierror, ConnectionRefusedError) as err:
+ raise InvalidHost("Invalid IP or Hostname") from err
+
+ finally:
+ my_socket.close()
+
+ return True
diff --git a/pyezviz/client.py b/pyezviz/client.py
new file mode 100644
index 0000000..4b28988
--- /dev/null
+++ b/pyezviz/client.py
@@ -0,0 +1,919 @@
+"""Ezviz API."""
+from __future__ import annotations
+
+import hashlib
+import logging
+from typing import Any
+from uuid import uuid4
+
+import requests
+
+from pyezviz.camera import EzvizCamera
+from pyezviz.cas import EzvizCAS
+from pyezviz.constants import (
+ DEFAULT_TIMEOUT,
+ FEATURE_CODE,
+ MAX_RETRIES,
+ DefenseModeType,
+ DeviceCatagories,
+)
+from pyezviz.exceptions import HTTPError, InvalidURL, PyEzvizError
+
+API_ENDPOINT_CLOUDDEVICES = "/api/cloud/v2/cloudDevices/getAll"
+API_ENDPOINT_PAGELIST = "/v3/userdevices/v1/devices/pagelist"
+API_ENDPOINT_DEVICES = "/v3/devices/"
+API_ENDPOINT_LOGIN = "/v3/users/login/v5"
+API_ENDPOINT_REFRESH_SESSION_ID = "/v3/apigateway/login"
+API_ENDPOINT_SWITCH_STATUS = "/switchStatus"
+API_ENDPOINT_PTZCONTROL = "/ptzControl"
+API_ENDPOINT_ALARM_SOUND = "/alarm/sound"
+API_ENDPOINT_DETECTION_SENSIBILITY = "/api/device/configAlgorithm"
+API_ENDPOINT_DETECTION_SENSIBILITY_GET = "/api/device/queryAlgorithmConfig"
+API_ENDPOINT_ALARMINFO_GET = "/v3/alarms/v2/advanced"
+API_ENDPOINT_SET_DEFENCE_SCHEDULE = "/api/device/defence/plan2"
+API_ENDPOINT_SWITCH_DEFENCE_MODE = "/v3/userdevices/v1/group/switchDefenceMode"
+API_ENDPOINT_SWITCH_SOUND_ALARM = "/sendAlarm"
+API_ENDPOINT_SERVER_INFO = "/v3/configurations/system/info"
+
+
+class EzvizClient:
+ """Initialize api client object."""
+
+ def __init__(
+ self,
+ account: str | None = None,
+ password: str | None = None,
+ url: str = "apiieu.ezvizlife.com",
+ timeout: int = DEFAULT_TIMEOUT,
+ token: dict | None = None,
+ ) -> None:
+ """Initialize the client object."""
+ self.account = account
+ self.password = password
+ self._session = requests.session()
+ # Set Android generic user agent.
+ self._session.headers.update({"User-Agent": "okhttp/3.12.1"})
+ self._token = token or {
+ "session_id": None,
+ "rf_session_id": None,
+ "username": None,
+ "api_url": url,
+ }
+ self._timeout = timeout
+
+ def _login(self, account: str, password: str) -> dict[Any, Any]:
+ """Login to Ezviz API."""
+
+ # Region code to url.
+ if len(self._token["api_url"].split(".")) == 1:
+ self._token["api_url"] = "apii" + self._token["api_url"] + ".ezvizlife.com"
+
+ # Ezviz API sends md5 of password
+ temp = hashlib.md5()
+ temp.update(password.encode("utf-8"))
+ md5pass = temp.hexdigest()
+ payload = {
+ "account": account,
+ "password": md5pass,
+ "featureCode": FEATURE_CODE,
+ "msgType": "0",
+ "cuName": "SFRDIDEw",
+ }
+
+ try:
+ req = self._session.post(
+ "https://" + self._token["api_url"] + API_ENDPOINT_LOGIN,
+ allow_redirects=False,
+ headers={
+ "clientType": "3",
+ "customno": "1000001",
+ "clientNo": "web_site",
+ "appId": "ys7",
+ "lang": "en",
+ },
+ data=payload,
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.ConnectionError as err:
+ raise InvalidURL("A Invalid URL or Proxy error occured") from err
+
+ except requests.HTTPError as err:
+ raise HTTPError from err
+
+ try:
+ json_result = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ if json_result["meta"]["code"] == 1100:
+ self._token["api_url"] = json_result["loginArea"]["apiDomain"]
+ print("Region incorrect!")
+ print(f"Your region url: {self._token['api_url']}")
+ self.close_session()
+ return self.login()
+
+ if json_result["meta"]["code"] == 1013:
+ raise PyEzvizError("Incorrect Username.")
+
+ if json_result["meta"]["code"] == 1014:
+ raise PyEzvizError("Incorrect Password.")
+
+ if json_result["meta"]["code"] == 1015:
+ raise PyEzvizError("The user is locked.")
+
+ self._token["session_id"] = str(json_result["loginSession"]["sessionId"])
+ self._token["rf_session_id"] = str(json_result["loginSession"]["rfSessionId"])
+ self._token["username"] = str(json_result["loginUser"]["username"])
+ self._token["api_url"] = str(json_result["loginArea"]["apiDomain"])
+ if not self._token["session_id"]:
+ raise PyEzvizError(
+ f"Login error: Please check your username/password: {req.text}"
+ )
+
+ self._token["service_urls"] = self.get_service_urls()
+
+ return self._token
+
+ def get_service_urls(self) -> Any:
+ """Get Ezviz service urls."""
+
+ try:
+ req = self._session.get(
+ f"https://{self._token['api_url']}{API_ENDPOINT_SERVER_INFO}",
+ headers={
+ "sessionId": self._token["session_id"],
+ "featureCode": FEATURE_CODE,
+ },
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.ConnectionError as err:
+ raise InvalidURL("A Invalid URL or Proxy error occured") from err
+
+ except requests.HTTPError as err:
+ if err.response.status_code == 401:
+ # session is wrong, need to relogin
+ self.login()
+
+ raise HTTPError from err
+
+ if not req.text:
+ raise PyEzvizError("No data")
+
+ try:
+ json_output = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ if json_output.get("meta").get("code") != 200:
+ logging.info("Json request error")
+
+ service_urls = json_output["systemConfigInfo"]
+ service_urls["sysConf"] = service_urls["sysConf"].split("|")
+
+ return service_urls
+
+ def _api_get_pagelist(
+ self, page_filter: str, json_key: str | None = None, max_retries: int = 0
+ ) -> Any:
+ """Get data from pagelist API."""
+
+ if max_retries > MAX_RETRIES:
+ raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
+
+ if page_filter is None:
+ raise PyEzvizError("Trying to call get_pagelist without filter")
+
+ try:
+ req = self._session.get(
+ "https://" + self._token["api_url"] + API_ENDPOINT_PAGELIST,
+ headers={"sessionId": self._token["session_id"]},
+ params={"filter": page_filter},
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ if err.response.status_code == 401:
+ # session is wrong, need to relogin
+ self.login()
+ return self._api_get_pagelist(page_filter, json_key, max_retries + 1)
+
+ raise HTTPError from err
+
+ if not req.text:
+ raise PyEzvizError("No data")
+
+ try:
+ json_output = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ if json_output.get("meta").get("code") != 200:
+ # session is wrong, need to relogin
+ self.login()
+ logging.info(
+ "Json request error, relogging (max retries: %s)", str(max_retries)
+ )
+ return self._api_get_pagelist(page_filter, json_key, max_retries + 1)
+
+ if json_key is None:
+ json_result = json_output
+ else:
+ json_result = json_output[json_key]
+
+ if not json_result:
+ # session is wrong, need to relogin
+ self.login()
+ logging.info(
+ "Impossible to load the devices, here is the returned response: %s",
+ str(req.text),
+ )
+ return self._api_get_pagelist(page_filter, json_key, max_retries + 1)
+
+ return json_result
+
+ def get_alarminfo(self, serial: str, max_retries: int = 0) -> Any:
+ """Get data from alarm info API."""
+ if max_retries > MAX_RETRIES:
+ raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
+
+ params: dict[str, int | str] = {
+ "deviceSerials": serial,
+ "queryType": -1,
+ "limit": 1,
+ "stype": -1,
+ }
+
+ try:
+ req = self._session.get(
+ "https://" + self._token["api_url"] + API_ENDPOINT_ALARMINFO_GET,
+ headers={"sessionId": self._token["session_id"]},
+ params=params,
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ if err.response.status_code == 401:
+ # session is wrong, need to relogin
+ self.login()
+ return self.get_alarminfo(serial, max_retries + 1)
+
+ raise HTTPError from err
+
+ if req.text == "":
+ raise PyEzvizError("No data")
+
+ try:
+ json_output = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ return json_output
+
+ def _switch_status(
+ self, serial: str, status_type: int, enable: int, max_retries: int = 0
+ ) -> bool:
+ """Switch status on a device."""
+ if max_retries > MAX_RETRIES:
+ raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
+
+ try:
+ req = self._session.put(
+ "https://"
+ + self._token["api_url"]
+ + API_ENDPOINT_DEVICES
+ + serial
+ + "/1/1/"
+ + str(status_type)
+ + API_ENDPOINT_SWITCH_STATUS,
+ headers={"sessionId": self._token["session_id"]},
+ data={
+ "enable": enable,
+ "serial": serial,
+ "channelNo": "1",
+ "type": status_type,
+ },
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ if err.response.status_code == 401:
+ # session is wrong, need to relogin
+ self.login()
+ return self._switch_status(serial, status_type, enable, max_retries + 1)
+
+ raise HTTPError from err
+
+ try:
+ json_output = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ if json_output.get("meta").get("code") != 200:
+ raise PyEzvizError(
+ f"Could not set the switch: Got {req.status_code} : {req.text})"
+ )
+
+ return True
+
+ def sound_alarm(self, serial: str, enable: int = 1, max_retries: int = 0) -> bool:
+ """Sound alarm on a device."""
+ if max_retries > MAX_RETRIES:
+ raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
+
+ try:
+ req = self._session.put(
+ "https://"
+ + self._token["api_url"]
+ + API_ENDPOINT_DEVICES
+ + serial
+ + "/0"
+ + API_ENDPOINT_SWITCH_SOUND_ALARM,
+ headers={"sessionId": self._token["session_id"]},
+ data={
+ "enable": enable,
+ },
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ if err.response.status_code == 401:
+ # session is wrong, need to relogin
+ self.login()
+ return self.sound_alarm(serial, enable, max_retries + 1)
+
+ raise HTTPError from err
+
+ try:
+ json_output = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ if json_output.get("meta").get("code") != 200:
+ raise PyEzvizError(
+ f"Could not set the alarm sound: Got {req.status_code} : {req.text})"
+ )
+
+ return True
+
+ def load_cameras(self) -> dict[Any, Any]:
+ """Load and return all cameras objects."""
+
+ devices = self._get_all_device_infos()
+ cameras = {}
+ supported_categories = [
+ DeviceCatagories.COMMON_DEVICE_CATEGORY.value,
+ DeviceCatagories.CAMERA_DEVICE_CATEGORY.value,
+ DeviceCatagories.BATTERY_CAMERA_DEVICE_CATEGORY.value,
+ DeviceCatagories.DOORBELL_DEVICE_CATEGORY.value,
+ DeviceCatagories.BASE_STATION_DEVICE_CATEGORY.value,
+ ]
+
+ for device, data in devices.items():
+ if data["deviceInfos"]["deviceCategory"] in supported_categories:
+ # Add support for connected HikVision cameras
+ if (
+ data["deviceInfos"]["deviceCategory"]
+ == DeviceCatagories.COMMON_DEVICE_CATEGORY.value
+ and not data["deviceInfos"]["hik"]
+ ):
+ continue
+
+ # Create camera object
+
+ camera = EzvizCamera(self, device, data)
+
+ camera.load()
+ cameras[device] = camera.status()
+
+ return cameras
+
+ def _get_all_device_infos(self) -> dict[Any, Any]:
+ """Load all devices and build dict per device serial."""
+
+ devices = self._get_page_list()
+ result: dict[Any, Any] = {}
+
+ for device in devices["deviceInfos"]:
+ result[device["deviceSerial"]] = {}
+ result[device["deviceSerial"]]["deviceInfos"] = device
+ result[device["deviceSerial"]]["connectionInfos"] = devices.get(
+ "connectionInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["p2pInfos"] = devices.get("p2pInfos").get(
+ device["deviceSerial"]
+ )
+ result[device["deviceSerial"]]["alarmNodisturbInfos"] = devices.get(
+ "alarmNodisturbInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["kmsInfos"] = devices.get("kmsInfos").get(
+ device["deviceSerial"]
+ )
+ result[device["deviceSerial"]]["timePlanInfos"] = devices.get(
+ "timePlanInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["statusInfos"] = devices.get(
+ "statusInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["wifiInfos"] = devices.get("wifiInfos").get(
+ device["deviceSerial"]
+ )
+ result[device["deviceSerial"]]["switchStatusInfos"] = devices.get(
+ "switchStatusInfos"
+ ).get(device["deviceSerial"])
+ for item in devices["cameraInfos"]:
+ if item["deviceSerial"] == device["deviceSerial"]:
+ result[device["deviceSerial"]]["cameraInfos"] = item
+
+ return result
+
+ def get_all_per_serial_infos(self, serial: str) -> dict[Any, Any] | None:
+ """Load all devices and build dict per device serial."""
+
+ if serial is None:
+ raise PyEzvizError("Need serial number for this query")
+
+ devices = self._get_page_list()
+ result: dict[str, dict] = {serial: {}}
+
+ for device in devices["deviceInfos"]:
+ if device["deviceSerial"] == serial:
+ result[device["deviceSerial"]]["deviceInfos"] = device
+ result[device["deviceSerial"]]["deviceInfos"] = device
+ result[device["deviceSerial"]]["connectionInfos"] = devices.get(
+ "connectionInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["p2pInfos"] = devices.get(
+ "p2pInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["alarmNodisturbInfos"] = devices.get(
+ "alarmNodisturbInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["kmsInfos"] = devices.get(
+ "kmsInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["timePlanInfos"] = devices.get(
+ "timePlanInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["statusInfos"] = devices.get(
+ "statusInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["wifiInfos"] = devices.get(
+ "wifiInfos"
+ ).get(device["deviceSerial"])
+ result[device["deviceSerial"]]["switchStatusInfos"] = devices.get(
+ "switchStatusInfos"
+ ).get(device["deviceSerial"])
+ for item in devices["cameraInfos"]:
+ if item["deviceSerial"] == device["deviceSerial"]:
+ result[device["deviceSerial"]]["cameraInfos"] = item
+
+ return result.get(serial)
+
+ def ptz_control(
+ self, command: str, serial: str, action: str, speed: int = 5
+ ) -> Any:
+ """PTZ Control by API."""
+ if command is None:
+ raise PyEzvizError("Trying to call ptzControl without command")
+ if action is None:
+ raise PyEzvizError("Trying to call ptzControl without action")
+
+ try:
+ req = self._session.put(
+ "https://"
+ + self._token["api_url"]
+ + API_ENDPOINT_DEVICES
+ + serial
+ + API_ENDPOINT_PTZCONTROL,
+ data={
+ "command": command,
+ "action": action,
+ "channelNo": "1",
+ "speed": speed,
+ "uuid": str(uuid4()),
+ "serial": serial,
+ },
+ headers={"sessionId": self._token["session_id"], "clientType": "1"},
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ raise HTTPError from err
+
+ return req.text
+
+ def login(self) -> dict[Any, Any]:
+ """Get or refresh ezviz login token."""
+ if self._token["session_id"] and self._token["rf_session_id"]:
+ try:
+ req = self._session.put(
+ "https://"
+ + self._token["api_url"]
+ + API_ENDPOINT_REFRESH_SESSION_ID,
+ data={
+ "refreshSessionId": self._token["rf_session_id"],
+ "featureCode": FEATURE_CODE,
+ },
+ headers={"sessionId": self._token["session_id"]},
+ timeout=self._timeout,
+ )
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ raise HTTPError from err
+
+ try:
+ json_result = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ self._token["session_id"] = str(json_result["sessionInfo"]["sessionId"])
+ self._token["rf_session_id"] = str(
+ json_result["sessionInfo"]["refreshSessionId"]
+ )
+ if not self._token["session_id"]:
+ raise PyEzvizError(f"Relogin required: {req.text}")
+
+ if not self._token.get("service_urls"):
+ self._token["service_urls"] = self.get_service_urls()
+
+ return self._token
+
+ if self.account and self.password:
+ return self._login(account=self.account, password=self.password)
+
+ raise PyEzvizError("Login with account and password required")
+
+ def set_camera_defence(self, serial: str, enable: int) -> bool:
+ """Enable/Disable motion detection on camera."""
+ cas_client = EzvizCAS(self._token)
+ cas_client.set_camera_defence_state(serial, enable)
+
+ return True
+
+ def api_set_defence_schedule(
+ self, serial: str, schedule: str, enable: int, max_retries: int = 0
+ ) -> bool:
+ """Set defence schedules."""
+ if max_retries > MAX_RETRIES:
+ raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
+
+ schedulestring = (
+ '{"CN":0,"EL":'
+ + str(enable)
+ + ',"SS":"'
+ + serial
+ + '","WP":['
+ + schedule
+ + "]}]}"
+ )
+ try:
+ req = self._session.post(
+ "https://" + self._token["api_url"] + API_ENDPOINT_SET_DEFENCE_SCHEDULE,
+ headers={"sessionId": self._token["session_id"]},
+ data={
+ "devTimingPlan": schedulestring,
+ },
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ if err.response.status_code == 401:
+ # session is wrong, need to relogin
+ self.login()
+ return self.api_set_defence_schedule(
+ serial, schedule, enable, max_retries + 1
+ )
+
+ raise HTTPError from err
+
+ try:
+ json_output = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ if json_output.get("resultCode") != 0:
+ raise PyEzvizError(
+ f"Could not set the schedule: Got {req.status_code} : {req.text})"
+ )
+
+ return True
+
+ def api_set_defence_mode(self, mode: DefenseModeType, max_retries: int = 0) -> bool:
+ """Set defence mode."""
+ if max_retries > MAX_RETRIES:
+ raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
+
+ try:
+ req = self._session.post(
+ "https://" + self._token["api_url"] + API_ENDPOINT_SWITCH_DEFENCE_MODE,
+ headers={"sessionId": self._token["session_id"]},
+ data={
+ "groupId": -1,
+ "mode": mode,
+ },
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ if err.response.status_code == 401:
+ # session is wrong, need to relogin
+ self.login()
+ return self.api_set_defence_mode(mode, max_retries + 1)
+
+ raise HTTPError from err
+
+ try:
+ json_output = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ if json_output.get("meta").get("code") != 200:
+ raise PyEzvizError(
+ f"Could not set defence mode: Got {req.status_code} : {req.text})"
+ )
+
+ return True
+
+ def detection_sensibility(
+ self,
+ serial: str,
+ sensibility: int = 3,
+ type_value: int = 3,
+ max_retries: int = 0,
+ ) -> bool | str:
+ """Set detection sensibility."""
+ if max_retries > MAX_RETRIES:
+ raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
+
+ if sensibility not in [0, 1, 2, 3, 4, 5, 6] and type_value == 0:
+ raise PyEzvizError(
+ "Unproper sensibility for type 0 (should be within 1 to 6)."
+ )
+
+ try:
+ req = self._session.post(
+ "https://"
+ + self._token["api_url"]
+ + API_ENDPOINT_DETECTION_SENSIBILITY,
+ headers={"sessionId": self._token["session_id"]},
+ data={
+ "subSerial": serial,
+ "type": type_value,
+ "channelNo": "1",
+ "value": sensibility,
+ },
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ if err.response.status_code == 401:
+ # session is wrong, need to re-log-in
+ self.login()
+ return self.detection_sensibility(
+ serial, sensibility, type_value, max_retries + 1
+ )
+
+ raise HTTPError from err
+
+ try:
+ response_json = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError("Could not decode response:" + str(err)) from err
+
+ if response_json["resultCode"] and response_json["resultCode"] != "0":
+ return "Unknown value"
+
+ return True
+
+ def get_detection_sensibility(
+ self, serial: str, type_value: str = "0", max_retries: int = 0
+ ) -> Any:
+ """Get detection sensibility notifications."""
+ if max_retries > MAX_RETRIES:
+ raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
+
+ try:
+ req = self._session.post(
+ "https://"
+ + self._token["api_url"]
+ + API_ENDPOINT_DETECTION_SENSIBILITY_GET,
+ headers={"sessionId": self._token["session_id"]},
+ data={
+ "subSerial": serial,
+ },
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ if err.response.status_code == 401:
+ # session is wrong, need to re-log-in.
+ self.login()
+ return self.get_detection_sensibility(
+ serial, type_value, max_retries + 1
+ )
+
+ raise HTTPError from err
+
+ try:
+ response_json = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError("Could not decode response:" + str(err)) from err
+
+ if response_json["resultCode"] != "0":
+ return "Unknown"
+
+ if response_json["algorithmConfig"]["algorithmList"]:
+ for idx in response_json["algorithmConfig"]["algorithmList"]:
+ if idx["type"] == type_value:
+ return idx["value"]
+
+ return "Unknown"
+
+ # soundtype: 0 = normal, 1 = intensive, 2 = disabled ... don't ask me why...
+ def alarm_sound(
+ self, serial: str, sound_type: int, enable: int = 1, max_retries: int = 0
+ ) -> bool:
+ """Enable alarm sound by API."""
+ if max_retries > MAX_RETRIES:
+ raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
+
+ if sound_type not in [0, 1, 2]:
+ raise PyEzvizError(
+ "Invalid sound_type, should be 0,1,2: " + str(sound_type)
+ )
+
+ try:
+ req = self._session.put(
+ "https://"
+ + self._token["api_url"]
+ + API_ENDPOINT_DEVICES
+ + serial
+ + API_ENDPOINT_ALARM_SOUND,
+ headers={"sessionId": self._token["session_id"]},
+ data={
+ "enable": enable,
+ "soundType": sound_type,
+ "voiceId": "0",
+ "deviceSerial": serial,
+ },
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.HTTPError as err:
+ if err.response.status_code == 401:
+ # session is wrong, need to re-log-in
+ self.login()
+ return self.alarm_sound(serial, sound_type, enable, max_retries + 1)
+
+ raise HTTPError from err
+
+ return True
+
+ def switch_status(self, serial: str, status_type: int, enable: int = 0) -> bool:
+ """Switch status of a device."""
+ return self._switch_status(serial, status_type, enable)
+
+ def _get_page_list(self) -> Any:
+ """Get ezviz device info broken down in sections."""
+ return self._api_get_pagelist(
+ page_filter="CLOUD, TIME_PLAN, CONNECTION, SWITCH,"
+ "STATUS, WIFI, NODISTURB, KMS, P2P,"
+ "TIME_PLAN, CHANNEL, VTM, DETECTOR,"
+ "FEATURE, UPGRADE, VIDEO_QUALITY, QOS",
+ json_key=None,
+ )
+
+ def get_device(self) -> Any:
+ """Get ezviz devices filter."""
+ return self._api_get_pagelist(page_filter="CLOUD", json_key="deviceInfos")
+
+ def get_connection(self) -> Any:
+ """Get ezviz connection infos filter."""
+ return self._api_get_pagelist(
+ page_filter="CONNECTION", json_key="connectionInfos"
+ )
+
+ def _get_status(self) -> Any:
+ """Get ezviz status infos filter."""
+ return self._api_get_pagelist(page_filter="STATUS", json_key="statusInfos")
+
+ def get_switch(self) -> Any:
+ """Get ezviz switch infos filter."""
+ return self._api_get_pagelist(
+ page_filter="SWITCH", json_key="switchStatusInfos"
+ )
+
+ def _get_wifi(self) -> Any:
+ """Get ezviz wifi infos filter."""
+ return self._api_get_pagelist(page_filter="WIFI", json_key="wifiInfos")
+
+ def _get_nodisturb(self) -> Any:
+ """Get ezviz nodisturb infos filter."""
+ return self._api_get_pagelist(
+ page_filter="NODISTURB", json_key="alarmNodisturbInfos"
+ )
+
+ def _get_p2p(self) -> Any:
+ """Get ezviz P2P infos filter."""
+ return self._api_get_pagelist(page_filter="P2P", json_key="p2pInfos")
+
+ def _get_kms(self) -> Any:
+ """Get ezviz KMS infos filter."""
+ return self._api_get_pagelist(page_filter="KMS", json_key="kmsInfos")
+
+ def _get_time_plan(self) -> Any:
+ """Get ezviz TIME_PLAN infos filter."""
+ return self._api_get_pagelist(page_filter="TIME_PLAN", json_key="timePlanInfos")
+
+ def close_session(self) -> None:
+ """Clear current session."""
+ if self._session:
+ self._session.close()
+
+ self._session = requests.session()
+ self._session.headers.update(
+ {"User-Agent": "okhttp/3.12.1"}
+ ) # Android generic user agent.
diff --git a/pyezviz/constants.py b/pyezviz/constants.py
new file mode 100644
index 0000000..398195c
--- /dev/null
+++ b/pyezviz/constants.py
@@ -0,0 +1,109 @@
+"""Device switch types relationship."""
+from enum import Enum, unique
+
+FEATURE_CODE = "c22cb01f8cb83351422d82fad59c8e4e"
+XOR_KEY = b"\x0c\x0eJ^X\x15@Rr"
+DEFAULT_TIMEOUT = 25
+MAX_RETRIES = 3
+
+@unique
+class DeviceSwitchType(Enum):
+ """Device switch name and number."""
+
+ ALARM_TONE = 1
+ STREAM_ADAPTIVE = 2
+ LIGHT = 3
+ INTELLIGENT_ANALYSIS = 4
+ LOG_UPLOAD = 5
+ DEFENCE_PLAN = 6
+ PRIVACY = 7
+ SOUND_LOCALIZATION = 8
+ CRUISE = 9
+ INFRARED_LIGHT = 10
+ WIFI = 11
+ WIFI_MARKETING = 12
+ WIFI_LIGHT = 13
+ PLUG = 14
+ SLEEP = 21
+ SOUND = 22
+ BABY_CARE = 23
+ LOGO = 24
+ MOBILE_TRACKING = 25
+ CHANNELOFFLINE = 26
+ ALL_DAY_VIDEO = 29
+ AUTO_SLEEP = 32
+ ROAMING_STATUS = 34
+ DEVICE_4G = 35
+ ALARM_REMIND_MODE = 37
+ OUTDOOR_RINGING_SOUND = 39
+ INTELLIGENT_PQ_SWITCH = 40
+ DOORBELL_TALK = 101
+ HUMAN_INTELLIGENT_DETECTION = 200
+ LIGHT_FLICKER = 301
+ ALARM_LIGHT = 303
+ ALARM_LIGHT_RELEVANCE = 305
+ DEVICE_HUMAN_RELATE_LIGHT = 41
+ TAMPER_ALARM = 306
+ DETECTION_TYPE = 451
+ OUTLET_RECOVER = 600
+ CHIME_INDICATOR_LIGHT = 611
+ TRACKING = 650
+ CRUISE_TRACKING = 651
+ PARTIAL_IMAGE_OPTIMIZE = 700
+ FEATURE_TRACKING = 701
+
+
+class SoundMode(Enum):
+ """Alarm sound level description."""
+
+ SILENT = 2
+ SOFT = 0
+ INTENSE = 1
+ CUSTOM = 3
+ PLAN = 4
+
+
+class DefenseModeType(Enum):
+ """Defense mode name and number."""
+
+ HOME_MODE = 1
+ AWAY_MODE = 2
+ SLEEP_MODE = 3
+ UNSET_MODE = 0
+
+
+class DeviceCatagories(Enum):
+ """Supported device categories."""
+
+ COMMON_DEVICE_CATEGORY = "COMMON"
+ CAMERA_DEVICE_CATEGORY = "IPC"
+ BATTERY_CAMERA_DEVICE_CATEGORY = "BatteryCamera"
+ DOORBELL_DEVICE_CATEGORY = "BDoorBell"
+ BASE_STATION_DEVICE_CATEGORY = "XVR"
+
+
+class SensorType(Enum):
+ """Sensors and their types to expose in HA."""
+
+ # pylint: disable=invalid-name
+ sw_version = "None"
+ alarm_sound_mod = "None"
+ battery_level = "battery"
+ detection_sensibility = "None"
+ last_alarm_time = "None"
+ Seconds_Last_Trigger = "None"
+ last_alarm_pic = "None"
+ supported_channels = "None"
+ local_ip = "None"
+ wan_ip = "None"
+ PIR_Status = "motion"
+
+
+class BinarySensorType(Enum):
+ """Binary_sensors and their types to expose in HA."""
+
+ # pylint: disable=invalid-name
+ Motion_Trigger = "motion"
+ alarm_schedules_enabled = "None"
+ encrypted = "None"
+ upgrade_available = "None"
diff --git a/pyezviz/exceptions.py b/pyezviz/exceptions.py
new file mode 100644
index 0000000..465b73d
--- /dev/null
+++ b/pyezviz/exceptions.py
@@ -0,0 +1,20 @@
+"""PyEzviz Exceptions."""
+
+class PyEzvizError(Exception):
+ """Ezviz api exception."""
+
+
+class InvalidURL(PyEzvizError):
+ """Invalid url exception."""
+
+
+class HTTPError(PyEzvizError):
+ """Invalid host exception."""
+
+
+class InvalidHost(PyEzvizError):
+ """Invalid host exception."""
+
+
+class AuthTestResultFailed(PyEzvizError):
+ """Authentication failed"""
diff --git a/pyezviz/mqtt.py b/pyezviz/mqtt.py
new file mode 100644
index 0000000..59039ba
--- /dev/null
+++ b/pyezviz/mqtt.py
@@ -0,0 +1,227 @@
+"""Ezviz cloud MQTT client for push messages."""
+
+import base64
+import json
+import threading
+import time
+
+import paho.mqtt.client as mqtt
+import requests
+from pyezviz.constants import DEFAULT_TIMEOUT, FEATURE_CODE
+from pyezviz.exceptions import HTTPError, InvalidURL, PyEzvizError
+
+API_ENDPOINT_SERVER_INFO = "/v3/configurations/system/info"
+API_ENDPOINT_REGISTER_MQTT = "/v1/getClientId"
+API_ENDPOINT_START_MQTT = "/api/push/start"
+API_ENDPOINT_STOP_MQTT = "/api/push/stop"
+
+
+MQTT_APP_KEY = "4c6b3cc2-b5eb-4813-a592-612c1374c1fe"
+APP_SECRET = "17454517-cc1c-42b3-a845-99b4a15dd3e6"
+
+
+def on_subscribe(client, userdata, mid, granted_qos):
+ """On MQTT message subscribe."""
+ # pylint: disable=unused-argument
+ print("Subscribed: " + str(mid) + " " + str(granted_qos))
+
+
+def on_connect(client, userdata, flags, return_code):
+ """On MQTT connect."""
+ # pylint: disable=unused-argument
+ if return_code == 0:
+ print("connected OK Returned code=", return_code)
+ else:
+ print("Bad connection Returned code=", return_code)
+ client.reconnect()
+
+
+def on_message(client, userdata, msg):
+ """On MQTT message receive."""
+ # pylint: disable=unused-argument
+ mqtt_message = json.loads(msg.payload)
+ mqtt_message["ext"] = mqtt_message["ext"].split(",")
+
+ # Print payload message
+ decoded_message = {mqtt_message['ext'][2]:{'id':mqtt_message['id'], 'alert':mqtt_message['alert'], 'time':mqtt_message['ext'][1], 'alert type':mqtt_message['ext'][4], 'image':mqtt_message['ext'][16]}}
+ print(decoded_message)
+
+
+class MQTTClient(threading.Thread):
+ """Open MQTT connection to ezviz cloud."""
+
+ def __init__(
+ self,
+ token,
+ timeout=DEFAULT_TIMEOUT,
+ ):
+ """Initialize the client object."""
+ threading.Thread.__init__(self)
+ self._session = None
+ self._token = token or {
+ "session_id": None,
+ "rf_session_id": None,
+ "username": None,
+ "api_url": "apiieu.ezvizlife.com",
+ }
+ self._timeout = timeout
+ self._stop_event = threading.Event()
+ self._mqtt_data = {
+ "mqtt_clientid": None,
+ "ticket": None,
+ "push_url": token["service_urls"]["pushAddr"],
+ }
+
+ def _mqtt(self):
+ """Receive MQTT messages from ezviz server"""
+
+ ezviz_mqtt_client = mqtt.Client(
+ client_id=self._mqtt_data["mqtt_clientid"], protocol=4, transport="tcp"
+ )
+ ezviz_mqtt_client.on_connect = on_connect
+ ezviz_mqtt_client.on_subscribe = on_subscribe
+ ezviz_mqtt_client.on_message = on_message
+ ezviz_mqtt_client.username_pw_set(MQTT_APP_KEY, APP_SECRET)
+
+ ezviz_mqtt_client.connect(self._mqtt_data["push_url"], 1882, 60)
+ ezviz_mqtt_client.subscribe(
+ f"{MQTT_APP_KEY}/ticket/{self._mqtt_data['ticket']}", qos=2
+ )
+
+ ezviz_mqtt_client.loop_start()
+ return ezviz_mqtt_client
+
+ def _register_ezviz_push(self):
+ """Register for push messages."""
+
+ auth_seq = base64.b64encode(f"{MQTT_APP_KEY}:{APP_SECRET}".encode("ascii"))
+ auth_seq = "Basic " + auth_seq.decode()
+
+ payload = {
+ "appKey": MQTT_APP_KEY,
+ "clientType": "5",
+ "mac": FEATURE_CODE,
+ "token": "123456",
+ "version": "v1.3.0",
+ }
+
+ try:
+ req = self._session.post(
+ f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_REGISTER_MQTT}",
+ allow_redirects=False,
+ headers={"Authorization": auth_seq},
+ data=payload,
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.ConnectionError as err:
+ raise InvalidURL("A Invalid URL or Proxy error occured") from err
+
+ except requests.HTTPError as err:
+ raise HTTPError from err
+
+ try:
+ json_result = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ self._mqtt_data["mqtt_clientid"] = json_result["data"]["clientId"]
+
+ def run(self):
+ """Method representing the thread's activity which should not be used directly."""
+
+ if self._session is None:
+ self._session = requests.session()
+ self._session.headers.update(
+ {"User-Agent": "okhttp/3.12.1"}
+ ) # Android generic user agent.
+
+ self._register_ezviz_push()
+ self._start_ezviz_push()
+ self._mqtt()
+
+ while not self._stop_event.is_set():
+ time.sleep(1)
+
+ def start(self):
+ """Start mqtt.
+ Start mqtt thread
+ """
+ super().start()
+
+ def stop(self):
+ """Stop push notifications."""
+
+ payload = {
+ "appKey": MQTT_APP_KEY,
+ "clientId": self._mqtt_data["mqtt_clientid"],
+ "clientType": 5,
+ "sessionId": self._token["session_id"],
+ "username": self._token["username"],
+ }
+
+ try:
+ req = self._session.post(
+ f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_STOP_MQTT}",
+ data=payload,
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.ConnectionError as err:
+ raise InvalidURL("A Invalid URL or Proxy error occured") from err
+
+ except requests.HTTPError as err:
+ raise HTTPError from err
+
+ self._stop_event.set()
+
+ def _start_ezviz_push(self):
+ """Send start for push messages to ezviz api."""
+
+ payload = {
+ "appKey": MQTT_APP_KEY,
+ "clientId": self._mqtt_data["mqtt_clientid"],
+ "clientType": 5,
+ "sessionId": self._token["session_id"],
+ "username": self._token["username"],
+ "token": "123456",
+ }
+
+ try:
+ req = self._session.post(
+ f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_START_MQTT}",
+ allow_redirects=False,
+ data=payload,
+ timeout=self._timeout,
+ )
+
+ req.raise_for_status()
+
+ except requests.ConnectionError as err:
+ raise InvalidURL("A Invalid URL or Proxy error occured") from err
+
+ except requests.HTTPError as err:
+ raise HTTPError from err
+
+ try:
+ json_result = req.json()
+
+ except ValueError as err:
+ raise PyEzvizError(
+ "Impossible to decode response: "
+ + str(err)
+ + "\nResponse was: "
+ + str(req.text)
+ ) from err
+
+ self._mqtt_data["ticket"] = json_result["ticket"]
diff --git a/pyezviz/test_cam_rtsp.py b/pyezviz/test_cam_rtsp.py
new file mode 100644
index 0000000..916048a
--- /dev/null
+++ b/pyezviz/test_cam_rtsp.py
@@ -0,0 +1,147 @@
+"""Test camera RTSP authentication"""
+import base64
+import hashlib
+import socket
+from pyezviz.exceptions import AuthTestResultFailed, InvalidHost
+
+
+def genmsg_describe(url, seq, user_agent, auth_seq):
+ """Generate RTSP describe message"""
+ msg_ret = "DESCRIBE " + url + " RTSP/1.0\r\n"
+ msg_ret += "CSeq: " + str(seq) + "\r\n"
+ msg_ret += "Authorization: " + auth_seq + "\r\n"
+ msg_ret += "User-Agent: " + user_agent + "\r\n"
+ msg_ret += "Accept: application/sdp\r\n"
+ msg_ret += "\r\n"
+ return msg_ret
+
+
+class TestRTSPAuth:
+ """Initialize RTSP credential test"""
+
+ def __init__(
+ self,
+ ip_addr,
+ username=None,
+ password=None,
+ test_uri="",
+ ):
+ self._rtsp_details = {
+ "bufLen": 1024,
+ "defaultServerIp": ip_addr,
+ "defaultServerPort": 554,
+ "defaultTestUri": test_uri,
+ "defaultUserAgent": "RTSP Client",
+ "defaultUsername": username,
+ "defaultPassword": password,
+ }
+
+ def generate_auth_string(self, realm, method, uri, nonce):
+ """Generate digest auth string """
+ map_return_info = {}
+ m_1 = hashlib.md5(
+ f"{self._rtsp_details['defaultUsername']}:"
+ f"{realm.decode()}:"
+ f"{self._rtsp_details['defaultPassword']}".encode()
+ ).hexdigest()
+ m_2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest()
+ response = hashlib.md5(f"{m_1}:{nonce}:{m_2}".encode()).hexdigest()
+
+ map_return_info = (
+ f"Digest "
+ f"username=\"{self._rtsp_details['defaultUsername']}\", "
+ f'realm="{realm.decode()}", '
+ f'algorithm="MD5", '
+ f'nonce="{nonce.decode()}", '
+ f'uri="{uri}", '
+ f'response="{response}"'
+ )
+ return map_return_info
+
+ def main(self):
+ """Main function """
+ session = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ try:
+ session.connect(
+ (
+ self._rtsp_details["defaultServerIp"],
+ self._rtsp_details["defaultServerPort"],
+ )
+ )
+
+ except TimeoutError as err:
+ raise AuthTestResultFailed("Invalid ip or camera hibernating") from err
+
+ except (socket.gaierror, ConnectionRefusedError) as err:
+ raise InvalidHost("Invalid IP or Hostname") from err
+
+ seq = 1
+
+ url = (
+ "rtsp://"
+ + self._rtsp_details["defaultServerIp"]
+ + self._rtsp_details["defaultTestUri"]
+ )
+
+ auth_seq = base64.b64encode(
+ f"{self._rtsp_details['defaultUsername']}:"
+ f"{self._rtsp_details['defaultPassword']}".encode("ascii")
+ )
+ auth_seq = "Basic " + auth_seq.decode()
+
+ print(
+ genmsg_describe(url, seq, self._rtsp_details["defaultUserAgent"], auth_seq)
+ )
+ session.send(
+ genmsg_describe(
+ url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
+ ).encode()
+ )
+ msg1 = session.recv(self._rtsp_details["bufLen"])
+ seq = seq + 1
+
+ if msg1.decode().find("200 OK") > 1:
+ print(f"Basic auth result: {msg1.decode()}")
+ return print("Basic Auth test passed. Credentials Valid!")
+
+ if msg1.decode().find("Unauthorized") > 1:
+ # Basic failed, doing new DESCRIBE with digest authentication.
+ start = msg1.decode().find("realm")
+ begin = msg1.decode().find('"', start)
+ end = msg1.decode().find('"', begin + 1)
+ realm = msg1[begin + 1 : end]
+
+ start = msg1.decode().find("nonce")
+ begin = msg1.decode().find('"', start)
+ end = msg1.decode().find('"', begin + 1)
+ nonce = msg1[begin + 1 : end]
+
+ auth_seq = self.generate_auth_string(
+ realm,
+ "DESCRIBE",
+ self._rtsp_details["defaultTestUri"],
+ nonce,
+ )
+
+ print(
+ genmsg_describe(
+ url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
+ )
+ )
+
+ session.send(
+ genmsg_describe(
+ url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
+ ).encode()
+ )
+ msg1 = session.recv(self._rtsp_details["bufLen"])
+ print(f"Digest auth result: {msg1.decode()}")
+
+ if msg1.decode().find("200 OK") > 1:
+ return print("Digest Auth test Passed. Credentials Valid!")
+
+ if msg1.decode().find("401 Unauthorized") > 1:
+ raise AuthTestResultFailed("Credentials not valid!!")
+
+ return print("Basic Auth test passed. Credentials Valid!")