diff --git a/main.py b/main.py
index 6299c0a..dd9db3f 100644
--- a/main.py
+++ b/main.py
@@ -8,7 +8,6 @@ import time
import json
import pygame
-from pyezviz import EzvizClient, MQTTClient
import secrets
@@ -34,16 +33,6 @@ def backdoor():
-def on_message(client, userdata, mqtt_message):
- message = json.loads(mqtt_message.payload)
- #print(json.dumps(mqtt_message, indent=4))
- if message['alert'] == 'somebody there ring the door': # lmao
- logging.info('Received door bell press alert')
- if 'E80451501' in message['ext']:
- logging.info('Backdoor pressed')
- backdoor()
if __name__ == '__main__':
@@ -52,14 +41,5 @@ if __name__ == '__main__':
- client = EzvizClient(secrets.EZVIZ_EMAIL, secrets.EZVIZ_PASS, 'apiius.ezvizlife.com')
- try:
- logging.info('Logging into EZVIZ client...')
- token = client.login()
- mqtt = MQTTClient(token, on_message)
- logging.info('Starting MQTT...')
- mqtt.start()
- except Exception as exp:
- logging.exception(str(exp))
- finally:
- client.close_session()
+ backdoor()
diff --git a/pyezviz/__init__.py b/pyezviz/__init__.py
deleted file mode 100644
index d236101..0000000
--- a/pyezviz/__init__.py
+++ /dev/null
@@ -1,27 +0,0 @@
-"""init pyezviz."""
-from pyezviz.camera import EzvizCamera
-from pyezviz.cas import EzvizCAS
-from pyezviz.client import EzvizClient
-from pyezviz.constants import (DefenseModeType, DeviceCatagories,
- DeviceSwitchType, SoundMode)
-from pyezviz.exceptions import (AuthTestResultFailed, HTTPError, InvalidHost,
- InvalidURL, PyEzvizError)
-from pyezviz.mqtt import MQTTClient
-from pyezviz.test_cam_rtsp import TestRTSPAuth
-__all__ = [
- "EzvizCamera",
- "EzvizClient",
- "PyEzvizError",
- "InvalidURL",
- "HTTPError",
- "InvalidHost",
- "AuthTestResultFailed",
- "EzvizCAS",
- "MQTTClient",
- "DefenseModeType",
- "DeviceCatagories",
- "DeviceSwitchType",
- "SoundMode",
- "TestRTSPAuth",
diff --git a/pyezviz/__main__.py b/pyezviz/__main__.py
deleted file mode 100644
index c2221e1..0000000
--- a/pyezviz/__main__.py
+++ /dev/null
@@ -1,309 +0,0 @@
-"""pyezviz command line."""
-import argparse
-import http.client
-import json
-import logging
-import sys
-import pandas
-from pyezviz import EzvizCamera, EzvizClient, MQTTClient
-from pyezviz.constants import DefenseModeType
-def main():
- """Initiate arg parser."""
- parser = argparse.ArgumentParser(prog="pyezviz")
- parser.add_argument("-u", "--username", required=True, help="Ezviz username")
- parser.add_argument("-p", "--password", required=True, help="Ezviz Password")
- parser.add_argument(
- "-r",
- "--region",
- required=False,
- default="apiieu.ezvizlife.com",
- help="Ezviz API region",
- )
- parser.add_argument(
- "--debug", "-d", action="store_true", help="Print debug messages to stderr"
- )
- subparsers = parser.add_subparsers(dest="action")
- parser_device = subparsers.add_parser(
- "devices", help="Play with all devices at once"
- )
- parser_device.add_argument(
- "device_action",
- type=str,
- default="status",
- help="Device action to perform",
- choices=["device", "status", "switch", "connection"],
- )
- parser_home_defence_mode = subparsers.add_parser(
- "home_defence_mode", help="Set home defence mode"
- )
- parser_mqtt = subparsers.add_parser("mqtt", help="Set home defence mode")
- parser_home_defence_mode.add_argument(
- "--mode", required=False, help="Choose mode", choices=["HOME_MODE", "AWAY_MODE"]
- )
- parser_camera = subparsers.add_parser("camera", help="Camera actions")
- parser_camera.add_argument("--serial", required=True, help="camera SERIAL")
- subparsers_camera = parser_camera.add_subparsers(dest="camera_action")
- parser_camera_status = subparsers_camera.add_parser(
- "status", help="Get the status of the camera"
- )
- parser_camera_move = subparsers_camera.add_parser("move", help="Move the camera")
- parser_camera_move.add_argument(
- "--direction",
- required=True,
- help="Direction to move the camera to",
- choices=["up", "down", "right", "left"],
- )
- parser_camera_move.add_argument(
- "--speed",
- required=False,
- help="Speed of the movement",
- default=5,
- type=int,
- choices=range(1, 10),
- )
- parser_camera_switch = subparsers_camera.add_parser(
- "switch", help="Change the status of a switch"
- )
- parser_camera_switch.add_argument(
- "--switch",
- required=True,
- help="Switch to switch",
- choices=["audio", "ir", "state", "privacy", "sleep", "follow_move"],
- )
- parser_camera_switch.add_argument(
- "--enable",
- required=False,
- help="Enable (or not)",
- default=1,
- type=int,
- choices=[0, 1],
- )
- parser_camera_alarm = subparsers_camera.add_parser(
- "alarm", help="Configure the camera alarm"
- )
- parser_camera_alarm.add_argument(
- "--notify", required=False, help="Enable (or not)", type=int, choices=[0, 1]
- )
- parser_camera_alarm.add_argument(
- "--sound",
- required=False,
- help="Sound level (2 is silent, 1 intensive, 0 soft)",
- type=int,
- choices=[0, 1, 2],
- )
- parser_camera_alarm.add_argument(
- "--sensibility",
- required=False,
- help="Sensibility level (Non-Cameras = from 1 to 6) or (Cameras = 1 to 100)",
- type=int,
- choices=range(0, 100),
- )
- parser_camera_alarm.add_argument(
- "--schedule", required=False, help="Schedule in json format *test*", type=str
- )
- args = parser.parse_args()
- # print("--------------args")
- # print("--------------args: %s",args)
- # print("--------------args")
- client = EzvizClient(args.username, args.password, args.region)
- if args.debug:
- http.client.HTTPConnection.debuglevel = 5
- # You must initialize logging, otherwise you'll not see debug output.
- logging.basicConfig()
- logging.getLogger().setLevel(logging.DEBUG)
- requests_log = logging.getLogger("requests.packages.urllib3")
- requests_log.setLevel(logging.DEBUG)
- requests_log.propagate = True
- if args.action == "devices":
- if args.device_action == "device":
- try:
- client.login()
- print(json.dumps(client.get_device(), indent=2))
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- finally:
- client.close_session()
- elif args.device_action == "status":
- try:
- client.login()
- print(
- pandas.DataFrame(client.load_cameras()).to_string(
- columns=[
- "serial",
- "name",
- # version,
- # upgrade_available,
- "status",
- "device_category",
- "device_sub_category",
- "sleep",
- "privacy",
- "audio",
- "ir_led",
- "state_led",
- # follow_move,
- # alarm_notify,
- # alarm_schedules_enabled,
- # alarm_sound_mod,
- # encrypted,
- "local_ip",
- "local_rtsp_port",
- "detection_sensibility",
- "battery_level",
- "alarm_schedules_enabled",
- "alarm_notify",
- "Motion_Trigger",
- # last_alarm_time,
- # last_alarm_pic
- ]
- )
- )
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- finally:
- client.close_session()
- elif args.device_action == "switch":
- try:
- client.login()
- print(json.dumps(client.get_switch(), indent=2))
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- finally:
- client.close_session()
- elif args.device_action == "connection":
- try:
- client.login()
- print(json.dumps(client.get_connection(), indent=2))
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- finally:
- client.close_session()
- else:
- print("Action not implemented: %s", args.device_action)
- elif args.action == "home_defence_mode":
- if args.mode:
- try:
- client.login()
- print(
- json.dumps(
- client.api_set_defence_mode(
- getattr(DefenseModeType, args.mode).value
- ),
- indent=2,
- )
- )
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- finally:
- client.close_session()
- elif args.action == "mqtt":
- try:
- token = client.login()
- mqtt = MQTTClient(token)
- mqtt.start()
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- finally:
- client.close_session()
- elif args.action == "camera":
- # load camera object
- try:
- client.login()
- camera = EzvizCamera(client, args.serial)
- logging.debug("Camera loaded")
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- client.close_session()
- if args.camera_action == "move":
- try:
- camera.move(args.direction, args.speed)
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- finally:
- client.close_session()
- elif args.camera_action == "status":
- try:
- print(json.dumps(camera.status(), indent=2))
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- finally:
- client.close_session()
- elif args.camera_action == "switch":
- try:
- if args.switch == "ir":
- camera.switch_device_ir_led(args.enable)
- elif args.switch == "state":
- print(args.enable)
- camera.switch_device_state_led(args.enable)
- elif args.switch == "audio":
- camera.switch_device_audio(args.enable)
- elif args.switch == "privacy":
- camera.switch_privacy_mode(args.enable)
- elif args.switch == "sleep":
- camera.switch_sleep_mode(args.enable)
- elif args.switch == "follow_move":
- camera.switch_follow_move(args.enable)
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- finally:
- client.close_session()
- elif args.camera_action == "alarm":
- try:
- if args.sound is not None:
- camera.alarm_sound(args.sound)
- if args.notify is not None:
- camera.alarm_notify(args.notify)
- if args.sensibility is not None:
- camera.alarm_detection_sensibility(args.sensibility)
- if args.schedule is not None:
- camera.change_defence_schedule(args.schedule)
- except Exception as exp: # pylint: disable=broad-except
- print(exp)
- finally:
- client.close_session()
- else:
- print("Action not implemented, try running with -h switch for help")
- else:
- print("Action not implemented: %s", args.action)
-if __name__ == "__main__":
- sys.exit(main())
diff --git a/pyezviz/camera.py b/pyezviz/camera.py
deleted file mode 100644
index 2632314..0000000
--- a/pyezviz/camera.py
+++ /dev/null
@@ -1,234 +0,0 @@
-"""pyezviz camera api."""
-from __future__ import annotations
-import datetime
-from typing import Any
-from pyezviz.constants import DeviceCatagories, DeviceSwitchType, SoundMode
-from pyezviz.exceptions import PyEzvizError
-class EzvizCamera:
- """Initialize Ezviz camera object."""
- def __init__(self, client, serial: str, device_obj: dict | None = None) -> None:
- """Initialize the camera object."""
- self._client = client
- self._serial = serial
- self._switch: dict[int, bool] = {}
- self._alarmmotiontrigger: dict[str, Any] = {}
- self._device = device_obj or {}
- self.alarmlist_time = None
- self.alarmlist_pic = None
- def load(self) -> None:
- """Update device info for camera serial."""
- if self._device is None:
- self._device = self._client.get_all_per_serial_infos(self._serial)
- self._alarm_list()
- self._switch_status()
- def _switch_status(self) -> None:
- """load device switches"""
- if self._device.get("switchStatusInfos"):
- for switch in self._device["switchStatusInfos"]:
- self._switch.update({switch["type"]: switch["enable"]})
- else:
- self._switch = {0: False}
- def _detection_sensibility(self) -> Any:
- """load detection sensibility"""
- result = "Unknown"
- if self._switch.get(DeviceSwitchType.AUTO_SLEEP.value) is not True:
- if (
- self._device["deviceInfos"]["deviceCategory"]
- == DeviceCatagories.BATTERY_CAMERA_DEVICE_CATEGORY.value
- ):
- result = self._client.get_detection_sensibility(
- self._serial,
- "3",
- )
- else:
- result = self._client.get_detection_sensibility(self._serial)
- if self._switch.get(DeviceSwitchType.AUTO_SLEEP.value) is True:
- result = "Hibernate"
- return result
- def _alarm_list(self) -> None:
- """get last alarm info for this camera's self._serial"""
- alarmlist = self._client.get_alarminfo(self._serial)
- if alarmlist.get("page").get("totalResults") > 0:
- self.alarmlist_time = alarmlist.get("alarms")[0].get("alarmStartTimeStr")
- self.alarmlist_pic = alarmlist.get("alarms")[0].get("picUrl")
- if self.alarmlist_time:
- self._motion_trigger(self.alarmlist_time)
- def _local_ip(self) -> str:
- """ "Fix empty ip value for certain cameras"""
- if self._device.get("wifiInfos"):
- return self._device["wifiInfos"].get("address")
- # Seems to return none or on some. This should run 2nd.
- if self._device.get("connectionInfos"):
- if self._device["connectionInfos"].get("localIp"):
- return self._device["connectionInfos"]["localIp"]
- return ""
- def _motion_trigger(self, alarmlist_time: str) -> None:
- """Create motion sensor based on last alarm time."""
- now = datetime.datetime.now().replace(microsecond=0)
- alarm_trigger_active = 0
- today_date = datetime.date.today()
- fix = datetime.datetime.now().replace(microsecond=0)
- # Need to handle error if time format different
- fix = datetime.datetime.strptime(
- alarmlist_time.replace("Today", str(today_date)),
- "%Y-%m-%d %H:%M:%S",
- )
- # returns a timedelta object
- timepassed = now - fix
- if timepassed < datetime.timedelta(seconds=60):
- alarm_trigger_active = 1
- self._alarmmotiontrigger = {
- "alarm_trigger_active": alarm_trigger_active,
- "timepassed": timepassed.total_seconds(),
- }
- def _is_alarm_schedules_enabled(self) -> bool | None:
- """Checks if alarm schedules enabled"""
- time_plans = None
- if self._device.get("timePlanInfos"):
- time_plans = [
- item for item in self._device["timePlanInfos"] if item.get("type") == 2
- ]
- if time_plans:
- return bool(time_plans[0].get("enable"))
- return None
- def status(self) -> dict[Any, Any]:
- """Return the status of the camera."""
- self.load()
- return {
- "serial": self._serial,
- "name": self._device["deviceInfos"].get("name"),
- "version": self._device["deviceInfos"].get("version"),
- "upgrade_available": self._device["statusInfos"].get("upgradeAvailable"),
- "status": self._device["deviceInfos"].get("status"),
- "device_category": self._device["deviceInfos"].get("deviceCategory"),
- "device_sub_category": self._device["deviceInfos"].get("deviceSubCategory"),
- "sleep": self._switch.get(DeviceSwitchType.SLEEP.value)
- or self._switch.get(DeviceSwitchType.AUTO_SLEEP.value),
- "privacy": self._switch.get(DeviceSwitchType.PRIVACY.value),
- "audio": self._switch.get(DeviceSwitchType.SOUND.value),
- "ir_led": self._switch.get(DeviceSwitchType.INFRARED_LIGHT.value),
- "state_led": self._switch.get(DeviceSwitchType.LIGHT.value),
- "follow_move": self._switch.get(DeviceSwitchType.MOBILE_TRACKING.value),
- "alarm_notify": bool(self._device["statusInfos"].get("globalStatus")),
- "alarm_schedules_enabled": self._is_alarm_schedules_enabled(),
- "alarm_sound_mod": SoundMode(
- self._device["statusInfos"].get("alarmSoundMode")
- ).name,
- "encrypted": bool(self._device["statusInfos"].get("isEncrypted")),
- "local_ip": self._local_ip(),
- "wan_ip": self._device.get("connectionInfos", {}).get("netIp", ""),
- "local_rtsp_port": self._device["connectionInfos"].get(
- "localRtspPort", "554"
- ),
- "supported_channels": self._device["deviceInfos"].get("channelNumber"),
- "detection_sensibility": self._detection_sensibility(),
- "battery_level": self._device["statusInfos"]
- .get("optionals", {})
- .get("powerRemaining"),
- "PIR_Status": self._device["statusInfos"].get("pirStatus"),
- "Motion_Trigger": self._alarmmotiontrigger.get("alarm_trigger_active"),
- "Seconds_Last_Trigger": self._alarmmotiontrigger.get("timepassed"),
- "last_alarm_time": self.alarmlist_time,
- "last_alarm_pic": self.alarmlist_pic,
- "wifiInfos": self._device.get("wifiInfos"),
- "switches": self._switch,
- }
- def move(self, direction: str, speed: int = 5) -> bool:
- """Move camera."""
- if direction not in ["right", "left", "down", "up"]:
- raise PyEzvizError(f"Invalid direction: {direction} ")
- # launch the start command
- self._client.ptz_control(str(direction).upper(), self._serial, "START", speed)
- # launch the stop command
- self._client.ptz_control(str(direction).upper(), self._serial, "STOP", speed)
- return True
- def alarm_notify(self, enable: int) -> bool:
- """Enable/Disable camera notification when movement is detected."""
- return self._client.set_camera_defence(self._serial, enable)
- def alarm_sound(self, sound_type: int) -> bool:
- """Enable/Disable camera sound when movement is detected."""
- # we force enable = 1 , to make sound...
- return self._client.alarm_sound(self._serial, sound_type, 1)
- def alarm_detection_sensibility(self, sensibility, type_value=0):
- """Enable/Disable camera sound when movement is detected."""
- # we force enable = 1 , to make sound...
- return self._client.detection_sensibility(self._serial, sensibility, type_value)
- def switch_device_audio(self, enable=0):
- """Switch audio status on a device."""
- return self._client.switch_status(
- self._serial, DeviceSwitchType.SOUND.value, enable
- )
- def switch_device_state_led(self, enable=0):
- """Switch led status on a device."""
- return self._client.switch_status(
- self._serial, DeviceSwitchType.LIGHT.value, enable
- )
- def switch_device_ir_led(self, enable=0):
- """Switch ir status on a device."""
- return self._client.switch_status(
- self._serial, DeviceSwitchType.INFRARED_LIGHT.value, enable
- )
- def switch_privacy_mode(self, enable=0):
- """Switch privacy mode on a device."""
- return self._client.switch_status(
- self._serial, DeviceSwitchType.PRIVACY.value, enable
- )
- def switch_sleep_mode(self, enable=0):
- """Switch sleep mode on a device."""
- return self._client.switch_status(
- self._serial, DeviceSwitchType.SLEEP.value, enable
- )
- def switch_follow_move(self, enable=0):
- """Switch follow move."""
- return self._client.switch_status(
- self._serial, DeviceSwitchType.MOBILE_TRACKING.value, enable
- )
- def change_defence_schedule(self, schedule, enable=0):
- """Change defence schedule. Requires json formatted schedules."""
- return self._client.api_set_defence_schdule(self._serial, schedule, enable)
diff --git a/pyezviz/cas.py b/pyezviz/cas.py
deleted file mode 100644
index c48ed05..0000000
--- a/pyezviz/cas.py
+++ /dev/null
@@ -1,160 +0,0 @@
-"""Ezviz CAS API Functions."""
-import random
-import socket
-import ssl
-from io import BytesIO
-from itertools import cycle
-import xmltodict
-from Crypto.Cipher import AES
-from pyezviz.constants import FEATURE_CODE, XOR_KEY
-from pyezviz.exceptions import InvalidHost
-def xor_enc_dec(msg, xor_key=XOR_KEY):
- """Xor encodes camera serial"""
- with BytesIO(msg) as stream:
- xor_msg = bytes(a ^ b for a, b in zip(stream.read(), cycle(xor_key)))
- return xor_msg
-class EzvizCAS:
- """Ezviz CAS server client."""
- def __init__(self, token):
- """Initialize the client object."""
- self._session = None
- self._token = token or {
- "session_id": None,
- "rf_session_id": None,
- "username": None,
- "api_url": "apiieu.ezvizlife.com",
- }
- self._service_urls = token["service_urls"]
- def cas_get_encryption(self, devserial):
- """Fetch encryption code from ezviz cas server"""
- # Random hex 64 characters long.
- rand_hex = random.randrange(10 ** 80)
- rand_hex = "%064x" % rand_hex
- rand_hex = rand_hex[:64]
- payload = (
- f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00"
- f"\x00\x02" # Check or order?
- f"\x00\x00\x00\x00\x00\x00 "
- f"\x01" # Check or order?
- f"\x00\x00\x00\x00\x00\x00\x02\t\x00\x00\x00\x00"
- f'\n\n\t'
- f'{self._token["session_id"]}'
- f"\n\t{FEATURE_CODE}\n\t"
- f"{devserial}"
- f"\n\t0\n\n"
- ).encode("latin1")
- payload_end_padding = rand_hex.encode("latin1")
- context = ssl.SSLContext(cert_reqs=ssl.CERT_NONE)
- # Create a TCP/IP socket
- my_socket = socket.create_connection(
- (self._service_urls["sysConf"][15], self._service_urls["sysConf"][16])
- )
- my_socket = context.wrap_socket(
- my_socket, server_hostname=self._service_urls["sysConf"][15]
- )
- # Get CAS Encryption Key
- try:
- my_socket.send(payload + payload_end_padding)
- response = my_socket.recv(1024)
- print(f"Get Encryption Key: {response}")
- except (socket.gaierror, ConnectionRefusedError) as err:
- raise InvalidHost("Invalid IP or Hostname") from err
- finally:
- my_socket.close()
- # Trim header, tail and convert xml to dict.
- response = response[32::]
- response = response[:-32:]
- response = xmltodict.parse(response)
- return response
- def set_camera_defence_state(self, serial, enable=1):
- """Enable alarm notifications."""
- # Random hex 64 characters long.
- rand_hex = random.randrange(10 ** 80)
- rand_hex = "%064x" % rand_hex
- rand_hex = rand_hex[:64]
- payload = (
- f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00"
- f"\x00\x14" # Check or order?
- f"\x00\x00\x00\x00\x00\x00 "
- f"\x05"
- f"\x00\x00\x00\x00\x00\x00\x02\xd0\x00\x00\x01\xe0"
- f'\n\n\t'
- f'\n\t'
- f'\n\n'
- f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00"
- f"\x00\x13"
- f"\x00\x00\x00\x00\x00\x000\x0f\xff\xff\xff\xff"
- f"\x00\x00\x00\xb0\x00\x00\x00\x00"
- ).encode("latin1")
- payload_end_padding = rand_hex.encode("latin1")
- # xor camera serial
- xor_cam_serial = xor_enc_dec(serial.encode("latin1"))
- defence_msg_string = (
- f'{xor_cam_serial.decode()}2+,*xdv.0" '
- f'encoding="utf-8"?>\n'
- f"\n"
- f"\tABCDEFG\n"
- f'\t\n'
- f"\n"
- f"\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10"
- ).encode("latin1")
- context = ssl.SSLContext(cert_reqs=ssl.CERT_NONE)
- # Create a TCP/IP socket
- my_socket = socket.create_connection(
- (self._service_urls["sysConf"][15], self._service_urls["sysConf"][16])
- )
- my_socket = context.wrap_socket(
- my_socket, server_hostname=self._service_urls["sysConf"][15]
- )
- cas_client = self.cas_get_encryption(serial)
- aes_key = cas_client["Response"]["Session"]["@Key"].encode("latin1")
- iv_value = (
- f"{serial}{cas_client['Response']['Session']['@OperationCode']}".encode(
- "latin1"
- )
- )
- # Message encryption
- cipher = AES.new(aes_key, AES.MODE_CBC, iv_value)
- try:
- defence_msg_string = cipher.encrypt(defence_msg_string)
- my_socket.send(payload + defence_msg_string + payload_end_padding)
- print(f"Set camera response: {my_socket.recv()}")
- except (socket.gaierror, ConnectionRefusedError) as err:
- raise InvalidHost("Invalid IP or Hostname") from err
- finally:
- my_socket.close()
- return True
diff --git a/pyezviz/client.py b/pyezviz/client.py
deleted file mode 100644
index 4b28988..0000000
--- a/pyezviz/client.py
+++ /dev/null
@@ -1,919 +0,0 @@
-"""Ezviz API."""
-from __future__ import annotations
-import hashlib
-import logging
-from typing import Any
-from uuid import uuid4
-import requests
-from pyezviz.camera import EzvizCamera
-from pyezviz.cas import EzvizCAS
-from pyezviz.constants import (
- DefenseModeType,
- DeviceCatagories,
-from pyezviz.exceptions import HTTPError, InvalidURL, PyEzvizError
-API_ENDPOINT_CLOUDDEVICES = "/api/cloud/v2/cloudDevices/getAll"
-API_ENDPOINT_PAGELIST = "/v3/userdevices/v1/devices/pagelist"
-API_ENDPOINT_DEVICES = "/v3/devices/"
-API_ENDPOINT_LOGIN = "/v3/users/login/v5"
-API_ENDPOINT_REFRESH_SESSION_ID = "/v3/apigateway/login"
-API_ENDPOINT_ALARM_SOUND = "/alarm/sound"
-API_ENDPOINT_DETECTION_SENSIBILITY = "/api/device/configAlgorithm"
-API_ENDPOINT_DETECTION_SENSIBILITY_GET = "/api/device/queryAlgorithmConfig"
-API_ENDPOINT_ALARMINFO_GET = "/v3/alarms/v2/advanced"
-API_ENDPOINT_SET_DEFENCE_SCHEDULE = "/api/device/defence/plan2"
-API_ENDPOINT_SWITCH_DEFENCE_MODE = "/v3/userdevices/v1/group/switchDefenceMode"
-API_ENDPOINT_SERVER_INFO = "/v3/configurations/system/info"
-class EzvizClient:
- """Initialize api client object."""
- def __init__(
- self,
- account: str | None = None,
- password: str | None = None,
- url: str = "apiieu.ezvizlife.com",
- timeout: int = DEFAULT_TIMEOUT,
- token: dict | None = None,
- ) -> None:
- """Initialize the client object."""
- self.account = account
- self.password = password
- self._session = requests.session()
- # Set Android generic user agent.
- self._session.headers.update({"User-Agent": "okhttp/3.12.1"})
- self._token = token or {
- "session_id": None,
- "rf_session_id": None,
- "username": None,
- "api_url": url,
- }
- self._timeout = timeout
- def _login(self, account: str, password: str) -> dict[Any, Any]:
- """Login to Ezviz API."""
- # Region code to url.
- if len(self._token["api_url"].split(".")) == 1:
- self._token["api_url"] = "apii" + self._token["api_url"] + ".ezvizlife.com"
- # Ezviz API sends md5 of password
- temp = hashlib.md5()
- temp.update(password.encode("utf-8"))
- md5pass = temp.hexdigest()
- payload = {
- "account": account,
- "password": md5pass,
- "featureCode": FEATURE_CODE,
- "msgType": "0",
- "cuName": "SFRDIDEw",
- }
- try:
- req = self._session.post(
- "https://" + self._token["api_url"] + API_ENDPOINT_LOGIN,
- allow_redirects=False,
- headers={
- "clientType": "3",
- "customno": "1000001",
- "clientNo": "web_site",
- "appId": "ys7",
- "lang": "en",
- },
- data=payload,
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.ConnectionError as err:
- raise InvalidURL("A Invalid URL or Proxy error occured") from err
- except requests.HTTPError as err:
- raise HTTPError from err
- try:
- json_result = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- if json_result["meta"]["code"] == 1100:
- self._token["api_url"] = json_result["loginArea"]["apiDomain"]
- print("Region incorrect!")
- print(f"Your region url: {self._token['api_url']}")
- self.close_session()
- return self.login()
- if json_result["meta"]["code"] == 1013:
- raise PyEzvizError("Incorrect Username.")
- if json_result["meta"]["code"] == 1014:
- raise PyEzvizError("Incorrect Password.")
- if json_result["meta"]["code"] == 1015:
- raise PyEzvizError("The user is locked.")
- self._token["session_id"] = str(json_result["loginSession"]["sessionId"])
- self._token["rf_session_id"] = str(json_result["loginSession"]["rfSessionId"])
- self._token["username"] = str(json_result["loginUser"]["username"])
- self._token["api_url"] = str(json_result["loginArea"]["apiDomain"])
- if not self._token["session_id"]:
- raise PyEzvizError(
- f"Login error: Please check your username/password: {req.text}"
- )
- self._token["service_urls"] = self.get_service_urls()
- return self._token
- def get_service_urls(self) -> Any:
- """Get Ezviz service urls."""
- try:
- req = self._session.get(
- f"https://{self._token['api_url']}{API_ENDPOINT_SERVER_INFO}",
- headers={
- "sessionId": self._token["session_id"],
- "featureCode": FEATURE_CODE,
- },
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.ConnectionError as err:
- raise InvalidURL("A Invalid URL or Proxy error occured") from err
- except requests.HTTPError as err:
- if err.response.status_code == 401:
- # session is wrong, need to relogin
- self.login()
- raise HTTPError from err
- if not req.text:
- raise PyEzvizError("No data")
- try:
- json_output = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- if json_output.get("meta").get("code") != 200:
- logging.info("Json request error")
- service_urls = json_output["systemConfigInfo"]
- service_urls["sysConf"] = service_urls["sysConf"].split("|")
- return service_urls
- def _api_get_pagelist(
- self, page_filter: str, json_key: str | None = None, max_retries: int = 0
- ) -> Any:
- """Get data from pagelist API."""
- if max_retries > MAX_RETRIES:
- raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
- if page_filter is None:
- raise PyEzvizError("Trying to call get_pagelist without filter")
- try:
- req = self._session.get(
- "https://" + self._token["api_url"] + API_ENDPOINT_PAGELIST,
- headers={"sessionId": self._token["session_id"]},
- params={"filter": page_filter},
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- if err.response.status_code == 401:
- # session is wrong, need to relogin
- self.login()
- return self._api_get_pagelist(page_filter, json_key, max_retries + 1)
- raise HTTPError from err
- if not req.text:
- raise PyEzvizError("No data")
- try:
- json_output = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- if json_output.get("meta").get("code") != 200:
- # session is wrong, need to relogin
- self.login()
- logging.info(
- "Json request error, relogging (max retries: %s)", str(max_retries)
- )
- return self._api_get_pagelist(page_filter, json_key, max_retries + 1)
- if json_key is None:
- json_result = json_output
- else:
- json_result = json_output[json_key]
- if not json_result:
- # session is wrong, need to relogin
- self.login()
- logging.info(
- "Impossible to load the devices, here is the returned response: %s",
- str(req.text),
- )
- return self._api_get_pagelist(page_filter, json_key, max_retries + 1)
- return json_result
- def get_alarminfo(self, serial: str, max_retries: int = 0) -> Any:
- """Get data from alarm info API."""
- if max_retries > MAX_RETRIES:
- raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
- params: dict[str, int | str] = {
- "deviceSerials": serial,
- "queryType": -1,
- "limit": 1,
- "stype": -1,
- }
- try:
- req = self._session.get(
- "https://" + self._token["api_url"] + API_ENDPOINT_ALARMINFO_GET,
- headers={"sessionId": self._token["session_id"]},
- params=params,
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- if err.response.status_code == 401:
- # session is wrong, need to relogin
- self.login()
- return self.get_alarminfo(serial, max_retries + 1)
- raise HTTPError from err
- if req.text == "":
- raise PyEzvizError("No data")
- try:
- json_output = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- return json_output
- def _switch_status(
- self, serial: str, status_type: int, enable: int, max_retries: int = 0
- ) -> bool:
- """Switch status on a device."""
- if max_retries > MAX_RETRIES:
- raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
- try:
- req = self._session.put(
- "https://"
- + self._token["api_url"]
- + serial
- + "/1/1/"
- + str(status_type)
- headers={"sessionId": self._token["session_id"]},
- data={
- "enable": enable,
- "serial": serial,
- "channelNo": "1",
- "type": status_type,
- },
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- if err.response.status_code == 401:
- # session is wrong, need to relogin
- self.login()
- return self._switch_status(serial, status_type, enable, max_retries + 1)
- raise HTTPError from err
- try:
- json_output = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- if json_output.get("meta").get("code") != 200:
- raise PyEzvizError(
- f"Could not set the switch: Got {req.status_code} : {req.text})"
- )
- return True
- def sound_alarm(self, serial: str, enable: int = 1, max_retries: int = 0) -> bool:
- """Sound alarm on a device."""
- if max_retries > MAX_RETRIES:
- raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
- try:
- req = self._session.put(
- "https://"
- + self._token["api_url"]
- + serial
- + "/0"
- headers={"sessionId": self._token["session_id"]},
- data={
- "enable": enable,
- },
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- if err.response.status_code == 401:
- # session is wrong, need to relogin
- self.login()
- return self.sound_alarm(serial, enable, max_retries + 1)
- raise HTTPError from err
- try:
- json_output = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- if json_output.get("meta").get("code") != 200:
- raise PyEzvizError(
- f"Could not set the alarm sound: Got {req.status_code} : {req.text})"
- )
- return True
- def load_cameras(self) -> dict[Any, Any]:
- """Load and return all cameras objects."""
- devices = self._get_all_device_infos()
- cameras = {}
- supported_categories = [
- DeviceCatagories.COMMON_DEVICE_CATEGORY.value,
- DeviceCatagories.CAMERA_DEVICE_CATEGORY.value,
- DeviceCatagories.DOORBELL_DEVICE_CATEGORY.value,
- DeviceCatagories.BASE_STATION_DEVICE_CATEGORY.value,
- ]
- for device, data in devices.items():
- if data["deviceInfos"]["deviceCategory"] in supported_categories:
- # Add support for connected HikVision cameras
- if (
- data["deviceInfos"]["deviceCategory"]
- == DeviceCatagories.COMMON_DEVICE_CATEGORY.value
- and not data["deviceInfos"]["hik"]
- ):
- continue
- # Create camera object
- camera = EzvizCamera(self, device, data)
- camera.load()
- cameras[device] = camera.status()
- return cameras
- def _get_all_device_infos(self) -> dict[Any, Any]:
- """Load all devices and build dict per device serial."""
- devices = self._get_page_list()
- result: dict[Any, Any] = {}
- for device in devices["deviceInfos"]:
- result[device["deviceSerial"]] = {}
- result[device["deviceSerial"]]["deviceInfos"] = device
- result[device["deviceSerial"]]["connectionInfos"] = devices.get(
- "connectionInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["p2pInfos"] = devices.get("p2pInfos").get(
- device["deviceSerial"]
- )
- result[device["deviceSerial"]]["alarmNodisturbInfos"] = devices.get(
- "alarmNodisturbInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["kmsInfos"] = devices.get("kmsInfos").get(
- device["deviceSerial"]
- )
- result[device["deviceSerial"]]["timePlanInfos"] = devices.get(
- "timePlanInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["statusInfos"] = devices.get(
- "statusInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["wifiInfos"] = devices.get("wifiInfos").get(
- device["deviceSerial"]
- )
- result[device["deviceSerial"]]["switchStatusInfos"] = devices.get(
- "switchStatusInfos"
- ).get(device["deviceSerial"])
- for item in devices["cameraInfos"]:
- if item["deviceSerial"] == device["deviceSerial"]:
- result[device["deviceSerial"]]["cameraInfos"] = item
- return result
- def get_all_per_serial_infos(self, serial: str) -> dict[Any, Any] | None:
- """Load all devices and build dict per device serial."""
- if serial is None:
- raise PyEzvizError("Need serial number for this query")
- devices = self._get_page_list()
- result: dict[str, dict] = {serial: {}}
- for device in devices["deviceInfos"]:
- if device["deviceSerial"] == serial:
- result[device["deviceSerial"]]["deviceInfos"] = device
- result[device["deviceSerial"]]["deviceInfos"] = device
- result[device["deviceSerial"]]["connectionInfos"] = devices.get(
- "connectionInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["p2pInfos"] = devices.get(
- "p2pInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["alarmNodisturbInfos"] = devices.get(
- "alarmNodisturbInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["kmsInfos"] = devices.get(
- "kmsInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["timePlanInfos"] = devices.get(
- "timePlanInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["statusInfos"] = devices.get(
- "statusInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["wifiInfos"] = devices.get(
- "wifiInfos"
- ).get(device["deviceSerial"])
- result[device["deviceSerial"]]["switchStatusInfos"] = devices.get(
- "switchStatusInfos"
- ).get(device["deviceSerial"])
- for item in devices["cameraInfos"]:
- if item["deviceSerial"] == device["deviceSerial"]:
- result[device["deviceSerial"]]["cameraInfos"] = item
- return result.get(serial)
- def ptz_control(
- self, command: str, serial: str, action: str, speed: int = 5
- ) -> Any:
- """PTZ Control by API."""
- if command is None:
- raise PyEzvizError("Trying to call ptzControl without command")
- if action is None:
- raise PyEzvizError("Trying to call ptzControl without action")
- try:
- req = self._session.put(
- "https://"
- + self._token["api_url"]
- + serial
- data={
- "command": command,
- "action": action,
- "channelNo": "1",
- "speed": speed,
- "uuid": str(uuid4()),
- "serial": serial,
- },
- headers={"sessionId": self._token["session_id"], "clientType": "1"},
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- raise HTTPError from err
- return req.text
- def login(self) -> dict[Any, Any]:
- """Get or refresh ezviz login token."""
- if self._token["session_id"] and self._token["rf_session_id"]:
- try:
- req = self._session.put(
- "https://"
- + self._token["api_url"]
- data={
- "refreshSessionId": self._token["rf_session_id"],
- "featureCode": FEATURE_CODE,
- },
- headers={"sessionId": self._token["session_id"]},
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- raise HTTPError from err
- try:
- json_result = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- self._token["session_id"] = str(json_result["sessionInfo"]["sessionId"])
- self._token["rf_session_id"] = str(
- json_result["sessionInfo"]["refreshSessionId"]
- )
- if not self._token["session_id"]:
- raise PyEzvizError(f"Relogin required: {req.text}")
- if not self._token.get("service_urls"):
- self._token["service_urls"] = self.get_service_urls()
- return self._token
- if self.account and self.password:
- return self._login(account=self.account, password=self.password)
- raise PyEzvizError("Login with account and password required")
- def set_camera_defence(self, serial: str, enable: int) -> bool:
- """Enable/Disable motion detection on camera."""
- cas_client = EzvizCAS(self._token)
- cas_client.set_camera_defence_state(serial, enable)
- return True
- def api_set_defence_schedule(
- self, serial: str, schedule: str, enable: int, max_retries: int = 0
- ) -> bool:
- """Set defence schedules."""
- if max_retries > MAX_RETRIES:
- raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
- schedulestring = (
- '{"CN":0,"EL":'
- + str(enable)
- + ',"SS":"'
- + serial
- + '","WP":['
- + schedule
- + "]}]}"
- )
- try:
- req = self._session.post(
- "https://" + self._token["api_url"] + API_ENDPOINT_SET_DEFENCE_SCHEDULE,
- headers={"sessionId": self._token["session_id"]},
- data={
- "devTimingPlan": schedulestring,
- },
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- if err.response.status_code == 401:
- # session is wrong, need to relogin
- self.login()
- return self.api_set_defence_schedule(
- serial, schedule, enable, max_retries + 1
- )
- raise HTTPError from err
- try:
- json_output = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- if json_output.get("resultCode") != 0:
- raise PyEzvizError(
- f"Could not set the schedule: Got {req.status_code} : {req.text})"
- )
- return True
- def api_set_defence_mode(self, mode: DefenseModeType, max_retries: int = 0) -> bool:
- """Set defence mode."""
- if max_retries > MAX_RETRIES:
- raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
- try:
- req = self._session.post(
- "https://" + self._token["api_url"] + API_ENDPOINT_SWITCH_DEFENCE_MODE,
- headers={"sessionId": self._token["session_id"]},
- data={
- "groupId": -1,
- "mode": mode,
- },
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- if err.response.status_code == 401:
- # session is wrong, need to relogin
- self.login()
- return self.api_set_defence_mode(mode, max_retries + 1)
- raise HTTPError from err
- try:
- json_output = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- if json_output.get("meta").get("code") != 200:
- raise PyEzvizError(
- f"Could not set defence mode: Got {req.status_code} : {req.text})"
- )
- return True
- def detection_sensibility(
- self,
- serial: str,
- sensibility: int = 3,
- type_value: int = 3,
- max_retries: int = 0,
- ) -> bool | str:
- """Set detection sensibility."""
- if max_retries > MAX_RETRIES:
- raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
- if sensibility not in [0, 1, 2, 3, 4, 5, 6] and type_value == 0:
- raise PyEzvizError(
- "Unproper sensibility for type 0 (should be within 1 to 6)."
- )
- try:
- req = self._session.post(
- "https://"
- + self._token["api_url"]
- headers={"sessionId": self._token["session_id"]},
- data={
- "subSerial": serial,
- "type": type_value,
- "channelNo": "1",
- "value": sensibility,
- },
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- if err.response.status_code == 401:
- # session is wrong, need to re-log-in
- self.login()
- return self.detection_sensibility(
- serial, sensibility, type_value, max_retries + 1
- )
- raise HTTPError from err
- try:
- response_json = req.json()
- except ValueError as err:
- raise PyEzvizError("Could not decode response:" + str(err)) from err
- if response_json["resultCode"] and response_json["resultCode"] != "0":
- return "Unknown value"
- return True
- def get_detection_sensibility(
- self, serial: str, type_value: str = "0", max_retries: int = 0
- ) -> Any:
- """Get detection sensibility notifications."""
- if max_retries > MAX_RETRIES:
- raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
- try:
- req = self._session.post(
- "https://"
- + self._token["api_url"]
- headers={"sessionId": self._token["session_id"]},
- data={
- "subSerial": serial,
- },
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- if err.response.status_code == 401:
- # session is wrong, need to re-log-in.
- self.login()
- return self.get_detection_sensibility(
- serial, type_value, max_retries + 1
- )
- raise HTTPError from err
- try:
- response_json = req.json()
- except ValueError as err:
- raise PyEzvizError("Could not decode response:" + str(err)) from err
- if response_json["resultCode"] != "0":
- return "Unknown"
- if response_json["algorithmConfig"]["algorithmList"]:
- for idx in response_json["algorithmConfig"]["algorithmList"]:
- if idx["type"] == type_value:
- return idx["value"]
- return "Unknown"
- # soundtype: 0 = normal, 1 = intensive, 2 = disabled ... don't ask me why...
- def alarm_sound(
- self, serial: str, sound_type: int, enable: int = 1, max_retries: int = 0
- ) -> bool:
- """Enable alarm sound by API."""
- if max_retries > MAX_RETRIES:
- raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
- if sound_type not in [0, 1, 2]:
- raise PyEzvizError(
- "Invalid sound_type, should be 0,1,2: " + str(sound_type)
- )
- try:
- req = self._session.put(
- "https://"
- + self._token["api_url"]
- + serial
- headers={"sessionId": self._token["session_id"]},
- data={
- "enable": enable,
- "soundType": sound_type,
- "voiceId": "0",
- "deviceSerial": serial,
- },
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.HTTPError as err:
- if err.response.status_code == 401:
- # session is wrong, need to re-log-in
- self.login()
- return self.alarm_sound(serial, sound_type, enable, max_retries + 1)
- raise HTTPError from err
- return True
- def switch_status(self, serial: str, status_type: int, enable: int = 0) -> bool:
- """Switch status of a device."""
- return self._switch_status(serial, status_type, enable)
- def _get_page_list(self) -> Any:
- """Get ezviz device info broken down in sections."""
- return self._api_get_pagelist(
- json_key=None,
- )
- def get_device(self) -> Any:
- """Get ezviz devices filter."""
- return self._api_get_pagelist(page_filter="CLOUD", json_key="deviceInfos")
- def get_connection(self) -> Any:
- """Get ezviz connection infos filter."""
- return self._api_get_pagelist(
- page_filter="CONNECTION", json_key="connectionInfos"
- )
- def _get_status(self) -> Any:
- """Get ezviz status infos filter."""
- return self._api_get_pagelist(page_filter="STATUS", json_key="statusInfos")
- def get_switch(self) -> Any:
- """Get ezviz switch infos filter."""
- return self._api_get_pagelist(
- page_filter="SWITCH", json_key="switchStatusInfos"
- )
- def _get_wifi(self) -> Any:
- """Get ezviz wifi infos filter."""
- return self._api_get_pagelist(page_filter="WIFI", json_key="wifiInfos")
- def _get_nodisturb(self) -> Any:
- """Get ezviz nodisturb infos filter."""
- return self._api_get_pagelist(
- page_filter="NODISTURB", json_key="alarmNodisturbInfos"
- )
- def _get_p2p(self) -> Any:
- """Get ezviz P2P infos filter."""
- return self._api_get_pagelist(page_filter="P2P", json_key="p2pInfos")
- def _get_kms(self) -> Any:
- """Get ezviz KMS infos filter."""
- return self._api_get_pagelist(page_filter="KMS", json_key="kmsInfos")
- def _get_time_plan(self) -> Any:
- """Get ezviz TIME_PLAN infos filter."""
- return self._api_get_pagelist(page_filter="TIME_PLAN", json_key="timePlanInfos")
- def close_session(self) -> None:
- """Clear current session."""
- if self._session:
- self._session.close()
- self._session = requests.session()
- self._session.headers.update(
- {"User-Agent": "okhttp/3.12.1"}
- ) # Android generic user agent.
diff --git a/pyezviz/constants.py b/pyezviz/constants.py
deleted file mode 100644
index 398195c..0000000
--- a/pyezviz/constants.py
+++ /dev/null
@@ -1,109 +0,0 @@
-"""Device switch types relationship."""
-from enum import Enum, unique
-FEATURE_CODE = "c22cb01f8cb83351422d82fad59c8e4e"
-XOR_KEY = b"\x0c\x0eJ^X\x15@Rr"
-class DeviceSwitchType(Enum):
- """Device switch name and number."""
- LIGHT = 3
- CRUISE = 9
- WIFI = 11
- PLUG = 14
- SLEEP = 21
- SOUND = 22
- BABY_CARE = 23
- LOGO = 24
- DEVICE_4G = 35
- TRACKING = 650
-class SoundMode(Enum):
- """Alarm sound level description."""
- SILENT = 2
- SOFT = 0
- CUSTOM = 3
- PLAN = 4
-class DefenseModeType(Enum):
- """Defense mode name and number."""
-class DeviceCatagories(Enum):
- """Supported device categories."""
-class SensorType(Enum):
- """Sensors and their types to expose in HA."""
- # pylint: disable=invalid-name
- sw_version = "None"
- alarm_sound_mod = "None"
- battery_level = "battery"
- detection_sensibility = "None"
- last_alarm_time = "None"
- Seconds_Last_Trigger = "None"
- last_alarm_pic = "None"
- supported_channels = "None"
- local_ip = "None"
- wan_ip = "None"
- PIR_Status = "motion"
-class BinarySensorType(Enum):
- """Binary_sensors and their types to expose in HA."""
- # pylint: disable=invalid-name
- Motion_Trigger = "motion"
- alarm_schedules_enabled = "None"
- encrypted = "None"
- upgrade_available = "None"
diff --git a/pyezviz/exceptions.py b/pyezviz/exceptions.py
deleted file mode 100644
index 465b73d..0000000
--- a/pyezviz/exceptions.py
+++ /dev/null
@@ -1,20 +0,0 @@
-"""PyEzviz Exceptions."""
-class PyEzvizError(Exception):
- """Ezviz api exception."""
-class InvalidURL(PyEzvizError):
- """Invalid url exception."""
-class HTTPError(PyEzvizError):
- """Invalid host exception."""
-class InvalidHost(PyEzvizError):
- """Invalid host exception."""
-class AuthTestResultFailed(PyEzvizError):
- """Authentication failed"""
diff --git a/pyezviz/mqtt.py b/pyezviz/mqtt.py
deleted file mode 100644
index c634657..0000000
--- a/pyezviz/mqtt.py
+++ /dev/null
@@ -1,234 +0,0 @@
-"""Ezviz cloud MQTT client for push messages."""
-import os
-import logging
- format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
- level=logging.DEBUG if os.environ.get('DEBUG') else logging.INFO)
-import base64
-import json
-import threading
-import time
-import paho.mqtt.client as mqtt
-import requests
-from pyezviz.constants import DEFAULT_TIMEOUT, FEATURE_CODE
-from pyezviz.exceptions import HTTPError, InvalidURL, PyEzvizError
-API_ENDPOINT_SERVER_INFO = "/v3/configurations/system/info"
-API_ENDPOINT_START_MQTT = "/api/push/start"
-API_ENDPOINT_STOP_MQTT = "/api/push/stop"
-MQTT_APP_KEY = "4c6b3cc2-b5eb-4813-a592-612c1374c1fe"
-APP_SECRET = "17454517-cc1c-42b3-a845-99b4a15dd3e6"
-def on_subscribe(client, userdata, mid, granted_qos):
- """On MQTT message subscribe."""
- # pylint: disable=unused-argument
- logging.info("Subscribed: " + str(mid) + " " + str(granted_qos))
-def on_connect(client, userdata, flags, return_code):
- """On MQTT connect."""
- # pylint: disable=unused-argument
- if return_code == 0:
- logging.debug("connected OK Returned code=%s", return_code)
- else:
- logging.info("Bad connection Returned code=%s", return_code)
- client.reconnect()
-#def on_message(client, userdata, msg):
-# """On MQTT message receive."""
-# # pylint: disable=unused-argument
-# mqtt_message = json.loads(msg.payload)
-# mqtt_message["ext"] = mqtt_message["ext"].split(",")
-# # Print payload message
-# decoded_message = {mqtt_message['ext'][2]:{'id':mqtt_message['id'], 'alert':mqtt_message['alert'], 'time':mqtt_message['ext'][1], 'alert type':mqtt_message['ext'][4], 'image':mqtt_message['ext'][16]}}
-# print(decoded_message)
-class MQTTClient(threading.Thread):
- """Open MQTT connection to ezviz cloud."""
- def __init__(
- self,
- token,
- callback,
- ):
- """Initialize the client object."""
- threading.Thread.__init__(self)
- self._session = None
- self._token = token or {
- "session_id": None,
- "rf_session_id": None,
- "username": None,
- "api_url": "apiieu.ezvizlife.com",
- }
- self._callback = callback
- self._timeout = timeout
- self._stop_event = threading.Event()
- self._mqtt_data = {
- "mqtt_clientid": None,
- "ticket": None,
- "push_url": token["service_urls"]["pushAddr"],
- }
- def _mqtt(self):
- """Receive MQTT messages from ezviz server"""
- ezviz_mqtt_client = mqtt.Client(
- client_id=self._mqtt_data["mqtt_clientid"], protocol=4, transport="tcp"
- )
- ezviz_mqtt_client.on_connect = on_connect
- ezviz_mqtt_client.on_subscribe = on_subscribe
- ezviz_mqtt_client.on_message = self._callback
- ezviz_mqtt_client.username_pw_set(MQTT_APP_KEY, APP_SECRET)
- ezviz_mqtt_client.connect(self._mqtt_data["push_url"], 1882, 60)
- ezviz_mqtt_client.subscribe(
- f"{MQTT_APP_KEY}/ticket/{self._mqtt_data['ticket']}", qos=2
- )
- ezviz_mqtt_client.loop_start()
- return ezviz_mqtt_client
- def _register_ezviz_push(self):
- """Register for push messages."""
- auth_seq = base64.b64encode(f"{MQTT_APP_KEY}:{APP_SECRET}".encode("ascii"))
- auth_seq = "Basic " + auth_seq.decode()
- payload = {
- "appKey": MQTT_APP_KEY,
- "clientType": "5",
- "mac": FEATURE_CODE,
- "token": "123456",
- "version": "v1.3.0",
- }
- try:
- req = self._session.post(
- f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_REGISTER_MQTT}",
- allow_redirects=False,
- headers={"Authorization": auth_seq},
- data=payload,
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.ConnectionError as err:
- raise InvalidURL("A Invalid URL or Proxy error occured") from err
- except requests.HTTPError as err:
- raise HTTPError from err
- try:
- json_result = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- self._mqtt_data["mqtt_clientid"] = json_result["data"]["clientId"]
- def run(self):
- """Method representing the thread's activity which should not be used directly."""
- if self._session is None:
- self._session = requests.session()
- self._session.headers.update(
- {"User-Agent": "okhttp/3.12.1"}
- ) # Android generic user agent.
- self._register_ezviz_push()
- self._start_ezviz_push()
- self._mqtt()
- while not self._stop_event.is_set():
- time.sleep(1)
- def start(self):
- """Start mqtt.
- Start mqtt thread
- """
- super().start()
- def stop(self):
- """Stop push notifications."""
- payload = {
- "appKey": MQTT_APP_KEY,
- "clientId": self._mqtt_data["mqtt_clientid"],
- "clientType": 5,
- "sessionId": self._token["session_id"],
- "username": self._token["username"],
- }
- try:
- req = self._session.post(
- f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_STOP_MQTT}",
- data=payload,
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.ConnectionError as err:
- raise InvalidURL("A Invalid URL or Proxy error occured") from err
- except requests.HTTPError as err:
- raise HTTPError from err
- self._stop_event.set()
- def _start_ezviz_push(self):
- """Send start for push messages to ezviz api."""
- payload = {
- "appKey": MQTT_APP_KEY,
- "clientId": self._mqtt_data["mqtt_clientid"],
- "clientType": 5,
- "sessionId": self._token["session_id"],
- "username": self._token["username"],
- "token": "123456",
- }
- try:
- req = self._session.post(
- f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_START_MQTT}",
- allow_redirects=False,
- data=payload,
- timeout=self._timeout,
- )
- req.raise_for_status()
- except requests.ConnectionError as err:
- raise InvalidURL("A Invalid URL or Proxy error occured") from err
- except requests.HTTPError as err:
- raise HTTPError from err
- try:
- json_result = req.json()
- except ValueError as err:
- raise PyEzvizError(
- "Impossible to decode response: "
- + str(err)
- + "\nResponse was: "
- + str(req.text)
- ) from err
- self._mqtt_data["ticket"] = json_result["ticket"]
diff --git a/pyezviz/test_cam_rtsp.py b/pyezviz/test_cam_rtsp.py
deleted file mode 100644
index 916048a..0000000
--- a/pyezviz/test_cam_rtsp.py
+++ /dev/null
@@ -1,147 +0,0 @@
-"""Test camera RTSP authentication"""
-import base64
-import hashlib
-import socket
-from pyezviz.exceptions import AuthTestResultFailed, InvalidHost
-def genmsg_describe(url, seq, user_agent, auth_seq):
- """Generate RTSP describe message"""
- msg_ret = "DESCRIBE " + url + " RTSP/1.0\r\n"
- msg_ret += "CSeq: " + str(seq) + "\r\n"
- msg_ret += "Authorization: " + auth_seq + "\r\n"
- msg_ret += "User-Agent: " + user_agent + "\r\n"
- msg_ret += "Accept: application/sdp\r\n"
- msg_ret += "\r\n"
- return msg_ret
-class TestRTSPAuth:
- """Initialize RTSP credential test"""
- def __init__(
- self,
- ip_addr,
- username=None,
- password=None,
- test_uri="",
- ):
- self._rtsp_details = {
- "bufLen": 1024,
- "defaultServerIp": ip_addr,
- "defaultServerPort": 554,
- "defaultTestUri": test_uri,
- "defaultUserAgent": "RTSP Client",
- "defaultUsername": username,
- "defaultPassword": password,
- }
- def generate_auth_string(self, realm, method, uri, nonce):
- """Generate digest auth string """
- map_return_info = {}
- m_1 = hashlib.md5(
- f"{self._rtsp_details['defaultUsername']}:"
- f"{realm.decode()}:"
- f"{self._rtsp_details['defaultPassword']}".encode()
- ).hexdigest()
- m_2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest()
- response = hashlib.md5(f"{m_1}:{nonce}:{m_2}".encode()).hexdigest()
- map_return_info = (
- f"Digest "
- f"username=\"{self._rtsp_details['defaultUsername']}\", "
- f'realm="{realm.decode()}", '
- f'algorithm="MD5", '
- f'nonce="{nonce.decode()}", '
- f'uri="{uri}", '
- f'response="{response}"'
- )
- return map_return_info
- def main(self):
- """Main function """
- session = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- session.connect(
- (
- self._rtsp_details["defaultServerIp"],
- self._rtsp_details["defaultServerPort"],
- )
- )
- except TimeoutError as err:
- raise AuthTestResultFailed("Invalid ip or camera hibernating") from err
- except (socket.gaierror, ConnectionRefusedError) as err:
- raise InvalidHost("Invalid IP or Hostname") from err
- seq = 1
- url = (
- "rtsp://"
- + self._rtsp_details["defaultServerIp"]
- + self._rtsp_details["defaultTestUri"]
- )
- auth_seq = base64.b64encode(
- f"{self._rtsp_details['defaultUsername']}:"
- f"{self._rtsp_details['defaultPassword']}".encode("ascii")
- )
- auth_seq = "Basic " + auth_seq.decode()
- print(
- genmsg_describe(url, seq, self._rtsp_details["defaultUserAgent"], auth_seq)
- )
- session.send(
- genmsg_describe(
- url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
- ).encode()
- )
- msg1 = session.recv(self._rtsp_details["bufLen"])
- seq = seq + 1
- if msg1.decode().find("200 OK") > 1:
- print(f"Basic auth result: {msg1.decode()}")
- return print("Basic Auth test passed. Credentials Valid!")
- if msg1.decode().find("Unauthorized") > 1:
- # Basic failed, doing new DESCRIBE with digest authentication.
- start = msg1.decode().find("realm")
- begin = msg1.decode().find('"', start)
- end = msg1.decode().find('"', begin + 1)
- realm = msg1[begin + 1 : end]
- start = msg1.decode().find("nonce")
- begin = msg1.decode().find('"', start)
- end = msg1.decode().find('"', begin + 1)
- nonce = msg1[begin + 1 : end]
- auth_seq = self.generate_auth_string(
- realm,
- self._rtsp_details["defaultTestUri"],
- nonce,
- )
- print(
- genmsg_describe(
- url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
- )
- )
- session.send(
- genmsg_describe(
- url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
- ).encode()
- )
- msg1 = session.recv(self._rtsp_details["bufLen"])
- print(f"Digest auth result: {msg1.decode()}")
- if msg1.decode().find("200 OK") > 1:
- return print("Digest Auth test Passed. Credentials Valid!")
- if msg1.decode().find("401 Unauthorized") > 1:
- raise AuthTestResultFailed("Credentials not valid!!")
- return print("Basic Auth test passed. Credentials Valid!")