#!/usr/bin/env python3 """ Session Catchup Script for planning-with-files Analyzes the previous session to find unsynced context after the last planning file update. Designed to run on SessionStart. Usage: python3 session-catchup.py [project-path] """ import json import sys import os from pathlib import Path from typing import List, Dict, Optional, Tuple PLANNING_FILES = ['task_plan.md', 'progress.md', 'findings.md'] def normalize_path(project_path: str) -> str: """Normalize project path to match Claude Code's internal representation. Claude Code stores session directories using the Windows-native path (e.g., C:\\Users\\...) sanitized with separators replaced by dashes. Git Bash passes /c/Users/... which produces a DIFFERENT sanitized string. This function converts Git Bash paths to Windows paths first. """ p = project_path # Git Bash / MSYS2: /c/Users/... -> C:/Users/... if len(p) >= 3 and p[0] == '/' and p[2] == '/': p = p[1].upper() + ':' + p[2:] # Resolve to absolute path to handle relative paths and symlinks try: resolved = str(Path(p).resolve()) # On Windows, resolve() returns C:\Users\... which is what we want if os.name == 'nt' or '\\' in resolved: p = resolved except (OSError, ValueError): pass return p def get_project_dir(project_path: str) -> Tuple[Optional[Path], Optional[str]]: """Resolve session storage path for the current runtime variant.""" normalized = normalize_path(project_path) # Claude Code's sanitization: replace path separators and : with - sanitized = normalized.replace('\\', '-').replace('/', '-').replace(':', '-') sanitized = sanitized.replace('_', '-') # Strip leading dash if present (Unix absolute paths start with /) if sanitized.startswith('-'): sanitized = sanitized[1:] claude_path = Path.home() / '.claude' / 'projects' / sanitized # Codex stores sessions in ~/.codex/sessions with a different format. # Avoid silently scanning Claude paths when running from Codex skill folder. script_path = Path(__file__).as_posix().lower() is_codex_variant = '/.codex/' in script_path codex_sessions_dir = Path.home() / '.codex' / 'sessions' if is_codex_variant and codex_sessions_dir.exists() and not claude_path.exists(): return None, ( "[planning-with-files] Session catchup skipped: Codex stores sessions " "in ~/.codex/sessions and native Codex parsing is not implemented yet." ) return claude_path, None def get_sessions_sorted(project_dir: Path) -> List[Path]: """Get all session files sorted by modification time (newest first).""" sessions = list(project_dir.glob('*.jsonl')) main_sessions = [s for s in sessions if not s.name.startswith('agent-')] return sorted(main_sessions, key=lambda p: p.stat().st_mtime, reverse=True) def parse_session_messages(session_file: Path) -> List[Dict]: """Parse all messages from a session file, preserving order.""" messages = [] with open(session_file, 'r', encoding='utf-8', errors='replace') as f: for line_num, line in enumerate(f): try: data = json.loads(line) data['_line_num'] = line_num messages.append(data) except json.JSONDecodeError: pass return messages def find_last_planning_update(messages: List[Dict]) -> Tuple[int, Optional[str]]: """ Find the last time a planning file was written/edited. Returns (line_number, filename) or (-1, None) if not found. """ last_update_line = -1 last_update_file = None for msg in messages: msg_type = msg.get('type') if msg_type == 'assistant': content = msg.get('message', {}).get('content', []) if isinstance(content, list): for item in content: if item.get('type') == 'tool_use': tool_name = item.get('name', '') tool_input = item.get('input', {}) if tool_name in ('Write', 'Edit'): file_path = tool_input.get('file_path', '') for pf in PLANNING_FILES: if file_path.endswith(pf): last_update_line = msg['_line_num'] last_update_file = pf return last_update_line, last_update_file def extract_messages_after(messages: List[Dict], after_line: int) -> List[Dict]: """Extract conversation messages after a certain line number.""" result = [] for msg in messages: if msg['_line_num'] <= after_line: continue msg_type = msg.get('type') is_meta = msg.get('isMeta', False) if msg_type == 'user' and not is_meta: content = msg.get('message', {}).get('content', '') if isinstance(content, list): for item in content: if isinstance(item, dict) and item.get('type') == 'text': content = item.get('text', '') break else: content = '' if content and isinstance(content, str): if content.startswith((' 20: result.append({'role': 'user', 'content': content, 'line': msg['_line_num']}) elif msg_type == 'assistant': msg_content = msg.get('message', {}).get('content', '') text_content = '' tool_uses = [] if isinstance(msg_content, str): text_content = msg_content elif isinstance(msg_content, list): for item in msg_content: if item.get('type') == 'text': text_content = item.get('text', '') elif item.get('type') == 'tool_use': tool_name = item.get('name', '') tool_input = item.get('input', {}) if tool_name == 'Edit': tool_uses.append(f"Edit: {tool_input.get('file_path', 'unknown')}") elif tool_name == 'Write': tool_uses.append(f"Write: {tool_input.get('file_path', 'unknown')}") elif tool_name == 'Bash': cmd = tool_input.get('command', '')[:80] tool_uses.append(f"Bash: {cmd}") else: tool_uses.append(f"{tool_name}") if text_content or tool_uses: result.append({ 'role': 'assistant', 'content': text_content[:600] if text_content else '', 'tools': tool_uses, 'line': msg['_line_num'] }) return result def main(): project_path = sys.argv[1] if len(sys.argv) > 1 else os.getcwd() # Check if planning files exist (indicates active task) has_planning_files = any( Path(project_path, f).exists() for f in PLANNING_FILES ) if not has_planning_files: # No planning files in this project; skip catchup to avoid noise. return project_dir, skip_reason = get_project_dir(project_path) if skip_reason: print(skip_reason) return if not project_dir.exists(): # No previous sessions, nothing to catch up on return sessions = get_sessions_sorted(project_dir) if len(sessions) < 1: return # Find a substantial previous session target_session = None for session in sessions: if session.stat().st_size > 5000: target_session = session break if not target_session: return messages = parse_session_messages(target_session) last_update_line, last_update_file = find_last_planning_update(messages) # No planning updates in the target session; skip catchup output. if last_update_line < 0: return # Only output if there's unsynced content messages_after = extract_messages_after(messages, last_update_line) if not messages_after: return # Output catchup report print("\n[planning-with-files] SESSION CATCHUP DETECTED") print(f"Previous session: {target_session.stem}") print(f"Last planning update: {last_update_file} at message #{last_update_line}") print(f"Unsynced messages: {len(messages_after)}") print("\n--- UNSYNCED CONTEXT ---") for msg in messages_after[-15:]: # Last 15 messages if msg['role'] == 'user': print(f"USER: {msg['content'][:300]}") else: if msg.get('content'): print(f"CLAUDE: {msg['content'][:300]}") if msg.get('tools'): print(f" Tools: {', '.join(msg['tools'][:4])}") print("\n--- RECOMMENDED ---") print("1. Run: git diff --stat") print("2. Read: task_plan.md, progress.md, findings.md") print("3. Update planning files based on above context") print("4. Continue with task") if __name__ == '__main__': main()