Unify chat.py and conversation.py into single implementation

CRITICAL FIX: chat.py had TWO execution paths causing inconsistent behavior:
1. Tool calling (correct) - used centralized command_patterns
2. Legacy JSON command parsing (broken) - bypassed SysadminTools

This caused macha-chat to fail SSH connections while macha-ask worked.

Changes:
- Rewrote chat.py to use ONLY tool-calling architecture
- All commands now go through SysadminTools.execute_command()
- SSH commands use centralized command_patterns.py
- conversation.py is now a lightweight wrapper for compatibility
- Both macha-chat and macha-ask use the same code path
- Updated module.nix to call chat.py directly

Benefits:
- Consistent behavior between macha-chat and macha-ask
- Single execution path = easier to maintain
- All SSH commands use explicit key paths
- No more password prompts

Fixes:
- SSH from macha-chat now works correctly
- Both interfaces use centralized command patterns
This commit is contained in:
Lily Miller
2025-10-06 16:19:57 -06:00
parent 2f367f7cdc
commit b9a498a3fd
3 changed files with 116 additions and 569 deletions

346
chat.py
View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""
Interactive chat interface with Macha AI agent.
Allows conversational interaction and directive execution.
Unified chat/conversation interface using tool-calling architecture.
"""
import json
@@ -10,7 +10,7 @@ import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
@@ -19,152 +19,34 @@ from agent import MachaAgent
class MachaChatSession:
"""Interactive chat session with Macha"""
"""Interactive chat session with Macha using tool-calling architecture"""
def __init__(self):
self.agent = MachaAgent(use_queue=True, priority="INTERACTIVE")
def __init__(
self,
ollama_host: str = "http://localhost:11434",
model: str = "gpt-oss:latest",
state_dir: Path = Path("/var/lib/macha"),
enable_tools: bool = True
):
"""Initialize chat session with Macha
Args:
ollama_host: Ollama API endpoint
model: Model name to use
state_dir: State directory for agent
enable_tools: Whether to enable tool calling (should always be True)
"""
self.agent = MachaAgent(
ollama_host=ollama_host,
model=model,
state_dir=state_dir,
enable_tools=enable_tools,
use_queue=True,
priority="INTERACTIVE"
)
self.conversation_history: List[Dict[str, str]] = []
self.session_start = datetime.now().isoformat()
def _create_chat_prompt(self, user_message: str) -> str:
"""Create a prompt for the chat session"""
# Build conversation context
context = ""
if self.conversation_history:
context = "\n\nCONVERSATION HISTORY:\n"
for entry in self.conversation_history[-10:]: # Last 10 messages
role = entry['role'].upper()
msg = entry['message']
context += f"{role}: {msg}\n"
prompt = f"""{MachaAgent.SYSTEM_PROMPT}
TASK: INTERACTIVE CHAT SESSION
You are in an interactive chat session with the system administrator.
You can have a natural conversation and execute commands when directed.
CAPABILITIES:
- Answer questions about system status
- Explain configurations and issues
- Execute commands when explicitly asked
- Provide guidance and recommendations
COMMAND EXECUTION:
When the user asks you to run a command or perform an action that requires execution:
1. Respond with a JSON object containing the command to execute
2. Format: {{"action": "execute", "command": "the command", "explanation": "why you're running it"}}
3. After seeing the output, continue the conversation naturally
RESPONSE FORMAT:
- For normal conversation: Respond naturally in plain text
- For command execution: Respond with JSON containing action/command/explanation
- Keep responses concise but informative
RULES:
- Only execute commands when explicitly asked or when it's clearly needed
- Explain what you're about to do before executing
- Never execute destructive commands without explicit confirmation
- If unsure, ask for clarification
{context}
USER: {user_message}
MACHA:"""
return prompt
def _execute_command(self, command: str) -> Dict[str, Any]:
"""Execute a shell command and return results"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=30
)
# Check if command failed due to permissions
needs_sudo = False
permission_errors = [
'Interactive authentication required',
'Permission denied',
'Operation not permitted',
'Must be root',
'insufficient privileges',
'authentication is required'
]
if result.returncode != 0:
error_text = (result.stderr + result.stdout).lower()
for perm_error in permission_errors:
if perm_error.lower() in error_text:
needs_sudo = True
break
# Retry with sudo if permission error detected
if needs_sudo and not command.strip().startswith('sudo'):
print(f"\n⚠️ Permission denied, retrying with sudo...")
sudo_command = f"sudo {command}"
result = subprocess.run(
sudo_command,
shell=True,
capture_output=True,
text=True,
timeout=30
)
return {
'success': result.returncode == 0,
'exit_code': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr,
'command': sudo_command,
'retried_with_sudo': True
}
return {
'success': result.returncode == 0,
'exit_code': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr,
'command': command,
'retried_with_sudo': False
}
except subprocess.TimeoutExpired:
return {
'success': False,
'exit_code': -1,
'stdout': '',
'stderr': 'Command timed out after 30 seconds',
'command': command,
'retried_with_sudo': False
}
except Exception as e:
return {
'success': False,
'exit_code': -1,
'stdout': '',
'stderr': str(e),
'command': command,
'retried_with_sudo': False
}
def _parse_response(self, response: str) -> Dict[str, Any]:
"""Parse AI response to determine if it's a command or text"""
try:
# Try to parse as JSON
parsed = json.loads(response.strip())
if isinstance(parsed, dict) and 'action' in parsed:
return parsed
except json.JSONDecodeError:
pass
# It's plain text conversation
return {'action': 'chat', 'message': response}
def _auto_diagnose_ollama(self) -> str:
"""Automatically diagnose Ollama issues"""
diagnostics = []
@@ -241,8 +123,16 @@ MACHA:"""
return "\n".join(diagnostics)
def process_message(self, user_message: str) -> str:
"""Process a user message and return Macha's response"""
def process_message(self, user_message: str, verbose: bool = False) -> str:
"""Process a user message and return Macha's response
Args:
user_message: The user's message
verbose: Whether to show detailed token counts
Returns:
Macha's response
"""
# Add user message to history
self.conversation_history.append({
@@ -258,14 +148,13 @@ MACHA:"""
knowledge_context = self.agent._query_relevant_knowledge(user_message, limit=3)
# Add recent conversation history (last 15 messages to stay within context limits)
# With tool calling, messages grow quickly, so we limit more aggressively
recent_history = self.conversation_history[-15:] # Last ~7 exchanges
recent_history = self.conversation_history[-15:]
for entry in recent_history:
content = entry['message']
# Truncate very long messages (e.g., command outputs)
if len(content) > 3000:
content = content[:1500] + "\n... [message truncated] ...\n" + content[-1500:]
# Add knowledge context to first user message if available
# Add knowledge context to last user message if available
if entry == recent_history[-1] and knowledge_context:
content += knowledge_context
messages.append({
@@ -273,9 +162,22 @@ MACHA:"""
"content": content
})
if verbose:
# Estimate tokens for debugging
total_chars = sum(len(json.dumps(m)) for m in messages)
estimated_tokens = total_chars // 4
print(f"[Context: {estimated_tokens:,} tokens, {len(messages)} messages]")
try:
# Use tool-aware chat API
ai_response = self.agent._query_ollama_with_tools(messages)
# Use tool-aware chat API - this handles all tool calling automatically
response_data = self.agent._query_ollama_with_tools(
messages,
tool_definitions=self.agent.tools.get_tool_definitions() if self.agent.enable_tools else []
)
# Extract the final response
ai_response = response_data.get("content", "")
except Exception as e:
error_msg = (
f"❌ CRITICAL: Failed to communicate with Ollama inference engine\n\n"
@@ -298,91 +200,16 @@ MACHA:"""
diagnostics = self._auto_diagnose_ollama()
return error_msg + "\n" + diagnostics
# Check if Ollama returned an error
try:
error_check = json.loads(ai_response)
if isinstance(error_check, dict) and 'error' in error_check:
error_msg = (
f"❌ Ollama API Error\n\n"
f"Error: {error_check.get('error', 'Unknown error')}\n"
f"Diagnosis: {error_check.get('diagnosis', 'No details')}\n\n"
)
# Auto-diagnose the issue
diagnostics = self._auto_diagnose_ollama()
return error_msg + "\n" + diagnostics
except json.JSONDecodeError:
# Not JSON, it's a normal response
pass
# Parse response
parsed = self._parse_response(ai_response)
if parsed.get('action') == 'execute':
# AI wants to execute a command
command = parsed.get('command', '')
explanation = parsed.get('explanation', '')
# Show what we're about to do
response = f"🔧 {explanation}\n\nExecuting: `{command}`\n\n"
# Execute the command
result = self._execute_command(command)
# Show if we retried with sudo
if result.get('retried_with_sudo'):
response += f"⚠️ Permission denied, retried as: `{result['command']}`\n\n"
if result['success']:
response += "✅ Command succeeded:\n"
if result['stdout']:
response += f"```\n{result['stdout']}\n```"
else:
response += "(no output)"
else:
response += f"❌ Command failed (exit code {result['exit_code']}):\n"
if result['stderr']:
response += f"```\n{result['stderr']}\n```"
elif result['stdout']:
response += f"```\n{result['stdout']}\n```"
# Add command execution to history
# Add response to history
self.conversation_history.append({
'role': 'macha',
'message': response,
'timestamp': datetime.now().isoformat(),
'command_result': result
})
# Now ask AI to respond to the command output
followup_prompt = f"""The command completed. Here's what happened:
Command: {command}
Success: {result['success']}
Output: {result['stdout'][:500] if result['stdout'] else '(none)'}
Error: {result['stderr'][:500] if result['stderr'] else '(none)'}
Please provide a brief analysis or next steps."""
followup_response = self.agent._query_ollama(followup_prompt)
if followup_response:
response += f"\n\n{followup_response}"
return response
else:
# Normal conversation response
message = parsed.get('message', ai_response)
self.conversation_history.append({
'role': 'macha',
'message': message,
'role': 'assistant',
'message': ai_response,
'timestamp': datetime.now().isoformat()
})
return message
return ai_response
def run(self):
def run_interactive(self):
"""Run the interactive chat session"""
print("=" * 70)
print("🌐 MACHA INTERACTIVE CHAT")
@@ -425,9 +252,6 @@ Please provide a brief analysis or next steps."""
continue
elif user_input.lower() == '/debug':
import os
import subprocess
print("\n" + "=" * 70)
print("MACHA ARCHITECTURE & STATUS")
print("=" * 70)
@@ -453,19 +277,18 @@ Please provide a brief analysis or next steps."""
except:
print(f" Sudo Access: ❌ No")
print(f" Note: Chat runs as invoking user (you), not as macha-autonomous")
print(f" Note: Chat runs as invoking user (you), using macha's tools")
print("\n🧠 INFERENCE ENGINE:")
print(f" Backend: Ollama")
print(f" Host: {self.agent.ollama_host}")
print(f" Model: {self.agent.model}")
print(f" Service: ollama.service (systemd)")
print(f" Queue Worker: ollama-queue-worker.service")
print("\n💾 DATABASE:")
print(f" Backend: ChromaDB")
print(f" Host: http://localhost:8000")
print(f" Data: /var/lib/chromadb")
print(f" Service: chromadb.service (systemd)")
print(f" State: {self.agent.state_dir}")
print("\n🔍 OLLAMA STATUS:")
# Try to query Ollama status
@@ -488,6 +311,12 @@ Please provide a brief analysis or next steps."""
print(f" Status: ❌ Cannot connect: {e}")
print(f" Hint: Check 'systemctl status ollama.service'")
print("\n🛠️ TOOLS:")
print(f" Enabled: {self.agent.enable_tools}")
if self.agent.enable_tools:
print(f" Available tools: {len(self.agent.tools.get_tool_definitions())}")
print(f" Architecture: Centralized command_patterns.py")
print("\n💡 CONVERSATION:")
print(f" History: {len(self.conversation_history)} messages")
print(f" Session started: {self.session_start}")
@@ -497,7 +326,7 @@ Please provide a brief analysis or next steps."""
# Process the message
print("\n🤖 MACHA: ", end='', flush=True)
response = self.process_message(user_input)
response = self.process_message(user_input, verbose=False)
print(response)
except KeyboardInterrupt:
@@ -508,15 +337,48 @@ Please provide a brief analysis or next steps."""
break
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
continue
def ask_once(self, question: str, verbose: bool = True) -> str:
"""Ask a single question and return the response (for macha-ask command)
Args:
question: The question to ask
verbose: Whether to show detailed context information
Returns:
Macha's response
"""
response = self.process_message(question, verbose=verbose)
return response
def main():
"""Main entry point"""
"""Main entry point for macha-chat"""
session = MachaChatSession()
session.run()
session.run_interactive()
def ask_main():
"""Entry point for macha-ask"""
if len(sys.argv) < 2:
print("Usage: macha-ask <question>", file=sys.stderr)
sys.exit(1)
question = " ".join(sys.argv[1:])
session = MachaChatSession()
response = session.ask_once(question, verbose=True)
print("\n" + "=" * 60)
print("MACHA:")
print("=" * 60)
print(response)
print("=" * 60)
print()
if __name__ == "__main__":
main()

View File

@@ -1,328 +1,12 @@
#!/usr/bin/env python3
"""
Conversational Interface - Allows questioning Macha about decisions and system state
Macha conversation interface - legacy compatibility wrapper.
This module now uses the unified chat.py implementation.
"""
import json
import requests
from typing import Dict, List, Any, Optional
from pathlib import Path
from datetime import datetime
from agent import MachaAgent
class MachaConversation:
"""Conversational interface for Macha"""
def __init__(
self,
ollama_host: str = "http://localhost:11434",
model: str = "gpt-oss:latest",
state_dir: Path = Path("/var/lib/macha")
):
self.ollama_host = ollama_host
self.model = model
self.state_dir = state_dir
self.decision_log = self.state_dir / "decisions.jsonl"
self.approval_queue = self.state_dir / "approval_queue.json"
self.orchestrator_log = self.state_dir / "orchestrator.log"
# Initialize agent with tool support and queue
self.agent = MachaAgent(
ollama_host=ollama_host,
model=model,
state_dir=state_dir,
enable_tools=True,
use_queue=True,
priority="INTERACTIVE"
)
def ask(self, question: str, include_context: bool = True) -> str:
"""Ask Macha a question with optional system context"""
context = ""
if include_context:
context = self._gather_context()
# Build messages for tool-aware chat
content = self._create_conversational_prompt(question, context)
messages = [{"role": "user", "content": content}]
response = self.agent._query_ollama_with_tools(messages)
return response
def discuss_action(self, action_index: int) -> str:
"""Discuss a specific queued action by its queue position (0-based index)"""
action = self._get_action_from_queue(action_index)
if not action:
return f"No action found at queue position {action_index}. Use 'macha-approve list' to see available actions."
context = self._gather_context()
action_context = json.dumps(action, indent=2)
content = f"""TASK: DISCUSS PROPOSED ACTION
================================================================================
A user is asking about a proposed action in your approval queue.
QUEUED ACTION (Queue Position #{action_index}):
{action_context}
RECENT SYSTEM CONTEXT:
{context}
The user wants to discuss this action. Explain:
1. Why you proposed this action
2. What problem it solves
3. The risks involved
4. What could go wrong
5. Alternative approaches if any
Be conversational, helpful, and honest about uncertainties.
"""
messages = [{"role": "user", "content": content}]
return self.agent._query_ollama_with_tools(messages)
def _gather_context(self) -> str:
"""Gather relevant system context for the conversation"""
context_parts = []
# System infrastructure from ChromaDB
try:
from context_db import ContextDatabase
db = ContextDatabase()
systems = db.get_all_systems()
if systems:
context_parts.append("INFRASTRUCTURE:")
for system in systems:
context_parts.append(f" - {system['hostname']} ({system.get('type', 'unknown')})")
if system.get('config_repo'):
context_parts.append(f" Config Repo: {system['config_repo']}")
context_parts.append(f" Branch: {system.get('config_branch', 'unknown')}")
if system.get('capabilities'):
context_parts.append(f" Capabilities: {', '.join(system['capabilities'])}")
except Exception as e:
# ChromaDB not available, skip
pass
# Recent decisions
recent_decisions = self._get_recent_decisions(5)
if recent_decisions:
context_parts.append("\nRECENT DECISIONS:")
for i, dec in enumerate(recent_decisions, 1):
timestamp = dec.get("timestamp", "unknown")
analysis = dec.get("analysis", {})
status = analysis.get("status", "unknown")
context_parts.append(f"{i}. [{timestamp}] Status: {status}")
if "issues" in analysis:
for issue in analysis.get("issues", [])[:3]:
context_parts.append(f" - {issue.get('description', 'N/A')}")
# Pending approvals
pending = self._get_pending_approvals()
if pending:
context_parts.append(f"\nPENDING APPROVALS: {len(pending)} action(s) awaiting approval")
# Recent log excerpts (last 10 lines)
recent_logs = self._get_recent_logs(10)
if recent_logs:
context_parts.append("\nRECENT LOG ENTRIES:")
context_parts.extend(recent_logs)
return "\n".join(context_parts)
def _create_conversational_prompt(self, question: str, context: str) -> str:
"""Create a conversational prompt"""
return f"""{MachaAgent.SYSTEM_PROMPT}
TASK: ANSWER QUESTION
================================================================================
You monitor system health, analyze issues using AI, and propose fixes. Be helpful,
honest about what you know and don't know, and reference the context provided below.
SYSTEM CONTEXT:
{context if context else "No recent activity"}
USER QUESTION:
{question}
Respond conversationally and helpfully. If the question is about your recent decisions
or actions, reference the context above. If you don't have enough information, say so.
Keep responses concise but informative.
"""
def _query_ollama(self, prompt: str, temperature: float = 0.7) -> str:
"""Query Ollama API"""
try:
response = requests.post(
f"{self.ollama_host}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"stream": False,
"temperature": temperature,
},
timeout=60
)
response.raise_for_status()
return response.json().get("response", "")
except requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = f" - {response.text}"
except:
pass
return f"Error: Ollama returned HTTP {response.status_code}{error_detail}"
except Exception as e:
return f"Error querying Ollama: {str(e)}"
def _get_recent_decisions(self, count: int = 5) -> List[Dict[str, Any]]:
"""Get recent decisions from log"""
if not self.decision_log.exists():
return []
decisions = []
try:
with open(self.decision_log, 'r') as f:
for line in f:
if line.strip():
try:
decisions.append(json.loads(line))
except:
pass
except:
pass
return decisions[-count:]
def _get_pending_approvals(self) -> List[Dict[str, Any]]:
"""Get pending approvals from queue"""
if not self.approval_queue.exists():
return []
try:
with open(self.approval_queue, 'r') as f:
data = json.load(f)
# Queue is a JSON array, not an object with "pending" key
if isinstance(data, list):
return data
return data.get("pending", [])
except:
return []
def _get_action_from_queue(self, action_index: int) -> Optional[Dict[str, Any]]:
"""Get a specific action from the queue by index"""
pending = self._get_pending_approvals()
if 0 <= action_index < len(pending):
return pending[action_index]
return None
def _get_recent_logs(self, count: int = 10) -> List[str]:
"""Get recent orchestrator log lines"""
if not self.orchestrator_log.exists():
return []
try:
with open(self.orchestrator_log, 'r') as f:
lines = f.readlines()
return [line.strip() for line in lines[-count:] if line.strip()]
except:
return []
# Import the unified implementation
from chat import ask_main
# Entry point
if __name__ == "__main__":
import sys
import argparse
parser = argparse.ArgumentParser(description="Ask Macha a question or discuss an action")
parser.add_argument("--discuss", type=int, metavar="ACTION_ID", help="Discuss a specific queued action")
parser.add_argument("--follow-up", type=str, metavar="QUESTION", help="Follow-up question about the action")
parser.add_argument("question", nargs="*", help="Your question for Macha")
parser.add_argument("--no-context", action="store_true", help="Don't include system context")
args = parser.parse_args()
# Load config if available
config_file = Path("/etc/macha-autonomous/config.json")
ollama_host = "http://localhost:11434"
model = "gpt-oss:latest"
if config_file.exists():
try:
with open(config_file, 'r') as f:
config = json.load(f)
ollama_host = config.get("ollama_host", ollama_host)
model = config.get("model", model)
except:
pass
conversation = MachaConversation(
ollama_host=ollama_host,
model=model
)
if args.discuss is not None:
if args.follow_up:
# Follow-up question about a specific action
action = conversation._get_action_from_queue(args.discuss)
if not action:
print(f"No action found at queue position {args.discuss}. Use 'macha-approve list' to see available actions.")
sys.exit(1)
# Build context with the action details
action_context = f"""
QUEUED ACTION #{args.discuss}:
Diagnosis: {action.get('proposal', {}).get('diagnosis', 'N/A')}
Proposed Action: {action.get('proposal', {}).get('proposed_action', 'N/A')}
Action Type: {action.get('proposal', {}).get('action_type', 'N/A')}
Risk Level: {action.get('proposal', {}).get('risk_level', 'N/A')}
Commands: {json.dumps(action.get('proposal', {}).get('commands', []), indent=2)}
Reasoning: {action.get('proposal', {}).get('reasoning', 'N/A')}
FOLLOW-UP QUESTION:
{args.follow_up}
"""
# Query the AI with the action context
response = conversation._query_ollama(f"""{MachaAgent.SYSTEM_PROMPT}
TASK: ANSWER FOLLOW-UP QUESTION ABOUT QUEUED ACTION
================================================================================
You are answering a follow-up question about a proposed fix that is awaiting approval.
Be helpful and answer directly. If the user is concerned about risks, explain them clearly.
If they ask about alternatives, suggest them.
{action_context}
RESPOND CONCISELY AND DIRECTLY.
""")
else:
# Initial discussion about the action
response = conversation.discuss_action(args.discuss)
elif args.question:
# Ask a general question
question = " ".join(args.question)
response = conversation.ask(question, include_context=not args.no_context)
else:
parser.print_help()
sys.exit(1)
# Only print formatted output for initial discussion, not for follow-ups
if args.follow_up:
print(response)
else:
print("\n" + "="*60)
print("MACHA:")
print("="*60)
print(response)
print("="*60 + "\n")
ask_main()

View File

@@ -507,7 +507,7 @@ print('='*60)
"
'')
# Tool to ask Macha questions
# Tool to ask Macha questions (unified with macha-chat, uses ask_main entry point)
(pkgs.writeScriptBin "macha-ask" ''
#!${pkgs.bash}/bin/bash
if [ $# -eq 0 ]; then
@@ -515,7 +515,8 @@ print('='*60)
echo "Example: macha-ask Why did you recommend restarting that service?"
exit 1
fi
sudo -u ${cfg.user} ${pkgs.coreutils}/bin/env CHROMA_ENV_FILE="" ANONYMIZED_TELEMETRY="False" ${pythonEnv}/bin/python3 ${./.}/conversation.py "$@"
# Run as macha user with ask_main entry point from chat.py
sudo -u ${cfg.user} ${pkgs.coreutils}/bin/env PYTHONPATH=${toString ./.} CHROMA_ENV_FILE="" ANONYMIZED_TELEMETRY="False" ${pythonEnv}/bin/python3 -c "from chat import ask_main; ask_main()" "$@"
'')
# Issue tracking CLI