#!/usr/bin/env python3 import os import sys import subprocess import json import logging import time import hashlib import random import string import uuid import re import traceback from datetime import datetime, timedelta, timezone from flask import Flask, request, jsonify from openai import OpenAI, APIError, APIConnectionError, Timeout import openai import flask try: from zoneinfo import ZoneInfo # Python 3.9+ except Exception: # pragma: no cover ZoneInfo = None # ------------------------------------------------------------------------------ # Logging # ------------------------------------------------------------------------------ logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('/tmp/xaiChatApi.log') # Will be overridden by config.json ] ) logger = logging.getLogger(__name__) logger.info("Starting XaiChatApi.py initialization") # ------------------------------------------------------------------------------ # Config # ------------------------------------------------------------------------------ def load_config(): config_path = os.path.join(os.path.dirname(__file__), 'config.json') logger.debug(f"Attempting to load config from {config_path}") try: if not os.access(config_path, os.R_OK): logger.error(f"No read permission for {config_path}") sys.exit(1) with open(config_path, 'r') as f: config = json.load(f) config['xai_api_key'] = os.getenv('XAI_API_KEY', config.get('xai_api_key', '')) required_fields = ['xai_api_key','api_base_url','api_timeout','max_tokens','temperature', 'max_search_results','ignore_inputs','log_file','flask_host','flask_port', 'run_startup_test','system_prompt'] missing = [f for f in required_fields if f not in config] if missing: logger.error(f"Missing config fields: {missing}") sys.exit(1) if not config.get('system_prompt') or '{message}' not in config['system_prompt']: logger.error("Invalid system_prompt in config.json: must include {message}") sys.exit(1) # switch log file to config's path for h in logger.handlers[:]: if isinstance(h, logging.FileHandler): logger.removeHandler(h) logger.addHandler(logging.FileHandler(config['log_file'])) logger.info(f"Config loaded: {json.dumps({k: '****' if k == 'xai_api_key' else v for k, v in config.items()}, indent=2)}") return config except FileNotFoundError: logger.error(f"Config file {config_path} not found"); sys.exit(1) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in {config_path}: {str(e)}"); sys.exit(1) except Exception as e: logger.error(f"Config loading failed: {type(e).__name__}: {str(e)}") logger.debug(f"Stack trace: {traceback.format_exc()}"); sys.exit(1) logger.info("Loading configuration") config = load_config() last_api_success = None # ------------------------------------------------------------------------------ # Dependencies # ------------------------------------------------------------------------------ def install_dependencies(): packages = ['flask>=3.0.0', 'openai>=1.0.0', 'gunicorn>=22.0'] logger.info("Checking dependencies") try: installed = subprocess.check_output([sys.executable, '-m', 'pip', 'list']).decode('utf-8') logger.debug(f"Installed packages:\n{installed}") for spec in packages: name = spec.split('>=')[0].split('==')[0] try: __import__(name) logger.info(f"{spec} already installed") except ImportError: logger.info(f"Installing {spec}...") subprocess.check_call([sys.executable, '-m', 'pip', 'install', spec]) logger.info(f"Successfully installed {spec}") except subprocess.CalledProcessError as e: logger.error(f"Dependency installation failed: {str(e)}"); sys.exit(1) logger.info("Installing dependencies") try: install_dependencies() except Exception as e: logger.error(f"Startup failed during dependency installation: {type(e).__name__}: {str(e)}") logger.debug(f"Stack trace: {traceback.format_exc()}"); sys.exit(1) # ------------------------------------------------------------------------------ # Flask # ------------------------------------------------------------------------------ logger.info("Initializing Flask app") try: app = Flask(__name__) app.secret_key = os.urandom(24) app.start_time = time.time() except Exception as e: logger.error(f"Startup failed during Flask initialization: {type(e).__name__}: {str(e)}") logger.debug(f"Stack trace: {traceback.format_exc()}"); sys.exit(1) logger.info(f"Python version: {sys.version}") logger.info(f"Flask version: {flask.__version__}") logger.info(f"OpenAI version: {openai.__version__}") logger.info(f"Gunicorn command: {' '.join(sys.argv)}") logger.info(f"Environment: {json.dumps(dict(os.environ), indent=2)}") if not config['xai_api_key']: logger.error("XAI_API_KEY not provided in config or environment"); sys.exit(1) # ------------------------------------------------------------------------------ # Time intent detection (STRICT; avoids 'know' and 'story about time') # ------------------------------------------------------------------------------ TIME_PATTERNS = [ r"\bwhat(?:'s|\s+is)?\s+(?:the\s+)?time\b", # what's the time r"\bwhat(?:'s|\s+is)?\s+(?:the\s+)?time\s+(?:in|for)\s+.+", # what's the time in/for X r"\b(?:current|local)\s+time\b", # current time / local time r"\btime\s+(?:right\s+)?now\b", # time now / time right now r"^\s*now\??\s*$", # "now?" r"\bwhat(?:'s|\s+is)?\s+(?:the\s+)?date\b", # what's the date r"\btoday'?s?\s+date\b", # today's date r"\bdate\s+today\b", # date today r"\bwhat\s+day\s+is\s+it\b", # what day is it r"\bday\s+of\s+week\b", # day of week r"\byesterday\b", # yesterday r"\btime\s+(?:in|for)\s+.+", # time in/for X ] def has_time_intent(msg: str) -> bool: m = (msg or "").strip().lower() return any(re.search(p, m, re.IGNORECASE) for p in TIME_PATTERNS) # ------------------------------------------------------------------------------ # Response text normalizer (fix odd contractions/phrasing) # ------------------------------------------------------------------------------ _CONTRACTION_FIXES = [ (re.compile(r"\bThe[’']ve\b"), "They’ve"), # fix 'The’ve' => 'They’ve' ] _START_THEYVE_CHECKED = re.compile( r"^\s*They[’']ve\s+checked\s+the\s+current\s+time\s+for\s+(.*?),(?:\s*and\s*)?", re.IGNORECASE ) def normalize_reply_text(text: str) -> str: if not text: return text out = text for pat, repl in _CONTRACTION_FIXES: out = pat.sub(repl, out) m = _START_THEYVE_CHECKED.match(out) if m: place = m.group(1).strip() out = _START_THEYVE_CHECKED.sub(f"In {place}, ", out, count=1) return out # ------------------------------------------------------------------------------ # Optional: local time for common cities (offline) # ------------------------------------------------------------------------------ CITY_TZ = { "new york": "America/New_York", "nyc": "America/New_York", "london": "Europe/London", "uk": "Europe/London", "britain": "Europe/London", "paris": "Europe/Paris", "berlin": "Europe/Berlin", "madrid": "Europe/Madrid", "rome": "Europe/Rome", "amsterdam": "Europe/Amsterdam", "zurich": "Europe/Zurich", "stockholm": "Europe/Stockholm", "oslo": "Europe/Oslo", "copenhagen": "Europe/Copenhagen", "helsinki": "Europe/Helsinki", "lisbon": "Europe/Lisbon", "dublin": "Europe/Dublin", "chicago": "America/Chicago", "toronto": "America/Toronto", "vancouver": "America/Vancouver", "los angeles": "America/Los_Angeles", "la": "America/Los_Angeles", "san francisco": "America/Los_Angeles", "sf": "America/Los_Angeles", "seattle": "America/Los_Angeles", "denver": "America/Denver", "phoenix": "America/Phoenix", "mexico city": "America/Mexico_City", "boston": "America/New_York", "sydney": "Australia/Sydney", "melbourne": "Australia/Melbourne", "tokyo": "Asia/Tokyo", "seoul": "Asia/Seoul", "singapore": "Asia/Singapore", "hong kong": "Asia/Hong_Kong", "shanghai": "Asia/Shanghai", "beijing": "Asia/Shanghai", "delhi": "Asia/Kolkata", "mumbai": "Asia/Kolkata", "kolkata": "Asia/Kolkata", "istanbul": "Europe/Istanbul", "moscow": "Europe/Moscow", "cape town": "Africa/Johannesburg", "johannesburg": "Africa/Johannesburg", "rio": "America/Sao_Paulo", "são paulo": "America/Sao_Paulo", "sao paulo": "America/Sao_Paulo", "buenos aires": "America/Argentina/Buenos_Aires", } _LOC_RE = re.compile(r"\btime\s+(?:in|for)\s+(.+)", re.IGNORECASE) def _extract_location(q: str) -> str | None: """Pull 'X' out of phrases like 'time in X' or 'time for X'.""" m = _LOC_RE.search(q or "") if not m: return None loc = m.group(1).strip().lower() # strip trailing punctuation loc = re.sub(r"[?.!,;:\s]+$", "", loc) return loc if loc else None def _local_time_string(now_utc: datetime, tz_name: str, city_label: str) -> str: try: if ZoneInfo is None: return now_utc.strftime(f"It’s %I:%M %p UTC on %A, %B %d, %Y (no local timezone available).") tz = ZoneInfo(tz_name) local = now_utc.astimezone(tz) abbr = local.tzname() or tz_name return local.strftime(f"It’s %I:%M %p {abbr} on %A, %B %d, %Y in {city_label}.") except Exception: return now_utc.strftime(f"It’s %I:%M %p UTC on %A, %B %d, %Y (timezone error).") # ------------------------------------------------------------------------------ # Startup ping # ------------------------------------------------------------------------------ def test_api_connectivity(): global last_api_success logger.info("Initializing OpenAI client for connectivity test") try: client = OpenAI(api_key=config['xai_api_key'], base_url=config['api_base_url']) response = client.chat.completions.create( model="grok-3", messages=[{"role": "user", "content": "ping"}], max_tokens=10, timeout=10.0 ) last_api_success = time.time() logger.info(f"API connectivity test successful: {response.choices[0].message.content}") except Exception as e: logger.error(f"API connectivity test failed: {type(e).__name__}: {str(e)}") logger.debug(f"Stack trace: {traceback.format_exc()}"); sys.exit(1) if config['run_startup_test']: logger.info("Running startup API connectivity test") test_api_connectivity() else: logger.info("Startup API connectivity test disabled in config") # ------------------------------------------------------------------------------ # Prompt, parsing, fallback # ------------------------------------------------------------------------------ def generate_system_prompt(session_id: str, timestamp: str) -> list: try: current_time = datetime.fromtimestamp(float(timestamp), tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') prompt = config['system_prompt'].format( session_id=session_id, timestamp=timestamp, current_time=current_time, max_tokens=config['max_tokens'], ignore_inputs=', '.join(config['ignore_inputs']), message='{message}' ) logger.debug(f"Generated system prompt: {prompt[:100]}... (length: {len(prompt)})") return [ {"role": "system", "content": prompt}, {"role": "user", "content": "{message}"} ] except Exception as e: logger.error(f"Prompt formatting failed: {type(e).__name__}: {str(e)}") logger.debug(f"Stack trace: {traceback.format_exc()}"); raise def parse_response_date(response: str) -> datetime | None: """Regex parse; no year-only matches to avoid false positives.""" try: date_patterns = [ r'\b(\w+\s+\d{1,2},\s+\d{4})\b', # September 03, 2025 r'\b(\d{4}-\d{2}-\d{2})\b', # 2025-09-03 r'\b(\d{1,2}\s+\w+\s+\d{4})\b', # 03 September 2025 r'\b(\d{1,2}:\d{2}\s*(?:AM|PM))\b', # 04:14 PM r'\b(\d{1,2}:\d{2})\b', # 04:14 ] formats = ['%B %d, %Y','%Y-%m-%d','%d %B %Y','%I:%M %p','%H:%M'] for pattern in date_patterns: m = re.search(pattern, response or "", re.IGNORECASE) if not m: continue date_str = m.group(1) for fmt in formats: try: parsed = datetime.strptime(date_str, fmt) if fmt in ('%I:%M %p','%H:%M'): current = datetime.now(timezone.utc) parsed = current.replace(hour=parsed.hour, minute=parsed.minute, second=0, microsecond=0) return parsed.replace(tzinfo=timezone.utc) except ValueError: continue logger.debug(f"No date parsed from response: {response}") return None except Exception as e: logger.debug(f"Date parsing failed: {type(e).__name__}: {str(e)}") logger.debug(f"Stack trace: {traceback.format_exc()}"); return None def calculate_time_fallback(query: str, current_time: str) -> str | None: """Only answer explicit time/date questions. Supports 'time in/for CITY' via zoneinfo.""" try: if not has_time_intent(query): return None lower = (query or "").lower() now_utc = datetime.fromtimestamp(float(current_time), tz=timezone.utc) # 'yesterday' explicit if re.search(r"\byesterday\b", lower): return (now_utc - timedelta(days=1)).strftime('Yesterday was %A, %B %d, %Y (UTC).') # time in/for CITY loc = _extract_location(query) if loc: # best-effort mapping tz_name = CITY_TZ.get(loc) # allow partial keys (e.g., "new york city" -> "new york") if tz_name is None: for key, val in CITY_TZ.items(): if key in loc: tz_name = val; break if tz_name: return _local_time_string(now_utc, tz_name, loc.title()) # generic "what's the time"/"now?": answer in UTC return now_utc.strftime("It’s %I:%M %p UTC on %A, %B %d, %Y.") except Exception as e: logger.error(f"Time fallback failed: {type(e).__name__}: {str(e)}") logger.debug(f"Stack trace: {traceback.format_exc()}"); return None def process_grok_response(response, message: str, timestamp: str) -> str: """Post-process model response and apply safe fallback only for explicit time/date questions.""" reply = response.choices[0].message.content.strip().replace(r'\\n', '\n') logger.debug(f"Processing response: {reply}, token_usage={response.usage}") if has_time_intent(message): current = datetime.fromtimestamp(float(timestamp), tz=timezone.utc) parsed_date = parse_response_date(reply) is_valid = False if parsed_date: time_diff = abs((current - parsed_date).total_seconds()) is_valid = time_diff < 86400 # within 24h logger.debug(f"Time validation: parsed_date={parsed_date}, diff={time_diff}s, valid={is_valid}") else: logger.debug("Time validation: no date parsed from model reply") if not reply or 'unavailable' in (reply or '').lower() or not is_valid: fallback = calculate_time_fallback(message, timestamp) if fallback: logger.info(f"Used fallback for explicit time query: {fallback}") return fallback # Final cleanup reply = normalize_reply_text(reply) return reply # ------------------------------------------------------------------------------ # Endpoints # ------------------------------------------------------------------------------ @app.route('/health', methods=['GET']) def health(): logger.info("Health check called") return jsonify({'status': 'healthy'}), 200, {'Cache-Control': 'no-store, no-cache, must-revalidate, max-age=0'} @app.route('/debug', methods=['GET']) def debug(): logger.info("Debug endpoint called") try: with open(config['log_file'], 'r') as f: recent_logs = f.readlines()[-5:] except Exception as e: recent_logs = [f"Error reading log file: {str(e)}"] status = { 'config': {k: '****' if k == 'xai_api_key' else v for k, v in config.items()}, 'uptime': time.time() - app.start_time, 'python_version': sys.version, 'flask_version': flask.__version__, 'openai_version': openai.__version__, 'last_api_success': last_api_success if last_api_success else 'Never', 'recent_logs': recent_logs, 'flask_host': config['flask_host'], 'flask_port': config['flask_port'] } return jsonify(status), 200, {'Cache-Control': 'no-store, no-cache, must-revalidate, max-age=0'} @app.route('/chat', methods=['GET', 'POST']) def chat(): start_time = time.time() session_id = str(uuid.uuid4()) timestamp = str(time.time()) if request.method == 'GET': message = request.args.get('message', '') nick = request.args.get('nick', 'unknown') request_details = {'method': 'GET', 'args': dict(request.args), 'headers': dict(request.headers)} else: data = request.get_json(silent=True) or {} message = data.get('message', '') nick = data.get('nick', 'unknown') request_details = {'method': 'POST', 'json': data, 'headers': dict(request.headers)} logger.debug(f"Session ID: {session_id}, Timestamp: {timestamp}, Request details: {json.dumps(request_details, indent=2)}") if not message: logger.error(f"Session ID: {session_id}, Timestamp: {timestamp}, No message provided") return jsonify({'error': 'No message provided', 'fallback': 'Please provide a message!'}), 400, {'Cache-Control': 'no-store, no-cache, must-revalidate, max-age=0'} if message.lower().strip() in config['ignore_inputs']: logger.info(f"Ignored non-substantive input from nick: {nick}, message: {message}") return jsonify({'reply': ''}), 200, {'Cache-Control': 'no-store, no-cache, must-revalidate, max-age=0', 'X-Session-ID': session_id, 'X-Timestamp': timestamp} logger.info(f"Session ID: {session_id}, Timestamp: {timestamp}, Request from nick: {nick}, message: {message}") try: conversation = generate_system_prompt(session_id, timestamp) conversation[-1]['content'] = conversation[-1]['content'].format(message=message) except Exception as e: logger.error(f"Prompt generation failed: {type(e).__name__}: {str(e)}") logger.debug(f"Stack trace: {traceback.format_exc()}") return jsonify({'error': f"Prompt generation failed: {str(e)}", 'fallback': 'Sorry, I couldn\'t process that!'}), 500, {'Cache-Control': 'no-store, no-cache, must-revalidate, max-age=0'} search_params = {} search_keywords = ['weather', 'death', 'died', 'recent', 'news', 'what happened'] # no time keywords if any(keyword in (message or '').lower() for keyword in search_keywords): search_params = {'mode': 'on', 'max_search_results': config['max_search_results']} logger.info(f"Live Search enabled for query: {message}") logger.debug(f"API request payload: {json.dumps(conversation, indent=2)}") try: logger.info("Initializing OpenAI client") client = OpenAI(api_key=config['xai_api_key'], base_url=config['api_base_url']) api_start = time.time() nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) headers = { 'X-Cache-Bypass': f"{time.time()}-{nonce}", 'X-Request-ID': str(random.randint(100000, 999999)), 'X-Session-ID': session_id, 'X-Timestamp': timestamp } logger.debug(f"Request headers: {headers}") response = client.chat.completions.create( model="grok-3", messages=conversation, temperature=config['temperature'], max_tokens=config['max_tokens'], extra_headers=headers, extra_body={'search_parameters': search_params} if search_params else {}, timeout=config['api_timeout'] ) api_duration = time.time() - api_start global last_api_success last_api_success = time.time() logger.debug(f"API call took {api_duration:.2f}s") logger.debug(f"Raw Grok response: {response.choices[0].message.content}") logger.debug(f"Full response: {response.model_dump()}") reply = process_grok_response(response, message, timestamp) reply_hash = hashlib.sha256(reply.encode()).hexdigest() logger.info(f"Reply (len={len(reply)}, hash={reply_hash}): {reply}") logger.info(f"Total time: {time.time() - start_time:.2f}s") return jsonify({'reply': reply}), 200, { 'Cache-Control': 'no-store, no-cache, must-revalidate, max-age=0', 'X-Session-ID': session_id, 'X-Timestamp': timestamp } except (APIError, APIConnectionError, Timeout) as e: logger.error(f"API call failed: {type(e).__name__}: {str(e)}") logger.debug(f"Stack trace: {traceback.format_exc()}") # Only use time fallback for explicit time/date intent if has_time_intent(message): fallback = calculate_time_fallback(message, timestamp) if fallback: logger.info(f"Used fallback for time query (API failure): {fallback}") return jsonify({'reply': fallback}), 200, {'Cache-Control': 'no-store, no-cache, must-revalidate, max-age=0', 'X-Session-ID': session_id, 'X-Timestamp': timestamp} return jsonify({'error': f"API call failed: {str(e)}", 'fallback': 'Sorry, I couldn\'t connect to Grok!'}), 500, {'Cache-Control': 'no-store, no-cache, must-revalidate, max-age=0', 'X-Session-ID': session_id, 'X-Timestamp': timestamp} # ------------------------------------------------------------------------------ Main # ------------------------------------------------------------------------------ if __name__ == '__main__': logger.info(f"Starting Flask server on {config['flask_host']}:{config['flask_port']}") app.run(host=config['flask_host'], port=config['flask_port'], debug=False)