import os import shutil import subprocess import tempfile import asyncio import logging from pathlib import Path from typing import Optional, Dict, Any import json from fastapi import FastAPI, HTTPException, BackgroundTasks, Request from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from pydantic import BaseModel import git import requests from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('mcp_server.log') ] ) logger = logging.getLogger(__name__) app = FastAPI(title="MCP Server", description="AI-powered code editing server") # Mount static files and templates app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # Models class GiteaRequest(BaseModel): repo_url: str token: str # Gitea token instead of username/password prompt: str ai_model: str = "gemini" # gemini or openai model_name: str = "gemini-1.5-pro" # specific model name api_key: str # API key from frontend class ProcessResponse(BaseModel): task_id: str status: str message: str # Global storage for task status task_status = {} class MCPServer: def __init__(self): self.repo_path = None async def process_repository(self, task_id: str, request: GiteaRequest): """Main processing function""" try: logger.info(f"Task {task_id}: Starting process...") task_status[task_id] = {"status": "processing", "message": "Starting process..."} # Step 1: Clone repository await self._clone_repository(task_id, request) # Step 2: Analyze code with AI await self._analyze_with_ai(task_id, request) # Step 3: Commit and push changes await self._commit_and_push(task_id, request) logger.info(f"Task {task_id}: Successfully processed repository") task_status[task_id] = {"status": "completed", "message": "Successfully processed repository"} except Exception as e: logger.error(f"Task {task_id}: Error - {str(e)}") task_status[task_id] = {"status": "error", "message": str(e)} # Do not delete the repo directory; keep for inspection async def _clone_repository(self, task_id: str, request: GiteaRequest): """Clone repository from Gitea into a persistent directory""" logger.info(f"Task {task_id}: Cloning repository...") task_status[task_id] = {"status": "processing", "message": "Cloning repository..."} # Extract repo name from URL repo_name = request.repo_url.split('/')[-1].replace('.git', '') # Persistent directory under /app/data data_dir = "/app/data" os.makedirs(data_dir, exist_ok=True) self.repo_path = os.path.join(data_dir, f"{repo_name}_{task_id}") try: os.chmod(data_dir, 0o777) # Give full permissions to the data dir logger.info(f"Task {task_id}: Created/using data directory: {self.repo_path}") except Exception as e: logger.warning(f"Task {task_id}: Could not set permissions on data dir: {e}") # Clone repository using git command with credentials try: # Use git command with credentials in URL auth_url = request.repo_url.replace('://', f'://{request.token}@') result = subprocess.run( ['git', 'clone', auth_url, self.repo_path], capture_output=True, text=True, timeout=300 # 5 minutes timeout ) if result.returncode != 0: logger.error(f"Task {task_id}: Git clone error - {result.stderr}") raise Exception(f"Failed to clone repository: {result.stderr}") logger.info(f"Task {task_id}: Successfully cloned repository to {self.repo_path}") except subprocess.TimeoutExpired: raise Exception("Repository cloning timed out after 5 minutes") except Exception as e: raise Exception(f"Failed to clone repository: {str(e)}") async def _analyze_with_ai(self, task_id: str, request: GiteaRequest): """Analyze code with AI model and apply changes""" logger.info(f"Task {task_id}: Analyzing code with AI...") task_status[task_id] = {"status": "processing", "message": "Analyzing code with AI..."} if request.ai_model == "gemini": await self._use_gemini_cli(task_id, request.prompt, request.api_key, request.model_name) elif request.ai_model == "openai": await self._use_openai_ai(task_id, request.prompt, request.api_key, request.model_name) else: raise Exception(f"Unsupported AI model: {request.ai_model}") async def _use_gemini_cli(self, task_id: str, prompt: str, api_key: str, model_name: str): """Use Gemini CLI for code analysis and editing""" try: # Check if Gemini CLI is installed try: subprocess.run(["gemini", "--version"], check=True, capture_output=True) logger.info(f"Task {task_id}: Gemini CLI is available") except (subprocess.CalledProcessError, FileNotFoundError): raise Exception("Gemini CLI is not installed. Please install it first: https://github.com/google/generative-ai-go/tree/main/cmd/gemini") # Read all code files code_content = self._read_code_files() logger.info(f"Task {task_id}: Read {len(code_content)} characters of code content") # Create AI prompt ai_prompt = f""" Analyze the following codebase and make the requested changes: USER REQUEST: {prompt} CODEBASE: {code_content} Please provide: 1. A summary of what changes need to be made 2. The specific file changes in the format: FILE: filename.py CHANGES: [describe changes or provide new code] Be specific about which files to modify and what changes to make. """ # Set API key as environment variable for Gemini CLI env = os.environ.copy() env['GEMINI_API_KEY'] = api_key logger.info(f"Task {task_id}: Calling Gemini CLI with model: {model_name}") # Call Gemini CLI with specific model, passing prompt via stdin result = subprocess.run( ["gemini", "generate", "--model", model_name], input=ai_prompt, capture_output=True, text=True, env=env, cwd=self.repo_path, timeout=600 # 10 minutes timeout ) if result.returncode != 0: logger.error(f"Task {task_id}: Gemini CLI error - {result.stderr}") raise Exception(f"Gemini CLI error: {result.stderr}") logger.info(f"Task {task_id}: Gemini CLI response received ({len(result.stdout)} characters)") logger.info(f"Task {task_id}: Gemini CLI raw response:\n{result.stdout}") # Store the raw AI response for frontend display task_status[task_id]["ai_response"] = result.stdout # Parse and apply changes await self._apply_ai_changes(result.stdout, task_id) except subprocess.TimeoutExpired: raise Exception("Gemini CLI request timed out after 10 minutes") except Exception as e: raise Exception(f"Gemini CLI error: {str(e)}") async def _use_openai_ai(self, task_id: str, prompt: str, api_key: str, model_name: str): """Use OpenAI for code analysis and editing""" try: from openai import OpenAI # Configure OpenAI with API key from frontend client = OpenAI(api_key=api_key) # Read all code files code_content = self._read_code_files() logger.info(f"Task {task_id}: Read {len(code_content)} characters of code content") # Create AI prompt ai_prompt = f""" Analyze the following codebase and make the requested changes: USER REQUEST: {prompt} CODEBASE: {code_content} Please provide: 1. A summary of what changes need to be made 2. The specific file changes in the format: FILE: filename.py CHANGES: [describe changes or provide new code] Be specific about which files to modify and what changes to make. """ logger.info(f"Task {task_id}: Calling OpenAI with model: {model_name}") # Get AI response response = client.chat.completions.create( model=model_name, messages=[ {"role": "system", "content": "You are a code analysis and editing assistant."}, {"role": "user", "content": ai_prompt} ] ) logger.info(f"Task {task_id}: OpenAI response received") # Parse and apply changes await self._apply_ai_changes(response.choices[0].message.content, task_id) except ImportError: raise Exception("OpenAI library not installed. Run: pip install openai") except Exception as e: raise Exception(f"OpenAI error: {str(e)}") def _read_code_files(self) -> str: """Read all code files in the repository""" code_content = "" file_count = 0 for root, dirs, files in os.walk(self.repo_path): # Skip .git directory if '.git' in dirs: dirs.remove('.git') for file in files: if file.endswith(('.py', '.js', '.ts', '.jsx', '.tsx', '.html', '.css', '.json', '.md')): file_path = os.path.join(root, file) try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() relative_path = os.path.relpath(file_path, self.repo_path) code_content += f"\n\n=== {relative_path} ===\n{content}\n" file_count += 1 except Exception as e: logger.warning(f"Could not read {file_path}: {e}") logger.info(f"Read {file_count} code files") return code_content async def _apply_ai_changes(self, ai_response: str, task_id: str): """Apply changes suggested by AI""" logger.info(f"Task {task_id}: Applying AI suggestions...") task_status[task_id] = {"status": "processing", "message": "Applying AI suggestions..."} # Parse AI response for file changes # This is a simplified parser - you might want to make it more robust lines = ai_response.split('\n') current_file = None current_changes = [] files_modified = 0 for line in lines: if line.startswith('FILE:'): if current_file and current_changes: await self._apply_file_changes(current_file, '\n'.join(current_changes)) files_modified += 1 current_file = line.replace('FILE:', '').strip() current_changes = [] elif line.startswith('CHANGES:') or line.strip() == '': continue elif current_file: current_changes.append(line) # Apply last file changes if current_file and current_changes: await self._apply_file_changes(current_file, '\n'.join(current_changes)) files_modified += 1 logger.info(f"Task {task_id}: Applied changes to {files_modified} files") async def _apply_file_changes(self, filename: str, changes: str): """Apply changes to a specific file""" file_path = os.path.join(self.repo_path, filename) if os.path.exists(file_path): # For now, we'll append the changes to the file # In a real implementation, you'd want more sophisticated parsing with open(file_path, 'a', encoding='utf-8') as f: f.write(f"\n\n# AI Generated Changes:\n{changes}\n") logger.info(f"Applied changes to file: {filename}") async def _commit_and_push(self, task_id: str, request: GiteaRequest): """Commit and push changes back to Gitea""" logger.info(f"Task {task_id}: Committing and pushing changes...") task_status[task_id] = {"status": "processing", "message": "Committing and pushing changes..."} try: repo = git.Repo(self.repo_path) # Add all changes repo.git.add('.') # Check if there are changes to commit if repo.is_dirty(): # Commit changes repo.index.commit("AI-generated code updates") logger.info(f"Task {task_id}: Changes committed") # Push changes origin = repo.remote(name='origin') origin.push() logger.info(f"Task {task_id}: Changes pushed to remote") else: logger.info(f"Task {task_id}: No changes to commit") # Remove the cloned repo directory after push if self.repo_path and os.path.exists(self.repo_path): shutil.rmtree(self.repo_path) logger.info(f"Task {task_id}: Removed cloned repo directory {self.repo_path}") except Exception as e: raise Exception(f"Failed to commit and push changes: {str(e)}") # Create MCP server instance mcp_server = MCPServer() @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): """Serve the frontend""" return templates.TemplateResponse("index.html", {"request": request}) @app.post("/process", response_model=ProcessResponse) async def process_repository(request: GiteaRequest, background_tasks: BackgroundTasks): """Process repository with AI""" import uuid task_id = str(uuid.uuid4()) logger.info(f"Starting new task: {task_id}") # Start background task background_tasks.add_task(mcp_server.process_repository, task_id, request) return ProcessResponse( task_id=task_id, status="started", message="Processing started" ) @app.get("/status/{task_id}") async def get_status(task_id: str): """Get status of a processing task""" if task_id not in task_status: raise HTTPException(status_code=404, detail="Task not found") return task_status[task_id] @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "message": "MCP Server is running"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) # AI Generated Changes: ``` ```python --- a/main.py +++ b/main.py @@ -490,9 +490,6 @@ origin = repo.remote(name='origin') origin.push() logger.info(f"Task {task_id}: Changes pushed to remote") - # Remove the cloned repo directory after push - if self.repo_path and os.path.exists(self.repo_path): - shutil.rmtree(self.repo_path) - logger.info(f"Task {task_id}: Removed cloned repo directory {self.repo_path}") except Exception as e: raise Exception(f"Failed to commit and push changes: {str(e)}") ``` ```