Remove EzViz junk
This commit is contained in:
parent
6e669608bf
commit
a091d161cb
24
main.py
24
main.py
|
@ -8,7 +8,6 @@ import time
|
|||
import json
|
||||
|
||||
import pygame
|
||||
from pyezviz import EzvizClient, MQTTClient
|
||||
|
||||
import secrets
|
||||
|
||||
|
@ -34,16 +33,6 @@ def backdoor():
|
|||
play_sound(CHIME)
|
||||
play_sound(BACKDOOR)
|
||||
|
||||
def on_message(client, userdata, mqtt_message):
|
||||
message = json.loads(mqtt_message.payload)
|
||||
#print(json.dumps(mqtt_message, indent=4))
|
||||
|
||||
if message['alert'] == 'somebody there ring the door': # lmao
|
||||
logging.info('Received door bell press alert')
|
||||
if 'E80451501' in message['ext']:
|
||||
logging.info('Backdoor pressed')
|
||||
backdoor()
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.info('')
|
||||
logging.info('==========================')
|
||||
|
@ -52,14 +41,5 @@ if __name__ == '__main__':
|
|||
pygame.mixer.pre_init(buffer=4096)
|
||||
pygame.mixer.init(buffer=4096)
|
||||
|
||||
client = EzvizClient(secrets.EZVIZ_EMAIL, secrets.EZVIZ_PASS, 'apiius.ezvizlife.com')
|
||||
try:
|
||||
logging.info('Logging into EZVIZ client...')
|
||||
token = client.login()
|
||||
mqtt = MQTTClient(token, on_message)
|
||||
logging.info('Starting MQTT...')
|
||||
mqtt.start()
|
||||
except Exception as exp:
|
||||
logging.exception(str(exp))
|
||||
finally:
|
||||
client.close_session()
|
||||
backdoor()
|
||||
|
||||
|
|
|
@ -1,27 +0,0 @@
|
|||
"""init pyezviz."""
|
||||
from pyezviz.camera import EzvizCamera
|
||||
from pyezviz.cas import EzvizCAS
|
||||
from pyezviz.client import EzvizClient
|
||||
from pyezviz.constants import (DefenseModeType, DeviceCatagories,
|
||||
DeviceSwitchType, SoundMode)
|
||||
from pyezviz.exceptions import (AuthTestResultFailed, HTTPError, InvalidHost,
|
||||
InvalidURL, PyEzvizError)
|
||||
from pyezviz.mqtt import MQTTClient
|
||||
from pyezviz.test_cam_rtsp import TestRTSPAuth
|
||||
|
||||
__all__ = [
|
||||
"EzvizCamera",
|
||||
"EzvizClient",
|
||||
"PyEzvizError",
|
||||
"InvalidURL",
|
||||
"HTTPError",
|
||||
"InvalidHost",
|
||||
"AuthTestResultFailed",
|
||||
"EzvizCAS",
|
||||
"MQTTClient",
|
||||
"DefenseModeType",
|
||||
"DeviceCatagories",
|
||||
"DeviceSwitchType",
|
||||
"SoundMode",
|
||||
"TestRTSPAuth",
|
||||
]
|
|
@ -1,309 +0,0 @@
|
|||
"""pyezviz command line."""
|
||||
import argparse
|
||||
import http.client
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
|
||||
import pandas
|
||||
from pyezviz import EzvizCamera, EzvizClient, MQTTClient
|
||||
from pyezviz.constants import DefenseModeType
|
||||
|
||||
|
||||
def main():
|
||||
"""Initiate arg parser."""
|
||||
parser = argparse.ArgumentParser(prog="pyezviz")
|
||||
parser.add_argument("-u", "--username", required=True, help="Ezviz username")
|
||||
parser.add_argument("-p", "--password", required=True, help="Ezviz Password")
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
"--region",
|
||||
required=False,
|
||||
default="apiieu.ezvizlife.com",
|
||||
help="Ezviz API region",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--debug", "-d", action="store_true", help="Print debug messages to stderr"
|
||||
)
|
||||
|
||||
subparsers = parser.add_subparsers(dest="action")
|
||||
|
||||
parser_device = subparsers.add_parser(
|
||||
"devices", help="Play with all devices at once"
|
||||
)
|
||||
parser_device.add_argument(
|
||||
"device_action",
|
||||
type=str,
|
||||
default="status",
|
||||
help="Device action to perform",
|
||||
choices=["device", "status", "switch", "connection"],
|
||||
)
|
||||
|
||||
parser_home_defence_mode = subparsers.add_parser(
|
||||
"home_defence_mode", help="Set home defence mode"
|
||||
)
|
||||
|
||||
parser_mqtt = subparsers.add_parser("mqtt", help="Set home defence mode")
|
||||
|
||||
parser_home_defence_mode.add_argument(
|
||||
"--mode", required=False, help="Choose mode", choices=["HOME_MODE", "AWAY_MODE"]
|
||||
)
|
||||
|
||||
parser_camera = subparsers.add_parser("camera", help="Camera actions")
|
||||
parser_camera.add_argument("--serial", required=True, help="camera SERIAL")
|
||||
|
||||
subparsers_camera = parser_camera.add_subparsers(dest="camera_action")
|
||||
|
||||
parser_camera_status = subparsers_camera.add_parser(
|
||||
"status", help="Get the status of the camera"
|
||||
)
|
||||
parser_camera_move = subparsers_camera.add_parser("move", help="Move the camera")
|
||||
parser_camera_move.add_argument(
|
||||
"--direction",
|
||||
required=True,
|
||||
help="Direction to move the camera to",
|
||||
choices=["up", "down", "right", "left"],
|
||||
)
|
||||
parser_camera_move.add_argument(
|
||||
"--speed",
|
||||
required=False,
|
||||
help="Speed of the movement",
|
||||
default=5,
|
||||
type=int,
|
||||
choices=range(1, 10),
|
||||
)
|
||||
|
||||
parser_camera_switch = subparsers_camera.add_parser(
|
||||
"switch", help="Change the status of a switch"
|
||||
)
|
||||
parser_camera_switch.add_argument(
|
||||
"--switch",
|
||||
required=True,
|
||||
help="Switch to switch",
|
||||
choices=["audio", "ir", "state", "privacy", "sleep", "follow_move"],
|
||||
)
|
||||
parser_camera_switch.add_argument(
|
||||
"--enable",
|
||||
required=False,
|
||||
help="Enable (or not)",
|
||||
default=1,
|
||||
type=int,
|
||||
choices=[0, 1],
|
||||
)
|
||||
|
||||
parser_camera_alarm = subparsers_camera.add_parser(
|
||||
"alarm", help="Configure the camera alarm"
|
||||
)
|
||||
parser_camera_alarm.add_argument(
|
||||
"--notify", required=False, help="Enable (or not)", type=int, choices=[0, 1]
|
||||
)
|
||||
parser_camera_alarm.add_argument(
|
||||
"--sound",
|
||||
required=False,
|
||||
help="Sound level (2 is silent, 1 intensive, 0 soft)",
|
||||
type=int,
|
||||
choices=[0, 1, 2],
|
||||
)
|
||||
parser_camera_alarm.add_argument(
|
||||
"--sensibility",
|
||||
required=False,
|
||||
help="Sensibility level (Non-Cameras = from 1 to 6) or (Cameras = 1 to 100)",
|
||||
type=int,
|
||||
choices=range(0, 100),
|
||||
)
|
||||
parser_camera_alarm.add_argument(
|
||||
"--schedule", required=False, help="Schedule in json format *test*", type=str
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# print("--------------args")
|
||||
# print("--------------args: %s",args)
|
||||
# print("--------------args")
|
||||
|
||||
client = EzvizClient(args.username, args.password, args.region)
|
||||
|
||||
if args.debug:
|
||||
|
||||
http.client.HTTPConnection.debuglevel = 5
|
||||
# You must initialize logging, otherwise you'll not see debug output.
|
||||
logging.basicConfig()
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
requests_log = logging.getLogger("requests.packages.urllib3")
|
||||
requests_log.setLevel(logging.DEBUG)
|
||||
requests_log.propagate = True
|
||||
|
||||
if args.action == "devices":
|
||||
|
||||
if args.device_action == "device":
|
||||
try:
|
||||
client.login()
|
||||
print(json.dumps(client.get_device(), indent=2))
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
finally:
|
||||
client.close_session()
|
||||
|
||||
elif args.device_action == "status":
|
||||
try:
|
||||
client.login()
|
||||
print(
|
||||
pandas.DataFrame(client.load_cameras()).to_string(
|
||||
columns=[
|
||||
"serial",
|
||||
"name",
|
||||
# version,
|
||||
# upgrade_available,
|
||||
"status",
|
||||
"device_category",
|
||||
"device_sub_category",
|
||||
"sleep",
|
||||
"privacy",
|
||||
"audio",
|
||||
"ir_led",
|
||||
"state_led",
|
||||
# follow_move,
|
||||
# alarm_notify,
|
||||
# alarm_schedules_enabled,
|
||||
# alarm_sound_mod,
|
||||
# encrypted,
|
||||
"local_ip",
|
||||
"local_rtsp_port",
|
||||
"detection_sensibility",
|
||||
"battery_level",
|
||||
"alarm_schedules_enabled",
|
||||
"alarm_notify",
|
||||
"Motion_Trigger",
|
||||
# last_alarm_time,
|
||||
# last_alarm_pic
|
||||
]
|
||||
)
|
||||
)
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
finally:
|
||||
client.close_session()
|
||||
|
||||
elif args.device_action == "switch":
|
||||
try:
|
||||
client.login()
|
||||
print(json.dumps(client.get_switch(), indent=2))
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
finally:
|
||||
client.close_session()
|
||||
|
||||
elif args.device_action == "connection":
|
||||
try:
|
||||
client.login()
|
||||
print(json.dumps(client.get_connection(), indent=2))
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
finally:
|
||||
client.close_session()
|
||||
|
||||
else:
|
||||
print("Action not implemented: %s", args.device_action)
|
||||
|
||||
elif args.action == "home_defence_mode":
|
||||
|
||||
if args.mode:
|
||||
try:
|
||||
client.login()
|
||||
print(
|
||||
json.dumps(
|
||||
client.api_set_defence_mode(
|
||||
getattr(DefenseModeType, args.mode).value
|
||||
),
|
||||
indent=2,
|
||||
)
|
||||
)
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
finally:
|
||||
client.close_session()
|
||||
|
||||
elif args.action == "mqtt":
|
||||
|
||||
try:
|
||||
token = client.login()
|
||||
mqtt = MQTTClient(token)
|
||||
mqtt.start()
|
||||
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
finally:
|
||||
client.close_session()
|
||||
|
||||
elif args.action == "camera":
|
||||
|
||||
# load camera object
|
||||
try:
|
||||
client.login()
|
||||
camera = EzvizCamera(client, args.serial)
|
||||
logging.debug("Camera loaded")
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
client.close_session()
|
||||
|
||||
if args.camera_action == "move":
|
||||
try:
|
||||
camera.move(args.direction, args.speed)
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
finally:
|
||||
client.close_session()
|
||||
|
||||
elif args.camera_action == "status":
|
||||
try:
|
||||
print(json.dumps(camera.status(), indent=2))
|
||||
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
finally:
|
||||
client.close_session()
|
||||
|
||||
elif args.camera_action == "switch":
|
||||
try:
|
||||
if args.switch == "ir":
|
||||
camera.switch_device_ir_led(args.enable)
|
||||
elif args.switch == "state":
|
||||
print(args.enable)
|
||||
camera.switch_device_state_led(args.enable)
|
||||
elif args.switch == "audio":
|
||||
camera.switch_device_audio(args.enable)
|
||||
elif args.switch == "privacy":
|
||||
camera.switch_privacy_mode(args.enable)
|
||||
elif args.switch == "sleep":
|
||||
camera.switch_sleep_mode(args.enable)
|
||||
elif args.switch == "follow_move":
|
||||
camera.switch_follow_move(args.enable)
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
finally:
|
||||
client.close_session()
|
||||
|
||||
elif args.camera_action == "alarm":
|
||||
try:
|
||||
if args.sound is not None:
|
||||
camera.alarm_sound(args.sound)
|
||||
if args.notify is not None:
|
||||
camera.alarm_notify(args.notify)
|
||||
if args.sensibility is not None:
|
||||
camera.alarm_detection_sensibility(args.sensibility)
|
||||
if args.schedule is not None:
|
||||
camera.change_defence_schedule(args.schedule)
|
||||
except Exception as exp: # pylint: disable=broad-except
|
||||
print(exp)
|
||||
finally:
|
||||
client.close_session()
|
||||
|
||||
else:
|
||||
print("Action not implemented, try running with -h switch for help")
|
||||
|
||||
else:
|
||||
print("Action not implemented: %s", args.action)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
|
@ -1,234 +0,0 @@
|
|||
"""pyezviz camera api."""
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
from typing import Any
|
||||
|
||||
from pyezviz.constants import DeviceCatagories, DeviceSwitchType, SoundMode
|
||||
from pyezviz.exceptions import PyEzvizError
|
||||
|
||||
|
||||
class EzvizCamera:
|
||||
"""Initialize Ezviz camera object."""
|
||||
|
||||
def __init__(self, client, serial: str, device_obj: dict | None = None) -> None:
|
||||
"""Initialize the camera object."""
|
||||
self._client = client
|
||||
self._serial = serial
|
||||
self._switch: dict[int, bool] = {}
|
||||
self._alarmmotiontrigger: dict[str, Any] = {}
|
||||
self._device = device_obj or {}
|
||||
self.alarmlist_time = None
|
||||
self.alarmlist_pic = None
|
||||
|
||||
def load(self) -> None:
|
||||
"""Update device info for camera serial."""
|
||||
|
||||
if self._device is None:
|
||||
self._device = self._client.get_all_per_serial_infos(self._serial)
|
||||
|
||||
self._alarm_list()
|
||||
|
||||
self._switch_status()
|
||||
|
||||
def _switch_status(self) -> None:
|
||||
"""load device switches"""
|
||||
|
||||
if self._device.get("switchStatusInfos"):
|
||||
for switch in self._device["switchStatusInfos"]:
|
||||
self._switch.update({switch["type"]: switch["enable"]})
|
||||
|
||||
else:
|
||||
self._switch = {0: False}
|
||||
|
||||
def _detection_sensibility(self) -> Any:
|
||||
"""load detection sensibility"""
|
||||
result = "Unknown"
|
||||
|
||||
if self._switch.get(DeviceSwitchType.AUTO_SLEEP.value) is not True:
|
||||
if (
|
||||
self._device["deviceInfos"]["deviceCategory"]
|
||||
== DeviceCatagories.BATTERY_CAMERA_DEVICE_CATEGORY.value
|
||||
):
|
||||
result = self._client.get_detection_sensibility(
|
||||
self._serial,
|
||||
"3",
|
||||
)
|
||||
else:
|
||||
result = self._client.get_detection_sensibility(self._serial)
|
||||
|
||||
if self._switch.get(DeviceSwitchType.AUTO_SLEEP.value) is True:
|
||||
result = "Hibernate"
|
||||
|
||||
return result
|
||||
|
||||
def _alarm_list(self) -> None:
|
||||
"""get last alarm info for this camera's self._serial"""
|
||||
alarmlist = self._client.get_alarminfo(self._serial)
|
||||
|
||||
if alarmlist.get("page").get("totalResults") > 0:
|
||||
self.alarmlist_time = alarmlist.get("alarms")[0].get("alarmStartTimeStr")
|
||||
self.alarmlist_pic = alarmlist.get("alarms")[0].get("picUrl")
|
||||
|
||||
if self.alarmlist_time:
|
||||
self._motion_trigger(self.alarmlist_time)
|
||||
|
||||
def _local_ip(self) -> str:
|
||||
""" "Fix empty ip value for certain cameras"""
|
||||
if self._device.get("wifiInfos"):
|
||||
return self._device["wifiInfos"].get("address")
|
||||
|
||||
# Seems to return none or 0.0.0.0 on some. This should run 2nd.
|
||||
if self._device.get("connectionInfos"):
|
||||
if self._device["connectionInfos"].get("localIp"):
|
||||
return self._device["connectionInfos"]["localIp"]
|
||||
|
||||
return "0.0.0.0"
|
||||
|
||||
def _motion_trigger(self, alarmlist_time: str) -> None:
|
||||
"""Create motion sensor based on last alarm time."""
|
||||
now = datetime.datetime.now().replace(microsecond=0)
|
||||
alarm_trigger_active = 0
|
||||
today_date = datetime.date.today()
|
||||
fix = datetime.datetime.now().replace(microsecond=0)
|
||||
|
||||
# Need to handle error if time format different
|
||||
fix = datetime.datetime.strptime(
|
||||
alarmlist_time.replace("Today", str(today_date)),
|
||||
"%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
|
||||
# returns a timedelta object
|
||||
timepassed = now - fix
|
||||
|
||||
if timepassed < datetime.timedelta(seconds=60):
|
||||
alarm_trigger_active = 1
|
||||
|
||||
self._alarmmotiontrigger = {
|
||||
"alarm_trigger_active": alarm_trigger_active,
|
||||
"timepassed": timepassed.total_seconds(),
|
||||
}
|
||||
|
||||
def _is_alarm_schedules_enabled(self) -> bool | None:
|
||||
"""Checks if alarm schedules enabled"""
|
||||
time_plans = None
|
||||
|
||||
if self._device.get("timePlanInfos"):
|
||||
time_plans = [
|
||||
item for item in self._device["timePlanInfos"] if item.get("type") == 2
|
||||
]
|
||||
|
||||
if time_plans:
|
||||
return bool(time_plans[0].get("enable"))
|
||||
|
||||
return None
|
||||
|
||||
def status(self) -> dict[Any, Any]:
|
||||
"""Return the status of the camera."""
|
||||
self.load()
|
||||
|
||||
return {
|
||||
"serial": self._serial,
|
||||
"name": self._device["deviceInfos"].get("name"),
|
||||
"version": self._device["deviceInfos"].get("version"),
|
||||
"upgrade_available": self._device["statusInfos"].get("upgradeAvailable"),
|
||||
"status": self._device["deviceInfos"].get("status"),
|
||||
"device_category": self._device["deviceInfos"].get("deviceCategory"),
|
||||
"device_sub_category": self._device["deviceInfos"].get("deviceSubCategory"),
|
||||
"sleep": self._switch.get(DeviceSwitchType.SLEEP.value)
|
||||
or self._switch.get(DeviceSwitchType.AUTO_SLEEP.value),
|
||||
"privacy": self._switch.get(DeviceSwitchType.PRIVACY.value),
|
||||
"audio": self._switch.get(DeviceSwitchType.SOUND.value),
|
||||
"ir_led": self._switch.get(DeviceSwitchType.INFRARED_LIGHT.value),
|
||||
"state_led": self._switch.get(DeviceSwitchType.LIGHT.value),
|
||||
"follow_move": self._switch.get(DeviceSwitchType.MOBILE_TRACKING.value),
|
||||
"alarm_notify": bool(self._device["statusInfos"].get("globalStatus")),
|
||||
"alarm_schedules_enabled": self._is_alarm_schedules_enabled(),
|
||||
"alarm_sound_mod": SoundMode(
|
||||
self._device["statusInfos"].get("alarmSoundMode")
|
||||
).name,
|
||||
"encrypted": bool(self._device["statusInfos"].get("isEncrypted")),
|
||||
"local_ip": self._local_ip(),
|
||||
"wan_ip": self._device.get("connectionInfos", {}).get("netIp", "0.0.0.0"),
|
||||
"local_rtsp_port": self._device["connectionInfos"].get(
|
||||
"localRtspPort", "554"
|
||||
),
|
||||
"supported_channels": self._device["deviceInfos"].get("channelNumber"),
|
||||
"detection_sensibility": self._detection_sensibility(),
|
||||
"battery_level": self._device["statusInfos"]
|
||||
.get("optionals", {})
|
||||
.get("powerRemaining"),
|
||||
"PIR_Status": self._device["statusInfos"].get("pirStatus"),
|
||||
"Motion_Trigger": self._alarmmotiontrigger.get("alarm_trigger_active"),
|
||||
"Seconds_Last_Trigger": self._alarmmotiontrigger.get("timepassed"),
|
||||
"last_alarm_time": self.alarmlist_time,
|
||||
"last_alarm_pic": self.alarmlist_pic,
|
||||
"wifiInfos": self._device.get("wifiInfos"),
|
||||
"switches": self._switch,
|
||||
}
|
||||
|
||||
def move(self, direction: str, speed: int = 5) -> bool:
|
||||
"""Move camera."""
|
||||
if direction not in ["right", "left", "down", "up"]:
|
||||
raise PyEzvizError(f"Invalid direction: {direction} ")
|
||||
|
||||
# launch the start command
|
||||
self._client.ptz_control(str(direction).upper(), self._serial, "START", speed)
|
||||
# launch the stop command
|
||||
self._client.ptz_control(str(direction).upper(), self._serial, "STOP", speed)
|
||||
|
||||
return True
|
||||
|
||||
def alarm_notify(self, enable: int) -> bool:
|
||||
"""Enable/Disable camera notification when movement is detected."""
|
||||
return self._client.set_camera_defence(self._serial, enable)
|
||||
|
||||
def alarm_sound(self, sound_type: int) -> bool:
|
||||
"""Enable/Disable camera sound when movement is detected."""
|
||||
# we force enable = 1 , to make sound...
|
||||
return self._client.alarm_sound(self._serial, sound_type, 1)
|
||||
|
||||
def alarm_detection_sensibility(self, sensibility, type_value=0):
|
||||
"""Enable/Disable camera sound when movement is detected."""
|
||||
# we force enable = 1 , to make sound...
|
||||
return self._client.detection_sensibility(self._serial, sensibility, type_value)
|
||||
|
||||
def switch_device_audio(self, enable=0):
|
||||
"""Switch audio status on a device."""
|
||||
return self._client.switch_status(
|
||||
self._serial, DeviceSwitchType.SOUND.value, enable
|
||||
)
|
||||
|
||||
def switch_device_state_led(self, enable=0):
|
||||
"""Switch led status on a device."""
|
||||
return self._client.switch_status(
|
||||
self._serial, DeviceSwitchType.LIGHT.value, enable
|
||||
)
|
||||
|
||||
def switch_device_ir_led(self, enable=0):
|
||||
"""Switch ir status on a device."""
|
||||
return self._client.switch_status(
|
||||
self._serial, DeviceSwitchType.INFRARED_LIGHT.value, enable
|
||||
)
|
||||
|
||||
def switch_privacy_mode(self, enable=0):
|
||||
"""Switch privacy mode on a device."""
|
||||
return self._client.switch_status(
|
||||
self._serial, DeviceSwitchType.PRIVACY.value, enable
|
||||
)
|
||||
|
||||
def switch_sleep_mode(self, enable=0):
|
||||
"""Switch sleep mode on a device."""
|
||||
return self._client.switch_status(
|
||||
self._serial, DeviceSwitchType.SLEEP.value, enable
|
||||
)
|
||||
|
||||
def switch_follow_move(self, enable=0):
|
||||
"""Switch follow move."""
|
||||
return self._client.switch_status(
|
||||
self._serial, DeviceSwitchType.MOBILE_TRACKING.value, enable
|
||||
)
|
||||
|
||||
def change_defence_schedule(self, schedule, enable=0):
|
||||
"""Change defence schedule. Requires json formatted schedules."""
|
||||
return self._client.api_set_defence_schdule(self._serial, schedule, enable)
|
160
pyezviz/cas.py
160
pyezviz/cas.py
|
@ -1,160 +0,0 @@
|
|||
"""Ezviz CAS API Functions."""
|
||||
|
||||
import random
|
||||
import socket
|
||||
import ssl
|
||||
from io import BytesIO
|
||||
from itertools import cycle
|
||||
|
||||
import xmltodict
|
||||
from Crypto.Cipher import AES
|
||||
from pyezviz.constants import FEATURE_CODE, XOR_KEY
|
||||
from pyezviz.exceptions import InvalidHost
|
||||
|
||||
|
||||
def xor_enc_dec(msg, xor_key=XOR_KEY):
|
||||
"""Xor encodes camera serial"""
|
||||
with BytesIO(msg) as stream:
|
||||
xor_msg = bytes(a ^ b for a, b in zip(stream.read(), cycle(xor_key)))
|
||||
return xor_msg
|
||||
|
||||
|
||||
class EzvizCAS:
|
||||
"""Ezviz CAS server client."""
|
||||
|
||||
def __init__(self, token):
|
||||
"""Initialize the client object."""
|
||||
self._session = None
|
||||
self._token = token or {
|
||||
"session_id": None,
|
||||
"rf_session_id": None,
|
||||
"username": None,
|
||||
"api_url": "apiieu.ezvizlife.com",
|
||||
}
|
||||
self._service_urls = token["service_urls"]
|
||||
|
||||
def cas_get_encryption(self, devserial):
|
||||
"""Fetch encryption code from ezviz cas server"""
|
||||
|
||||
# Random hex 64 characters long.
|
||||
rand_hex = random.randrange(10 ** 80)
|
||||
rand_hex = "%064x" % rand_hex
|
||||
rand_hex = rand_hex[:64]
|
||||
|
||||
payload = (
|
||||
f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00"
|
||||
f"\x00\x02" # Check or order?
|
||||
f"\x00\x00\x00\x00\x00\x00 "
|
||||
f"\x01" # Check or order?
|
||||
f"\x00\x00\x00\x00\x00\x00\x02\t\x00\x00\x00\x00"
|
||||
f'<?xml version="1.0" encoding="utf-8"?>\n<Request>\n\t'
|
||||
f'<ClientID>{self._token["session_id"]}</ClientID>'
|
||||
f"\n\t<Sign>{FEATURE_CODE}</Sign>\n\t"
|
||||
f"<DevSerial>{devserial}</DevSerial>"
|
||||
f"\n\t<ClientType>0</ClientType>\n</Request>\n"
|
||||
).encode("latin1")
|
||||
|
||||
payload_end_padding = rand_hex.encode("latin1")
|
||||
|
||||
context = ssl.SSLContext(cert_reqs=ssl.CERT_NONE)
|
||||
|
||||
# Create a TCP/IP socket
|
||||
my_socket = socket.create_connection(
|
||||
(self._service_urls["sysConf"][15], self._service_urls["sysConf"][16])
|
||||
)
|
||||
my_socket = context.wrap_socket(
|
||||
my_socket, server_hostname=self._service_urls["sysConf"][15]
|
||||
)
|
||||
|
||||
# Get CAS Encryption Key
|
||||
try:
|
||||
my_socket.send(payload + payload_end_padding)
|
||||
response = my_socket.recv(1024)
|
||||
print(f"Get Encryption Key: {response}")
|
||||
|
||||
except (socket.gaierror, ConnectionRefusedError) as err:
|
||||
raise InvalidHost("Invalid IP or Hostname") from err
|
||||
|
||||
finally:
|
||||
my_socket.close()
|
||||
|
||||
# Trim header, tail and convert xml to dict.
|
||||
response = response[32::]
|
||||
response = response[:-32:]
|
||||
response = xmltodict.parse(response)
|
||||
|
||||
return response
|
||||
|
||||
def set_camera_defence_state(self, serial, enable=1):
|
||||
"""Enable alarm notifications."""
|
||||
|
||||
# Random hex 64 characters long.
|
||||
rand_hex = random.randrange(10 ** 80)
|
||||
rand_hex = "%064x" % rand_hex
|
||||
rand_hex = rand_hex[:64]
|
||||
|
||||
payload = (
|
||||
f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00"
|
||||
f"\x00\x14" # Check or order?
|
||||
f"\x00\x00\x00\x00\x00\x00 "
|
||||
f"\x05"
|
||||
f"\x00\x00\x00\x00\x00\x00\x02\xd0\x00\x00\x01\xe0"
|
||||
f'<?xml version="1.0" encoding="utf-8"?>\n<Request>\n\t'
|
||||
f'<Verify ClientSession="{self._token["session_id"]}" '
|
||||
f'ToDevice="{serial}" ClientType="0" />\n\t'
|
||||
f'<Message Length="240" />\n</Request>\n'
|
||||
f"\x9e\xba\xac\xe9\x01\x00\x00\x00\x00\x00"
|
||||
f"\x00\x13"
|
||||
f"\x00\x00\x00\x00\x00\x000\x0f\xff\xff\xff\xff"
|
||||
f"\x00\x00\x00\xb0\x00\x00\x00\x00"
|
||||
).encode("latin1")
|
||||
|
||||
payload_end_padding = rand_hex.encode("latin1")
|
||||
|
||||
# xor camera serial
|
||||
xor_cam_serial = xor_enc_dec(serial.encode("latin1"))
|
||||
|
||||
defence_msg_string = (
|
||||
f'{xor_cam_serial.decode()}2+,*xdv.0" '
|
||||
f'encoding="utf-8"?>\n'
|
||||
f"<Request>\n"
|
||||
f"\t<OperationCode>ABCDEFG</OperationCode>\n"
|
||||
f'\t<Defence Type="Global" Status="{enable}" Actor="V" Channel="0" />\n'
|
||||
f"</Request>\n"
|
||||
f"\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10"
|
||||
).encode("latin1")
|
||||
|
||||
context = ssl.SSLContext(cert_reqs=ssl.CERT_NONE)
|
||||
|
||||
# Create a TCP/IP socket
|
||||
my_socket = socket.create_connection(
|
||||
(self._service_urls["sysConf"][15], self._service_urls["sysConf"][16])
|
||||
)
|
||||
my_socket = context.wrap_socket(
|
||||
my_socket, server_hostname=self._service_urls["sysConf"][15]
|
||||
)
|
||||
|
||||
cas_client = self.cas_get_encryption(serial)
|
||||
|
||||
aes_key = cas_client["Response"]["Session"]["@Key"].encode("latin1")
|
||||
iv_value = (
|
||||
f"{serial}{cas_client['Response']['Session']['@OperationCode']}".encode(
|
||||
"latin1"
|
||||
)
|
||||
)
|
||||
|
||||
# Message encryption
|
||||
cipher = AES.new(aes_key, AES.MODE_CBC, iv_value)
|
||||
|
||||
try:
|
||||
defence_msg_string = cipher.encrypt(defence_msg_string)
|
||||
my_socket.send(payload + defence_msg_string + payload_end_padding)
|
||||
print(f"Set camera response: {my_socket.recv()}")
|
||||
|
||||
except (socket.gaierror, ConnectionRefusedError) as err:
|
||||
raise InvalidHost("Invalid IP or Hostname") from err
|
||||
|
||||
finally:
|
||||
my_socket.close()
|
||||
|
||||
return True
|
|
@ -1,919 +0,0 @@
|
|||
"""Ezviz API."""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
import requests
|
||||
|
||||
from pyezviz.camera import EzvizCamera
|
||||
from pyezviz.cas import EzvizCAS
|
||||
from pyezviz.constants import (
|
||||
DEFAULT_TIMEOUT,
|
||||
FEATURE_CODE,
|
||||
MAX_RETRIES,
|
||||
DefenseModeType,
|
||||
DeviceCatagories,
|
||||
)
|
||||
from pyezviz.exceptions import HTTPError, InvalidURL, PyEzvizError
|
||||
|
||||
API_ENDPOINT_CLOUDDEVICES = "/api/cloud/v2/cloudDevices/getAll"
|
||||
API_ENDPOINT_PAGELIST = "/v3/userdevices/v1/devices/pagelist"
|
||||
API_ENDPOINT_DEVICES = "/v3/devices/"
|
||||
API_ENDPOINT_LOGIN = "/v3/users/login/v5"
|
||||
API_ENDPOINT_REFRESH_SESSION_ID = "/v3/apigateway/login"
|
||||
API_ENDPOINT_SWITCH_STATUS = "/switchStatus"
|
||||
API_ENDPOINT_PTZCONTROL = "/ptzControl"
|
||||
API_ENDPOINT_ALARM_SOUND = "/alarm/sound"
|
||||
API_ENDPOINT_DETECTION_SENSIBILITY = "/api/device/configAlgorithm"
|
||||
API_ENDPOINT_DETECTION_SENSIBILITY_GET = "/api/device/queryAlgorithmConfig"
|
||||
API_ENDPOINT_ALARMINFO_GET = "/v3/alarms/v2/advanced"
|
||||
API_ENDPOINT_SET_DEFENCE_SCHEDULE = "/api/device/defence/plan2"
|
||||
API_ENDPOINT_SWITCH_DEFENCE_MODE = "/v3/userdevices/v1/group/switchDefenceMode"
|
||||
API_ENDPOINT_SWITCH_SOUND_ALARM = "/sendAlarm"
|
||||
API_ENDPOINT_SERVER_INFO = "/v3/configurations/system/info"
|
||||
|
||||
|
||||
class EzvizClient:
|
||||
"""Initialize api client object."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
account: str | None = None,
|
||||
password: str | None = None,
|
||||
url: str = "apiieu.ezvizlife.com",
|
||||
timeout: int = DEFAULT_TIMEOUT,
|
||||
token: dict | None = None,
|
||||
) -> None:
|
||||
"""Initialize the client object."""
|
||||
self.account = account
|
||||
self.password = password
|
||||
self._session = requests.session()
|
||||
# Set Android generic user agent.
|
||||
self._session.headers.update({"User-Agent": "okhttp/3.12.1"})
|
||||
self._token = token or {
|
||||
"session_id": None,
|
||||
"rf_session_id": None,
|
||||
"username": None,
|
||||
"api_url": url,
|
||||
}
|
||||
self._timeout = timeout
|
||||
|
||||
def _login(self, account: str, password: str) -> dict[Any, Any]:
|
||||
"""Login to Ezviz API."""
|
||||
|
||||
# Region code to url.
|
||||
if len(self._token["api_url"].split(".")) == 1:
|
||||
self._token["api_url"] = "apii" + self._token["api_url"] + ".ezvizlife.com"
|
||||
|
||||
# Ezviz API sends md5 of password
|
||||
temp = hashlib.md5()
|
||||
temp.update(password.encode("utf-8"))
|
||||
md5pass = temp.hexdigest()
|
||||
payload = {
|
||||
"account": account,
|
||||
"password": md5pass,
|
||||
"featureCode": FEATURE_CODE,
|
||||
"msgType": "0",
|
||||
"cuName": "SFRDIDEw",
|
||||
}
|
||||
|
||||
try:
|
||||
req = self._session.post(
|
||||
"https://" + self._token["api_url"] + API_ENDPOINT_LOGIN,
|
||||
allow_redirects=False,
|
||||
headers={
|
||||
"clientType": "3",
|
||||
"customno": "1000001",
|
||||
"clientNo": "web_site",
|
||||
"appId": "ys7",
|
||||
"lang": "en",
|
||||
},
|
||||
data=payload,
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.ConnectionError as err:
|
||||
raise InvalidURL("A Invalid URL or Proxy error occured") from err
|
||||
|
||||
except requests.HTTPError as err:
|
||||
raise HTTPError from err
|
||||
|
||||
try:
|
||||
json_result = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
if json_result["meta"]["code"] == 1100:
|
||||
self._token["api_url"] = json_result["loginArea"]["apiDomain"]
|
||||
print("Region incorrect!")
|
||||
print(f"Your region url: {self._token['api_url']}")
|
||||
self.close_session()
|
||||
return self.login()
|
||||
|
||||
if json_result["meta"]["code"] == 1013:
|
||||
raise PyEzvizError("Incorrect Username.")
|
||||
|
||||
if json_result["meta"]["code"] == 1014:
|
||||
raise PyEzvizError("Incorrect Password.")
|
||||
|
||||
if json_result["meta"]["code"] == 1015:
|
||||
raise PyEzvizError("The user is locked.")
|
||||
|
||||
self._token["session_id"] = str(json_result["loginSession"]["sessionId"])
|
||||
self._token["rf_session_id"] = str(json_result["loginSession"]["rfSessionId"])
|
||||
self._token["username"] = str(json_result["loginUser"]["username"])
|
||||
self._token["api_url"] = str(json_result["loginArea"]["apiDomain"])
|
||||
if not self._token["session_id"]:
|
||||
raise PyEzvizError(
|
||||
f"Login error: Please check your username/password: {req.text}"
|
||||
)
|
||||
|
||||
self._token["service_urls"] = self.get_service_urls()
|
||||
|
||||
return self._token
|
||||
|
||||
def get_service_urls(self) -> Any:
|
||||
"""Get Ezviz service urls."""
|
||||
|
||||
try:
|
||||
req = self._session.get(
|
||||
f"https://{self._token['api_url']}{API_ENDPOINT_SERVER_INFO}",
|
||||
headers={
|
||||
"sessionId": self._token["session_id"],
|
||||
"featureCode": FEATURE_CODE,
|
||||
},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.ConnectionError as err:
|
||||
raise InvalidURL("A Invalid URL or Proxy error occured") from err
|
||||
|
||||
except requests.HTTPError as err:
|
||||
if err.response.status_code == 401:
|
||||
# session is wrong, need to relogin
|
||||
self.login()
|
||||
|
||||
raise HTTPError from err
|
||||
|
||||
if not req.text:
|
||||
raise PyEzvizError("No data")
|
||||
|
||||
try:
|
||||
json_output = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
if json_output.get("meta").get("code") != 200:
|
||||
logging.info("Json request error")
|
||||
|
||||
service_urls = json_output["systemConfigInfo"]
|
||||
service_urls["sysConf"] = service_urls["sysConf"].split("|")
|
||||
|
||||
return service_urls
|
||||
|
||||
def _api_get_pagelist(
|
||||
self, page_filter: str, json_key: str | None = None, max_retries: int = 0
|
||||
) -> Any:
|
||||
"""Get data from pagelist API."""
|
||||
|
||||
if max_retries > MAX_RETRIES:
|
||||
raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
|
||||
|
||||
if page_filter is None:
|
||||
raise PyEzvizError("Trying to call get_pagelist without filter")
|
||||
|
||||
try:
|
||||
req = self._session.get(
|
||||
"https://" + self._token["api_url"] + API_ENDPOINT_PAGELIST,
|
||||
headers={"sessionId": self._token["session_id"]},
|
||||
params={"filter": page_filter},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
if err.response.status_code == 401:
|
||||
# session is wrong, need to relogin
|
||||
self.login()
|
||||
return self._api_get_pagelist(page_filter, json_key, max_retries + 1)
|
||||
|
||||
raise HTTPError from err
|
||||
|
||||
if not req.text:
|
||||
raise PyEzvizError("No data")
|
||||
|
||||
try:
|
||||
json_output = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
if json_output.get("meta").get("code") != 200:
|
||||
# session is wrong, need to relogin
|
||||
self.login()
|
||||
logging.info(
|
||||
"Json request error, relogging (max retries: %s)", str(max_retries)
|
||||
)
|
||||
return self._api_get_pagelist(page_filter, json_key, max_retries + 1)
|
||||
|
||||
if json_key is None:
|
||||
json_result = json_output
|
||||
else:
|
||||
json_result = json_output[json_key]
|
||||
|
||||
if not json_result:
|
||||
# session is wrong, need to relogin
|
||||
self.login()
|
||||
logging.info(
|
||||
"Impossible to load the devices, here is the returned response: %s",
|
||||
str(req.text),
|
||||
)
|
||||
return self._api_get_pagelist(page_filter, json_key, max_retries + 1)
|
||||
|
||||
return json_result
|
||||
|
||||
def get_alarminfo(self, serial: str, max_retries: int = 0) -> Any:
|
||||
"""Get data from alarm info API."""
|
||||
if max_retries > MAX_RETRIES:
|
||||
raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
|
||||
|
||||
params: dict[str, int | str] = {
|
||||
"deviceSerials": serial,
|
||||
"queryType": -1,
|
||||
"limit": 1,
|
||||
"stype": -1,
|
||||
}
|
||||
|
||||
try:
|
||||
req = self._session.get(
|
||||
"https://" + self._token["api_url"] + API_ENDPOINT_ALARMINFO_GET,
|
||||
headers={"sessionId": self._token["session_id"]},
|
||||
params=params,
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
if err.response.status_code == 401:
|
||||
# session is wrong, need to relogin
|
||||
self.login()
|
||||
return self.get_alarminfo(serial, max_retries + 1)
|
||||
|
||||
raise HTTPError from err
|
||||
|
||||
if req.text == "":
|
||||
raise PyEzvizError("No data")
|
||||
|
||||
try:
|
||||
json_output = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
return json_output
|
||||
|
||||
def _switch_status(
|
||||
self, serial: str, status_type: int, enable: int, max_retries: int = 0
|
||||
) -> bool:
|
||||
"""Switch status on a device."""
|
||||
if max_retries > MAX_RETRIES:
|
||||
raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
|
||||
|
||||
try:
|
||||
req = self._session.put(
|
||||
"https://"
|
||||
+ self._token["api_url"]
|
||||
+ API_ENDPOINT_DEVICES
|
||||
+ serial
|
||||
+ "/1/1/"
|
||||
+ str(status_type)
|
||||
+ API_ENDPOINT_SWITCH_STATUS,
|
||||
headers={"sessionId": self._token["session_id"]},
|
||||
data={
|
||||
"enable": enable,
|
||||
"serial": serial,
|
||||
"channelNo": "1",
|
||||
"type": status_type,
|
||||
},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
if err.response.status_code == 401:
|
||||
# session is wrong, need to relogin
|
||||
self.login()
|
||||
return self._switch_status(serial, status_type, enable, max_retries + 1)
|
||||
|
||||
raise HTTPError from err
|
||||
|
||||
try:
|
||||
json_output = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
if json_output.get("meta").get("code") != 200:
|
||||
raise PyEzvizError(
|
||||
f"Could not set the switch: Got {req.status_code} : {req.text})"
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def sound_alarm(self, serial: str, enable: int = 1, max_retries: int = 0) -> bool:
|
||||
"""Sound alarm on a device."""
|
||||
if max_retries > MAX_RETRIES:
|
||||
raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
|
||||
|
||||
try:
|
||||
req = self._session.put(
|
||||
"https://"
|
||||
+ self._token["api_url"]
|
||||
+ API_ENDPOINT_DEVICES
|
||||
+ serial
|
||||
+ "/0"
|
||||
+ API_ENDPOINT_SWITCH_SOUND_ALARM,
|
||||
headers={"sessionId": self._token["session_id"]},
|
||||
data={
|
||||
"enable": enable,
|
||||
},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
if err.response.status_code == 401:
|
||||
# session is wrong, need to relogin
|
||||
self.login()
|
||||
return self.sound_alarm(serial, enable, max_retries + 1)
|
||||
|
||||
raise HTTPError from err
|
||||
|
||||
try:
|
||||
json_output = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
if json_output.get("meta").get("code") != 200:
|
||||
raise PyEzvizError(
|
||||
f"Could not set the alarm sound: Got {req.status_code} : {req.text})"
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def load_cameras(self) -> dict[Any, Any]:
|
||||
"""Load and return all cameras objects."""
|
||||
|
||||
devices = self._get_all_device_infos()
|
||||
cameras = {}
|
||||
supported_categories = [
|
||||
DeviceCatagories.COMMON_DEVICE_CATEGORY.value,
|
||||
DeviceCatagories.CAMERA_DEVICE_CATEGORY.value,
|
||||
DeviceCatagories.BATTERY_CAMERA_DEVICE_CATEGORY.value,
|
||||
DeviceCatagories.DOORBELL_DEVICE_CATEGORY.value,
|
||||
DeviceCatagories.BASE_STATION_DEVICE_CATEGORY.value,
|
||||
]
|
||||
|
||||
for device, data in devices.items():
|
||||
if data["deviceInfos"]["deviceCategory"] in supported_categories:
|
||||
# Add support for connected HikVision cameras
|
||||
if (
|
||||
data["deviceInfos"]["deviceCategory"]
|
||||
== DeviceCatagories.COMMON_DEVICE_CATEGORY.value
|
||||
and not data["deviceInfos"]["hik"]
|
||||
):
|
||||
continue
|
||||
|
||||
# Create camera object
|
||||
|
||||
camera = EzvizCamera(self, device, data)
|
||||
|
||||
camera.load()
|
||||
cameras[device] = camera.status()
|
||||
|
||||
return cameras
|
||||
|
||||
def _get_all_device_infos(self) -> dict[Any, Any]:
|
||||
"""Load all devices and build dict per device serial."""
|
||||
|
||||
devices = self._get_page_list()
|
||||
result: dict[Any, Any] = {}
|
||||
|
||||
for device in devices["deviceInfos"]:
|
||||
result[device["deviceSerial"]] = {}
|
||||
result[device["deviceSerial"]]["deviceInfos"] = device
|
||||
result[device["deviceSerial"]]["connectionInfos"] = devices.get(
|
||||
"connectionInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["p2pInfos"] = devices.get("p2pInfos").get(
|
||||
device["deviceSerial"]
|
||||
)
|
||||
result[device["deviceSerial"]]["alarmNodisturbInfos"] = devices.get(
|
||||
"alarmNodisturbInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["kmsInfos"] = devices.get("kmsInfos").get(
|
||||
device["deviceSerial"]
|
||||
)
|
||||
result[device["deviceSerial"]]["timePlanInfos"] = devices.get(
|
||||
"timePlanInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["statusInfos"] = devices.get(
|
||||
"statusInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["wifiInfos"] = devices.get("wifiInfos").get(
|
||||
device["deviceSerial"]
|
||||
)
|
||||
result[device["deviceSerial"]]["switchStatusInfos"] = devices.get(
|
||||
"switchStatusInfos"
|
||||
).get(device["deviceSerial"])
|
||||
for item in devices["cameraInfos"]:
|
||||
if item["deviceSerial"] == device["deviceSerial"]:
|
||||
result[device["deviceSerial"]]["cameraInfos"] = item
|
||||
|
||||
return result
|
||||
|
||||
def get_all_per_serial_infos(self, serial: str) -> dict[Any, Any] | None:
|
||||
"""Load all devices and build dict per device serial."""
|
||||
|
||||
if serial is None:
|
||||
raise PyEzvizError("Need serial number for this query")
|
||||
|
||||
devices = self._get_page_list()
|
||||
result: dict[str, dict] = {serial: {}}
|
||||
|
||||
for device in devices["deviceInfos"]:
|
||||
if device["deviceSerial"] == serial:
|
||||
result[device["deviceSerial"]]["deviceInfos"] = device
|
||||
result[device["deviceSerial"]]["deviceInfos"] = device
|
||||
result[device["deviceSerial"]]["connectionInfos"] = devices.get(
|
||||
"connectionInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["p2pInfos"] = devices.get(
|
||||
"p2pInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["alarmNodisturbInfos"] = devices.get(
|
||||
"alarmNodisturbInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["kmsInfos"] = devices.get(
|
||||
"kmsInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["timePlanInfos"] = devices.get(
|
||||
"timePlanInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["statusInfos"] = devices.get(
|
||||
"statusInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["wifiInfos"] = devices.get(
|
||||
"wifiInfos"
|
||||
).get(device["deviceSerial"])
|
||||
result[device["deviceSerial"]]["switchStatusInfos"] = devices.get(
|
||||
"switchStatusInfos"
|
||||
).get(device["deviceSerial"])
|
||||
for item in devices["cameraInfos"]:
|
||||
if item["deviceSerial"] == device["deviceSerial"]:
|
||||
result[device["deviceSerial"]]["cameraInfos"] = item
|
||||
|
||||
return result.get(serial)
|
||||
|
||||
def ptz_control(
|
||||
self, command: str, serial: str, action: str, speed: int = 5
|
||||
) -> Any:
|
||||
"""PTZ Control by API."""
|
||||
if command is None:
|
||||
raise PyEzvizError("Trying to call ptzControl without command")
|
||||
if action is None:
|
||||
raise PyEzvizError("Trying to call ptzControl without action")
|
||||
|
||||
try:
|
||||
req = self._session.put(
|
||||
"https://"
|
||||
+ self._token["api_url"]
|
||||
+ API_ENDPOINT_DEVICES
|
||||
+ serial
|
||||
+ API_ENDPOINT_PTZCONTROL,
|
||||
data={
|
||||
"command": command,
|
||||
"action": action,
|
||||
"channelNo": "1",
|
||||
"speed": speed,
|
||||
"uuid": str(uuid4()),
|
||||
"serial": serial,
|
||||
},
|
||||
headers={"sessionId": self._token["session_id"], "clientType": "1"},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
raise HTTPError from err
|
||||
|
||||
return req.text
|
||||
|
||||
def login(self) -> dict[Any, Any]:
|
||||
"""Get or refresh ezviz login token."""
|
||||
if self._token["session_id"] and self._token["rf_session_id"]:
|
||||
try:
|
||||
req = self._session.put(
|
||||
"https://"
|
||||
+ self._token["api_url"]
|
||||
+ API_ENDPOINT_REFRESH_SESSION_ID,
|
||||
data={
|
||||
"refreshSessionId": self._token["rf_session_id"],
|
||||
"featureCode": FEATURE_CODE,
|
||||
},
|
||||
headers={"sessionId": self._token["session_id"]},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
raise HTTPError from err
|
||||
|
||||
try:
|
||||
json_result = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
self._token["session_id"] = str(json_result["sessionInfo"]["sessionId"])
|
||||
self._token["rf_session_id"] = str(
|
||||
json_result["sessionInfo"]["refreshSessionId"]
|
||||
)
|
||||
if not self._token["session_id"]:
|
||||
raise PyEzvizError(f"Relogin required: {req.text}")
|
||||
|
||||
if not self._token.get("service_urls"):
|
||||
self._token["service_urls"] = self.get_service_urls()
|
||||
|
||||
return self._token
|
||||
|
||||
if self.account and self.password:
|
||||
return self._login(account=self.account, password=self.password)
|
||||
|
||||
raise PyEzvizError("Login with account and password required")
|
||||
|
||||
def set_camera_defence(self, serial: str, enable: int) -> bool:
|
||||
"""Enable/Disable motion detection on camera."""
|
||||
cas_client = EzvizCAS(self._token)
|
||||
cas_client.set_camera_defence_state(serial, enable)
|
||||
|
||||
return True
|
||||
|
||||
def api_set_defence_schedule(
|
||||
self, serial: str, schedule: str, enable: int, max_retries: int = 0
|
||||
) -> bool:
|
||||
"""Set defence schedules."""
|
||||
if max_retries > MAX_RETRIES:
|
||||
raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
|
||||
|
||||
schedulestring = (
|
||||
'{"CN":0,"EL":'
|
||||
+ str(enable)
|
||||
+ ',"SS":"'
|
||||
+ serial
|
||||
+ '","WP":['
|
||||
+ schedule
|
||||
+ "]}]}"
|
||||
)
|
||||
try:
|
||||
req = self._session.post(
|
||||
"https://" + self._token["api_url"] + API_ENDPOINT_SET_DEFENCE_SCHEDULE,
|
||||
headers={"sessionId": self._token["session_id"]},
|
||||
data={
|
||||
"devTimingPlan": schedulestring,
|
||||
},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
if err.response.status_code == 401:
|
||||
# session is wrong, need to relogin
|
||||
self.login()
|
||||
return self.api_set_defence_schedule(
|
||||
serial, schedule, enable, max_retries + 1
|
||||
)
|
||||
|
||||
raise HTTPError from err
|
||||
|
||||
try:
|
||||
json_output = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
if json_output.get("resultCode") != 0:
|
||||
raise PyEzvizError(
|
||||
f"Could not set the schedule: Got {req.status_code} : {req.text})"
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def api_set_defence_mode(self, mode: DefenseModeType, max_retries: int = 0) -> bool:
|
||||
"""Set defence mode."""
|
||||
if max_retries > MAX_RETRIES:
|
||||
raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
|
||||
|
||||
try:
|
||||
req = self._session.post(
|
||||
"https://" + self._token["api_url"] + API_ENDPOINT_SWITCH_DEFENCE_MODE,
|
||||
headers={"sessionId": self._token["session_id"]},
|
||||
data={
|
||||
"groupId": -1,
|
||||
"mode": mode,
|
||||
},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
if err.response.status_code == 401:
|
||||
# session is wrong, need to relogin
|
||||
self.login()
|
||||
return self.api_set_defence_mode(mode, max_retries + 1)
|
||||
|
||||
raise HTTPError from err
|
||||
|
||||
try:
|
||||
json_output = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
if json_output.get("meta").get("code") != 200:
|
||||
raise PyEzvizError(
|
||||
f"Could not set defence mode: Got {req.status_code} : {req.text})"
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def detection_sensibility(
|
||||
self,
|
||||
serial: str,
|
||||
sensibility: int = 3,
|
||||
type_value: int = 3,
|
||||
max_retries: int = 0,
|
||||
) -> bool | str:
|
||||
"""Set detection sensibility."""
|
||||
if max_retries > MAX_RETRIES:
|
||||
raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
|
||||
|
||||
if sensibility not in [0, 1, 2, 3, 4, 5, 6] and type_value == 0:
|
||||
raise PyEzvizError(
|
||||
"Unproper sensibility for type 0 (should be within 1 to 6)."
|
||||
)
|
||||
|
||||
try:
|
||||
req = self._session.post(
|
||||
"https://"
|
||||
+ self._token["api_url"]
|
||||
+ API_ENDPOINT_DETECTION_SENSIBILITY,
|
||||
headers={"sessionId": self._token["session_id"]},
|
||||
data={
|
||||
"subSerial": serial,
|
||||
"type": type_value,
|
||||
"channelNo": "1",
|
||||
"value": sensibility,
|
||||
},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
if err.response.status_code == 401:
|
||||
# session is wrong, need to re-log-in
|
||||
self.login()
|
||||
return self.detection_sensibility(
|
||||
serial, sensibility, type_value, max_retries + 1
|
||||
)
|
||||
|
||||
raise HTTPError from err
|
||||
|
||||
try:
|
||||
response_json = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError("Could not decode response:" + str(err)) from err
|
||||
|
||||
if response_json["resultCode"] and response_json["resultCode"] != "0":
|
||||
return "Unknown value"
|
||||
|
||||
return True
|
||||
|
||||
def get_detection_sensibility(
|
||||
self, serial: str, type_value: str = "0", max_retries: int = 0
|
||||
) -> Any:
|
||||
"""Get detection sensibility notifications."""
|
||||
if max_retries > MAX_RETRIES:
|
||||
raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
|
||||
|
||||
try:
|
||||
req = self._session.post(
|
||||
"https://"
|
||||
+ self._token["api_url"]
|
||||
+ API_ENDPOINT_DETECTION_SENSIBILITY_GET,
|
||||
headers={"sessionId": self._token["session_id"]},
|
||||
data={
|
||||
"subSerial": serial,
|
||||
},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
if err.response.status_code == 401:
|
||||
# session is wrong, need to re-log-in.
|
||||
self.login()
|
||||
return self.get_detection_sensibility(
|
||||
serial, type_value, max_retries + 1
|
||||
)
|
||||
|
||||
raise HTTPError from err
|
||||
|
||||
try:
|
||||
response_json = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError("Could not decode response:" + str(err)) from err
|
||||
|
||||
if response_json["resultCode"] != "0":
|
||||
return "Unknown"
|
||||
|
||||
if response_json["algorithmConfig"]["algorithmList"]:
|
||||
for idx in response_json["algorithmConfig"]["algorithmList"]:
|
||||
if idx["type"] == type_value:
|
||||
return idx["value"]
|
||||
|
||||
return "Unknown"
|
||||
|
||||
# soundtype: 0 = normal, 1 = intensive, 2 = disabled ... don't ask me why...
|
||||
def alarm_sound(
|
||||
self, serial: str, sound_type: int, enable: int = 1, max_retries: int = 0
|
||||
) -> bool:
|
||||
"""Enable alarm sound by API."""
|
||||
if max_retries > MAX_RETRIES:
|
||||
raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
|
||||
|
||||
if sound_type not in [0, 1, 2]:
|
||||
raise PyEzvizError(
|
||||
"Invalid sound_type, should be 0,1,2: " + str(sound_type)
|
||||
)
|
||||
|
||||
try:
|
||||
req = self._session.put(
|
||||
"https://"
|
||||
+ self._token["api_url"]
|
||||
+ API_ENDPOINT_DEVICES
|
||||
+ serial
|
||||
+ API_ENDPOINT_ALARM_SOUND,
|
||||
headers={"sessionId": self._token["session_id"]},
|
||||
data={
|
||||
"enable": enable,
|
||||
"soundType": sound_type,
|
||||
"voiceId": "0",
|
||||
"deviceSerial": serial,
|
||||
},
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.HTTPError as err:
|
||||
if err.response.status_code == 401:
|
||||
# session is wrong, need to re-log-in
|
||||
self.login()
|
||||
return self.alarm_sound(serial, sound_type, enable, max_retries + 1)
|
||||
|
||||
raise HTTPError from err
|
||||
|
||||
return True
|
||||
|
||||
def switch_status(self, serial: str, status_type: int, enable: int = 0) -> bool:
|
||||
"""Switch status of a device."""
|
||||
return self._switch_status(serial, status_type, enable)
|
||||
|
||||
def _get_page_list(self) -> Any:
|
||||
"""Get ezviz device info broken down in sections."""
|
||||
return self._api_get_pagelist(
|
||||
page_filter="CLOUD, TIME_PLAN, CONNECTION, SWITCH,"
|
||||
"STATUS, WIFI, NODISTURB, KMS, P2P,"
|
||||
"TIME_PLAN, CHANNEL, VTM, DETECTOR,"
|
||||
"FEATURE, UPGRADE, VIDEO_QUALITY, QOS",
|
||||
json_key=None,
|
||||
)
|
||||
|
||||
def get_device(self) -> Any:
|
||||
"""Get ezviz devices filter."""
|
||||
return self._api_get_pagelist(page_filter="CLOUD", json_key="deviceInfos")
|
||||
|
||||
def get_connection(self) -> Any:
|
||||
"""Get ezviz connection infos filter."""
|
||||
return self._api_get_pagelist(
|
||||
page_filter="CONNECTION", json_key="connectionInfos"
|
||||
)
|
||||
|
||||
def _get_status(self) -> Any:
|
||||
"""Get ezviz status infos filter."""
|
||||
return self._api_get_pagelist(page_filter="STATUS", json_key="statusInfos")
|
||||
|
||||
def get_switch(self) -> Any:
|
||||
"""Get ezviz switch infos filter."""
|
||||
return self._api_get_pagelist(
|
||||
page_filter="SWITCH", json_key="switchStatusInfos"
|
||||
)
|
||||
|
||||
def _get_wifi(self) -> Any:
|
||||
"""Get ezviz wifi infos filter."""
|
||||
return self._api_get_pagelist(page_filter="WIFI", json_key="wifiInfos")
|
||||
|
||||
def _get_nodisturb(self) -> Any:
|
||||
"""Get ezviz nodisturb infos filter."""
|
||||
return self._api_get_pagelist(
|
||||
page_filter="NODISTURB", json_key="alarmNodisturbInfos"
|
||||
)
|
||||
|
||||
def _get_p2p(self) -> Any:
|
||||
"""Get ezviz P2P infos filter."""
|
||||
return self._api_get_pagelist(page_filter="P2P", json_key="p2pInfos")
|
||||
|
||||
def _get_kms(self) -> Any:
|
||||
"""Get ezviz KMS infos filter."""
|
||||
return self._api_get_pagelist(page_filter="KMS", json_key="kmsInfos")
|
||||
|
||||
def _get_time_plan(self) -> Any:
|
||||
"""Get ezviz TIME_PLAN infos filter."""
|
||||
return self._api_get_pagelist(page_filter="TIME_PLAN", json_key="timePlanInfos")
|
||||
|
||||
def close_session(self) -> None:
|
||||
"""Clear current session."""
|
||||
if self._session:
|
||||
self._session.close()
|
||||
|
||||
self._session = requests.session()
|
||||
self._session.headers.update(
|
||||
{"User-Agent": "okhttp/3.12.1"}
|
||||
) # Android generic user agent.
|
|
@ -1,109 +0,0 @@
|
|||
"""Device switch types relationship."""
|
||||
from enum import Enum, unique
|
||||
|
||||
FEATURE_CODE = "c22cb01f8cb83351422d82fad59c8e4e"
|
||||
XOR_KEY = b"\x0c\x0eJ^X\x15@Rr"
|
||||
DEFAULT_TIMEOUT = 25
|
||||
MAX_RETRIES = 3
|
||||
|
||||
@unique
|
||||
class DeviceSwitchType(Enum):
|
||||
"""Device switch name and number."""
|
||||
|
||||
ALARM_TONE = 1
|
||||
STREAM_ADAPTIVE = 2
|
||||
LIGHT = 3
|
||||
INTELLIGENT_ANALYSIS = 4
|
||||
LOG_UPLOAD = 5
|
||||
DEFENCE_PLAN = 6
|
||||
PRIVACY = 7
|
||||
SOUND_LOCALIZATION = 8
|
||||
CRUISE = 9
|
||||
INFRARED_LIGHT = 10
|
||||
WIFI = 11
|
||||
WIFI_MARKETING = 12
|
||||
WIFI_LIGHT = 13
|
||||
PLUG = 14
|
||||
SLEEP = 21
|
||||
SOUND = 22
|
||||
BABY_CARE = 23
|
||||
LOGO = 24
|
||||
MOBILE_TRACKING = 25
|
||||
CHANNELOFFLINE = 26
|
||||
ALL_DAY_VIDEO = 29
|
||||
AUTO_SLEEP = 32
|
||||
ROAMING_STATUS = 34
|
||||
DEVICE_4G = 35
|
||||
ALARM_REMIND_MODE = 37
|
||||
OUTDOOR_RINGING_SOUND = 39
|
||||
INTELLIGENT_PQ_SWITCH = 40
|
||||
DOORBELL_TALK = 101
|
||||
HUMAN_INTELLIGENT_DETECTION = 200
|
||||
LIGHT_FLICKER = 301
|
||||
ALARM_LIGHT = 303
|
||||
ALARM_LIGHT_RELEVANCE = 305
|
||||
DEVICE_HUMAN_RELATE_LIGHT = 41
|
||||
TAMPER_ALARM = 306
|
||||
DETECTION_TYPE = 451
|
||||
OUTLET_RECOVER = 600
|
||||
CHIME_INDICATOR_LIGHT = 611
|
||||
TRACKING = 650
|
||||
CRUISE_TRACKING = 651
|
||||
PARTIAL_IMAGE_OPTIMIZE = 700
|
||||
FEATURE_TRACKING = 701
|
||||
|
||||
|
||||
class SoundMode(Enum):
|
||||
"""Alarm sound level description."""
|
||||
|
||||
SILENT = 2
|
||||
SOFT = 0
|
||||
INTENSE = 1
|
||||
CUSTOM = 3
|
||||
PLAN = 4
|
||||
|
||||
|
||||
class DefenseModeType(Enum):
|
||||
"""Defense mode name and number."""
|
||||
|
||||
HOME_MODE = 1
|
||||
AWAY_MODE = 2
|
||||
SLEEP_MODE = 3
|
||||
UNSET_MODE = 0
|
||||
|
||||
|
||||
class DeviceCatagories(Enum):
|
||||
"""Supported device categories."""
|
||||
|
||||
COMMON_DEVICE_CATEGORY = "COMMON"
|
||||
CAMERA_DEVICE_CATEGORY = "IPC"
|
||||
BATTERY_CAMERA_DEVICE_CATEGORY = "BatteryCamera"
|
||||
DOORBELL_DEVICE_CATEGORY = "BDoorBell"
|
||||
BASE_STATION_DEVICE_CATEGORY = "XVR"
|
||||
|
||||
|
||||
class SensorType(Enum):
|
||||
"""Sensors and their types to expose in HA."""
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
sw_version = "None"
|
||||
alarm_sound_mod = "None"
|
||||
battery_level = "battery"
|
||||
detection_sensibility = "None"
|
||||
last_alarm_time = "None"
|
||||
Seconds_Last_Trigger = "None"
|
||||
last_alarm_pic = "None"
|
||||
supported_channels = "None"
|
||||
local_ip = "None"
|
||||
wan_ip = "None"
|
||||
PIR_Status = "motion"
|
||||
|
||||
|
||||
class BinarySensorType(Enum):
|
||||
"""Binary_sensors and their types to expose in HA."""
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
Motion_Trigger = "motion"
|
||||
alarm_schedules_enabled = "None"
|
||||
encrypted = "None"
|
||||
upgrade_available = "None"
|
|
@ -1,20 +0,0 @@
|
|||
"""PyEzviz Exceptions."""
|
||||
|
||||
class PyEzvizError(Exception):
|
||||
"""Ezviz api exception."""
|
||||
|
||||
|
||||
class InvalidURL(PyEzvizError):
|
||||
"""Invalid url exception."""
|
||||
|
||||
|
||||
class HTTPError(PyEzvizError):
|
||||
"""Invalid host exception."""
|
||||
|
||||
|
||||
class InvalidHost(PyEzvizError):
|
||||
"""Invalid host exception."""
|
||||
|
||||
|
||||
class AuthTestResultFailed(PyEzvizError):
|
||||
"""Authentication failed"""
|
234
pyezviz/mqtt.py
234
pyezviz/mqtt.py
|
@ -1,234 +0,0 @@
|
|||
"""Ezviz cloud MQTT client for push messages."""
|
||||
import os
|
||||
import logging
|
||||
logging.basicConfig(
|
||||
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
|
||||
level=logging.DEBUG if os.environ.get('DEBUG') else logging.INFO)
|
||||
|
||||
import base64
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
import requests
|
||||
from pyezviz.constants import DEFAULT_TIMEOUT, FEATURE_CODE
|
||||
from pyezviz.exceptions import HTTPError, InvalidURL, PyEzvizError
|
||||
|
||||
API_ENDPOINT_SERVER_INFO = "/v3/configurations/system/info"
|
||||
API_ENDPOINT_REGISTER_MQTT = "/v1/getClientId"
|
||||
API_ENDPOINT_START_MQTT = "/api/push/start"
|
||||
API_ENDPOINT_STOP_MQTT = "/api/push/stop"
|
||||
|
||||
|
||||
MQTT_APP_KEY = "4c6b3cc2-b5eb-4813-a592-612c1374c1fe"
|
||||
APP_SECRET = "17454517-cc1c-42b3-a845-99b4a15dd3e6"
|
||||
|
||||
|
||||
def on_subscribe(client, userdata, mid, granted_qos):
|
||||
"""On MQTT message subscribe."""
|
||||
# pylint: disable=unused-argument
|
||||
logging.info("Subscribed: " + str(mid) + " " + str(granted_qos))
|
||||
|
||||
|
||||
def on_connect(client, userdata, flags, return_code):
|
||||
"""On MQTT connect."""
|
||||
# pylint: disable=unused-argument
|
||||
if return_code == 0:
|
||||
logging.debug("connected OK Returned code=%s", return_code)
|
||||
else:
|
||||
logging.info("Bad connection Returned code=%s", return_code)
|
||||
client.reconnect()
|
||||
|
||||
|
||||
#def on_message(client, userdata, msg):
|
||||
# """On MQTT message receive."""
|
||||
# # pylint: disable=unused-argument
|
||||
# mqtt_message = json.loads(msg.payload)
|
||||
# mqtt_message["ext"] = mqtt_message["ext"].split(",")
|
||||
#
|
||||
# # Print payload message
|
||||
# decoded_message = {mqtt_message['ext'][2]:{'id':mqtt_message['id'], 'alert':mqtt_message['alert'], 'time':mqtt_message['ext'][1], 'alert type':mqtt_message['ext'][4], 'image':mqtt_message['ext'][16]}}
|
||||
# print(decoded_message)
|
||||
|
||||
|
||||
class MQTTClient(threading.Thread):
|
||||
"""Open MQTT connection to ezviz cloud."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
token,
|
||||
callback,
|
||||
timeout=DEFAULT_TIMEOUT,
|
||||
):
|
||||
"""Initialize the client object."""
|
||||
threading.Thread.__init__(self)
|
||||
self._session = None
|
||||
self._token = token or {
|
||||
"session_id": None,
|
||||
"rf_session_id": None,
|
||||
"username": None,
|
||||
"api_url": "apiieu.ezvizlife.com",
|
||||
}
|
||||
self._callback = callback
|
||||
self._timeout = timeout
|
||||
self._stop_event = threading.Event()
|
||||
self._mqtt_data = {
|
||||
"mqtt_clientid": None,
|
||||
"ticket": None,
|
||||
"push_url": token["service_urls"]["pushAddr"],
|
||||
}
|
||||
|
||||
def _mqtt(self):
|
||||
"""Receive MQTT messages from ezviz server"""
|
||||
|
||||
ezviz_mqtt_client = mqtt.Client(
|
||||
client_id=self._mqtt_data["mqtt_clientid"], protocol=4, transport="tcp"
|
||||
)
|
||||
ezviz_mqtt_client.on_connect = on_connect
|
||||
ezviz_mqtt_client.on_subscribe = on_subscribe
|
||||
ezviz_mqtt_client.on_message = self._callback
|
||||
ezviz_mqtt_client.username_pw_set(MQTT_APP_KEY, APP_SECRET)
|
||||
|
||||
ezviz_mqtt_client.connect(self._mqtt_data["push_url"], 1882, 60)
|
||||
ezviz_mqtt_client.subscribe(
|
||||
f"{MQTT_APP_KEY}/ticket/{self._mqtt_data['ticket']}", qos=2
|
||||
)
|
||||
|
||||
ezviz_mqtt_client.loop_start()
|
||||
return ezviz_mqtt_client
|
||||
|
||||
def _register_ezviz_push(self):
|
||||
"""Register for push messages."""
|
||||
|
||||
auth_seq = base64.b64encode(f"{MQTT_APP_KEY}:{APP_SECRET}".encode("ascii"))
|
||||
auth_seq = "Basic " + auth_seq.decode()
|
||||
|
||||
payload = {
|
||||
"appKey": MQTT_APP_KEY,
|
||||
"clientType": "5",
|
||||
"mac": FEATURE_CODE,
|
||||
"token": "123456",
|
||||
"version": "v1.3.0",
|
||||
}
|
||||
|
||||
try:
|
||||
req = self._session.post(
|
||||
f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_REGISTER_MQTT}",
|
||||
allow_redirects=False,
|
||||
headers={"Authorization": auth_seq},
|
||||
data=payload,
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.ConnectionError as err:
|
||||
raise InvalidURL("A Invalid URL or Proxy error occured") from err
|
||||
|
||||
except requests.HTTPError as err:
|
||||
raise HTTPError from err
|
||||
|
||||
try:
|
||||
json_result = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
self._mqtt_data["mqtt_clientid"] = json_result["data"]["clientId"]
|
||||
|
||||
def run(self):
|
||||
"""Method representing the thread's activity which should not be used directly."""
|
||||
|
||||
if self._session is None:
|
||||
self._session = requests.session()
|
||||
self._session.headers.update(
|
||||
{"User-Agent": "okhttp/3.12.1"}
|
||||
) # Android generic user agent.
|
||||
|
||||
self._register_ezviz_push()
|
||||
self._start_ezviz_push()
|
||||
self._mqtt()
|
||||
|
||||
while not self._stop_event.is_set():
|
||||
time.sleep(1)
|
||||
|
||||
def start(self):
|
||||
"""Start mqtt.
|
||||
Start mqtt thread
|
||||
"""
|
||||
super().start()
|
||||
|
||||
def stop(self):
|
||||
"""Stop push notifications."""
|
||||
|
||||
payload = {
|
||||
"appKey": MQTT_APP_KEY,
|
||||
"clientId": self._mqtt_data["mqtt_clientid"],
|
||||
"clientType": 5,
|
||||
"sessionId": self._token["session_id"],
|
||||
"username": self._token["username"],
|
||||
}
|
||||
|
||||
try:
|
||||
req = self._session.post(
|
||||
f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_STOP_MQTT}",
|
||||
data=payload,
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.ConnectionError as err:
|
||||
raise InvalidURL("A Invalid URL or Proxy error occured") from err
|
||||
|
||||
except requests.HTTPError as err:
|
||||
raise HTTPError from err
|
||||
|
||||
self._stop_event.set()
|
||||
|
||||
def _start_ezviz_push(self):
|
||||
"""Send start for push messages to ezviz api."""
|
||||
|
||||
payload = {
|
||||
"appKey": MQTT_APP_KEY,
|
||||
"clientId": self._mqtt_data["mqtt_clientid"],
|
||||
"clientType": 5,
|
||||
"sessionId": self._token["session_id"],
|
||||
"username": self._token["username"],
|
||||
"token": "123456",
|
||||
}
|
||||
|
||||
try:
|
||||
req = self._session.post(
|
||||
f"https://{self._mqtt_data['push_url']}{API_ENDPOINT_START_MQTT}",
|
||||
allow_redirects=False,
|
||||
data=payload,
|
||||
timeout=self._timeout,
|
||||
)
|
||||
|
||||
req.raise_for_status()
|
||||
|
||||
except requests.ConnectionError as err:
|
||||
raise InvalidURL("A Invalid URL or Proxy error occured") from err
|
||||
|
||||
except requests.HTTPError as err:
|
||||
raise HTTPError from err
|
||||
|
||||
try:
|
||||
json_result = req.json()
|
||||
|
||||
except ValueError as err:
|
||||
raise PyEzvizError(
|
||||
"Impossible to decode response: "
|
||||
+ str(err)
|
||||
+ "\nResponse was: "
|
||||
+ str(req.text)
|
||||
) from err
|
||||
|
||||
self._mqtt_data["ticket"] = json_result["ticket"]
|
|
@ -1,147 +0,0 @@
|
|||
"""Test camera RTSP authentication"""
|
||||
import base64
|
||||
import hashlib
|
||||
import socket
|
||||
from pyezviz.exceptions import AuthTestResultFailed, InvalidHost
|
||||
|
||||
|
||||
def genmsg_describe(url, seq, user_agent, auth_seq):
|
||||
"""Generate RTSP describe message"""
|
||||
msg_ret = "DESCRIBE " + url + " RTSP/1.0\r\n"
|
||||
msg_ret += "CSeq: " + str(seq) + "\r\n"
|
||||
msg_ret += "Authorization: " + auth_seq + "\r\n"
|
||||
msg_ret += "User-Agent: " + user_agent + "\r\n"
|
||||
msg_ret += "Accept: application/sdp\r\n"
|
||||
msg_ret += "\r\n"
|
||||
return msg_ret
|
||||
|
||||
|
||||
class TestRTSPAuth:
|
||||
"""Initialize RTSP credential test"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ip_addr,
|
||||
username=None,
|
||||
password=None,
|
||||
test_uri="",
|
||||
):
|
||||
self._rtsp_details = {
|
||||
"bufLen": 1024,
|
||||
"defaultServerIp": ip_addr,
|
||||
"defaultServerPort": 554,
|
||||
"defaultTestUri": test_uri,
|
||||
"defaultUserAgent": "RTSP Client",
|
||||
"defaultUsername": username,
|
||||
"defaultPassword": password,
|
||||
}
|
||||
|
||||
def generate_auth_string(self, realm, method, uri, nonce):
|
||||
"""Generate digest auth string """
|
||||
map_return_info = {}
|
||||
m_1 = hashlib.md5(
|
||||
f"{self._rtsp_details['defaultUsername']}:"
|
||||
f"{realm.decode()}:"
|
||||
f"{self._rtsp_details['defaultPassword']}".encode()
|
||||
).hexdigest()
|
||||
m_2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest()
|
||||
response = hashlib.md5(f"{m_1}:{nonce}:{m_2}".encode()).hexdigest()
|
||||
|
||||
map_return_info = (
|
||||
f"Digest "
|
||||
f"username=\"{self._rtsp_details['defaultUsername']}\", "
|
||||
f'realm="{realm.decode()}", '
|
||||
f'algorithm="MD5", '
|
||||
f'nonce="{nonce.decode()}", '
|
||||
f'uri="{uri}", '
|
||||
f'response="{response}"'
|
||||
)
|
||||
return map_return_info
|
||||
|
||||
def main(self):
|
||||
"""Main function """
|
||||
session = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
try:
|
||||
session.connect(
|
||||
(
|
||||
self._rtsp_details["defaultServerIp"],
|
||||
self._rtsp_details["defaultServerPort"],
|
||||
)
|
||||
)
|
||||
|
||||
except TimeoutError as err:
|
||||
raise AuthTestResultFailed("Invalid ip or camera hibernating") from err
|
||||
|
||||
except (socket.gaierror, ConnectionRefusedError) as err:
|
||||
raise InvalidHost("Invalid IP or Hostname") from err
|
||||
|
||||
seq = 1
|
||||
|
||||
url = (
|
||||
"rtsp://"
|
||||
+ self._rtsp_details["defaultServerIp"]
|
||||
+ self._rtsp_details["defaultTestUri"]
|
||||
)
|
||||
|
||||
auth_seq = base64.b64encode(
|
||||
f"{self._rtsp_details['defaultUsername']}:"
|
||||
f"{self._rtsp_details['defaultPassword']}".encode("ascii")
|
||||
)
|
||||
auth_seq = "Basic " + auth_seq.decode()
|
||||
|
||||
print(
|
||||
genmsg_describe(url, seq, self._rtsp_details["defaultUserAgent"], auth_seq)
|
||||
)
|
||||
session.send(
|
||||
genmsg_describe(
|
||||
url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
|
||||
).encode()
|
||||
)
|
||||
msg1 = session.recv(self._rtsp_details["bufLen"])
|
||||
seq = seq + 1
|
||||
|
||||
if msg1.decode().find("200 OK") > 1:
|
||||
print(f"Basic auth result: {msg1.decode()}")
|
||||
return print("Basic Auth test passed. Credentials Valid!")
|
||||
|
||||
if msg1.decode().find("Unauthorized") > 1:
|
||||
# Basic failed, doing new DESCRIBE with digest authentication.
|
||||
start = msg1.decode().find("realm")
|
||||
begin = msg1.decode().find('"', start)
|
||||
end = msg1.decode().find('"', begin + 1)
|
||||
realm = msg1[begin + 1 : end]
|
||||
|
||||
start = msg1.decode().find("nonce")
|
||||
begin = msg1.decode().find('"', start)
|
||||
end = msg1.decode().find('"', begin + 1)
|
||||
nonce = msg1[begin + 1 : end]
|
||||
|
||||
auth_seq = self.generate_auth_string(
|
||||
realm,
|
||||
"DESCRIBE",
|
||||
self._rtsp_details["defaultTestUri"],
|
||||
nonce,
|
||||
)
|
||||
|
||||
print(
|
||||
genmsg_describe(
|
||||
url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
|
||||
)
|
||||
)
|
||||
|
||||
session.send(
|
||||
genmsg_describe(
|
||||
url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
|
||||
).encode()
|
||||
)
|
||||
msg1 = session.recv(self._rtsp_details["bufLen"])
|
||||
print(f"Digest auth result: {msg1.decode()}")
|
||||
|
||||
if msg1.decode().find("200 OK") > 1:
|
||||
return print("Digest Auth test Passed. Credentials Valid!")
|
||||
|
||||
if msg1.decode().find("401 Unauthorized") > 1:
|
||||
raise AuthTestResultFailed("Credentials not valid!!")
|
||||
|
||||
return print("Basic Auth test passed. Credentials Valid!")
|
Loading…
Reference in New Issue
Block a user