import httpx
import json
from urllib.parse import urlparse
from datetime import datetime
from pathlib import Path
import logging
import re
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"http_requests_{datetime.now().strftime('%Y%m%d')}.log"
# Create logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# File handler - writes only CRITICAL logs to file
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.CRITICAL)
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(file_formatter)
# Console handler - shows all logs in console
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
console_handler.setFormatter(console_formatter)
# Add handlers to logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
def get_json_as_browser(url, timeout=30):
"""
Fetch JSON data from a URL while mimicking a browser request.
Args:
url: The URL to fetch
timeout: Request timeout in seconds (default: 30)
Returns:
dict: Parsed JSON response
"""
# Browser-like headers
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
try:
# Create client with browser-like settings
with httpx.Client(headers=headers, timeout=timeout, follow_redirects=True) as client:
response = client.get(url)
# response.raise_for_status() # Raise exception for 4xx/5xx status codes
content=response.json()
# Parse and return JSON
return content
except httpx.HTTPStatusError as e:
print(f"HTTP error occurred: {e.response.status_code}")
raise
except httpx.RequestError as e:
print(f"Request error occurred: {e}")
raise
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
raise
def get_id(content):
current_id=content.get('data').get('id')
return current_id
def parse_url(url):
res=urlparse(url)
hostname=res.hostname
username=res.hostname.split(".")[0]
slug=res.path.rstrip('/').split("/")[-1]
return hostname,username, slug
def get_link_download(data):
link=data.get('data').get('url')
logger.critical(f"download:{link}")
return link
def get_link_from_url(cur_url):
hostname, username, slug=parse_url(cur_url)
url_for_json=f"https://{hostname}/api/v1/published/galleries/{hostname}/{slug}/"
logger.critical(f"base_url:{cur_url}")
try:
json_gallery = get_json_as_browser(url_for_json)
print("Success! Received data:")
# print(json.dumps(data, indent=2))
cur_id=get_id(json_gallery)
print(f"{id}")
for type_content in ["original","web"]:
if type_content=="web":
link=get_json_as_browser(f"https://{hostname}/api/v1/download-gallery/{cur_id}/{type_content}/")
link=get_link_download(link)
# cur_filename=sanitize_filename(link)
try:
download_zip_file(link,filename=f"{username}_{slug}")
except Exception as e:
print(f"Failed to fetch data: {e}")
except Exception as e:
print(f"Failed to fetch data: {e}")
def sanitize_filename(filename):
# Define the pattern for invalid characters on Windows
# The characters are: \ / : * ? " < > |
# The backslash in the pattern needs to be escaped: \\\\
# The double quote in the pattern needs to be escaped: \"
invalid_chars_pattern = r'[\\/:\*\?"<>|]'
# Replace invalid characters with an empty string or a safe character (e.g., '_')
# Using an empty string effectively removes them
sanitized_filename = re.sub(invalid_chars_pattern, '', filename)
# Optional: Replace spaces with underscores for personal preference
# sanitized_filename = sanitized_filename.replace(' ', '_')
# Ensure the filename is not empty after sanitization
if not sanitized_filename:
return "default_filename"
return sanitized_filename
def download_zip_file(url, filename=None, save_dir="downloads", timeout=60, chunk_size=8192, max_retries=3):
"""
Download a ZIP file from a URL while mimicking a browser request.
Args:
url: The URL of the ZIP file to download
filename: Custom filename for the downloaded file (default: auto-generated from URL)
save_dir: Directory where to save the file (default: "downloads")
timeout: Request timeout in seconds (default: 60)
chunk_size: Size of chunks to download in bytes (default: 8192)
max_retries: Maximum number of retry attempts (default: 3)
Returns:
Path: Path to the downloaded file
"""
logger.info(f"Starting ZIP download from: {url}")
# Create download directory
download_dir = Path(save_dir)
download_dir.mkdir(parents=True, exist_ok=True)
# Determine filename
if filename is None:
filename = url.split('/')[-1].split('?')[0] # Remove query parameters
if not filename or not filename.endswith('.zip'):
filename = f"download_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
else:
# Ensure filename has .zip extension
if not filename.endswith('.zip'):
filename = f"{filename}.zip"
save_path = download_dir / filename
logger.info(f"Saving to: {save_path}")
# Browser-like headers for file download
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/zip, application/octet-stream, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'identity', # Changed from gzip to avoid compression issues
'DNT': '1',
'Connection': 'keep-alive',
}
last_error = None
verify_ssl = True # Start with SSL verification enabled
for attempt in range(max_retries):
try:
if attempt > 0:
logger.info(f"Retry attempt {attempt + 1}/{max_retries}")
import time
time.sleep(2 ** attempt) # Exponential backoff
# After first SSL failure, disable SSL verification for remaining attempts
if attempt > 0 and last_error and ("SSL" in str(last_error) or "ssl" in str(last_error).lower()):
verify_ssl = False
logger.warning(f"SSL error detected. Retrying with SSL verification disabled.")
# Create client with SSL verification option
with httpx.Client(
headers=headers,
timeout=timeout,
follow_redirects=True,
verify=verify_ssl
) as client:
start_time = datetime.now()
# Stream the response to handle large files
with client.stream('GET', url) as response:
response.raise_for_status()
# Get file size if available
total_size = int(response.headers.get('content-length', 0))
logger.info(f"File size: {total_size / (1024*1024):.2f} MB" if total_size else "File size: Unknown")
downloaded = 0
with open(save_path, 'wb') as f:
for chunk in response.iter_bytes(chunk_size=chunk_size):
f.write(chunk)
downloaded += len(chunk)
# Log progress for large files
if total_size and downloaded % (chunk_size * 100) == 0:
progress = (downloaded / total_size) * 100
logger.debug(f"Download progress: {progress:.1f}%")
elapsed = (datetime.now() - start_time).total_seconds()
file_size_mb = save_path.stat().st_size / (1024*1024)
logger.info(f"Download completed: {file_size_mb:.2f} MB in {elapsed:.2f}s ({file_size_mb/elapsed:.2f} MB/s)")
return save_path
except httpx.HTTPStatusError as e:
last_error = e
logger.error(f"HTTP error during download (attempt {attempt + 1}): {e.response.status_code} - {e}")
if e.response.status_code in [404, 403, 401]: # Don't retry on client errors
break
except (httpx.RequestError, Exception) as e:
last_error = e
error_msg = str(e)
logger.error(f"Error during download (attempt {attempt + 1}): {error_msg}")
# All retries failed
logger.critical(f"CRITICAL DOWNLOAD ERROR - URL: {url}, Error: {last_error}")
# Clean up partial download
if save_path.exists():
save_path.unlink()
logger.debug(f"Removed partial download: {save_path}")
raise last_error