import os import json import ffmpeg import subprocess def load_config(config_file): if os.path.exists(config_file): with open(config_file, 'r') as f: return json.load(f) else: return {"directories": []} def save_config(config_file, config): with open(config_file, 'w') as f: json.dump(config, f, indent=4) def media_create_cache(cache_dir, data): filepath = f"{cache_dir}/mediascan.json" with open(filepath, 'w') as f: json.dump(data, f, indent=2) def media_remove_cache(cache_dir): filepath = f"{cache_dir}/mediascan.json" if not os.path.exists(filepath): return True os.remove(filepath) return True def media_get_from_cache(cache_dir): filepath = f"{cache_dir}/mediascan.json" if not os.path.exists(filepath): return None with open(filepath, 'r') as f: return json.load(f) def get_media_info_with_ffprobe(file_path): try: if not os.path.exists(file_path): print(f"File not found: {file_path}") return None # Command to fetch streams streams_command = [ "ffprobe", "-v", "error", "-show_entries", "stream=index,codec_type,codec_name,width,height,bit_rate,channels,channel_layout", "-show_entries", "stream_tags=language,title", "-of", "json", file_path ] # Command to fetch format information format_command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration,bit_rate,size", "-of", "json", file_path ] streams_result = subprocess.run(streams_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) format_result = subprocess.run(format_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if streams_result.returncode != 0: print(f"ffprobe stream error: {streams_result.stderr}") return None if format_result.returncode != 0: print(f"ffprobe format error: {format_result.stderr}") return None streams_data = json.loads(streams_result.stdout) format_data = json.loads(format_result.stdout) format_info = format_data.get("format", {}) container_duration = float(format_info.get("duration", 0)) container_bitrate = int(format_info.get("bit_rate", 0)) // 1000 if format_info.get("bit_rate") else "Unknown" file_size = int(format_info.get("size", 0)) media_info = { "video": [], "audio": [], "container": { "duration": f"{container_duration:.2f} seconds" if container_duration else "Unknown", "bitrate": f"{container_bitrate} Kbit/s" if container_bitrate else "Unknown", "size": f"{file_size / (1024**2):.2f} MB" if file_size else "Unknown" } } for stream in streams_data.get("streams", []): codec_type = stream.get("codec_type") codec_name = stream.get("codec_name") stream_bitrate = int(stream.get("bit_rate", 0)) // 1000 if stream.get("bit_rate") else None width = stream.get("width") height = stream.get("height") channels = stream.get("channels", "Unknown") channel_layout = stream.get("channel_layout", "Unknown") language = stream.get("tags", {}).get("language", "Unknown") title = stream.get("tags", {}).get("title", "Unknown") bitrate = stream_bitrate if stream_bitrate else container_bitrate if codec_type == "video": if len(media_info["video"]) == 0: bitrate_str = f"{bitrate} Kbit/s" if bitrate else "Unknown" else: bitrate_str = "Unknown" media_info["video"].append({ "resolution": f"{width}x{height}" if width and height else "Unknown", "bitrate": bitrate_str, "codec": codec_name }) elif codec_type == "audio": media_info["audio"].append({ "bitrate": f"{stream_bitrate} Kbit/s" if stream_bitrate else "Unknown", "channels": channels, "layout": channel_layout, "language": language, "title": title, "codec": codec_name }) return media_info except Exception as e: print(f"Error processing file {file_path}: {e}") return None # Function to list media files def list_media_files(directories): media_files = [] for directory in directories: if os.path.exists(directory): for root, _, files in os.walk(directory): for file in files: file_path = os.path.join(root, file) try: file_info = { "name": file, "path": file_path, "size": os.path.getsize(file_path) } media_files.append(file_info) except OSError as e: print(f"Error accessing file {file_path}: {e}") else: print(f"Directory does not exist: {directory}") return media_files def filter_media_files(media_files, allowed_formats): """ Фильтрует список файлов по допустимым форматам. :param media_files: Список файлов, полученный из list_media_files. :param allowed_formats: Список допустимых форматов, например [".mp3", ".jpg"]. :return: Отфильтрованный список файлов. """ filtered_files = [ file for file in media_files if any(file['name'].lower().endswith(fmt.lower()) for fmt in allowed_formats) ] return filtered_files def human_readable_size(size_bytes): if size_bytes < 1024: return [f"{size_bytes}", "B", f"{size_bytes}"] elif size_bytes < 1024**2: return [f"{size_bytes / 1024:.2f}", "KB", f"{size_bytes}"] elif size_bytes < 1024**3: return [f"{size_bytes / 1024**2:.2f}", "MB", f"{size_bytes}"] else: return [f"{size_bytes / 1024**3:.2f}", "GB", f"{size_bytes}"] def get_single_media_by_path(path): media_info = get_media_info_with_ffprobe(path) size, size_unit, size_bytes = human_readable_size(os.path.getsize(path)) media_file = { "path": path, "name": os.path.basename(path), "size_bytes": size_bytes, "size": size, "size_unit": size_unit, "info": media_info } return media_file