From 4bc9e2d768a2863669b89e0fdb8b1526860dab6a Mon Sep 17 00:00:00 2001 From: yinwm Date: Wed, 11 Feb 2026 18:43:21 +0800 Subject: [PATCH] fix(cron): add one-time reminders and fix data paths to workspace - Add at_seconds parameter for one-time reminders (e.g., "remind me in 10 minutes") - Update every_seconds description to emphasize recurring-only usage - Route cron delivery: deliver=true sends directly, deliver=false uses agent - Fix cron data path from ~/.picoclaw/cron to workspace/cron - Fix sessions path from workspace/../sessions to workspace/sessions Co-Authored-By: Claude Opus 4.6 --- .gitignore | 16 +++++++++-- cmd/picoclaw/main.go | 45 +++++++++++++++++++----------- pkg/agent/context.go | 10 +++++-- pkg/agent/loop.go | 34 +++++++++++++++++++---- pkg/channels/base.go | 5 ++-- pkg/session/manager.go | 14 +++++++--- pkg/tools/cron.go | 63 ++++++++++++++++++++++++++++++++---------- 7 files changed, 141 insertions(+), 46 deletions(-) diff --git a/.gitignore b/.gitignore index dacb665..6ad4d78 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +# Binaries bin/ *.exe *.dll @@ -5,12 +6,21 @@ bin/ *.dylib *.test *.out +/picoclaw +/picoclaw-test + +# Picoclaw specific .picoclaw/ config.json sessions/ +build/ + +# Coverage coverage.txt coverage.html -.DS_Store -build -picoclaw +# OS +.DS_Store + +# Ralph workspace +ralph/ diff --git a/cmd/picoclaw/main.go b/cmd/picoclaw/main.go index 60dd1b9..0b13bc7 100644 --- a/cmd/picoclaw/main.go +++ b/cmd/picoclaw/main.go @@ -551,20 +551,8 @@ func gatewayCmd() { "skills_available": skillsInfo["available"], }) - cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") - - // Create cron service first (onJob handler set after CronTool creation) - cronService := cron.NewCronService(cronStorePath, nil) - - // Create and register CronTool - cronTool := tools.NewCronTool(cronService, agentLoop) - agentLoop.RegisterTool(cronTool) - - // Now set the onJob handler for cron service - cronService.SetOnJob(func(job *cron.CronJob) (string, error) { - result := cronTool.ExecuteJob(context.Background(), job) - return result, nil - }) + // Setup cron tool and service + cronService, _ := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath()) heartbeatService := heartbeat.NewHeartbeatService( cfg.WorkspacePath(), @@ -702,6 +690,25 @@ func getConfigPath() string { return filepath.Join(home, ".picoclaw", "config.json") } +func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string) (*cron.CronService, *tools.CronTool) { + cronStorePath := filepath.Join(workspace, "cron", "jobs.json") + + // Create cron service + cronService := cron.NewCronService(cronStorePath, nil) + + // Create and register CronTool + cronTool := tools.NewCronTool(cronService, agentLoop, msgBus) + agentLoop.RegisterTool(cronTool) + + // Set the onJob handler + cronService.SetOnJob(func(job *cron.CronJob) (string, error) { + result := cronTool.ExecuteJob(context.Background(), job) + return result, nil + }) + + return cronService, cronTool +} + func loadConfig() (*config.Config, error) { return config.LoadConfig(getConfigPath()) } @@ -714,8 +721,14 @@ func cronCmd() { subcommand := os.Args[2] - dataDir := filepath.Join(filepath.Dir(getConfigPath()), "cron") - cronStorePath := filepath.Join(dataDir, "jobs.json") + // Load config to get workspace path + cfg, err := loadConfig() + if err != nil { + fmt.Printf("Error loading config: %v\n", err) + return + } + + cronStorePath := filepath.Join(cfg.WorkspacePath(), "cron", "jobs.json") switch subcommand { case "list": diff --git a/pkg/agent/context.go b/pkg/agent/context.go index 7e8612e..d1b3397 100644 --- a/pkg/agent/context.go +++ b/pkg/agent/context.go @@ -69,8 +69,13 @@ Your workspace is at: %s %s -Always be helpful, accurate, and concise. When using tools, explain what you're doing. -When remembering something, write to %s/memory/MEMORY.md`, +## Important Rules + +1. **ALWAYS use tools** - When you need to perform an action (schedule reminders, send messages, execute commands, etc.), you MUST call the appropriate tool. Do NOT just say you'll do it or pretend to do it. + +2. **Be helpful and accurate** - When using tools, briefly explain what you're doing. + +3. **Memory** - When remembering something, write to %s/memory/MEMORY.md`, now, runtime, workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath) } @@ -86,6 +91,7 @@ func (cb *ContextBuilder) buildToolsSection() string { var sb strings.Builder sb.WriteString("## Available Tools\n\n") + sb.WriteString("**CRITICAL**: You MUST use tools to perform actions. Do NOT pretend to execute commands or schedule tasks.\n\n") sb.WriteString("You have access to the following tools:\n\n") for _, s := range summaries { sb.WriteString(s) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 3ab9b7a..439428a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -69,7 +69,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers editFileTool := tools.NewEditFileTool(workspace) toolsRegistry.Register(editFileTool) - sessionsManager := session.NewSessionManager(filepath.Join(filepath.Dir(cfg.WorkspacePath()), "sessions")) + sessionsManager := session.NewSessionManager(filepath.Join(workspace, "sessions")) return &AgentLoop{ bus: msgBus, @@ -179,6 +179,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) msg.ChatID, ) + // Save user message to session + al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) + iteration := 0 var finalContent string @@ -277,6 +280,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) } messages = append(messages, assistantMsg) + // Save assistant message with tool calls to session + al.sessions.AddFullMessage(msg.SessionKey, assistantMsg) + for _, tc := range response.ToolCalls { // Log tool call with arguments preview argsJSON, _ := json.Marshal(tc.Arguments) @@ -287,7 +293,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) "iteration": iteration, }) - result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments) + result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID) if err != nil { result = fmt.Sprintf("Error: %v", err) } @@ -298,6 +304,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) ToolCallID: tc.ID, } messages = append(messages, toolResultMsg) + + // Save tool result message to session + al.sessions.AddFullMessage(msg.SessionKey, toolResultMsg) } } @@ -305,7 +314,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) finalContent = "I've completed processing but have no response to give." } - al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) + // Save final assistant message to session al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent) al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) @@ -370,6 +379,9 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe originChatID, ) + // Save user message to session with system message marker + al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content)) + iteration := 0 var finalContent string @@ -446,6 +458,9 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe } messages = append(messages, assistantMsg) + // Save assistant message with tool calls to session + al.sessions.AddFullMessage(sessionKey, assistantMsg) + for _, tc := range response.ToolCalls { result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID) if err != nil { @@ -458,6 +473,9 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe ToolCallID: tc.ID, } messages = append(messages, toolResultMsg) + + // Save tool result message to session + al.sessions.AddFullMessage(sessionKey, toolResultMsg) } } @@ -465,8 +483,7 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe finalContent = "Background task completed." } - // Save to session with system message marker - al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content)) + // Save final assistant message to session al.sessions.AddMessage(sessionKey, "assistant", finalContent) al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) @@ -476,6 +493,13 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe "final_length": len(finalContent), }) + // Send response back to the original channel + al.bus.PublishOutbound(bus.OutboundMessage{ + Channel: originChannel, + ChatID: originChatID, + Content: finalContent, + }) + return finalContent, nil } diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 5361191..8abe7b5 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -61,7 +61,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st return } - // 生成 SessionKey: channel:chatID + // Build session key: channel:chatID sessionKey := fmt.Sprintf("%s:%s", c.name, chatID) msg := bus.InboundMessage{ @@ -70,8 +70,9 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st ChatID: chatID, Content: content, Media: media, - Metadata: metadata, SessionKey: sessionKey, + Metadata: metadata, + } } c.bus.PublishInbound(msg) diff --git a/pkg/session/manager.go b/pkg/session/manager.go index df86724..b4b8257 100644 --- a/pkg/session/manager.go +++ b/pkg/session/manager.go @@ -59,6 +59,15 @@ func (sm *SessionManager) GetOrCreate(key string) *Session { } func (sm *SessionManager) AddMessage(sessionKey, role, content string) { + sm.AddFullMessage(sessionKey, providers.Message{ + Role: role, + Content: content, + }) +} + +// AddFullMessage adds a complete message with tool calls and tool call ID to the session. +// This is used to save the full conversation flow including tool calls and tool results. +func (sm *SessionManager) AddFullMessage(sessionKey string, msg providers.Message) { sm.mu.Lock() defer sm.mu.Unlock() @@ -72,10 +81,7 @@ func (sm *SessionManager) AddMessage(sessionKey, role, content string) { sm.sessions[sessionKey] = session } - session.Messages = append(session.Messages, providers.Message{ - Role: role, - Content: content, - }) + session.Messages = append(session.Messages, msg) session.Updated = time.Now() } diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go index 65c97ce..87aaf35 100644 --- a/pkg/tools/cron.go +++ b/pkg/tools/cron.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "sync" + "time" + "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/cron" ) @@ -24,16 +26,18 @@ type JobExecutor interface { type CronTool struct { cronService *cron.CronService executor JobExecutor + msgBus *bus.MessageBus channel string chatID string mu sync.RWMutex } // NewCronTool creates a new CronTool -func NewCronTool(cronService *cron.CronService, executor JobExecutor) *CronTool { +func NewCronTool(cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus) *CronTool { return &CronTool{ cronService: cronService, executor: executor, + msgBus: msgBus, } } @@ -44,7 +48,7 @@ func (t *CronTool) Name() string { // Description returns the tool description func (t *CronTool) Description() string { - return "Schedule reminders and recurring tasks. Actions: add, list, remove, enable, disable." + return "Schedule reminders and tasks. IMPORTANT: When user asks to be reminded or scheduled, you MUST call this tool. Use 'at_seconds' for one-time reminders (e.g., 'remind me in 10 minutes' → at_seconds=600). Use 'every_seconds' ONLY for recurring tasks (e.g., 'every 2 hours' → every_seconds=7200). Use 'cron_expr' for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am)." } // Parameters returns the tool parameters schema @@ -55,24 +59,32 @@ func (t *CronTool) Parameters() map[string]interface{} { "action": map[string]interface{}{ "type": "string", "enum": []string{"add", "list", "remove", "enable", "disable"}, - "description": "Action to perform", + "description": "Action to perform. Use 'add' when user wants to schedule a reminder or task.", }, "message": map[string]interface{}{ "type": "string", - "description": "Reminder message (for add)", + "description": "The reminder/task message to display when triggered (required for add)", + }, + "at_seconds": map[string]interface{}{ + "type": "integer", + "description": "One-time reminder: seconds from now when to trigger (e.g., 600 for 10 minutes later). Use this for one-time reminders like 'remind me in 10 minutes'.", }, "every_seconds": map[string]interface{}{ "type": "integer", - "description": "Interval in seconds for recurring tasks", + "description": "Recurring interval in seconds (e.g., 3600 for every hour). Use this ONLY for recurring tasks like 'every 2 hours' or 'daily reminder'.", }, "cron_expr": map[string]interface{}{ "type": "string", - "description": "Cron expression like '0 9 * * *' for scheduled tasks", + "description": "Cron expression for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am). Use this for complex recurring schedules.", }, "job_id": map[string]interface{}{ "type": "string", "description": "Job ID (for remove/enable/disable)", }, + "deliver": map[string]interface{}{ + "type": "boolean", + "description": "If true, send message directly to channel. If false, let agent process the message (for complex tasks). Default: true", + }, }, "required": []string{"action"}, } @@ -126,32 +138,44 @@ func (t *CronTool) addJob(args map[string]interface{}) (string, error) { var schedule cron.CronSchedule - // Check for every_seconds + // Check for at_seconds (one-time), every_seconds (recurring), or cron_expr + atSeconds, hasAt := args["at_seconds"].(float64) everySeconds, hasEvery := args["every_seconds"].(float64) cronExpr, hasCron := args["cron_expr"].(string) - if !hasEvery && !hasCron { - return "Error: either every_seconds or cron_expr is required", nil - } - - if hasEvery { + // Priority: at_seconds > every_seconds > cron_expr + if hasAt { + atMS := time.Now().UnixMilli() + int64(atSeconds)*1000 + schedule = cron.CronSchedule{ + Kind: "at", + AtMS: &atMS, + } + } else if hasEvery { everyMS := int64(everySeconds) * 1000 schedule = cron.CronSchedule{ Kind: "every", EveryMS: &everyMS, } - } else { + } else if hasCron { schedule = cron.CronSchedule{ Kind: "cron", Expr: cronExpr, } + } else { + return "Error: one of at_seconds, every_seconds, or cron_expr is required", nil + } + + // Read deliver parameter, default to true + deliver := true + if d, ok := args["deliver"].(bool); ok { + deliver = d } job, err := t.cronService.AddJob( truncateString(message, 30), schedule, message, - true, // deliver + deliver, channel, chatID, ) @@ -231,6 +255,17 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { chatID = "direct" } + // If deliver=true, send message directly without agent processing + if job.Payload.Deliver { + t.msgBus.PublishOutbound(bus.OutboundMessage{ + Channel: channel, + ChatID: chatID, + Content: job.Payload.Message, + }) + return "ok" + } + + // For deliver=false, process through agent (for complex tasks) sessionKey := fmt.Sprintf("cron-%s", job.ID) // Call agent with the job's message