fix(cron): add one-time reminders and fix data paths to workspace

- Add at_seconds parameter for one-time reminders (e.g., "remind me in 10 minutes")
- Update every_seconds description to emphasize recurring-only usage
- Route cron delivery: deliver=true sends directly, deliver=false uses agent
- Fix cron data path from ~/.picoclaw/cron to workspace/cron
- Fix sessions path from workspace/../sessions to workspace/sessions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
yinwm
2026-02-11 18:43:21 +08:00
parent 6d4d2bc61e
commit 4bc9e2d768
7 changed files with 141 additions and 46 deletions

16
.gitignore vendored
View File

@@ -1,3 +1,4 @@
# Binaries
bin/ bin/
*.exe *.exe
*.dll *.dll
@@ -5,12 +6,21 @@ bin/
*.dylib *.dylib
*.test *.test
*.out *.out
/picoclaw
/picoclaw-test
# Picoclaw specific
.picoclaw/ .picoclaw/
config.json config.json
sessions/ sessions/
build/
# Coverage
coverage.txt coverage.txt
coverage.html coverage.html
.DS_Store
build
picoclaw # OS
.DS_Store
# Ralph workspace
ralph/

View File

@@ -551,20 +551,8 @@ func gatewayCmd() {
"skills_available": skillsInfo["available"], "skills_available": skillsInfo["available"],
}) })
cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") // Setup cron tool and service
cronService, _ := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath())
// Create cron service first (onJob handler set after CronTool creation)
cronService := cron.NewCronService(cronStorePath, nil)
// Create and register CronTool
cronTool := tools.NewCronTool(cronService, agentLoop)
agentLoop.RegisterTool(cronTool)
// Now set the onJob handler for cron service
cronService.SetOnJob(func(job *cron.CronJob) (string, error) {
result := cronTool.ExecuteJob(context.Background(), job)
return result, nil
})
heartbeatService := heartbeat.NewHeartbeatService( heartbeatService := heartbeat.NewHeartbeatService(
cfg.WorkspacePath(), cfg.WorkspacePath(),
@@ -702,6 +690,25 @@ func getConfigPath() string {
return filepath.Join(home, ".picoclaw", "config.json") return filepath.Join(home, ".picoclaw", "config.json")
} }
func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string) (*cron.CronService, *tools.CronTool) {
cronStorePath := filepath.Join(workspace, "cron", "jobs.json")
// Create cron service
cronService := cron.NewCronService(cronStorePath, nil)
// Create and register CronTool
cronTool := tools.NewCronTool(cronService, agentLoop, msgBus)
agentLoop.RegisterTool(cronTool)
// Set the onJob handler
cronService.SetOnJob(func(job *cron.CronJob) (string, error) {
result := cronTool.ExecuteJob(context.Background(), job)
return result, nil
})
return cronService, cronTool
}
func loadConfig() (*config.Config, error) { func loadConfig() (*config.Config, error) {
return config.LoadConfig(getConfigPath()) return config.LoadConfig(getConfigPath())
} }
@@ -714,8 +721,14 @@ func cronCmd() {
subcommand := os.Args[2] subcommand := os.Args[2]
dataDir := filepath.Join(filepath.Dir(getConfigPath()), "cron") // Load config to get workspace path
cronStorePath := filepath.Join(dataDir, "jobs.json") cfg, err := loadConfig()
if err != nil {
fmt.Printf("Error loading config: %v\n", err)
return
}
cronStorePath := filepath.Join(cfg.WorkspacePath(), "cron", "jobs.json")
switch subcommand { switch subcommand {
case "list": case "list":

View File

@@ -69,8 +69,13 @@ Your workspace is at: %s
%s %s
Always be helpful, accurate, and concise. When using tools, explain what you're doing. ## Important Rules
When remembering something, write to %s/memory/MEMORY.md`,
1. **ALWAYS use tools** - When you need to perform an action (schedule reminders, send messages, execute commands, etc.), you MUST call the appropriate tool. Do NOT just say you'll do it or pretend to do it.
2. **Be helpful and accurate** - When using tools, briefly explain what you're doing.
3. **Memory** - When remembering something, write to %s/memory/MEMORY.md`,
now, runtime, workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath) now, runtime, workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath)
} }
@@ -86,6 +91,7 @@ func (cb *ContextBuilder) buildToolsSection() string {
var sb strings.Builder var sb strings.Builder
sb.WriteString("## Available Tools\n\n") sb.WriteString("## Available Tools\n\n")
sb.WriteString("**CRITICAL**: You MUST use tools to perform actions. Do NOT pretend to execute commands or schedule tasks.\n\n")
sb.WriteString("You have access to the following tools:\n\n") sb.WriteString("You have access to the following tools:\n\n")
for _, s := range summaries { for _, s := range summaries {
sb.WriteString(s) sb.WriteString(s)

View File

@@ -69,7 +69,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
editFileTool := tools.NewEditFileTool(workspace) editFileTool := tools.NewEditFileTool(workspace)
toolsRegistry.Register(editFileTool) toolsRegistry.Register(editFileTool)
sessionsManager := session.NewSessionManager(filepath.Join(filepath.Dir(cfg.WorkspacePath()), "sessions")) sessionsManager := session.NewSessionManager(filepath.Join(workspace, "sessions"))
return &AgentLoop{ return &AgentLoop{
bus: msgBus, bus: msgBus,
@@ -179,6 +179,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
msg.ChatID, msg.ChatID,
) )
// Save user message to session
al.sessions.AddMessage(msg.SessionKey, "user", msg.Content)
iteration := 0 iteration := 0
var finalContent string var finalContent string
@@ -277,6 +280,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
} }
messages = append(messages, assistantMsg) messages = append(messages, assistantMsg)
// Save assistant message with tool calls to session
al.sessions.AddFullMessage(msg.SessionKey, assistantMsg)
for _, tc := range response.ToolCalls { for _, tc := range response.ToolCalls {
// Log tool call with arguments preview // Log tool call with arguments preview
argsJSON, _ := json.Marshal(tc.Arguments) argsJSON, _ := json.Marshal(tc.Arguments)
@@ -287,7 +293,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
"iteration": iteration, "iteration": iteration,
}) })
result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments) result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID)
if err != nil { if err != nil {
result = fmt.Sprintf("Error: %v", err) result = fmt.Sprintf("Error: %v", err)
} }
@@ -298,6 +304,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
ToolCallID: tc.ID, ToolCallID: tc.ID,
} }
messages = append(messages, toolResultMsg) messages = append(messages, toolResultMsg)
// Save tool result message to session
al.sessions.AddFullMessage(msg.SessionKey, toolResultMsg)
} }
} }
@@ -305,7 +314,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
finalContent = "I've completed processing but have no response to give." finalContent = "I've completed processing but have no response to give."
} }
al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) // Save final assistant message to session
al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent) al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent)
al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey))
@@ -370,6 +379,9 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
originChatID, originChatID,
) )
// Save user message to session with system message marker
al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content))
iteration := 0 iteration := 0
var finalContent string var finalContent string
@@ -446,6 +458,9 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
} }
messages = append(messages, assistantMsg) messages = append(messages, assistantMsg)
// Save assistant message with tool calls to session
al.sessions.AddFullMessage(sessionKey, assistantMsg)
for _, tc := range response.ToolCalls { for _, tc := range response.ToolCalls {
result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID) result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID)
if err != nil { if err != nil {
@@ -458,6 +473,9 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
ToolCallID: tc.ID, ToolCallID: tc.ID,
} }
messages = append(messages, toolResultMsg) messages = append(messages, toolResultMsg)
// Save tool result message to session
al.sessions.AddFullMessage(sessionKey, toolResultMsg)
} }
} }
@@ -465,8 +483,7 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
finalContent = "Background task completed." finalContent = "Background task completed."
} }
// Save to session with system message marker // Save final assistant message to session
al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content))
al.sessions.AddMessage(sessionKey, "assistant", finalContent) al.sessions.AddMessage(sessionKey, "assistant", finalContent)
al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) al.sessions.Save(al.sessions.GetOrCreate(sessionKey))
@@ -476,6 +493,13 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
"final_length": len(finalContent), "final_length": len(finalContent),
}) })
// Send response back to the original channel
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: originChannel,
ChatID: originChatID,
Content: finalContent,
})
return finalContent, nil return finalContent, nil
} }

