314 lines
12 KiB
Python
Executable File
314 lines
12 KiB
Python
Executable File
#!/Volumes/SAM2/CODE/pynamer/.venv/bin/python
|
|
|
|
import argparse
|
|
import base64
|
|
import io
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
import yaml
|
|
from typing import Dict, List, Optional, Union
|
|
|
|
import litellm
|
|
from litellm import completion
|
|
import logging
|
|
from PIL import Image # Added for image processing
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger('pynamer')
|
|
|
|
class PyNamer:
|
|
"""A tool to generate descriptive filenames for images using LLMs."""
|
|
|
|
def __init__(self, config_path: str = 'config.yaml'):
|
|
"""Initialize the PyNamer with configuration.
|
|
|
|
Args:
|
|
config_path: Path to the YAML configuration file
|
|
"""
|
|
self.config = self._load_config(config_path)
|
|
self._setup_llm()
|
|
|
|
def _load_config(self, config_path: str) -> Dict:
|
|
"""Load configuration from YAML file.
|
|
|
|
Args:
|
|
config_path: Path to the configuration file
|
|
|
|
Returns:
|
|
Dict containing configuration
|
|
"""
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
logger.info(f"Loaded configuration from {config_path}")
|
|
return config
|
|
except Exception as e:
|
|
logger.error(f"Failed to load configuration: {e}")
|
|
sys.exit(1)
|
|
|
|
def _setup_llm(self) -> None:
|
|
"""Set up the LLM client based on configuration."""
|
|
llm_config = self.config.get('llm', {})
|
|
|
|
# Set API key if provided in config
|
|
api_key = llm_config.get('api_key')
|
|
if api_key:
|
|
os.environ["OPENAI_API_KEY"] = api_key
|
|
|
|
# Set custom endpoint if provided
|
|
endpoint = llm_config.get('endpoint')
|
|
if endpoint:
|
|
os.environ["OPENAI_API_BASE"] = endpoint
|
|
|
|
self.model = llm_config.get('model', 'gpt-4-vision-preview')
|
|
self.max_tokens = llm_config.get('max_tokens', 100)
|
|
self.temperature = llm_config.get('temperature', 0.7)
|
|
|
|
# Image processing settings
|
|
image_config = self.config.get('image', {})
|
|
self.resize_max_dimension = image_config.get('resize_max_dimension', 1024) # Default max dimension
|
|
self.resize_format = image_config.get('resize_format', 'JPEG') # Default format after resize
|
|
|
|
logger.info(f"LLM setup complete. Using model: {self.model}")
|
|
logger.info(f"Image resize settings: max_dimension={self.resize_max_dimension}, format={self.resize_format}")
|
|
|
|
def _resize_and_encode_image(self, image_path: str) -> str:
|
|
"""Resize image if necessary and encode to base64 for API submission.
|
|
|
|
Args:
|
|
image_path: Path to the image file
|
|
|
|
Returns:
|
|
Base64 encoded image string
|
|
"""
|
|
try:
|
|
with Image.open(image_path) as img:
|
|
# Calculate new size maintaining aspect ratio
|
|
width, height = img.size
|
|
if max(width, height) > self.resize_max_dimension:
|
|
if width > height:
|
|
new_width = self.resize_max_dimension
|
|
new_height = int(height * (self.resize_max_dimension / width))
|
|
else:
|
|
new_height = self.resize_max_dimension
|
|
new_width = int(width * (self.resize_max_dimension / height))
|
|
|
|
logger.debug(f"Resizing image from {width}x{height} to {new_width}x{new_height}")
|
|
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
else:
|
|
logger.debug("Image size is within limits, no resize needed.")
|
|
|
|
# Save resized image to a bytes buffer
|
|
buffer = io.BytesIO()
|
|
# Handle potential transparency issues when saving as JPEG
|
|
if self.resize_format.upper() == 'JPEG' and img.mode in ('RGBA', 'P'):
|
|
img = img.convert('RGB')
|
|
img.save(buffer, format=self.resize_format)
|
|
img_bytes = buffer.getvalue()
|
|
|
|
return base64.b64encode(img_bytes).decode('utf-8')
|
|
except Exception as e:
|
|
logger.error(f"Error processing image {image_path}: {e}")
|
|
raise # Re-raise the exception to be caught by the caller
|
|
|
|
def _is_supported_format(self, file_path: str) -> bool:
|
|
"""Check if the file format is supported.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
|
|
Returns:
|
|
True if supported, False otherwise
|
|
"""
|
|
supported_formats = self.config.get('image', {}).get('supported_formats', [])
|
|
file_ext = os.path.splitext(file_path)[1].lower()
|
|
return file_ext in supported_formats
|
|
|
|
def generate_filename(self, image_path: str) -> str:
|
|
"""Generate a descriptive filename for the image using LLM.
|
|
|
|
Args:
|
|
image_path: Path to the image file
|
|
|
|
Returns:
|
|
Generated filename (without extension)
|
|
"""
|
|
if not os.path.exists(image_path):
|
|
logger.error(f"Image not found: {image_path}")
|
|
return None
|
|
|
|
if not self._is_supported_format(image_path):
|
|
logger.error(f"Unsupported file format: {image_path}")
|
|
return None
|
|
|
|
try:
|
|
# Resize and encode image
|
|
base64_image = self._resize_and_encode_image(image_path)
|
|
|
|
# Determine the mime type based on the resize format
|
|
mime_type = f"image/{self.resize_format.lower()}"
|
|
|
|
# Prepare messages for LLM
|
|
system_message = self.config.get('prompt', {}).get('system_message', '')
|
|
user_message = self.config.get('prompt', {}).get('user_message', '')
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_message},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": user_message},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {"url": f"data:{mime_type};base64,{base64_image}"}
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
# Call LLM
|
|
response = completion(
|
|
model=self.model,
|
|
messages=messages,
|
|
max_tokens=self.max_tokens,
|
|
temperature=self.temperature
|
|
)
|
|
|
|
# Extract filename from response
|
|
filename = response.choices[0].message.content.strip()
|
|
logger.info(f"Generated filename: {filename}")
|
|
return filename
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating filename: {e}")
|
|
return None
|
|
|
|
def rename_image(self, image_path: str, dry_run: bool = False) -> Optional[str]:
|
|
"""Rename the image with a generated descriptive filename.
|
|
|
|
Args:
|
|
image_path: Path to the image file
|
|
dry_run: If True, don't actually rename the file
|
|
|
|
Returns:
|
|
New path if successful, None otherwise
|
|
"""
|
|
# Generate filename
|
|
new_filename = self.generate_filename(image_path)
|
|
if not new_filename:
|
|
return None
|
|
|
|
# Clean up the filename (ensure snake_case, no special chars)
|
|
new_filename = new_filename.lower()
|
|
new_filename = ''.join(c if c.isalnum() else '_' for c in new_filename)
|
|
new_filename = new_filename.replace('__', '_').strip('_')
|
|
|
|
# Get original path components
|
|
path = Path(image_path)
|
|
directory = path.parent
|
|
extension = path.suffix
|
|
|
|
# Create new path
|
|
new_path = directory / f"{new_filename}{extension}"
|
|
|
|
# Rename file
|
|
if not dry_run:
|
|
try:
|
|
# Handle case where the new filename already exists
|
|
counter = 1
|
|
while new_path.exists():
|
|
new_path = directory / f"{new_filename}_{counter}{extension}"
|
|
counter += 1
|
|
|
|
path.rename(new_path)
|
|
logger.info(f"Renamed: {image_path} -> {new_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error renaming file: {e}")
|
|
return None
|
|
else:
|
|
logger.info(f"[DRY RUN] Would rename: {image_path} -> {new_path}")
|
|
|
|
return str(new_path)
|
|
|
|
def main():
|
|
"""Main entry point for the script."""
|
|
parser = argparse.ArgumentParser(description='Generate descriptive filenames for images using LLMs')
|
|
parser.add_argument('images', nargs='+', help='Paths to image files')
|
|
parser.add_argument('-c', '--config', default='config.yaml', help='Path to configuration file')
|
|
parser.add_argument('-d', '--dry-run', action='store_true', help='Preview changes without renaming files')
|
|
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set logging level
|
|
if args.verbose:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Initialize PyNamer
|
|
namer = PyNamer(config_path=args.config)
|
|
|
|
# Process each image - handle image paths that might contain spaces
|
|
image_paths = []
|
|
|
|
# First, try to handle the case where the entire argument list is a single path with spaces
|
|
if len(args.images) > 1:
|
|
combined_path = ' '.join(args.images)
|
|
if os.path.exists(combined_path):
|
|
image_paths = [combined_path]
|
|
logger.info(f"Found image by combining all arguments: {combined_path}")
|
|
|
|
# If that didn't work, try to handle each argument individually
|
|
if not image_paths:
|
|
# Use a set to avoid duplicate processing
|
|
processed_paths = set()
|
|
|
|
for path in args.images:
|
|
# Check if the path exists as is
|
|
if os.path.exists(path) and path not in processed_paths:
|
|
image_paths.append(path)
|
|
processed_paths.add(path)
|
|
else:
|
|
# Try to find files that match the pattern
|
|
import glob
|
|
matching_files = glob.glob(f"*{path}*")
|
|
for file in matching_files:
|
|
if file not in processed_paths:
|
|
image_paths.append(file)
|
|
processed_paths.add(file)
|
|
|
|
if not matching_files:
|
|
logger.debug(f"Could not find any file matching '{path}'")
|
|
|
|
# Process each valid image path
|
|
if not image_paths:
|
|
print("Error: No valid image files found to process.")
|
|
return
|
|
|
|
# Remove duplicates while preserving order
|
|
unique_image_paths = []
|
|
seen = set()
|
|
for path in image_paths:
|
|
if path not in seen:
|
|
unique_image_paths.append(path)
|
|
seen.add(path)
|
|
|
|
# Process each image
|
|
for image_path in unique_image_paths:
|
|
if not os.path.exists(image_path):
|
|
logger.warning(f"Skipping non-existent file: {image_path}")
|
|
continue
|
|
|
|
new_path = namer.rename_image(image_path, dry_run=args.dry_run)
|
|
if new_path:
|
|
print(f"{'[DRY RUN] ' if args.dry_run else ''}Renamed: {image_path} -> {new_path}")
|
|
else:
|
|
print(f"Failed to process: {image_path}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|