You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
297 lines
11 KiB
297 lines
11 KiB
import base64 |
|
import io |
|
import logging |
|
import os |
|
from typing import Optional |
|
|
|
from PIL import Image |
|
from fastmcp import FastMCP |
|
import openai |
|
|
|
# Configure logging |
|
import tempfile |
|
import os |
|
|
|
# Use a temporary directory for logs or current working directory |
|
log_dir = os.environ.get('MCP_LOG_DIR', tempfile.gettempdir()) |
|
log_file = os.path.join(log_dir, 'image_server.log') |
|
|
|
logging.basicConfig( |
|
level=logging.DEBUG, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
filename=log_file |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
# Create FastMCP server instance |
|
mcp = FastMCP("ImageRecognitionServer") |
|
|
|
# Get OpenAI API key from environment |
|
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY', '') |
|
HAS_OPENAI = bool(OPENAI_API_KEY and OPENAI_API_KEY != 'your-openai-api-key-here') |
|
|
|
if HAS_OPENAI: |
|
openai.api_key = OPENAI_API_KEY |
|
logger.info("OpenAI API key configured - AI descriptions enabled") |
|
else: |
|
logger.warning("No valid OpenAI API key - using basic image metadata only") |
|
|
|
@mcp.tool() |
|
def describe_image(image_data: str, mime_type: str = 'image/jpeg') -> str: |
|
""" |
|
Describe an image using base64 encoded image data |
|
|
|
Args: |
|
image_data: Base64 encoded image data |
|
mime_type: MIME type of the image (default: image/jpeg) |
|
|
|
Returns: |
|
Detailed description of the image |
|
""" |
|
try: |
|
logger.debug(f"Describing image - MIME type: {mime_type}") |
|
|
|
# Decode base64 image |
|
image_bytes = base64.b64decode(image_data) |
|
image = Image.open(io.BytesIO(image_bytes)) |
|
|
|
# Log image details |
|
logger.info(f"Image size: {image.size}, mode: {image.mode}") |
|
|
|
# If OpenAI is available, use Vision API |
|
if HAS_OPENAI: |
|
try: |
|
response = openai.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": "Describe this image in detail, including objects, colors, composition, and any text visible."}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": f"data:{mime_type};base64,{image_data}" |
|
} |
|
} |
|
] |
|
} |
|
], |
|
max_tokens=500 |
|
) |
|
|
|
description = response.choices[0].message.content |
|
logger.debug(f"OpenAI description: {description}") |
|
return description |
|
|
|
except Exception as e: |
|
logger.error(f"OpenAI API error: {str(e)}", exc_info=True) |
|
# Fall back to basic metadata |
|
return f"OpenAI API error: {str(e)}\n\nBasic metadata:\n- Size: {image.size[0]}x{image.size[1]} pixels\n- Mode: {image.mode}\n- Format: {image.format or 'Unknown'}" |
|
|
|
# Return basic metadata if no OpenAI |
|
description = f"Image Analysis (Basic Metadata):\n- Size: {image.size[0]}x{image.size[1]} pixels\n- Mode: {image.mode}\n- Format: {image.format or 'Unknown'}\n\nNote: For AI-powered descriptions, configure OPENAI_API_KEY in MCP settings." |
|
logger.debug(f"Returning basic description: {description}") |
|
|
|
return description |
|
|
|
except Exception as e: |
|
logger.error(f"Error describing image: {str(e)}", exc_info=True) |
|
return f"Error describing image: {str(e)}" |
|
|
|
@mcp.tool() |
|
def describe_image_from_file(file_path: str) -> str: |
|
""" |
|
Describe an image from a file path |
|
|
|
Args: |
|
file_path: Path to the image file |
|
|
|
Returns: |
|
Detailed description of the image |
|
""" |
|
try: |
|
logger.debug(f"Describing image from file: {file_path}") |
|
|
|
# Open the image file |
|
with open(file_path, 'rb') as image_file: |
|
# Encode image to base64 |
|
image_data = base64.b64encode(image_file.read()).decode('utf-8') |
|
|
|
# Determine MIME type from file extension |
|
mime_type = 'image/jpeg' |
|
if file_path.lower().endswith('.png'): |
|
mime_type = 'image/png' |
|
elif file_path.lower().endswith('.gif'): |
|
mime_type = 'image/gif' |
|
elif file_path.lower().endswith('.webp'): |
|
mime_type = 'image/webp' |
|
|
|
# Use the describe_image function |
|
return describe_image(image_data, mime_type) |
|
|
|
except Exception as e: |
|
logger.error(f"Error reading image file: {str(e)}", exc_info=True) |
|
return f"Error reading image file: {str(e)}" |
|
|
|
@mcp.tool() |
|
def ask_image_question(file_path: str, prompt: str) -> str: |
|
""" |
|
Ask a specific question about an image using AI vision |
|
|
|
Args: |
|
file_path: Path to the image file |
|
prompt: The question or instruction about the image |
|
|
|
Returns: |
|
AI response to the question about the image |
|
""" |
|
try: |
|
logger.debug(f"Asking question about image: {file_path}") |
|
logger.debug(f"Question: {prompt}") |
|
|
|
# Open and encode the image file |
|
with open(file_path, 'rb') as image_file: |
|
image_data = base64.b64encode(image_file.read()).decode('utf-8') |
|
|
|
# Determine MIME type from file extension |
|
mime_type = 'image/jpeg' |
|
if file_path.lower().endswith('.png'): |
|
mime_type = 'image/png' |
|
elif file_path.lower().endswith('.gif'): |
|
mime_type = 'image/gif' |
|
elif file_path.lower().endswith('.webp'): |
|
mime_type = 'image/webp' |
|
|
|
# Load image for basic metadata fallback |
|
image_bytes = base64.b64decode(image_data) |
|
image = Image.open(io.BytesIO(image_bytes)) |
|
|
|
# If OpenAI is available, use Vision API with custom prompt |
|
if HAS_OPENAI: |
|
try: |
|
response = openai.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": prompt}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": f"data:{mime_type};base64,{image_data}" |
|
} |
|
} |
|
] |
|
} |
|
], |
|
max_tokens=500 |
|
) |
|
|
|
answer = response.choices[0].message.content |
|
logger.debug(f"OpenAI response: {answer}") |
|
return answer |
|
|
|
except Exception as e: |
|
logger.error(f"OpenAI API error: {str(e)}", exc_info=True) |
|
return f"OpenAI API error: {str(e)}\n\nNote: Configure OPENAI_API_KEY for AI-powered image analysis." |
|
|
|
# Return error if no OpenAI |
|
return f"AI image analysis not available. Please configure OPENAI_API_KEY.\n\nImage metadata:\n- Size: {image.size[0]}x{image.size[1]} pixels\n- Mode: {image.mode}\n- Format: {image.format or 'Unknown'}" |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing image question: {str(e)}", exc_info=True) |
|
return f"Error processing image question: {str(e)}" |
|
|
|
@mcp.tool() |
|
def generate_image_dalle(prompt: str, size: str = "1024x1024", quality: str = "standard", style: str = "vivid", n: int = 1) -> str: |
|
""" |
|
Generate an image using DALL-E API |
|
|
|
Args: |
|
prompt: Description of the image to generate |
|
size: Image size - options: "1024x1024", "1792x1024", "1024x1792" (default: "1024x1024") |
|
quality: Image quality - options: "standard", "hd" (default: "standard") |
|
style: Image style - options: "vivid", "natural" (default: "vivid") |
|
n: Number of images to generate (1-10, default: 1) |
|
|
|
Returns: |
|
JSON response with generated image URLs and metadata |
|
""" |
|
try: |
|
logger.debug(f"Generating image with DALL-E") |
|
logger.debug(f"Prompt: {prompt}") |
|
logger.debug(f"Size: {size}, Quality: {quality}, Style: {style}, Count: {n}") |
|
|
|
# Validate parameters |
|
valid_sizes = ["1024x1024", "1792x1024", "1024x1792"] |
|
if size not in valid_sizes: |
|
return f"Error: Invalid size '{size}'. Valid options: {', '.join(valid_sizes)}" |
|
|
|
valid_qualities = ["standard", "hd"] |
|
if quality not in valid_qualities: |
|
return f"Error: Invalid quality '{quality}'. Valid options: {', '.join(valid_qualities)}" |
|
|
|
valid_styles = ["vivid", "natural"] |
|
if style not in valid_styles: |
|
return f"Error: Invalid style '{style}'. Valid options: {', '.join(valid_styles)}" |
|
|
|
if not (1 <= n <= 10): |
|
return "Error: Number of images must be between 1 and 10" |
|
|
|
# Check if OpenAI is available |
|
if not HAS_OPENAI: |
|
return "Error: OpenAI API key not configured. Please set OPENAI_API_KEY to use DALL-E image generation." |
|
|
|
try: |
|
# Generate image using DALL-E 3 |
|
response = openai.images.generate( |
|
model="dall-e-3", |
|
prompt=prompt, |
|
size=size, |
|
quality=quality, |
|
style=style, |
|
n=n |
|
) |
|
|
|
# Format response |
|
result = { |
|
"prompt": prompt, |
|
"parameters": { |
|
"size": size, |
|
"quality": quality, |
|
"style": style, |
|
"count": n |
|
}, |
|
"images": [] |
|
} |
|
|
|
for i, image_data in enumerate(response.data): |
|
result["images"].append({ |
|
"index": i + 1, |
|
"url": image_data.url, |
|
"revised_prompt": getattr(image_data, 'revised_prompt', None) |
|
}) |
|
|
|
logger.info(f"Successfully generated {len(response.data)} image(s)") |
|
return f"Successfully generated {len(response.data)} image(s):\n\n" + "\n".join([ |
|
f"Image {img['index']}:\n URL: {img['url']}\n Revised prompt: {img['revised_prompt'] or 'N/A'}" |
|
for img in result["images"] |
|
]) |
|
|
|
except Exception as e: |
|
logger.error(f"DALL-E API error: {str(e)}", exc_info=True) |
|
return f"DALL-E API error: {str(e)}" |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating image: {str(e)}", exc_info=True) |
|
return f"Error generating image: {str(e)}" |
|
|
|
def main(): |
|
"""Main entry point for the MCP server.""" |
|
logger.info("Starting MCP Image Recognition Server") |
|
mcp.run() |
|
|
|
if __name__ == "__main__": |
|
main() |