View File

@@ -61,7 +61,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
return return
} }
// 生成 SessionKey: channel:chatID // Build session key: channel:chatID
sessionKey := fmt.Sprintf("%s:%s", c.name, chatID) sessionKey := fmt.Sprintf("%s:%s", c.name, chatID)
msg := bus.InboundMessage{ msg := bus.InboundMessage{
@@ -70,8 +70,9 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
ChatID: chatID, ChatID: chatID,
Content: content, Content: content,
Media: media, Media: media,
Metadata: metadata,
SessionKey: sessionKey, SessionKey: sessionKey,
Metadata: metadata,
}
} }
c.bus.PublishInbound(msg) c.bus.PublishInbound(msg)

View File

@@ -59,6 +59,15 @@ func (sm *SessionManager) GetOrCreate(key string) *Session {
} }
func (sm *SessionManager) AddMessage(sessionKey, role, content string) { func (sm *SessionManager) AddMessage(sessionKey, role, content string) {
sm.AddFullMessage(sessionKey, providers.Message{
Role: role,
Content: content,
})
}
// AddFullMessage adds a complete message with tool calls and tool call ID to the session.
// This is used to save the full conversation flow including tool calls and tool results.
func (sm *SessionManager) AddFullMessage(sessionKey string, msg providers.Message) {
sm.mu.Lock() sm.mu.Lock()
defer sm.mu.Unlock() defer sm.mu.Unlock()
@@ -72,10 +81,7 @@ func (sm *SessionManager) AddMessage(sessionKey, role, content string) {
sm.sessions[sessionKey] = session sm.sessions[sessionKey] = session
} }
session.Messages = append(session.Messages, providers.Message{ session.Messages = append(session.Messages, msg)
Role: role,
Content: content,
})
session.Updated = time.Now() session.Updated = time.Now()
} }

