plex-photo-scraper/har_parser.py

190 lines
7.0 KiB
Python

import json
from urllib.parse import urlparse, parse_qs
import pathlib
import base64
data = {}
names = {}
def main():
"""
Opens a HAR archive file "data.har" and prints a list of files (URLs)
found in the archive.
"""
har_file_path = "data.har"
with open(har_file_path, 'r', encoding='utf-8') as f:
har_data = json.load(f)
entries = har_data.get('log', {}).get('entries', [])
if not entries:
print("No entries found in the HAR file.")
return
for entry in entries:
response = entry.get('response', {})
content = response.get('content', {})
mime_type = content.get('mimeType', '')
if not mime_type.startswith('image/'):
continue
request_obj = entry.get('request', {})
request_url_str = request_obj.get('url')
if not request_url_str:
# If the entry's main request URL is missing, skip.
continue
parsed_request_url = urlparse(request_url_str)
query_params = parse_qs(parsed_request_url.query)
# The key for our 'data' dict is the value of the 'url' query parameter
# from the request's URL string.
key_from_query_param = query_params.get('url', [None])[0]
if not key_from_query_param:
# If the 'url' query parameter is not found in the request_url_str, skip.
continue
# The value for our 'data' dict is the response's base64 encoded text.
response_text = content.get('text')
if response_text is not None:
# Ensure response_text is not None (it can be an empty string for 0-byte files)
data[key_from_query_param] = response_text
# Second loop to process JSON entries
for entry in entries:
response = entry.get('response', {})
content = response.get('content', {})
mime_type = content.get('mimeType', '')
# Check if the mimeType indicates JSON
if 'json' not in mime_type.lower(): # Make check case-insensitive and broader
continue
response_text = content.get('text')
if not response_text:
continue
try:
json_data = json.loads(response_text)
except json.JSONDecodeError:
# If JSON parsing fails, skip this entry
continue
media_container = json_data.get('MediaContainer', {})
metadata_array = media_container.get('Metadata', [])
if not isinstance(metadata_array, list):
continue # Skip if Metadata is not a list
for metadata_element in metadata_array:
if not isinstance(metadata_element, dict):
continue # Skip if metadata_element is not a dict
media_array = metadata_element.get('Media', [])
if not isinstance(media_array, list):
continue # Skip if Media is not a list
for media_element in media_array:
if not isinstance(media_element, dict):
continue # Skip if media_element is not a dict
part_array = media_element.get('Part', [])
if not isinstance(part_array, list):
continue # Skip if Part is not a list
for part_element in part_array:
if not isinstance(part_element, dict):
continue # Skip if part_element is not a dict
part_key = part_element.get('key')
part_file = part_element.get('file')
if part_key is not None:
names[part_key] = part_file
print("\nProcessed data (truncated values):")
if not data:
print("No data was processed and stored.")
else:
for key, value in data.items():
if len(value) > 100:
truncated_value = value[:100] + "..."
else:
truncated_value = value
print(f"'{key}': '{truncated_value}'")
print("\nProcessed names (truncated file paths):")
if not names:
print("No names were processed and stored.")
else:
for key, value in names.items():
# Assuming value could be None or not a string, though unlikely with current logic
if isinstance(value, str):
if len(value) > 100:
truncated_value = value[:100] + "..."
else:
truncated_value = value
else:
truncated_value = str(value) # Convert non-string values to string
print(f"'{key}': '{truncated_value}'")
# --- Save decoded files ---
print("\nSaving decoded files...")
output_root_dir = pathlib.Path('data')
files_saved_count = 0
files_skipped_count = 0
for key_url, base64_content in data.items():
# The key_url is what we stored from the 'url' query parameter.
# The request asks to "split the url by '?' and keep the first part."
# This key_url itself is unlikely to have a '?', but we follow the instruction.
name_lookup_key = key_url.split('?', 1)[0]
original_file_path_str = names.get(name_lookup_key)
if not original_file_path_str:
print(f"Warning: No file path found in 'names' for key '{name_lookup_key}' (from URL '{key_url}'). Skipping.")
files_skipped_count += 1
continue
if not base64_content:
print(f"Warning: No base64 content for key '{key_url}' (file path '{original_file_path_str}'). Skipping.")
files_skipped_count += 1
continue
try:
# Ensure the path from 'names' is treated as relative to the 'output_root_dir'
# by stripping any leading slashes.
relative_file_path = pathlib.Path(original_file_path_str.lstrip('/\\'))
target_file_path = output_root_dir / relative_file_path
# Create parent directories
target_file_path.parent.mkdir(parents=True, exist_ok=True)
# Decode base64 content and write to file
decoded_data = base64.b64decode(base64_content)
with open(target_file_path, 'wb') as f_out:
f_out.write(decoded_data)
print(f"Saved: {target_file_path}")
files_saved_count += 1
except base64.binascii.Error as e:
print(f"Error decoding base64 for '{key_url}' (file path '{original_file_path_str}'): {e}. Skipping.")
files_skipped_count += 1
except OSError as e:
print(f"Error writing file '{target_file_path}' for key '{key_url}': {e}. Skipping.")
files_skipped_count += 1
except Exception as e:
print(f"An unexpected error occurred for key '{key_url}' (file path '{original_file_path_str}'): {e}. Skipping.")
files_skipped_count += 1
print(f"\nFile saving complete. Saved: {files_saved_count}, Skipped: {files_skipped_count}")
if __name__ == "__main__":
main()