You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

290 lines
11 KiB

import base64
import io
import logging
import os
from typing import Optional
from PIL import Image
from fastmcp import FastMCP
import openai
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='/home/enne2/Sviluppo/tetris-sdl/mcp-image-server/image_server.log'
)
logger = logging.getLogger(__name__)
# Create FastMCP server instance
mcp = FastMCP("ImageRecognitionServer")
# Get OpenAI API key from environment
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY', '')
HAS_OPENAI = bool(OPENAI_API_KEY and OPENAI_API_KEY != 'your-openai-api-key-here')
if HAS_OPENAI:
openai.api_key = OPENAI_API_KEY
logger.info("OpenAI API key configured - AI descriptions enabled")
else:
logger.warning("No valid OpenAI API key - using basic image metadata only")
@mcp.tool()
def describe_image(image_data: str, mime_type: str = 'image/jpeg') -> str:
"""
Describe an image using base64 encoded image data
Args:
image_data: Base64 encoded image data
mime_type: MIME type of the image (default: image/jpeg)
Returns:
Detailed description of the image
"""
try:
logger.debug(f"Describing image - MIME type: {mime_type}")
# Decode base64 image
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
# Log image details
logger.info(f"Image size: {image.size}, mode: {image.mode}")
# If OpenAI is available, use Vision API
if HAS_OPENAI:
try:
response = openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Describe this image in detail, including objects, colors, composition, and any text visible."},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{image_data}"
}
}
]
}
],
max_tokens=500
)
description = response.choices[0].message.content
logger.debug(f"OpenAI description: {description}")
return description
except Exception as e:
logger.error(f"OpenAI API error: {str(e)}", exc_info=True)
# Fall back to basic metadata
return f"OpenAI API error: {str(e)}\n\nBasic metadata:\n- Size: {image.size[0]}x{image.size[1]} pixels\n- Mode: {image.mode}\n- Format: {image.format or 'Unknown'}"
# Return basic metadata if no OpenAI
description = f"Image Analysis (Basic Metadata):\n- Size: {image.size[0]}x{image.size[1]} pixels\n- Mode: {image.mode}\n- Format: {image.format or 'Unknown'}\n\nNote: For AI-powered descriptions, configure OPENAI_API_KEY in MCP settings."
logger.debug(f"Returning basic description: {description}")
return description
except Exception as e:
logger.error(f"Error describing image: {str(e)}", exc_info=True)
return f"Error describing image: {str(e)}"
@mcp.tool()
def describe_image_from_file(file_path: str) -> str:
"""
Describe an image from a file path
Args:
file_path: Path to the image file
Returns:
Detailed description of the image
"""
try:
logger.debug(f"Describing image from file: {file_path}")
# Open the image file
with open(file_path, 'rb') as image_file:
# Encode image to base64
image_data = base64.b64encode(image_file.read()).decode('utf-8')
# Determine MIME type from file extension
mime_type = 'image/jpeg'
if file_path.lower().endswith('.png'):
mime_type = 'image/png'
elif file_path.lower().endswith('.gif'):
mime_type = 'image/gif'
elif file_path.lower().endswith('.webp'):
mime_type = 'image/webp'
# Use the describe_image function
return describe_image(image_data, mime_type)
except Exception as e:
logger.error(f"Error reading image file: {str(e)}", exc_info=True)
return f"Error reading image file: {str(e)}"
@mcp.tool()
def ask_image_question(file_path: str, prompt: str) -> str:
"""
Ask a specific question about an image using AI vision
Args:
file_path: Path to the image file
prompt: The question or instruction about the image
Returns:
AI response to the question about the image
"""
try:
logger.debug(f"Asking question about image: {file_path}")
logger.debug(f"Question: {prompt}")
# Open and encode the image file
with open(file_path, 'rb') as image_file:
image_data = base64.b64encode(image_file.read()).decode('utf-8')
# Determine MIME type from file extension
mime_type = 'image/jpeg'
if file_path.lower().endswith('.png'):
mime_type = 'image/png'
elif file_path.lower().endswith('.gif'):
mime_type = 'image/gif'
elif file_path.lower().endswith('.webp'):
mime_type = 'image/webp'
# Load image for basic metadata fallback
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
# If OpenAI is available, use Vision API with custom prompt
if HAS_OPENAI:
try:
response = openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{image_data}"
}
}
]
}
],
max_tokens=500
)
answer = response.choices[0].message.content
logger.debug(f"OpenAI response: {answer}")
return answer
except Exception as e:
logger.error(f"OpenAI API error: {str(e)}", exc_info=True)
return f"OpenAI API error: {str(e)}\n\nNote: Configure OPENAI_API_KEY for AI-powered image analysis."
# Return error if no OpenAI
return f"AI image analysis not available. Please configure OPENAI_API_KEY.\n\nImage metadata:\n- Size: {image.size[0]}x{image.size[1]} pixels\n- Mode: {image.mode}\n- Format: {image.format or 'Unknown'}"
except Exception as e:
logger.error(f"Error processing image question: {str(e)}", exc_info=True)
return f"Error processing image question: {str(e)}"
@mcp.tool()
def generate_image_dalle(prompt: str, size: str = "1024x1024", quality: str = "standard", style: str = "vivid", n: int = 1) -> str:
"""
Generate an image using DALL-E API
Args:
prompt: Description of the image to generate
size: Image size - options: "1024x1024", "1792x1024", "1024x1792" (default: "1024x1024")
quality: Image quality - options: "standard", "hd" (default: "standard")
style: Image style - options: "vivid", "natural" (default: "vivid")
n: Number of images to generate (1-10, default: 1)
Returns:
JSON response with generated image URLs and metadata
"""
try:
logger.debug(f"Generating image with DALL-E")
logger.debug(f"Prompt: {prompt}")
logger.debug(f"Size: {size}, Quality: {quality}, Style: {style}, Count: {n}")
# Validate parameters
valid_sizes = ["1024x1024", "1792x1024", "1024x1792"]
if size not in valid_sizes:
return f"Error: Invalid size '{size}'. Valid options: {', '.join(valid_sizes)}"
valid_qualities = ["standard", "hd"]
if quality not in valid_qualities:
return f"Error: Invalid quality '{quality}'. Valid options: {', '.join(valid_qualities)}"
valid_styles = ["vivid", "natural"]
if style not in valid_styles:
return f"Error: Invalid style '{style}'. Valid options: {', '.join(valid_styles)}"
if not (1 <= n <= 10):
return "Error: Number of images must be between 1 and 10"
# Check if OpenAI is available
if not HAS_OPENAI:
return "Error: OpenAI API key not configured. Please set OPENAI_API_KEY to use DALL-E image generation."
try:
# Generate image using DALL-E 3
response = openai.images.generate(
model="dall-e-3",
prompt=prompt,
size=size,
quality=quality,
style=style,
n=n
)
# Format response
result = {
"prompt": prompt,
"parameters": {
"size": size,
"quality": quality,
"style": style,
"count": n
},
"images": []
}
for i, image_data in enumerate(response.data):
result["images"].append({
"index": i + 1,
"url": image_data.url,
"revised_prompt": getattr(image_data, 'revised_prompt', None)
})
logger.info(f"Successfully generated {len(response.data)} image(s)")
return f"Successfully generated {len(response.data)} image(s):\n\n" + "\n".join([
f"Image {img['index']}:\n URL: {img['url']}\n Revised prompt: {img['revised_prompt'] or 'N/A'}"
for img in result["images"]
])
except Exception as e:
logger.error(f"DALL-E API error: {str(e)}", exc_info=True)
return f"DALL-E API error: {str(e)}"
except Exception as e:
logger.error(f"Error generating image: {str(e)}", exc_info=True)
return f"Error generating image: {str(e)}"
def main():
"""Main entry point for the MCP server."""
logger.info("Starting MCP Image Recognition Server")
mcp.run()
if __name__ == "__main__":
main()