pynamer/pynamer.py

314 lines
12 KiB
Python
Executable File

#!/Volumes/SAM2/CODE/pynamer/.venv/bin/python
import argparse
import base64
import io
import os
import sys
from pathlib import Path
import yaml
from typing import Dict, List, Optional, Union
import litellm
from litellm import completion
import logging
from PIL import Image # Added for image processing
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('pynamer')
class PyNamer:
"""A tool to generate descriptive filenames for images using LLMs."""
def __init__(self, config_path: str = 'config.yaml'):
"""Initialize the PyNamer with configuration.
Args:
config_path: Path to the YAML configuration file
"""
self.config = self._load_config(config_path)
self._setup_llm()
def _load_config(self, config_path: str) -> Dict:
"""Load configuration from YAML file.
Args:
config_path: Path to the configuration file
Returns:
Dict containing configuration
"""
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
logger.info(f"Loaded configuration from {config_path}")
return config
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
sys.exit(1)
def _setup_llm(self) -> None:
"""Set up the LLM client based on configuration."""
llm_config = self.config.get('llm', {})
# Set API key if provided in config
api_key = llm_config.get('api_key')
if api_key:
os.environ["OPENAI_API_KEY"] = api_key
# Set custom endpoint if provided
endpoint = llm_config.get('endpoint')
if endpoint:
os.environ["OPENAI_API_BASE"] = endpoint
self.model = llm_config.get('model', 'gpt-4-vision-preview')
self.max_tokens = llm_config.get('max_tokens', 100)
self.temperature = llm_config.get('temperature', 0.7)
# Image processing settings
image_config = self.config.get('image', {})
self.resize_max_dimension = image_config.get('resize_max_dimension', 1024) # Default max dimension
self.resize_format = image_config.get('resize_format', 'JPEG') # Default format after resize
logger.info(f"LLM setup complete. Using model: {self.model}")
logger.info(f"Image resize settings: max_dimension={self.resize_max_dimension}, format={self.resize_format}")
def _resize_and_encode_image(self, image_path: str) -> str:
"""Resize image if necessary and encode to base64 for API submission.
Args:
image_path: Path to the image file
Returns:
Base64 encoded image string
"""
try:
with Image.open(image_path) as img:
# Calculate new size maintaining aspect ratio
width, height = img.size
if max(width, height) > self.resize_max_dimension:
if width > height:
new_width = self.resize_max_dimension
new_height = int(height * (self.resize_max_dimension / width))
else:
new_height = self.resize_max_dimension
new_width = int(width * (self.resize_max_dimension / height))
logger.debug(f"Resizing image from {width}x{height} to {new_width}x{new_height}")
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
else:
logger.debug("Image size is within limits, no resize needed.")
# Save resized image to a bytes buffer
buffer = io.BytesIO()
# Handle potential transparency issues when saving as JPEG
if self.resize_format.upper() == 'JPEG' and img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
img.save(buffer, format=self.resize_format)
img_bytes = buffer.getvalue()
return base64.b64encode(img_bytes).decode('utf-8')
except Exception as e:
logger.error(f"Error processing image {image_path}: {e}")
raise # Re-raise the exception to be caught by the caller
def _is_supported_format(self, file_path: str) -> bool:
"""Check if the file format is supported.
Args:
file_path: Path to the file
Returns:
True if supported, False otherwise
"""
supported_formats = self.config.get('image', {}).get('supported_formats', [])
file_ext = os.path.splitext(file_path)[1].lower()
return file_ext in supported_formats
def generate_filename(self, image_path: str) -> str:
"""Generate a descriptive filename for the image using LLM.
Args:
image_path: Path to the image file
Returns:
Generated filename (without extension)
"""
if not os.path.exists(image_path):
logger.error(f"Image not found: {image_path}")
return None
if not self._is_supported_format(image_path):
logger.error(f"Unsupported file format: {image_path}")
return None
try:
# Resize and encode image
base64_image = self._resize_and_encode_image(image_path)
# Determine the mime type based on the resize format
mime_type = f"image/{self.resize_format.lower()}"
# Prepare messages for LLM
system_message = self.config.get('prompt', {}).get('system_message', '')
user_message = self.config.get('prompt', {}).get('user_message', '')
messages = [
{"role": "system", "content": system_message},
{
"role": "user",
"content": [
{"type": "text", "text": user_message},
{
"type": "image_url",
"image_url": {"url": f"data:{mime_type};base64,{base64_image}"}
}
]
}
]
# Call LLM
response = completion(
model=self.model,
messages=messages,
max_tokens=self.max_tokens,
temperature=self.temperature
)
# Extract filename from response
filename = response.choices[0].message.content.strip()
logger.info(f"Generated filename: {filename}")
return filename
except Exception as e:
logger.error(f"Error generating filename: {e}")
return None
def rename_image(self, image_path: str, dry_run: bool = False) -> Optional[str]:
"""Rename the image with a generated descriptive filename.
Args:
image_path: Path to the image file
dry_run: If True, don't actually rename the file
Returns:
New path if successful, None otherwise
"""
# Generate filename
new_filename = self.generate_filename(image_path)
if not new_filename:
return None
# Clean up the filename (ensure snake_case, no special chars)
new_filename = new_filename.lower()
new_filename = ''.join(c if c.isalnum() else '_' for c in new_filename)
new_filename = new_filename.replace('__', '_').strip('_')
# Get original path components
path = Path(image_path)
directory = path.parent
extension = path.suffix
# Create new path
new_path = directory / f"{new_filename}{extension}"
# Rename file
if not dry_run:
try:
# Handle case where the new filename already exists
counter = 1
while new_path.exists():
new_path = directory / f"{new_filename}_{counter}{extension}"
counter += 1
path.rename(new_path)
logger.info(f"Renamed: {image_path} -> {new_path}")
except Exception as e:
logger.error(f"Error renaming file: {e}")
return None
else:
logger.info(f"[DRY RUN] Would rename: {image_path} -> {new_path}")
return str(new_path)
def main():
"""Main entry point for the script."""
parser = argparse.ArgumentParser(description='Generate descriptive filenames for images using LLMs')
parser.add_argument('images', nargs='+', help='Paths to image files')
parser.add_argument('-c', '--config', default='config.yaml', help='Path to configuration file')
parser.add_argument('-d', '--dry-run', action='store_true', help='Preview changes without renaming files')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
args = parser.parse_args()
# Set logging level
if args.verbose:
logger.setLevel(logging.DEBUG)
# Initialize PyNamer
namer = PyNamer(config_path=args.config)
# Process each image - handle image paths that might contain spaces
image_paths = []
# First, try to handle the case where the entire argument list is a single path with spaces
if len(args.images) > 1:
combined_path = ' '.join(args.images)
if os.path.exists(combined_path):
image_paths = [combined_path]
logger.info(f"Found image by combining all arguments: {combined_path}")
# If that didn't work, try to handle each argument individually
if not image_paths:
# Use a set to avoid duplicate processing
processed_paths = set()
for path in args.images:
# Check if the path exists as is
if os.path.exists(path) and path not in processed_paths:
image_paths.append(path)
processed_paths.add(path)
else:
# Try to find files that match the pattern
import glob
matching_files = glob.glob(f"*{path}*")
for file in matching_files:
if file not in processed_paths:
image_paths.append(file)
processed_paths.add(file)
if not matching_files:
logger.debug(f"Could not find any file matching '{path}'")
# Process each valid image path
if not image_paths:
print("Error: No valid image files found to process.")
return
# Remove duplicates while preserving order
unique_image_paths = []
seen = set()
for path in image_paths:
if path not in seen:
unique_image_paths.append(path)
seen.add(path)
# Process each image
for image_path in unique_image_paths:
if not os.path.exists(image_path):
logger.warning(f"Skipping non-existent file: {image_path}")
continue
new_path = namer.rename_image(image_path, dry_run=args.dry_run)
if new_path:
print(f"{'[DRY RUN] ' if args.dry_run else ''}Renamed: {image_path} -> {new_path}")
else:
print(f"Failed to process: {image_path}")
if __name__ == "__main__":
main()