diff --git a/.ralph/prd.json b/.ralph/prd.json index f0bf0d4..e6451a1 100644 --- a/.ralph/prd.json +++ b/.ralph/prd.json @@ -137,7 +137,7 @@ "go test ./pkg/state -run TestAtomicSave passes" ], "priority": 9, - "passes": false, + "passes": true, "notes": "" }, { diff --git a/.ralph/progress.txt b/.ralph/progress.txt index ea466ed..44297b4 100644 --- a/.ralph/progress.txt +++ b/.ralph/progress.txt @@ -6,7 +6,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改 ## Progress -### Completed (7/21) +### Completed (8/21) - US-001: Add ToolResult struct and helper functions - US-002: Modify Tool interface to return *ToolResult @@ -15,6 +15,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改 - US-006: Add AsyncCallback type and AsyncTool interface - US-007: Heartbeat async task execution support - US-008: Inject callback into async tools in AgentLoop +- US-009: State save atomicity - SetLastChannel ### In Progress @@ -30,7 +31,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改 | US-006 | Add AsyncCallback type and AsyncTool interface | Completed | | | US-007 | Heartbeat async task execution support | Completed | | | US-008 | Inject callback into async tools in AgentLoop | Completed | | -| US-009 | State save atomicity - SetLastChannel | Pending | | +| US-009 | State save atomicity - SetLastChannel | Completed | | | US-010 | Update RecordLastChannel to use atomic save | Pending | | | US-011 | Refactor MessageTool to use ToolResult | Completed | | | US-012 | Refactor ShellTool to use ToolResult | Completed | | @@ -149,4 +150,29 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改 - **Gotchas encountered:** 更新方法签名时需要同时更新所有调用点。我修改了 `ExecuteWithContext` 的签名,所以也更新了 `Execute` 方法的调用。 - **Useful context:** 异步工具完成时会调用回调,回调将 `ForUser` 内容发送给用户。这允许长时间运行的操作(如子代理)在后台完成并通知用户,而不阻塞主循环。 +--- + +## [2026-02-12] - US-009 +- What was implemented: + - 创建新的 `pkg/state` 包,包含状态管理和原子保存功能 + - 定义 `State` 结构体,包含 `LastChannel`、`LastChatID` 和 `Timestamp` 字段 + - 定义 `Manager` 结构体,使用 `sync.RWMutex` 保护并发访问 + - 实现 `NewManager(workspace string)` 构造函数,创建状态目录并加载现有状态 + - 实现 `SetLastChannel(workspace, channel string)` 方法,使用临时文件 + 重命名模式实现原子保存 + - 实现 `SetLastChatID(workspace, chatID string)` 方法 + - 实现 `GetLastChannel()` 和 `GetLastChatID()` getter 方法 + - 实现 `saveAtomic()` 内部方法,使用 `os.WriteFile` 写入临时文件,然后用 `os.Rename` 原子性地重命名 + - 如果重命名失败,清理临时文件 + - 实现 `load()` 方法,从磁盘加载状态 + - 添加完整的测试:`TestAtomicSave`、`TestSetLastChatID`、`TestAtomicity_NoCorruptionOnInterrupt`、`TestConcurrentAccess`、`TestNewManager_ExistingState`、`TestNewManager_EmptyWorkspace` + +- Files changed: + - `pkg/state/state.go` (新增) + - `pkg/state/state_test.go` (新增) + +- **Learnings for future iterations:** + - **Patterns discovered:** 临时文件 + 重命名模式是实现原子写入的标准方法。在 POSIX 系统上,`os.Rename` 是原子操作。 + - **Gotchas encountered:** 临时文件必须与目标文件在同一文件系统中,否则 `os.Rename` 会失败。 + - **Useful context:** 这个模式将在 US-010 中用于 `RecordLastChannel`,确保状态更新的原子性。 + --- \ No newline at end of file diff --git a/pkg/state/state.go b/pkg/state/state.go new file mode 100644 index 0000000..280aafd --- /dev/null +++ b/pkg/state/state.go @@ -0,0 +1,160 @@ +package state + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" +) + +// State represents the persistent state for a workspace. +// It includes information about the last active channel/chat. +type State struct { + // LastChannel is the last channel used for communication + LastChannel string `json:"last_channel,omitempty"` + + // LastChatID is the last chat ID used for communication + LastChatID string `json:"last_chat_id,omitempty"` + + // Timestamp is the last time this state was updated + Timestamp time.Time `json:"timestamp"` +} + +// Manager manages persistent state with atomic saves. +type Manager struct { + workspace string + state *State + mu sync.RWMutex + stateFile string +} + +// NewManager creates a new state manager for the given workspace. +func NewManager(workspace string) *Manager { + stateDir := filepath.Join(workspace, "state") + stateFile := filepath.Join(stateDir, "state.json") + + // Create state directory if it doesn't exist + os.MkdirAll(stateDir, 0755) + + sm := &Manager{ + workspace: workspace, + stateFile: stateFile, + state: &State{}, + } + + // Load existing state if available + sm.load() + + return sm +} + +// SetLastChannel atomically updates the last channel and saves the state. +// This method uses a temp file + rename pattern for atomic writes, +// ensuring that the state file is never corrupted even if the process crashes. +// +// The workspace parameter is used to construct the state file path. +func (sm *Manager) SetLastChannel(workspace, channel string) error { + sm.mu.Lock() + defer sm.mu.Unlock() + + // Update state + sm.state.LastChannel = channel + sm.state.Timestamp = time.Now() + + // Atomic save using temp file + rename + if err := sm.saveAtomic(); err != nil { + return fmt.Errorf("failed to save state atomically: %w", err) + } + + return nil +} + +// SetLastChatID atomically updates the last chat ID and saves the state. +func (sm *Manager) SetLastChatID(workspace, chatID string) error { + sm.mu.Lock() + defer sm.mu.Unlock() + + // Update state + sm.state.LastChatID = chatID + sm.state.Timestamp = time.Now() + + // Atomic save using temp file + rename + if err := sm.saveAtomic(); err != nil { + return fmt.Errorf("failed to save state atomically: %w", err) + } + + return nil +} + +// GetLastChannel returns the last channel from the state. +func (sm *Manager) GetLastChannel() string { + sm.mu.RLock() + defer sm.mu.RUnlock() + return sm.state.LastChannel +} + +// GetLastChatID returns the last chat ID from the state. +func (sm *Manager) GetLastChatID() string { + sm.mu.RLock() + defer sm.mu.RUnlock() + return sm.state.LastChatID +} + +// GetTimestamp returns the timestamp of the last state update. +func (sm *Manager) GetTimestamp() time.Time { + sm.mu.RLock() + defer sm.mu.RUnlock() + return sm.state.Timestamp +} + +// saveAtomic performs an atomic save using temp file + rename. +// This ensures that the state file is never corrupted: +// 1. Write to a temp file +// 2. Rename temp file to target (atomic on POSIX systems) +// 3. If rename fails, cleanup the temp file +// +// Must be called with the lock held. +func (sm *Manager) saveAtomic() error { + // Create temp file in the same directory as the target + tempFile := sm.stateFile + ".tmp" + + // Marshal state to JSON + data, err := json.MarshalIndent(sm.state, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal state: %w", err) + } + + // Write to temp file + if err := os.WriteFile(tempFile, data, 0644); err != nil { + return fmt.Errorf("failed to write temp file: %w", err) + } + + // Atomic rename from temp to target + if err := os.Rename(tempFile, sm.stateFile); err != nil { + // Cleanup temp file if rename fails + os.Remove(tempFile) + return fmt.Errorf("failed to rename temp file: %w", err) + } + + return nil +} + +// load loads the state from disk. +func (sm *Manager) load() error { + data, err := os.ReadFile(sm.stateFile) + if err != nil { + // File doesn't exist yet, that's OK + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("failed to read state file: %w", err) + } + + if err := json.Unmarshal(data, sm.state); err != nil { + return fmt.Errorf("failed to unmarshal state: %w", err) + } + + return nil +} diff --git a/pkg/state/state_test.go b/pkg/state/state_test.go new file mode 100644 index 0000000..4ee049f --- /dev/null +++ b/pkg/state/state_test.go @@ -0,0 +1,216 @@ +package state + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" +) + +func TestAtomicSave(t *testing.T) { + // Create temp workspace + tmpDir, err := os.MkdirTemp("", "state-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + sm := NewManager(tmpDir) + + // Test SetLastChannel + err = sm.SetLastChannel(tmpDir, "test-channel") + if err != nil { + t.Fatalf("SetLastChannel failed: %v", err) + } + + // Verify the channel was saved + lastChannel := sm.GetLastChannel() + if lastChannel != "test-channel" { + t.Errorf("Expected channel 'test-channel', got '%s'", lastChannel) + } + + // Verify timestamp was updated + if sm.GetTimestamp().IsZero() { + t.Error("Expected timestamp to be updated") + } + + // Verify state file exists + stateFile := filepath.Join(tmpDir, "state", "state.json") + if _, err := os.Stat(stateFile); os.IsNotExist(err) { + t.Error("Expected state file to exist") + } + + // Create a new manager to verify persistence + sm2 := NewManager(tmpDir) + if sm2.GetLastChannel() != "test-channel" { + t.Errorf("Expected persistent channel 'test-channel', got '%s'", sm2.GetLastChannel()) + } +} + +func TestSetLastChatID(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "state-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + sm := NewManager(tmpDir) + + // Test SetLastChatID + err = sm.SetLastChatID(tmpDir, "test-chat-id") + if err != nil { + t.Fatalf("SetLastChatID failed: %v", err) + } + + // Verify the chat ID was saved + lastChatID := sm.GetLastChatID() + if lastChatID != "test-chat-id" { + t.Errorf("Expected chat ID 'test-chat-id', got '%s'", lastChatID) + } + + // Verify timestamp was updated + if sm.GetTimestamp().IsZero() { + t.Error("Expected timestamp to be updated") + } + + // Create a new manager to verify persistence + sm2 := NewManager(tmpDir) + if sm2.GetLastChatID() != "test-chat-id" { + t.Errorf("Expected persistent chat ID 'test-chat-id', got '%s'", sm2.GetLastChatID()) + } +} + +func TestAtomicity_NoCorruptionOnInterrupt(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "state-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + sm := NewManager(tmpDir) + + // Write initial state + err = sm.SetLastChannel(tmpDir, "initial-channel") + if err != nil { + t.Fatalf("SetLastChannel failed: %v", err) + } + + // Simulate a crash scenario by manually creating a corrupted temp file + tempFile := filepath.Join(tmpDir, "state", "state.json.tmp") + err = os.WriteFile(tempFile, []byte("corrupted data"), 0644) + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + + // Verify that the original state is still intact + lastChannel := sm.GetLastChannel() + if lastChannel != "initial-channel" { + t.Errorf("Expected channel 'initial-channel' after corrupted temp file, got '%s'", lastChannel) + } + + // Clean up the temp file manually + os.Remove(tempFile) + + // Now do a proper save + err = sm.SetLastChannel(tmpDir, "new-channel") + if err != nil { + t.Fatalf("SetLastChannel failed: %v", err) + } + + // Verify the new state was saved + if sm.GetLastChannel() != "new-channel" { + t.Errorf("Expected channel 'new-channel', got '%s'", sm.GetLastChannel()) + } +} + +func TestConcurrentAccess(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "state-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + sm := NewManager(tmpDir) + + // Test concurrent writes + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func(idx int) { + channel := fmt.Sprintf("channel-%d", idx) + sm.SetLastChannel(tmpDir, channel) + done <- true + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < 10; i++ { + <-done + } + + // Verify the final state is consistent + lastChannel := sm.GetLastChannel() + if lastChannel == "" { + t.Error("Expected non-empty channel after concurrent writes") + } + + // Verify state file is valid JSON + stateFile := filepath.Join(tmpDir, "state", "state.json") + data, err := os.ReadFile(stateFile) + if err != nil { + t.Fatalf("Failed to read state file: %v", err) + } + + var state State + if err := json.Unmarshal(data, &state); err != nil { + t.Errorf("State file contains invalid JSON: %v", err) + } +} + +func TestNewManager_ExistingState(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "state-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + // Create initial state + sm1 := NewManager(tmpDir) + sm1.SetLastChannel(tmpDir, "existing-channel") + sm1.SetLastChatID(tmpDir, "existing-chat-id") + + // Create new manager with same workspace + sm2 := NewManager(tmpDir) + + // Verify state was loaded + if sm2.GetLastChannel() != "existing-channel" { + t.Errorf("Expected channel 'existing-channel', got '%s'", sm2.GetLastChannel()) + } + + if sm2.GetLastChatID() != "existing-chat-id" { + t.Errorf("Expected chat ID 'existing-chat-id', got '%s'", sm2.GetLastChatID()) + } +} + +func TestNewManager_EmptyWorkspace(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "state-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + sm := NewManager(tmpDir) + + // Verify default state + if sm.GetLastChannel() != "" { + t.Errorf("Expected empty channel, got '%s'", sm.GetLastChannel()) + } + + if sm.GetLastChatID() != "" { + t.Errorf("Expected empty chat ID, got '%s'", sm.GetLastChatID()) + } + + if !sm.GetTimestamp().IsZero() { + t.Error("Expected zero timestamp for new state") + } +}