feat: US-009 - Add state save atomicity with SetLastChannel
- Create pkg/state package with State and Manager structs - Implement SetLastChannel with atomic save using temp file + rename - Implement SetLastChatID with same atomic save pattern - Add GetLastChannel, GetLastChatID, and GetTimestamp getters - Use sync.RWMutex for thread-safe concurrent access - Add comprehensive tests for atomic save, concurrent access, and persistence - Cleanup temp file if rename fails Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -137,7 +137,7 @@
|
|||||||
"go test ./pkg/state -run TestAtomicSave passes"
|
"go test ./pkg/state -run TestAtomicSave passes"
|
||||||
],
|
],
|
||||||
"priority": 9,
|
"priority": 9,
|
||||||
"passes": false,
|
"passes": true,
|
||||||
"notes": ""
|
"notes": ""
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
|
|||||||
|
|
||||||
## Progress
|
## Progress
|
||||||
|
|
||||||
### Completed (7/21)
|
### Completed (8/21)
|
||||||
|
|
||||||
- US-001: Add ToolResult struct and helper functions
|
- US-001: Add ToolResult struct and helper functions
|
||||||
- US-002: Modify Tool interface to return *ToolResult
|
- US-002: Modify Tool interface to return *ToolResult
|
||||||
@@ -15,6 +15,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
|
|||||||
- US-006: Add AsyncCallback type and AsyncTool interface
|
- US-006: Add AsyncCallback type and AsyncTool interface
|
||||||
- US-007: Heartbeat async task execution support
|
- US-007: Heartbeat async task execution support
|
||||||
- US-008: Inject callback into async tools in AgentLoop
|
- US-008: Inject callback into async tools in AgentLoop
|
||||||
|
- US-009: State save atomicity - SetLastChannel
|
||||||
|
|
||||||
### In Progress
|
### In Progress
|
||||||
|
|
||||||
@@ -30,7 +31,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
|
|||||||
| US-006 | Add AsyncCallback type and AsyncTool interface | Completed | |
|
| US-006 | Add AsyncCallback type and AsyncTool interface | Completed | |
|
||||||
| US-007 | Heartbeat async task execution support | Completed | |
|
| US-007 | Heartbeat async task execution support | Completed | |
|
||||||
| US-008 | Inject callback into async tools in AgentLoop | Completed | |
|
| US-008 | Inject callback into async tools in AgentLoop | Completed | |
|
||||||
| US-009 | State save atomicity - SetLastChannel | Pending | |
|
| US-009 | State save atomicity - SetLastChannel | Completed | |
|
||||||
| US-010 | Update RecordLastChannel to use atomic save | Pending | |
|
| US-010 | Update RecordLastChannel to use atomic save | Pending | |
|
||||||
| US-011 | Refactor MessageTool to use ToolResult | Completed | |
|
| US-011 | Refactor MessageTool to use ToolResult | Completed | |
|
||||||
| US-012 | Refactor ShellTool to use ToolResult | Completed | |
|
| US-012 | Refactor ShellTool to use ToolResult | Completed | |
|
||||||
@@ -149,4 +150,29 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
|
|||||||
- **Gotchas encountered:** 更新方法签名时需要同时更新所有调用点。我修改了 `ExecuteWithContext` 的签名,所以也更新了 `Execute` 方法的调用。
|
- **Gotchas encountered:** 更新方法签名时需要同时更新所有调用点。我修改了 `ExecuteWithContext` 的签名,所以也更新了 `Execute` 方法的调用。
|
||||||
- **Useful context:** 异步工具完成时会调用回调,回调将 `ForUser` 内容发送给用户。这允许长时间运行的操作(如子代理)在后台完成并通知用户,而不阻塞主循环。
|
- **Useful context:** 异步工具完成时会调用回调,回调将 `ForUser` 内容发送给用户。这允许长时间运行的操作(如子代理)在后台完成并通知用户,而不阻塞主循环。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## [2026-02-12] - US-009
|
||||||
|
- What was implemented:
|
||||||
|
- 创建新的 `pkg/state` 包,包含状态管理和原子保存功能
|
||||||
|
- 定义 `State` 结构体,包含 `LastChannel`、`LastChatID` 和 `Timestamp` 字段
|
||||||
|
- 定义 `Manager` 结构体,使用 `sync.RWMutex` 保护并发访问
|
||||||
|
- 实现 `NewManager(workspace string)` 构造函数,创建状态目录并加载现有状态
|
||||||
|
- 实现 `SetLastChannel(workspace, channel string)` 方法,使用临时文件 + 重命名模式实现原子保存
|
||||||
|
- 实现 `SetLastChatID(workspace, chatID string)` 方法
|
||||||
|
- 实现 `GetLastChannel()` 和 `GetLastChatID()` getter 方法
|
||||||
|
- 实现 `saveAtomic()` 内部方法,使用 `os.WriteFile` 写入临时文件,然后用 `os.Rename` 原子性地重命名
|
||||||
|
- 如果重命名失败,清理临时文件
|
||||||
|
- 实现 `load()` 方法,从磁盘加载状态
|
||||||
|
- 添加完整的测试:`TestAtomicSave`、`TestSetLastChatID`、`TestAtomicity_NoCorruptionOnInterrupt`、`TestConcurrentAccess`、`TestNewManager_ExistingState`、`TestNewManager_EmptyWorkspace`
|
||||||
|
|
||||||
|
- Files changed:
|
||||||
|
- `pkg/state/state.go` (新增)
|
||||||
|
- `pkg/state/state_test.go` (新增)
|
||||||
|
|
||||||
|
- **Learnings for future iterations:**
|
||||||
|
- **Patterns discovered:** 临时文件 + 重命名模式是实现原子写入的标准方法。在 POSIX 系统上,`os.Rename` 是原子操作。
|
||||||
|
- **Gotchas encountered:** 临时文件必须与目标文件在同一文件系统中,否则 `os.Rename` 会失败。
|
||||||
|
- **Useful context:** 这个模式将在 US-010 中用于 `RecordLastChannel`,确保状态更新的原子性。
|
||||||
|
|
||||||
---
|
---
|
||||||
160
pkg/state/state.go
Normal file
160
pkg/state/state.go
Normal file
@@ -0,0 +1,160 @@
|
|||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// State represents the persistent state for a workspace.
|
||||||
|
// It includes information about the last active channel/chat.
|
||||||
|
type State struct {
|
||||||
|
// LastChannel is the last channel used for communication
|
||||||
|
LastChannel string `json:"last_channel,omitempty"`
|
||||||
|
|
||||||
|
// LastChatID is the last chat ID used for communication
|
||||||
|
LastChatID string `json:"last_chat_id,omitempty"`
|
||||||
|
|
||||||
|
// Timestamp is the last time this state was updated
|
||||||
|
Timestamp time.Time `json:"timestamp"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manager manages persistent state with atomic saves.
|
||||||
|
type Manager struct {
|
||||||
|
workspace string
|
||||||
|
state *State
|
||||||
|
mu sync.RWMutex
|
||||||
|
stateFile string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewManager creates a new state manager for the given workspace.
|
||||||
|
func NewManager(workspace string) *Manager {
|
||||||
|
stateDir := filepath.Join(workspace, "state")
|
||||||
|
stateFile := filepath.Join(stateDir, "state.json")
|
||||||
|
|
||||||
|
// Create state directory if it doesn't exist
|
||||||
|
os.MkdirAll(stateDir, 0755)
|
||||||
|
|
||||||
|
sm := &Manager{
|
||||||
|
workspace: workspace,
|
||||||
|
stateFile: stateFile,
|
||||||
|
state: &State{},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load existing state if available
|
||||||
|
sm.load()
|
||||||
|
|
||||||
|
return sm
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLastChannel atomically updates the last channel and saves the state.
|
||||||
|
// This method uses a temp file + rename pattern for atomic writes,
|
||||||
|
// ensuring that the state file is never corrupted even if the process crashes.
|
||||||
|
//
|
||||||
|
// The workspace parameter is used to construct the state file path.
|
||||||
|
func (sm *Manager) SetLastChannel(workspace, channel string) error {
|
||||||
|
sm.mu.Lock()
|
||||||
|
defer sm.mu.Unlock()
|
||||||
|
|
||||||
|
// Update state
|
||||||
|
sm.state.LastChannel = channel
|
||||||
|
sm.state.Timestamp = time.Now()
|
||||||
|
|
||||||
|
// Atomic save using temp file + rename
|
||||||
|
if err := sm.saveAtomic(); err != nil {
|
||||||
|
return fmt.Errorf("failed to save state atomically: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLastChatID atomically updates the last chat ID and saves the state.
|
||||||
|
func (sm *Manager) SetLastChatID(workspace, chatID string) error {
|
||||||
|
sm.mu.Lock()
|
||||||
|
defer sm.mu.Unlock()
|
||||||
|
|
||||||
|
// Update state
|
||||||
|
sm.state.LastChatID = chatID
|
||||||
|
sm.state.Timestamp = time.Now()
|
||||||
|
|
||||||
|
// Atomic save using temp file + rename
|
||||||
|
if err := sm.saveAtomic(); err != nil {
|
||||||
|
return fmt.Errorf("failed to save state atomically: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLastChannel returns the last channel from the state.
|
||||||
|
func (sm *Manager) GetLastChannel() string {
|
||||||
|
sm.mu.RLock()
|
||||||
|
defer sm.mu.RUnlock()
|
||||||
|
return sm.state.LastChannel
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLastChatID returns the last chat ID from the state.
|
||||||
|
func (sm *Manager) GetLastChatID() string {
|
||||||
|
sm.mu.RLock()
|
||||||
|
defer sm.mu.RUnlock()
|
||||||
|
return sm.state.LastChatID
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTimestamp returns the timestamp of the last state update.
|
||||||
|
func (sm *Manager) GetTimestamp() time.Time {
|
||||||
|
sm.mu.RLock()
|
||||||
|
defer sm.mu.RUnlock()
|
||||||
|
return sm.state.Timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
// saveAtomic performs an atomic save using temp file + rename.
|
||||||
|
// This ensures that the state file is never corrupted:
|
||||||
|
// 1. Write to a temp file
|
||||||
|
// 2. Rename temp file to target (atomic on POSIX systems)
|
||||||
|
// 3. If rename fails, cleanup the temp file
|
||||||
|
//
|
||||||
|
// Must be called with the lock held.
|
||||||
|
func (sm *Manager) saveAtomic() error {
|
||||||
|
// Create temp file in the same directory as the target
|
||||||
|
tempFile := sm.stateFile + ".tmp"
|
||||||
|
|
||||||
|
// Marshal state to JSON
|
||||||
|
data, err := json.MarshalIndent(sm.state, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal state: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write to temp file
|
||||||
|
if err := os.WriteFile(tempFile, data, 0644); err != nil {
|
||||||
|
return fmt.Errorf("failed to write temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Atomic rename from temp to target
|
||||||
|
if err := os.Rename(tempFile, sm.stateFile); err != nil {
|
||||||
|
// Cleanup temp file if rename fails
|
||||||
|
os.Remove(tempFile)
|
||||||
|
return fmt.Errorf("failed to rename temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// load loads the state from disk.
|
||||||
|
func (sm *Manager) load() error {
|
||||||
|
data, err := os.ReadFile(sm.stateFile)
|
||||||
|
if err != nil {
|
||||||
|
// File doesn't exist yet, that's OK
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to read state file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(data, sm.state); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal state: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
216
pkg/state/state_test.go
Normal file
216
pkg/state/state_test.go
Normal file
@@ -0,0 +1,216 @@
|
|||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAtomicSave(t *testing.T) {
|
||||||
|
// Create temp workspace
|
||||||
|
tmpDir, err := os.MkdirTemp("", "state-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
sm := NewManager(tmpDir)
|
||||||
|
|
||||||
|
// Test SetLastChannel
|
||||||
|
err = sm.SetLastChannel(tmpDir, "test-channel")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SetLastChannel failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the channel was saved
|
||||||
|
lastChannel := sm.GetLastChannel()
|
||||||
|
if lastChannel != "test-channel" {
|
||||||
|
t.Errorf("Expected channel 'test-channel', got '%s'", lastChannel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify timestamp was updated
|
||||||
|
if sm.GetTimestamp().IsZero() {
|
||||||
|
t.Error("Expected timestamp to be updated")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify state file exists
|
||||||
|
stateFile := filepath.Join(tmpDir, "state", "state.json")
|
||||||
|
if _, err := os.Stat(stateFile); os.IsNotExist(err) {
|
||||||
|
t.Error("Expected state file to exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new manager to verify persistence
|
||||||
|
sm2 := NewManager(tmpDir)
|
||||||
|
if sm2.GetLastChannel() != "test-channel" {
|
||||||
|
t.Errorf("Expected persistent channel 'test-channel', got '%s'", sm2.GetLastChannel())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSetLastChatID(t *testing.T) {
|
||||||
|
tmpDir, err := os.MkdirTemp("", "state-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
sm := NewManager(tmpDir)
|
||||||
|
|
||||||
|
// Test SetLastChatID
|
||||||
|
err = sm.SetLastChatID(tmpDir, "test-chat-id")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SetLastChatID failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the chat ID was saved
|
||||||
|
lastChatID := sm.GetLastChatID()
|
||||||
|
if lastChatID != "test-chat-id" {
|
||||||
|
t.Errorf("Expected chat ID 'test-chat-id', got '%s'", lastChatID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify timestamp was updated
|
||||||
|
if sm.GetTimestamp().IsZero() {
|
||||||
|
t.Error("Expected timestamp to be updated")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new manager to verify persistence
|
||||||
|
sm2 := NewManager(tmpDir)
|
||||||
|
if sm2.GetLastChatID() != "test-chat-id" {
|
||||||
|
t.Errorf("Expected persistent chat ID 'test-chat-id', got '%s'", sm2.GetLastChatID())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAtomicity_NoCorruptionOnInterrupt(t *testing.T) {
|
||||||
|
tmpDir, err := os.MkdirTemp("", "state-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
sm := NewManager(tmpDir)
|
||||||
|
|
||||||
|
// Write initial state
|
||||||
|
err = sm.SetLastChannel(tmpDir, "initial-channel")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SetLastChannel failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simulate a crash scenario by manually creating a corrupted temp file
|
||||||
|
tempFile := filepath.Join(tmpDir, "state", "state.json.tmp")
|
||||||
|
err = os.WriteFile(tempFile, []byte("corrupted data"), 0644)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that the original state is still intact
|
||||||
|
lastChannel := sm.GetLastChannel()
|
||||||
|
if lastChannel != "initial-channel" {
|
||||||
|
t.Errorf("Expected channel 'initial-channel' after corrupted temp file, got '%s'", lastChannel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up the temp file manually
|
||||||
|
os.Remove(tempFile)
|
||||||
|
|
||||||
|
// Now do a proper save
|
||||||
|
err = sm.SetLastChannel(tmpDir, "new-channel")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SetLastChannel failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the new state was saved
|
||||||
|
if sm.GetLastChannel() != "new-channel" {
|
||||||
|
t.Errorf("Expected channel 'new-channel', got '%s'", sm.GetLastChannel())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcurrentAccess(t *testing.T) {
|
||||||
|
tmpDir, err := os.MkdirTemp("", "state-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
sm := NewManager(tmpDir)
|
||||||
|
|
||||||
|
// Test concurrent writes
|
||||||
|
done := make(chan bool, 10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
go func(idx int) {
|
||||||
|
channel := fmt.Sprintf("channel-%d", idx)
|
||||||
|
sm.SetLastChannel(tmpDir, channel)
|
||||||
|
done <- true
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all goroutines to complete
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the final state is consistent
|
||||||
|
lastChannel := sm.GetLastChannel()
|
||||||
|
if lastChannel == "" {
|
||||||
|
t.Error("Expected non-empty channel after concurrent writes")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify state file is valid JSON
|
||||||
|
stateFile := filepath.Join(tmpDir, "state", "state.json")
|
||||||
|
data, err := os.ReadFile(stateFile)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to read state file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var state State
|
||||||
|
if err := json.Unmarshal(data, &state); err != nil {
|
||||||
|
t.Errorf("State file contains invalid JSON: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewManager_ExistingState(t *testing.T) {
|
||||||
|
tmpDir, err := os.MkdirTemp("", "state-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
// Create initial state
|
||||||
|
sm1 := NewManager(tmpDir)
|
||||||
|
sm1.SetLastChannel(tmpDir, "existing-channel")
|
||||||
|
sm1.SetLastChatID(tmpDir, "existing-chat-id")
|
||||||
|
|
||||||
|
// Create new manager with same workspace
|
||||||
|
sm2 := NewManager(tmpDir)
|
||||||
|
|
||||||
|
// Verify state was loaded
|
||||||
|
if sm2.GetLastChannel() != "existing-channel" {
|
||||||
|
t.Errorf("Expected channel 'existing-channel', got '%s'", sm2.GetLastChannel())
|
||||||
|
}
|
||||||
|
|
||||||
|
if sm2.GetLastChatID() != "existing-chat-id" {
|
||||||
|
t.Errorf("Expected chat ID 'existing-chat-id', got '%s'", sm2.GetLastChatID())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewManager_EmptyWorkspace(t *testing.T) {
|
||||||
|
tmpDir, err := os.MkdirTemp("", "state-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
sm := NewManager(tmpDir)
|
||||||
|
|
||||||
|
// Verify default state
|
||||||
|
if sm.GetLastChannel() != "" {
|
||||||
|
t.Errorf("Expected empty channel, got '%s'", sm.GetLastChannel())
|
||||||
|
}
|
||||||
|
|
||||||
|
if sm.GetLastChatID() != "" {
|
||||||
|
t.Errorf("Expected empty chat ID, got '%s'", sm.GetLastChatID())
|
||||||
|
}
|
||||||
|
|
||||||
|
if !sm.GetTimestamp().IsZero() {
|
||||||
|
t.Error("Expected zero timestamp for new state")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user