View File

@@ -4,7 +4,9 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/cron" "github.com/sipeed/picoclaw/pkg/cron"
) )
@@ -24,16 +26,18 @@ type JobExecutor interface {
type CronTool struct { type CronTool struct {
cronService *cron.CronService cronService *cron.CronService
executor JobExecutor executor JobExecutor
msgBus *bus.MessageBus
channel string channel string
chatID string chatID string
mu sync.RWMutex mu sync.RWMutex
} }
// NewCronTool creates a new CronTool // NewCronTool creates a new CronTool
func NewCronTool(cronService *cron.CronService, executor JobExecutor) *CronTool { func NewCronTool(cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus) *CronTool {
return &CronTool{ return &CronTool{
cronService: cronService, cronService: cronService,
executor: executor, executor: executor,
msgBus: msgBus,
} }
} }
@@ -44,7 +48,7 @@ func (t *CronTool) Name() string {
// Description returns the tool description // Description returns the tool description
func (t *CronTool) Description() string { func (t *CronTool) Description() string {
return "Schedule reminders and recurring tasks. Actions: add, list, remove, enable, disable." return "Schedule reminders and tasks. IMPORTANT: When user asks to be reminded or scheduled, you MUST call this tool. Use 'at_seconds' for one-time reminders (e.g., 'remind me in 10 minutes' → at_seconds=600). Use 'every_seconds' ONLY for recurring tasks (e.g., 'every 2 hours' → every_seconds=7200). Use 'cron_expr' for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am)."
} }
// Parameters returns the tool parameters schema // Parameters returns the tool parameters schema
@@ -55,24 +59,32 @@ func (t *CronTool) Parameters() map[string]interface{} {
"action": map[string]interface{}{ "action": map[string]interface{}{
"type": "string", "type": "string",
"enum": []string{"add", "list", "remove", "enable", "disable"}, "enum": []string{"add", "list", "remove", "enable", "disable"},
"description": "Action to perform", "description": "Action to perform. Use 'add' when user wants to schedule a reminder or task.",
}, },
"message": map[string]interface{}{ "message": map[string]interface{}{
"type": "string", "type": "string",
"description": "Reminder message (for add)", "description": "The reminder/task message to display when triggered (required for add)",
},
"at_seconds": map[string]interface{}{
"type": "integer",
"description": "One-time reminder: seconds from now when to trigger (e.g., 600 for 10 minutes later). Use this for one-time reminders like 'remind me in 10 minutes'.",
}, },
"every_seconds": map[string]interface{}{ "every_seconds": map[string]interface{}{
"type": "integer", "type": "integer",
"description": "Interval in seconds for recurring tasks", "description": "Recurring interval in seconds (e.g., 3600 for every hour). Use this ONLY for recurring tasks like 'every 2 hours' or 'daily reminder'.",
}, },
"cron_expr": map[string]interface{}{ "cron_expr": map[string]interface{}{
"type": "string", "type": "string",
"description": "Cron expression like '0 9 * * *' for scheduled tasks", "description": "Cron expression for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am). Use this for complex recurring schedules.",
}, },
"job_id": map[string]interface{}{ "job_id": map[string]interface{}{
"type": "string", "type": "string",
"description": "Job ID (for remove/enable/disable)", "description": "Job ID (for remove/enable/disable)",
}, },
"deliver": map[string]interface{}{
"type": "boolean",
"description": "If true, send message directly to channel. If false, let agent process the message (for complex tasks). Default: true",
},
}, },
"required": []string{"action"}, "required": []string{"action"},
} }
@@ -126,32 +138,44 @@ func (t *CronTool) addJob(args map[string]interface{}) (string, error) {
var schedule cron.CronSchedule var schedule cron.CronSchedule
// Check for every_seconds // Check for at_seconds (one-time), every_seconds (recurring), or cron_expr
atSeconds, hasAt := args["at_seconds"].(float64)
everySeconds, hasEvery := args["every_seconds"].(float64) everySeconds, hasEvery := args["every_seconds"].(float64)
cronExpr, hasCron := args["cron_expr"].(string) cronExpr, hasCron := args["cron_expr"].(string)
if !hasEvery && !hasCron { // Priority: at_seconds > every_seconds > cron_expr
return "Error: either every_seconds or cron_expr is required", nil if hasAt {
} atMS := time.Now().UnixMilli() + int64(atSeconds)*1000
schedule = cron.CronSchedule{
if hasEvery { Kind: "at",
AtMS: &atMS,
}
} else if hasEvery {
everyMS := int64(everySeconds) * 1000 everyMS := int64(everySeconds) * 1000
schedule = cron.CronSchedule{ schedule = cron.CronSchedule{
Kind: "every", Kind: "every",
EveryMS: &everyMS, EveryMS: &everyMS,
} }
} else { } else if hasCron {
schedule = cron.CronSchedule{ schedule = cron.CronSchedule{
Kind: "cron", Kind: "cron",
Expr: cronExpr, Expr: cronExpr,
} }
} else {
return "Error: one of at_seconds, every_seconds, or cron_expr is required", nil
}
// Read deliver parameter, default to true
deliver := true
if d, ok := args["deliver"].(bool); ok {
deliver = d
} }
job, err := t.cronService.AddJob( job, err := t.cronService.AddJob(
truncateString(message, 30), truncateString(message, 30),
schedule, schedule,
message, message,
true, // deliver deliver,
channel, channel,
chatID, chatID,
) )
@@ -231,6 +255,17 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
chatID = "direct" chatID = "direct"
} }
// If deliver=true, send message directly without agent processing
if job.Payload.Deliver {
t.msgBus.PublishOutbound(bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: job.Payload.Message,
})
return "ok"
}
// For deliver=false, process through agent (for complex tasks)
sessionKey := fmt.Sprintf("cron-%s", job.ID) sessionKey := fmt.Sprintf("cron-%s", job.ID)
// Call agent with the job's message // Call agent with the job's message