From 6d4d2bc61e0c06e0e83dd5d91b1aac30dea9e932 Mon Sep 17 00:00:00 2001 From: yinwm Date: Wed, 11 Feb 2026 12:28:37 +0800 Subject: [PATCH 1/5] feat: add cron tool integration with agent - Add adhocore/gronx dependency for cron expression parsing - Fix CronService race conditions and add cron expression support - Add CronTool with add/list/remove/enable/disable actions - Add ContextualTool interface for tools needing channel/chatID context - Add ProcessDirectWithChannel to AgentLoop for cron job execution - Register CronTool in gateway and wire up onJob handler - Fix slice bounds panic in addJob for short messages Co-Authored-By: Claude Opus 4.6 --- cmd/picoclaw/main.go | 15 ++- go.mod | 1 + go.sum | 2 + pkg/agent/loop.go | 16 ++- pkg/cron/service.go | 136 +++++++++++++++++------ pkg/tools/base.go | 7 ++ pkg/tools/cron.go | 252 ++++++++++++++++++++++++++++++++++++++++++ pkg/tools/registry.go | 9 ++ 8 files changed, 401 insertions(+), 37 deletions(-) create mode 100644 pkg/tools/cron.go diff --git a/cmd/picoclaw/main.go b/cmd/picoclaw/main.go index 751cdda..60dd1b9 100644 --- a/cmd/picoclaw/main.go +++ b/cmd/picoclaw/main.go @@ -27,6 +27,7 @@ import ( "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/skills" + "github.com/sipeed/picoclaw/pkg/tools" "github.com/sipeed/picoclaw/pkg/voice" ) @@ -551,8 +552,20 @@ func gatewayCmd() { }) cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") + + // Create cron service first (onJob handler set after CronTool creation) cronService := cron.NewCronService(cronStorePath, nil) + // Create and register CronTool + cronTool := tools.NewCronTool(cronService, agentLoop) + agentLoop.RegisterTool(cronTool) + + // Now set the onJob handler for cron service + cronService.SetOnJob(func(job *cron.CronJob) (string, error) { + result := cronTool.ExecuteJob(context.Background(), job) + return result, nil + }) + heartbeatService := heartbeat.NewHeartbeatService( cfg.WorkspacePath(), nil, @@ -745,7 +758,7 @@ func cronHelp() { func cronListCmd(storePath string) { cs := cron.NewCronService(storePath, nil) - jobs := cs.ListJobs(false) + jobs := cs.ListJobs(true) // Show all jobs, including disabled if len(jobs) == 0 { fmt.Println("No scheduled jobs.") diff --git a/go.mod b/go.mod index 23cfa0e..832f1e8 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/sipeed/picoclaw go 1.24.0 require ( + github.com/adhocore/gronx v1.19.6 github.com/bwmarrin/discordgo v0.29.0 github.com/caarlos0/env/v11 v11.3.1 github.com/chzyer/readline v1.5.1 diff --git a/go.sum b/go.sum index 2f9d5be..f1ce926 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +github.com/adhocore/gronx v1.19.6 h1:5KNVcoR9ACgL9HhEqCm5QXsab/gI4QDIybTAWcXDKDc= +github.com/adhocore/gronx v1.19.6/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg= github.com/bwmarrin/discordgo v0.29.0 h1:FmWeXFaKUwrcL3Cx65c20bTRW+vOb6k8AnaP+EgjDno= github.com/bwmarrin/discordgo v0.29.0/go.mod h1:NJZpH+1AfhIcyQsPeuBKsUtYrRnjkyu0kIVMCHkZtRY= github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA= diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index d38848b..3ab9b7a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -119,11 +119,19 @@ func (al *AgentLoop) Stop() { al.running = false } +func (al *AgentLoop) RegisterTool(tool tools.Tool) { + al.tools.Register(tool) +} + func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey string) (string, error) { + return al.ProcessDirectWithChannel(ctx, content, sessionKey, "cli", "direct") +} + +func (al *AgentLoop) ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) { msg := bus.InboundMessage{ - Channel: "cli", - SenderID: "user", - ChatID: "direct", + Channel: channel, + SenderID: "cron", + ChatID: chatID, Content: content, SessionKey: sessionKey, } @@ -439,7 +447,7 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe messages = append(messages, assistantMsg) for _, tc := range response.ToolCalls { - result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments) + result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID) if err != nil { result = fmt.Sprintf("Error: %v", err) } diff --git a/pkg/cron/service.go b/pkg/cron/service.go index 54f9dcc..c85ab2b 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -1,12 +1,17 @@ package cron import ( + "crypto/rand" + "encoding/hex" "encoding/json" "fmt" + "log" "os" "path/filepath" "sync" "time" + + "github.com/adhocore/gronx" ) type CronSchedule struct { @@ -58,6 +63,7 @@ type CronService struct { mu sync.RWMutex running bool stopChan chan struct{} + gronx *gronx.Gronx } func NewCronService(storePath string, onJob JobHandler) *CronService { @@ -65,7 +71,9 @@ func NewCronService(storePath string, onJob JobHandler) *CronService { storePath: storePath, onJob: onJob, stopChan: make(chan struct{}), + gronx: gronx.New(), } + // Initialize and load store on creation cs.loadStore() return cs } @@ -83,7 +91,7 @@ func (cs *CronService) Start() error { } cs.recomputeNextRuns() - if err := cs.saveStore(); err != nil { + if err := cs.saveStoreUnsafe(); err != nil { return fmt.Errorf("failed to save store: %w", err) } @@ -120,30 +128,47 @@ func (cs *CronService) runLoop() { } func (cs *CronService) checkJobs() { - cs.mu.RLock() + cs.mu.Lock() + if !cs.running { - cs.mu.RUnlock() + cs.mu.Unlock() return } now := time.Now().UnixMilli() var dueJobs []*CronJob + // Collect jobs that are due (we need to copy them to execute outside lock) for i := range cs.store.Jobs { job := &cs.store.Jobs[i] if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now { - dueJobs = append(dueJobs, job) + // Create a shallow copy of the job for execution + jobCopy := *job + dueJobs = append(dueJobs, &jobCopy) } } - cs.mu.RUnlock() + // Update next run times for due jobs immediately (before executing) + for i := range cs.store.Jobs { + for _, dueJob := range dueJobs { + if cs.store.Jobs[i].ID == dueJob.ID { + // Reset NextRunAtMS temporarily so we don't re-execute + cs.store.Jobs[i].State.NextRunAtMS = nil + break + } + } + } + + if err := cs.saveStoreUnsafe(); err != nil { + log.Printf("[cron] failed to save store: %v", err) + } + + cs.mu.Unlock() + + // Execute jobs outside the lock for _, job := range dueJobs { cs.executeJob(job) } - - cs.mu.Lock() - defer cs.mu.Unlock() - cs.saveStore() } func (cs *CronService) executeJob(job *CronJob) { @@ -154,30 +179,42 @@ func (cs *CronService) executeJob(job *CronJob) { _, err = cs.onJob(job) } + // Now acquire lock to update state cs.mu.Lock() defer cs.mu.Unlock() - job.State.LastRunAtMS = &startTime - job.UpdatedAtMS = time.Now().UnixMilli() + // Find the job in store and update it + for i := range cs.store.Jobs { + if cs.store.Jobs[i].ID == job.ID { + cs.store.Jobs[i].State.LastRunAtMS = &startTime + cs.store.Jobs[i].UpdatedAtMS = time.Now().UnixMilli() - if err != nil { - job.State.LastStatus = "error" - job.State.LastError = err.Error() - } else { - job.State.LastStatus = "ok" - job.State.LastError = "" + if err != nil { + cs.store.Jobs[i].State.LastStatus = "error" + cs.store.Jobs[i].State.LastError = err.Error() + } else { + cs.store.Jobs[i].State.LastStatus = "ok" + cs.store.Jobs[i].State.LastError = "" + } + + // Compute next run time + if cs.store.Jobs[i].Schedule.Kind == "at" { + if cs.store.Jobs[i].DeleteAfterRun { + cs.removeJobUnsafe(job.ID) + } else { + cs.store.Jobs[i].Enabled = false + cs.store.Jobs[i].State.NextRunAtMS = nil + } + } else { + nextRun := cs.computeNextRun(&cs.store.Jobs[i].Schedule, time.Now().UnixMilli()) + cs.store.Jobs[i].State.NextRunAtMS = nextRun + } + break + } } - if job.Schedule.Kind == "at" { - if job.DeleteAfterRun { - cs.removeJobUnsafe(job.ID) - } else { - job.Enabled = false - job.State.NextRunAtMS = nil - } - } else { - nextRun := cs.computeNextRun(&job.Schedule, time.Now().UnixMilli()) - job.State.NextRunAtMS = nextRun + if err := cs.saveStoreUnsafe(); err != nil { + log.Printf("[cron] failed to save store: %v", err) } } @@ -197,6 +234,23 @@ func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int6 return &next } + if schedule.Kind == "cron" { + if schedule.Expr == "" { + return nil + } + + // Use gronx to calculate next run time + now := time.UnixMilli(nowMS) + nextTime, err := gronx.NextTickAfter(schedule.Expr, now, false) + if err != nil { + log.Printf("[cron] failed to compute next run for expr '%s': %v", schedule.Expr, err) + return nil + } + + nextMS := nextTime.UnixMilli() + return &nextMS + } + return nil } @@ -223,9 +277,17 @@ func (cs *CronService) getNextWakeMS() *int64 { } func (cs *CronService) Load() error { + cs.mu.Lock() + defer cs.mu.Unlock() return cs.loadStore() } +func (cs *CronService) SetOnJob(handler JobHandler) { + cs.mu.Lock() + defer cs.mu.Unlock() + cs.onJob = handler +} + func (cs *CronService) loadStore() error { cs.store = &CronStore{ Version: 1, @@ -243,7 +305,7 @@ func (cs *CronService) loadStore() error { return json.Unmarshal(data, cs.store) } -func (cs *CronService) saveStore() error { +func (cs *CronService) saveStoreUnsafe() error { dir := filepath.Dir(cs.storePath) if err := os.MkdirAll(dir, 0755); err != nil { return err @@ -284,7 +346,7 @@ func (cs *CronService) AddJob(name string, schedule CronSchedule, message string } cs.store.Jobs = append(cs.store.Jobs, job) - if err := cs.saveStore(); err != nil { + if err := cs.saveStoreUnsafe(); err != nil { return nil, err } @@ -310,7 +372,9 @@ func (cs *CronService) removeJobUnsafe(jobID string) bool { removed := len(cs.store.Jobs) < before if removed { - cs.saveStore() + if err := cs.saveStoreUnsafe(); err != nil { + log.Printf("[cron] failed to save store after remove: %v", err) + } } return removed @@ -332,7 +396,9 @@ func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob { job.State.NextRunAtMS = nil } - cs.saveStore() + if err := cs.saveStoreUnsafe(); err != nil { + log.Printf("[cron] failed to save store after enable: %v", err) + } return job } } @@ -377,5 +443,11 @@ func (cs *CronService) Status() map[string]interface{} { } func generateID() string { - return fmt.Sprintf("%d", time.Now().UnixNano()) + // Use crypto/rand for better uniqueness under concurrent access + b := make([]byte, 8) + if _, err := rand.Read(b); err != nil { + // Fallback to time-based if crypto/rand fails + return fmt.Sprintf("%d", time.Now().UnixNano()) + } + return hex.EncodeToString(b) } diff --git a/pkg/tools/base.go b/pkg/tools/base.go index 1bf53f7..095ac69 100644 --- a/pkg/tools/base.go +++ b/pkg/tools/base.go @@ -9,6 +9,13 @@ type Tool interface { Execute(ctx context.Context, args map[string]interface{}) (string, error) } +// ContextualTool is an optional interface that tools can implement +// to receive the current message context (channel, chatID) +type ContextualTool interface { + Tool + SetContext(channel, chatID string) +} + func ToolToSchema(tool Tool) map[string]interface{} { return map[string]interface{}{ "type": "function", diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go new file mode 100644 index 0000000..65c97ce --- /dev/null +++ b/pkg/tools/cron.go @@ -0,0 +1,252 @@ +package tools + +import ( + "context" + "fmt" + "sync" + + "github.com/sipeed/picoclaw/pkg/cron" +) + +func truncateString(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] +} + +// JobExecutor is the interface for executing cron jobs through the agent +type JobExecutor interface { + ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) +} + +// CronTool provides scheduling capabilities for the agent +type CronTool struct { + cronService *cron.CronService + executor JobExecutor + channel string + chatID string + mu sync.RWMutex +} + +// NewCronTool creates a new CronTool +func NewCronTool(cronService *cron.CronService, executor JobExecutor) *CronTool { + return &CronTool{ + cronService: cronService, + executor: executor, + } +} + +// Name returns the tool name +func (t *CronTool) Name() string { + return "cron" +} + +// Description returns the tool description +func (t *CronTool) Description() string { + return "Schedule reminders and recurring tasks. Actions: add, list, remove, enable, disable." +} + +// Parameters returns the tool parameters schema +func (t *CronTool) Parameters() map[string]interface{} { + return map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "action": map[string]interface{}{ + "type": "string", + "enum": []string{"add", "list", "remove", "enable", "disable"}, + "description": "Action to perform", + }, + "message": map[string]interface{}{ + "type": "string", + "description": "Reminder message (for add)", + }, + "every_seconds": map[string]interface{}{ + "type": "integer", + "description": "Interval in seconds for recurring tasks", + }, + "cron_expr": map[string]interface{}{ + "type": "string", + "description": "Cron expression like '0 9 * * *' for scheduled tasks", + }, + "job_id": map[string]interface{}{ + "type": "string", + "description": "Job ID (for remove/enable/disable)", + }, + }, + "required": []string{"action"}, + } +} + +// SetContext sets the current session context for job creation +func (t *CronTool) SetContext(channel, chatID string) { + t.mu.Lock() + defer t.mu.Unlock() + t.channel = channel + t.chatID = chatID +} + +// Execute runs the tool with given arguments +func (t *CronTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) { + action, ok := args["action"].(string) + if !ok { + return "", fmt.Errorf("action is required") + } + + switch action { + case "add": + return t.addJob(args) + case "list": + return t.listJobs() + case "remove": + return t.removeJob(args) + case "enable": + return t.enableJob(args, true) + case "disable": + return t.enableJob(args, false) + default: + return "", fmt.Errorf("unknown action: %s", action) + } +} + +func (t *CronTool) addJob(args map[string]interface{}) (string, error) { + t.mu.RLock() + channel := t.channel + chatID := t.chatID + t.mu.RUnlock() + + if channel == "" || chatID == "" { + return "Error: no session context (channel/chat_id not set). Use this tool in an active conversation.", nil + } + + message, ok := args["message"].(string) + if !ok || message == "" { + return "Error: message is required for add", nil + } + + var schedule cron.CronSchedule + + // Check for every_seconds + everySeconds, hasEvery := args["every_seconds"].(float64) + cronExpr, hasCron := args["cron_expr"].(string) + + if !hasEvery && !hasCron { + return "Error: either every_seconds or cron_expr is required", nil + } + + if hasEvery { + everyMS := int64(everySeconds) * 1000 + schedule = cron.CronSchedule{ + Kind: "every", + EveryMS: &everyMS, + } + } else { + schedule = cron.CronSchedule{ + Kind: "cron", + Expr: cronExpr, + } + } + + job, err := t.cronService.AddJob( + truncateString(message, 30), + schedule, + message, + true, // deliver + channel, + chatID, + ) + if err != nil { + return fmt.Sprintf("Error adding job: %v", err), nil + } + + return fmt.Sprintf("Created job '%s' (id: %s)", job.Name, job.ID), nil +} + +func (t *CronTool) listJobs() (string, error) { + jobs := t.cronService.ListJobs(false) + + if len(jobs) == 0 { + return "No scheduled jobs.", nil + } + + result := "Scheduled jobs:\n" + for _, j := range jobs { + var scheduleInfo string + if j.Schedule.Kind == "every" && j.Schedule.EveryMS != nil { + scheduleInfo = fmt.Sprintf("every %ds", *j.Schedule.EveryMS/1000) + } else if j.Schedule.Kind == "cron" { + scheduleInfo = j.Schedule.Expr + } else if j.Schedule.Kind == "at" { + scheduleInfo = "one-time" + } else { + scheduleInfo = "unknown" + } + result += fmt.Sprintf("- %s (id: %s, %s)\n", j.Name, j.ID, scheduleInfo) + } + + return result, nil +} + +func (t *CronTool) removeJob(args map[string]interface{}) (string, error) { + jobID, ok := args["job_id"].(string) + if !ok || jobID == "" { + return "Error: job_id is required for remove", nil + } + + if t.cronService.RemoveJob(jobID) { + return fmt.Sprintf("Removed job %s", jobID), nil + } + return fmt.Sprintf("Job %s not found", jobID), nil +} + +func (t *CronTool) enableJob(args map[string]interface{}, enable bool) (string, error) { + jobID, ok := args["job_id"].(string) + if !ok || jobID == "" { + return "Error: job_id is required for enable/disable", nil + } + + job := t.cronService.EnableJob(jobID, enable) + if job == nil { + return fmt.Sprintf("Job %s not found", jobID), nil + } + + status := "enabled" + if !enable { + status = "disabled" + } + return fmt.Sprintf("Job '%s' %s", job.Name, status), nil +} + +// ExecuteJob executes a cron job through the agent +func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { + // Get channel/chatID from job payload + channel := job.Payload.Channel + chatID := job.Payload.To + + // Default values if not set + if channel == "" { + channel = "cli" + } + if chatID == "" { + chatID = "direct" + } + + sessionKey := fmt.Sprintf("cron-%s", job.ID) + + // Call agent with the job's message + response, err := t.executor.ProcessDirectWithChannel( + ctx, + job.Payload.Message, + sessionKey, + channel, + chatID, + ) + + if err != nil { + return fmt.Sprintf("Error: %v", err) + } + + // Response is automatically sent via MessageBus by AgentLoop + _ = response // Will be sent by AgentLoop + return "ok" +} diff --git a/pkg/tools/registry.go b/pkg/tools/registry.go index d181944..a769664 100644 --- a/pkg/tools/registry.go +++ b/pkg/tools/registry.go @@ -34,6 +34,10 @@ func (r *ToolRegistry) Get(name string) (Tool, bool) { } func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string]interface{}) (string, error) { + return r.ExecuteWithContext(ctx, name, args, "", "") +} + +func (r *ToolRegistry) ExecuteWithContext(ctx context.Context, name string, args map[string]interface{}, channel, chatID string) (string, error) { logger.InfoCF("tool", "Tool execution started", map[string]interface{}{ "tool": name, @@ -49,6 +53,11 @@ func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string return "", fmt.Errorf("tool '%s' not found", name) } + // If tool implements ContextualTool, set context + if contextualTool, ok := tool.(ContextualTool); ok && channel != "" && chatID != "" { + contextualTool.SetContext(channel, chatID) + } + start := time.Now() result, err := tool.Execute(ctx, args) duration := time.Since(start) From 4bc9e2d768a2863669b89e0fdb8b1526860dab6a Mon Sep 17 00:00:00 2001 From: yinwm Date: Wed, 11 Feb 2026 18:43:21 +0800 Subject: [PATCH 2/5] fix(cron): add one-time reminders and fix data paths to workspace - Add at_seconds parameter for one-time reminders (e.g., "remind me in 10 minutes") - Update every_seconds description to emphasize recurring-only usage - Route cron delivery: deliver=true sends directly, deliver=false uses agent - Fix cron data path from ~/.picoclaw/cron to workspace/cron - Fix sessions path from workspace/../sessions to workspace/sessions Co-Authored-By: Claude Opus 4.6 --- .gitignore | 16 +++++++++-- cmd/picoclaw/main.go | 45 +++++++++++++++++++----------- pkg/agent/context.go | 10 +++++-- pkg/agent/loop.go | 34 +++++++++++++++++++---- pkg/channels/base.go | 5 ++-- pkg/session/manager.go | 14 +++++++--- pkg/tools/cron.go | 63 ++++++++++++++++++++++++++++++++---------- 7 files changed, 141 insertions(+), 46 deletions(-) diff --git a/.gitignore b/.gitignore index dacb665..6ad4d78 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +# Binaries bin/ *.exe *.dll @@ -5,12 +6,21 @@ bin/ *.dylib *.test *.out +/picoclaw +/picoclaw-test + +# Picoclaw specific .picoclaw/ config.json sessions/ +build/ + +# Coverage coverage.txt coverage.html -.DS_Store -build -picoclaw +# OS +.DS_Store + +# Ralph workspace +ralph/ diff --git a/cmd/picoclaw/main.go b/cmd/picoclaw/main.go index 60dd1b9..0b13bc7 100644 --- a/cmd/picoclaw/main.go +++ b/cmd/picoclaw/main.go @@ -551,20 +551,8 @@ func gatewayCmd() { "skills_available": skillsInfo["available"], }) - cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") - - // Create cron service first (onJob handler set after CronTool creation) - cronService := cron.NewCronService(cronStorePath, nil) - - // Create and register CronTool - cronTool := tools.NewCronTool(cronService, agentLoop) - agentLoop.RegisterTool(cronTool) - - // Now set the onJob handler for cron service - cronService.SetOnJob(func(job *cron.CronJob) (string, error) { - result := cronTool.ExecuteJob(context.Background(), job) - return result, nil - }) + // Setup cron tool and service + cronService, _ := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath()) heartbeatService := heartbeat.NewHeartbeatService( cfg.WorkspacePath(), @@ -702,6 +690,25 @@ func getConfigPath() string { return filepath.Join(home, ".picoclaw", "config.json") } +func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string) (*cron.CronService, *tools.CronTool) { + cronStorePath := filepath.Join(workspace, "cron", "jobs.json") + + // Create cron service + cronService := cron.NewCronService(cronStorePath, nil) + + // Create and register CronTool + cronTool := tools.NewCronTool(cronService, agentLoop, msgBus) + agentLoop.RegisterTool(cronTool) + + // Set the onJob handler + cronService.SetOnJob(func(job *cron.CronJob) (string, error) { + result := cronTool.ExecuteJob(context.Background(), job) + return result, nil + }) + + return cronService, cronTool +} + func loadConfig() (*config.Config, error) { return config.LoadConfig(getConfigPath()) } @@ -714,8 +721,14 @@ func cronCmd() { subcommand := os.Args[2] - dataDir := filepath.Join(filepath.Dir(getConfigPath()), "cron") - cronStorePath := filepath.Join(dataDir, "jobs.json") + // Load config to get workspace path + cfg, err := loadConfig() + if err != nil { + fmt.Printf("Error loading config: %v\n", err) + return + } + + cronStorePath := filepath.Join(cfg.WorkspacePath(), "cron", "jobs.json") switch subcommand { case "list": diff --git a/pkg/agent/context.go b/pkg/agent/context.go index 7e8612e..d1b3397 100644 --- a/pkg/agent/context.go +++ b/pkg/agent/context.go @@ -69,8 +69,13 @@ Your workspace is at: %s %s -Always be helpful, accurate, and concise. When using tools, explain what you're doing. -When remembering something, write to %s/memory/MEMORY.md`, +## Important Rules + +1. **ALWAYS use tools** - When you need to perform an action (schedule reminders, send messages, execute commands, etc.), you MUST call the appropriate tool. Do NOT just say you'll do it or pretend to do it. + +2. **Be helpful and accurate** - When using tools, briefly explain what you're doing. + +3. **Memory** - When remembering something, write to %s/memory/MEMORY.md`, now, runtime, workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath) } @@ -86,6 +91,7 @@ func (cb *ContextBuilder) buildToolsSection() string { var sb strings.Builder sb.WriteString("## Available Tools\n\n") + sb.WriteString("**CRITICAL**: You MUST use tools to perform actions. Do NOT pretend to execute commands or schedule tasks.\n\n") sb.WriteString("You have access to the following tools:\n\n") for _, s := range summaries { sb.WriteString(s) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 3ab9b7a..439428a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -69,7 +69,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers editFileTool := tools.NewEditFileTool(workspace) toolsRegistry.Register(editFileTool) - sessionsManager := session.NewSessionManager(filepath.Join(filepath.Dir(cfg.WorkspacePath()), "sessions")) + sessionsManager := session.NewSessionManager(filepath.Join(workspace, "sessions")) return &AgentLoop{ bus: msgBus, @@ -179,6 +179,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) msg.ChatID, ) + // Save user message to session + al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) + iteration := 0 var finalContent string @@ -277,6 +280,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) } messages = append(messages, assistantMsg) + // Save assistant message with tool calls to session + al.sessions.AddFullMessage(msg.SessionKey, assistantMsg) + for _, tc := range response.ToolCalls { // Log tool call with arguments preview argsJSON, _ := json.Marshal(tc.Arguments) @@ -287,7 +293,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) "iteration": iteration, }) - result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments) + result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID) if err != nil { result = fmt.Sprintf("Error: %v", err) } @@ -298,6 +304,9 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) ToolCallID: tc.ID, } messages = append(messages, toolResultMsg) + + // Save tool result message to session + al.sessions.AddFullMessage(msg.SessionKey, toolResultMsg) } } @@ -305,7 +314,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) finalContent = "I've completed processing but have no response to give." } - al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) + // Save final assistant message to session al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent) al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) @@ -370,6 +379,9 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe originChatID, ) + // Save user message to session with system message marker + al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content)) + iteration := 0 var finalContent string @@ -446,6 +458,9 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe } messages = append(messages, assistantMsg) + // Save assistant message with tool calls to session + al.sessions.AddFullMessage(sessionKey, assistantMsg) + for _, tc := range response.ToolCalls { result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID) if err != nil { @@ -458,6 +473,9 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe ToolCallID: tc.ID, } messages = append(messages, toolResultMsg) + + // Save tool result message to session + al.sessions.AddFullMessage(sessionKey, toolResultMsg) } } @@ -465,8 +483,7 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe finalContent = "Background task completed." } - // Save to session with system message marker - al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content)) + // Save final assistant message to session al.sessions.AddMessage(sessionKey, "assistant", finalContent) al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) @@ -476,6 +493,13 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe "final_length": len(finalContent), }) + // Send response back to the original channel + al.bus.PublishOutbound(bus.OutboundMessage{ + Channel: originChannel, + ChatID: originChatID, + Content: finalContent, + }) + return finalContent, nil } diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 5361191..8abe7b5 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -61,7 +61,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st return } - // 生成 SessionKey: channel:chatID + // Build session key: channel:chatID sessionKey := fmt.Sprintf("%s:%s", c.name, chatID) msg := bus.InboundMessage{ @@ -70,8 +70,9 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st ChatID: chatID, Content: content, Media: media, - Metadata: metadata, SessionKey: sessionKey, + Metadata: metadata, + } } c.bus.PublishInbound(msg) diff --git a/pkg/session/manager.go b/pkg/session/manager.go index df86724..b4b8257 100644 --- a/pkg/session/manager.go +++ b/pkg/session/manager.go @@ -59,6 +59,15 @@ func (sm *SessionManager) GetOrCreate(key string) *Session { } func (sm *SessionManager) AddMessage(sessionKey, role, content string) { + sm.AddFullMessage(sessionKey, providers.Message{ + Role: role, + Content: content, + }) +} + +// AddFullMessage adds a complete message with tool calls and tool call ID to the session. +// This is used to save the full conversation flow including tool calls and tool results. +func (sm *SessionManager) AddFullMessage(sessionKey string, msg providers.Message) { sm.mu.Lock() defer sm.mu.Unlock() @@ -72,10 +81,7 @@ func (sm *SessionManager) AddMessage(sessionKey, role, content string) { sm.sessions[sessionKey] = session } - session.Messages = append(session.Messages, providers.Message{ - Role: role, - Content: content, - }) + session.Messages = append(session.Messages, msg) session.Updated = time.Now() } diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go index 65c97ce..87aaf35 100644 --- a/pkg/tools/cron.go +++ b/pkg/tools/cron.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "sync" + "time" + "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/cron" ) @@ -24,16 +26,18 @@ type JobExecutor interface { type CronTool struct { cronService *cron.CronService executor JobExecutor + msgBus *bus.MessageBus channel string chatID string mu sync.RWMutex } // NewCronTool creates a new CronTool -func NewCronTool(cronService *cron.CronService, executor JobExecutor) *CronTool { +func NewCronTool(cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus) *CronTool { return &CronTool{ cronService: cronService, executor: executor, + msgBus: msgBus, } } @@ -44,7 +48,7 @@ func (t *CronTool) Name() string { // Description returns the tool description func (t *CronTool) Description() string { - return "Schedule reminders and recurring tasks. Actions: add, list, remove, enable, disable." + return "Schedule reminders and tasks. IMPORTANT: When user asks to be reminded or scheduled, you MUST call this tool. Use 'at_seconds' for one-time reminders (e.g., 'remind me in 10 minutes' → at_seconds=600). Use 'every_seconds' ONLY for recurring tasks (e.g., 'every 2 hours' → every_seconds=7200). Use 'cron_expr' for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am)." } // Parameters returns the tool parameters schema @@ -55,24 +59,32 @@ func (t *CronTool) Parameters() map[string]interface{} { "action": map[string]interface{}{ "type": "string", "enum": []string{"add", "list", "remove", "enable", "disable"}, - "description": "Action to perform", + "description": "Action to perform. Use 'add' when user wants to schedule a reminder or task.", }, "message": map[string]interface{}{ "type": "string", - "description": "Reminder message (for add)", + "description": "The reminder/task message to display when triggered (required for add)", + }, + "at_seconds": map[string]interface{}{ + "type": "integer", + "description": "One-time reminder: seconds from now when to trigger (e.g., 600 for 10 minutes later). Use this for one-time reminders like 'remind me in 10 minutes'.", }, "every_seconds": map[string]interface{}{ "type": "integer", - "description": "Interval in seconds for recurring tasks", + "description": "Recurring interval in seconds (e.g., 3600 for every hour). Use this ONLY for recurring tasks like 'every 2 hours' or 'daily reminder'.", }, "cron_expr": map[string]interface{}{ "type": "string", - "description": "Cron expression like '0 9 * * *' for scheduled tasks", + "description": "Cron expression for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am). Use this for complex recurring schedules.", }, "job_id": map[string]interface{}{ "type": "string", "description": "Job ID (for remove/enable/disable)", }, + "deliver": map[string]interface{}{ + "type": "boolean", + "description": "If true, send message directly to channel. If false, let agent process the message (for complex tasks). Default: true", + }, }, "required": []string{"action"}, } @@ -126,32 +138,44 @@ func (t *CronTool) addJob(args map[string]interface{}) (string, error) { var schedule cron.CronSchedule - // Check for every_seconds + // Check for at_seconds (one-time), every_seconds (recurring), or cron_expr + atSeconds, hasAt := args["at_seconds"].(float64) everySeconds, hasEvery := args["every_seconds"].(float64) cronExpr, hasCron := args["cron_expr"].(string) - if !hasEvery && !hasCron { - return "Error: either every_seconds or cron_expr is required", nil - } - - if hasEvery { + // Priority: at_seconds > every_seconds > cron_expr + if hasAt { + atMS := time.Now().UnixMilli() + int64(atSeconds)*1000 + schedule = cron.CronSchedule{ + Kind: "at", + AtMS: &atMS, + } + } else if hasEvery { everyMS := int64(everySeconds) * 1000 schedule = cron.CronSchedule{ Kind: "every", EveryMS: &everyMS, } - } else { + } else if hasCron { schedule = cron.CronSchedule{ Kind: "cron", Expr: cronExpr, } + } else { + return "Error: one of at_seconds, every_seconds, or cron_expr is required", nil + } + + // Read deliver parameter, default to true + deliver := true + if d, ok := args["deliver"].(bool); ok { + deliver = d } job, err := t.cronService.AddJob( truncateString(message, 30), schedule, message, - true, // deliver + deliver, channel, chatID, ) @@ -231,6 +255,17 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { chatID = "direct" } + // If deliver=true, send message directly without agent processing + if job.Payload.Deliver { + t.msgBus.PublishOutbound(bus.OutboundMessage{ + Channel: channel, + ChatID: chatID, + Content: job.Payload.Message, + }) + return "ok" + } + + // For deliver=false, process through agent (for complex tasks) sessionKey := fmt.Sprintf("cron-%s", job.ID) // Call agent with the job's message From af60ce26fc895b0d2e816e714e9b9130311886c1 Mon Sep 17 00:00:00 2001 From: yinwm Date: Wed, 11 Feb 2026 19:27:36 +0800 Subject: [PATCH 3/5] refactor(agent): improve code quality and restore summarization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code review fixes: - Use map for O(n) job lookup in cron service (was O(n²)) - Set DeleteAfterRun=true for one-time cron tasks - Restore context compression/summarization to prevent context overflow - Add pkg/utils/string.go with Unicode-aware Truncate function - Simplify setupCronTool to return only CronService - Change Chinese comments to English in context.go Refactoring: - Replace toolsSummary callback with SetToolsRegistry setter pattern - This makes dependency injection clearer and easier to test Co-Authored-By: Claude Opus 4.6 --- cmd/picoclaw/main.go | 6 +- pkg/agent/context.go | 19 ++++--- pkg/agent/loop.go | 132 ++++++++++++++++++++++++++++++++++++++++++- pkg/cron/service.go | 19 ++++--- pkg/utils/string.go | 16 ++++++ 5 files changed, 174 insertions(+), 18 deletions(-) create mode 100644 pkg/utils/string.go diff --git a/cmd/picoclaw/main.go b/cmd/picoclaw/main.go index 0b13bc7..c14ec58 100644 --- a/cmd/picoclaw/main.go +++ b/cmd/picoclaw/main.go @@ -552,7 +552,7 @@ func gatewayCmd() { }) // Setup cron tool and service - cronService, _ := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath()) + cronService := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath()) heartbeatService := heartbeat.NewHeartbeatService( cfg.WorkspacePath(), @@ -690,7 +690,7 @@ func getConfigPath() string { return filepath.Join(home, ".picoclaw", "config.json") } -func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string) (*cron.CronService, *tools.CronTool) { +func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string) *cron.CronService { cronStorePath := filepath.Join(workspace, "cron", "jobs.json") // Create cron service @@ -706,7 +706,7 @@ func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace return result, nil }) - return cronService, cronTool + return cronService } func loadConfig() (*config.Config, error) { diff --git a/pkg/agent/context.go b/pkg/agent/context.go index d1b3397..e737fbd 100644 --- a/pkg/agent/context.go +++ b/pkg/agent/context.go @@ -11,13 +11,14 @@ import ( "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/skills" + "github.com/sipeed/picoclaw/pkg/tools" ) type ContextBuilder struct { workspace string skillsLoader *skills.SkillsLoader memory *MemoryStore - toolsSummary func() []string // Function to get tool summaries dynamically + tools *tools.ToolRegistry // Direct reference to tool registry } func getGlobalConfigDir() string { @@ -28,9 +29,9 @@ func getGlobalConfigDir() string { return filepath.Join(home, ".picoclaw") } -func NewContextBuilder(workspace string, toolsSummaryFunc func() []string) *ContextBuilder { - // builtin skills: 当前项目的 skills 目录 - // 使用当前工作目录下的 skills/ 目录 +func NewContextBuilder(workspace string) *ContextBuilder { + // builtin skills: skills directory in current project + // Use the skills/ directory under the current working directory wd, _ := os.Getwd() builtinSkillsDir := filepath.Join(wd, "skills") globalSkillsDir := filepath.Join(getGlobalConfigDir(), "skills") @@ -39,10 +40,14 @@ func NewContextBuilder(workspace string, toolsSummaryFunc func() []string) *Cont workspace: workspace, skillsLoader: skills.NewSkillsLoader(workspace, globalSkillsDir, builtinSkillsDir), memory: NewMemoryStore(workspace), - toolsSummary: toolsSummaryFunc, } } +// SetToolsRegistry sets the tools registry for dynamic tool summary generation. +func (cb *ContextBuilder) SetToolsRegistry(registry *tools.ToolRegistry) { + cb.tools = registry +} + func (cb *ContextBuilder) getIdentity() string { now := time.Now().Format("2006-01-02 15:04 (Monday)") workspacePath, _ := filepath.Abs(filepath.Join(cb.workspace)) @@ -80,11 +85,11 @@ Your workspace is at: %s } func (cb *ContextBuilder) buildToolsSection() string { - if cb.toolsSummary == nil { + if cb.tools == nil { return "" } - summaries := cb.toolsSummary() + summaries := cb.tools.GetSummaries() if len(summaries) == 0 { return "" } diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 439428a..5cdd6a7 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -13,6 +13,8 @@ import ( "os" "path/filepath" "strings" + "sync" + "time" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" @@ -27,11 +29,13 @@ type AgentLoop struct { provider providers.LLMProvider workspace string model string + contextWindow int // Maximum context window size in tokens maxIterations int sessions *session.SessionManager contextBuilder *ContextBuilder tools *tools.ToolRegistry running bool + summarizing sync.Map // Tracks which sessions are currently being summarized } func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop { @@ -71,16 +75,22 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers sessionsManager := session.NewSessionManager(filepath.Join(workspace, "sessions")) + // Create context builder and set tools registry + contextBuilder := NewContextBuilder(workspace) + contextBuilder.SetToolsRegistry(toolsRegistry) + return &AgentLoop{ bus: msgBus, provider: provider, workspace: workspace, model: cfg.Agents.Defaults.Model, + contextWindow: cfg.Agents.Defaults.MaxTokens, // Restore context window for summarization maxIterations: cfg.Agents.Defaults.MaxToolIterations, sessions: sessionsManager, - contextBuilder: NewContextBuilder(workspace, func() []string { return toolsRegistry.GetSummaries() }), + contextBuilder: contextBuilder, tools: toolsRegistry, running: false, + summarizing: sync.Map{}, } } @@ -318,6 +328,21 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent) al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) + // Context compression: Check if we need to summarize + // Trigger if history > 20 messages OR estimated tokens > 75% of context window + newHistory := al.sessions.GetHistory(msg.SessionKey) + tokenEstimate := al.estimateTokens(newHistory) + threshold := al.contextWindow * 75 / 100 + + if len(newHistory) > 20 || tokenEstimate > threshold { + if _, loading := al.summarizing.LoadOrStore(msg.SessionKey, true); !loading { + go func() { + defer al.summarizing.Delete(msg.SessionKey) + al.summarizeSession(msg.SessionKey) + }() + } + } + // Log response preview responsePreview := truncate(finalContent, 120) logger.InfoCF("agent", fmt.Sprintf("Response to %s:%s: %s", msg.Channel, msg.SenderID, responsePreview), @@ -595,3 +620,108 @@ func truncateString(s string, maxLen int) string { } return s[:maxLen-3] + "..." } + +// summarizeSession summarizes the conversation history for a session. +func (al *AgentLoop) summarizeSession(sessionKey string) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + history := al.sessions.GetHistory(sessionKey) + summary := al.sessions.GetSummary(sessionKey) + + // Keep last 4 messages for continuity + if len(history) <= 4 { + return + } + + toSummarize := history[:len(history)-4] + + // Oversized Message Guard + // Skip messages larger than 50% of context window to prevent summarizer overflow + maxMessageTokens := al.contextWindow / 2 + validMessages := make([]providers.Message, 0) + omitted := false + + for _, m := range toSummarize { + if m.Role != "user" && m.Role != "assistant" { + continue + } + // Estimate tokens for this message + msgTokens := len(m.Content) / 4 + if msgTokens > maxMessageTokens { + omitted = true + continue + } + validMessages = append(validMessages, m) + } + + if len(validMessages) == 0 { + return + } + + // Multi-Part Summarization + // Split into two parts if history is significant + var finalSummary string + if len(validMessages) > 10 { + mid := len(validMessages) / 2 + part1 := validMessages[:mid] + part2 := validMessages[mid:] + + s1, _ := al.summarizeBatch(ctx, part1, "") + s2, _ := al.summarizeBatch(ctx, part2, "") + + // Merge them + mergePrompt := fmt.Sprintf("Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s", s1, s2) + resp, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: mergePrompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err == nil { + finalSummary = resp.Content + } else { + finalSummary = s1 + " " + s2 + } + } else { + finalSummary, _ = al.summarizeBatch(ctx, validMessages, summary) + } + + if omitted && finalSummary != "" { + finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]" + } + + if finalSummary != "" { + al.sessions.SetSummary(sessionKey, finalSummary) + al.sessions.TruncateHistory(sessionKey, 4) + al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) + } +} + +// summarizeBatch summarizes a batch of messages. +func (al *AgentLoop) summarizeBatch(ctx context.Context, batch []providers.Message, existingSummary string) (string, error) { + prompt := "Provide a concise summary of this conversation segment, preserving core context and key points.\n" + if existingSummary != "" { + prompt += "Existing context: " + existingSummary + "\n" + } + prompt += "\nCONVERSATION:\n" + for _, m := range batch { + prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content) + } + + response, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: prompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err != nil { + return "", err + } + return response.Content, nil +} + +// estimateTokens estimates the number of tokens in a message list. +func (al *AgentLoop) estimateTokens(messages []providers.Message) int { + total := 0 + for _, m := range messages { + total += len(m.Content) / 4 // Simple heuristic: 4 chars per token + } + return total +} diff --git a/pkg/cron/service.go b/pkg/cron/service.go index c85ab2b..9434ed8 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -149,13 +149,15 @@ func (cs *CronService) checkJobs() { } // Update next run times for due jobs immediately (before executing) + // Use map for O(n) lookup instead of O(n²) nested loop + dueMap := make(map[string]bool, len(dueJobs)) + for _, job := range dueJobs { + dueMap[job.ID] = true + } for i := range cs.store.Jobs { - for _, dueJob := range dueJobs { - if cs.store.Jobs[i].ID == dueJob.ID { - // Reset NextRunAtMS temporarily so we don't re-execute - cs.store.Jobs[i].State.NextRunAtMS = nil - break - } + if dueMap[cs.store.Jobs[i].ID] { + // Reset NextRunAtMS temporarily so we don't re-execute + cs.store.Jobs[i].State.NextRunAtMS = nil } } @@ -325,6 +327,9 @@ func (cs *CronService) AddJob(name string, schedule CronSchedule, message string now := time.Now().UnixMilli() + // One-time tasks (at) should be deleted after execution + deleteAfterRun := (schedule.Kind == "at") + job := CronJob{ ID: generateID(), Name: name, @@ -342,7 +347,7 @@ func (cs *CronService) AddJob(name string, schedule CronSchedule, message string }, CreatedAtMS: now, UpdatedAtMS: now, - DeleteAfterRun: false, + DeleteAfterRun: deleteAfterRun, } cs.store.Jobs = append(cs.store.Jobs, job) diff --git a/pkg/utils/string.go b/pkg/utils/string.go new file mode 100644 index 0000000..0d9837c --- /dev/null +++ b/pkg/utils/string.go @@ -0,0 +1,16 @@ +package utils + +// Truncate returns a truncated version of s with at most maxLen runes. +// Handles multi-byte Unicode characters properly. +// If the string is truncated, "..." is appended to indicate truncation. +func Truncate(s string, maxLen int) string { + runes := []rune(s) + if len(runes) <= maxLen { + return s + } + // Reserve 3 chars for "..." + if maxLen <= 3 { + return string(runes[:maxLen]) + } + return string(runes[:maxLen-3]) + "..." +} From c704990ceca03b8d2c97518c96e15f90cace6d1a Mon Sep 17 00:00:00 2001 From: yinwm Date: Wed, 11 Feb 2026 20:22:41 +0800 Subject: [PATCH 4/5] refactor(tools): remove duplicate truncate functions and add docs - Remove duplicate truncate/truncateString functions from loop.go and cron.go - Use utils.Truncate consistently across codebase - Add Workspace Layout section to README - Document cron/scheduled tasks functionality Co-Authored-By: Claude Opus 4.6 --- README.md | 29 ++++ pkg/agent/loop.go | 435 ++++++++++++++++++---------------------------- pkg/tools/cron.go | 13 +- 3 files changed, 204 insertions(+), 273 deletions(-) diff --git a/README.md b/README.md index 1cf7173..70e06ac 100644 --- a/README.md +++ b/README.md @@ -333,6 +333,23 @@ picoclaw gateway Config file: `~/.picoclaw/config.json` +### Workspace Layout + +PicoClaw stores data in your configured workspace (default: `~/.picoclaw/workspace`): + +``` +~/.picoclaw/workspace/ +├── sessions/ # Conversation sessions and history +├── memory/ # Long-term memory (MEMORY.md) +├── cron/ # Scheduled jobs database +├── skills/ # Custom skills +├── AGENTS.md # Agent behavior guide +├── IDENTITY.md # Agent identity +├── SOUL.md # Agent soul +├── TOOLS.md # Tool descriptions +└── USER.md # User preferences +``` + ### Providers > [!NOTE] @@ -452,6 +469,18 @@ picoclaw agent -m "Hello" | `picoclaw agent` | Interactive chat mode | | `picoclaw gateway` | Start the gateway | | `picoclaw status` | Show status | +| `picoclaw cron list` | List all scheduled jobs | +| `picoclaw cron add ...` | Add a scheduled job | + +### Scheduled Tasks / Reminders + +PicoClaw supports scheduled reminders and recurring tasks through the `cron` tool: + +- **One-time reminders**: "Remind me in 10 minutes" → triggers once after 10min +- **Recurring tasks**: "Remind me every 2 hours" → triggers every 2 hours +- **Cron expressions**: "Remind me at 9am daily" → uses cron expression + +Jobs are stored in `~/.picoclaw/workspace/cron/` and processed automatically. ## 🤝 Contribute & Roadmap diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 5cdd6a7..40c9ba7 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -22,6 +22,7 @@ import ( "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/session" "github.com/sipeed/picoclaw/pkg/tools" + "github.com/sipeed/picoclaw/pkg/utils" ) type AgentLoop struct { @@ -38,6 +39,17 @@ type AgentLoop struct { summarizing sync.Map // Tracks which sessions are currently being summarized } +// processOptions configures how a message is processed +type processOptions struct { + SessionKey string // Session identifier for history/context + Channel string // Target channel for tool execution + ChatID string // Target chat ID for tool execution + UserMessage string // User message content (may include prefix) + DefaultResponse string // Response when LLM returns empty + EnableSummary bool // Whether to trigger summarization + SendResponse bool // Whether to send response via bus +} + func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop { workspace := cfg.WorkspacePath() os.MkdirAll(workspace, 0755) @@ -151,7 +163,7 @@ func (al *AgentLoop) ProcessDirectWithChannel(ctx context.Context, content, sess func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { // Add message preview to log - preview := truncate(msg.Content, 80) + preview := utils.Truncate(msg.Content, 80) logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, preview), map[string]interface{}{ "channel": msg.Channel, @@ -165,193 +177,16 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) return al.processSystemMessage(ctx, msg) } - // Update tool contexts - if tool, ok := al.tools.Get("message"); ok { - if mt, ok := tool.(*tools.MessageTool); ok { - mt.SetContext(msg.Channel, msg.ChatID) - } - } - if tool, ok := al.tools.Get("spawn"); ok { - if st, ok := tool.(*tools.SpawnTool); ok { - st.SetContext(msg.Channel, msg.ChatID) - } - } - - history := al.sessions.GetHistory(msg.SessionKey) - summary := al.sessions.GetSummary(msg.SessionKey) - - messages := al.contextBuilder.BuildMessages( - history, - summary, - msg.Content, - nil, - msg.Channel, - msg.ChatID, - ) - - // Save user message to session - al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) - - iteration := 0 - var finalContent string - - for iteration < al.maxIterations { - iteration++ - - logger.DebugCF("agent", "LLM iteration", - map[string]interface{}{ - "iteration": iteration, - "max": al.maxIterations, - }) - - toolDefs := al.tools.GetDefinitions() - providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) - for _, td := range toolDefs { - providerToolDefs = append(providerToolDefs, providers.ToolDefinition{ - Type: td["type"].(string), - Function: providers.ToolFunctionDefinition{ - Name: td["function"].(map[string]interface{})["name"].(string), - Description: td["function"].(map[string]interface{})["description"].(string), - Parameters: td["function"].(map[string]interface{})["parameters"].(map[string]interface{}), - }, - }) - } - - // Log LLM request details - logger.DebugCF("agent", "LLM request", - map[string]interface{}{ - "iteration": iteration, - "model": al.model, - "messages_count": len(messages), - "tools_count": len(providerToolDefs), - "max_tokens": 8192, - "temperature": 0.7, - "system_prompt_len": len(messages[0].Content), - }) - - // Log full messages (detailed) - logger.DebugCF("agent", "Full LLM request", - map[string]interface{}{ - "iteration": iteration, - "messages_json": formatMessagesForLog(messages), - "tools_json": formatToolsForLog(providerToolDefs), - }) - - response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{ - "max_tokens": 8192, - "temperature": 0.7, - }) - - if err != nil { - logger.ErrorCF("agent", "LLM call failed", - map[string]interface{}{ - "iteration": iteration, - "error": err.Error(), - }) - return "", fmt.Errorf("LLM call failed: %w", err) - } - - if len(response.ToolCalls) == 0 { - finalContent = response.Content - logger.InfoCF("agent", "LLM response without tool calls (direct answer)", - map[string]interface{}{ - "iteration": iteration, - "content_chars": len(finalContent), - }) - break - } - - toolNames := make([]string, 0, len(response.ToolCalls)) - for _, tc := range response.ToolCalls { - toolNames = append(toolNames, tc.Name) - } - logger.InfoCF("agent", "LLM requested tool calls", - map[string]interface{}{ - "tools": toolNames, - "count": len(toolNames), - "iteration": iteration, - }) - - assistantMsg := providers.Message{ - Role: "assistant", - Content: response.Content, - } - - for _, tc := range response.ToolCalls { - argumentsJSON, _ := json.Marshal(tc.Arguments) - assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{ - ID: tc.ID, - Type: "function", - Function: &providers.FunctionCall{ - Name: tc.Name, - Arguments: string(argumentsJSON), - }, - }) - } - messages = append(messages, assistantMsg) - - // Save assistant message with tool calls to session - al.sessions.AddFullMessage(msg.SessionKey, assistantMsg) - - for _, tc := range response.ToolCalls { - // Log tool call with arguments preview - argsJSON, _ := json.Marshal(tc.Arguments) - argsPreview := truncate(string(argsJSON), 200) - logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), - map[string]interface{}{ - "tool": tc.Name, - "iteration": iteration, - }) - - result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID) - if err != nil { - result = fmt.Sprintf("Error: %v", err) - } - - toolResultMsg := providers.Message{ - Role: "tool", - Content: result, - ToolCallID: tc.ID, - } - messages = append(messages, toolResultMsg) - - // Save tool result message to session - al.sessions.AddFullMessage(msg.SessionKey, toolResultMsg) - } - } - - if finalContent == "" { - finalContent = "I've completed processing but have no response to give." - } - - // Save final assistant message to session - al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent) - al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) - - // Context compression: Check if we need to summarize - // Trigger if history > 20 messages OR estimated tokens > 75% of context window - newHistory := al.sessions.GetHistory(msg.SessionKey) - tokenEstimate := al.estimateTokens(newHistory) - threshold := al.contextWindow * 75 / 100 - - if len(newHistory) > 20 || tokenEstimate > threshold { - if _, loading := al.summarizing.LoadOrStore(msg.SessionKey, true); !loading { - go func() { - defer al.summarizing.Delete(msg.SessionKey) - al.summarizeSession(msg.SessionKey) - }() - } - } - - // Log response preview - responsePreview := truncate(finalContent, 120) - logger.InfoCF("agent", fmt.Sprintf("Response to %s:%s: %s", msg.Channel, msg.SenderID, responsePreview), - map[string]interface{}{ - "iterations": iteration, - "final_length": len(finalContent), - }) - - return finalContent, nil + // Process as user message + return al.runAgentLoop(ctx, processOptions{ + SessionKey: msg.SessionKey, + Channel: msg.Channel, + ChatID: msg.ChatID, + UserMessage: msg.Content, + DefaultResponse: "I've completed processing but have no response to give.", + EnableSummary: true, + SendResponse: false, + }) } func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { @@ -380,39 +215,96 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe // Use the origin session for context sessionKey := fmt.Sprintf("%s:%s", originChannel, originChatID) - // Update tool contexts to original channel/chatID - if tool, ok := al.tools.Get("message"); ok { - if mt, ok := tool.(*tools.MessageTool); ok { - mt.SetContext(originChannel, originChatID) - } - } - if tool, ok := al.tools.Get("spawn"); ok { - if st, ok := tool.(*tools.SpawnTool); ok { - st.SetContext(originChannel, originChatID) - } - } + // Process as system message with routing back to origin + return al.runAgentLoop(ctx, processOptions{ + SessionKey: sessionKey, + Channel: originChannel, + ChatID: originChatID, + UserMessage: fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content), + DefaultResponse: "Background task completed.", + EnableSummary: false, + SendResponse: true, // Send response back to original channel + }) +} - // Build messages with the announce content - history := al.sessions.GetHistory(sessionKey) - summary := al.sessions.GetSummary(sessionKey) +// runAgentLoop is the core message processing logic. +// It handles context building, LLM calls, tool execution, and response handling. +func (al *AgentLoop) runAgentLoop(ctx context.Context, opts processOptions) (string, error) { + // 1. Update tool contexts + al.updateToolContexts(opts.Channel, opts.ChatID) + + // 2. Build messages + history := al.sessions.GetHistory(opts.SessionKey) + summary := al.sessions.GetSummary(opts.SessionKey) messages := al.contextBuilder.BuildMessages( history, summary, - msg.Content, + opts.UserMessage, nil, - originChannel, - originChatID, + opts.Channel, + opts.ChatID, ) - // Save user message to session with system message marker - al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content)) + // 3. Save user message to session + al.sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage) + // 4. Run LLM iteration loop + finalContent, iteration, err := al.runLLMIteration(ctx, messages, opts) + if err != nil { + return "", err + } + + // 5. Handle empty response + if finalContent == "" { + finalContent = opts.DefaultResponse + } + + // 6. Save final assistant message to session + al.sessions.AddMessage(opts.SessionKey, "assistant", finalContent) + al.sessions.Save(al.sessions.GetOrCreate(opts.SessionKey)) + + // 7. Optional: summarization + if opts.EnableSummary { + al.maybeSummarize(opts.SessionKey) + } + + // 8. Optional: send response via bus + if opts.SendResponse { + al.bus.PublishOutbound(bus.OutboundMessage{ + Channel: opts.Channel, + ChatID: opts.ChatID, + Content: finalContent, + }) + } + + // 9. Log response + responsePreview := utils.Truncate(finalContent, 120) + logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview), + map[string]interface{}{ + "session_key": opts.SessionKey, + "iterations": iteration, + "final_length": len(finalContent), + }) + + return finalContent, nil +} + +// runLLMIteration executes the LLM call loop with tool handling. +// Returns the final content, iteration count, and any error. +func (al *AgentLoop) runLLMIteration(ctx context.Context, messages []providers.Message, opts processOptions) (string, int, error) { iteration := 0 var finalContent string for iteration < al.maxIterations { iteration++ + logger.DebugCF("agent", "LLM iteration", + map[string]interface{}{ + "iteration": iteration, + "max": al.maxIterations, + }) + + // Build tool definitions toolDefs := al.tools.GetDefinitions() providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) for _, td := range toolDefs { @@ -429,12 +321,12 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe // Log LLM request details logger.DebugCF("agent", "LLM request", map[string]interface{}{ - "iteration": iteration, - "model": al.model, - "messages_count": len(messages), - "tools_count": len(providerToolDefs), - "max_tokens": 8192, - "temperature": 0.7, + "iteration": iteration, + "model": al.model, + "messages_count": len(messages), + "tools_count": len(providerToolDefs), + "max_tokens": 8192, + "temperature": 0.7, "system_prompt_len": len(messages[0].Content), }) @@ -446,30 +338,49 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe "tools_json": formatToolsForLog(providerToolDefs), }) + // Call LLM response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{ "max_tokens": 8192, "temperature": 0.7, }) if err != nil { - logger.ErrorCF("agent", "LLM call failed in system message", + logger.ErrorCF("agent", "LLM call failed", map[string]interface{}{ "iteration": iteration, "error": err.Error(), }) - return "", fmt.Errorf("LLM call failed: %w", err) + return "", iteration, fmt.Errorf("LLM call failed: %w", err) } + // Check if no tool calls - we're done if len(response.ToolCalls) == 0 { finalContent = response.Content + logger.InfoCF("agent", "LLM response without tool calls (direct answer)", + map[string]interface{}{ + "iteration": iteration, + "content_chars": len(finalContent), + }) break } + // Log tool calls + toolNames := make([]string, 0, len(response.ToolCalls)) + for _, tc := range response.ToolCalls { + toolNames = append(toolNames, tc.Name) + } + logger.InfoCF("agent", "LLM requested tool calls", + map[string]interface{}{ + "tools": toolNames, + "count": len(toolNames), + "iteration": iteration, + }) + + // Build assistant message with tool calls assistantMsg := providers.Message{ Role: "assistant", Content: response.Content, } - for _, tc := range response.ToolCalls { argumentsJSON, _ := json.Marshal(tc.Arguments) assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{ @@ -484,10 +395,20 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe messages = append(messages, assistantMsg) // Save assistant message with tool calls to session - al.sessions.AddFullMessage(sessionKey, assistantMsg) + al.sessions.AddFullMessage(opts.SessionKey, assistantMsg) + // Execute tool calls for _, tc := range response.ToolCalls { - result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID) + // Log tool call with arguments preview + argsJSON, _ := json.Marshal(tc.Arguments) + argsPreview := utils.Truncate(string(argsJSON), 200) + logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), + map[string]interface{}{ + "tool": tc.Name, + "iteration": iteration, + }) + + result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, opts.Channel, opts.ChatID) if err != nil { result = fmt.Sprintf("Error: %v", err) } @@ -500,46 +421,41 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe messages = append(messages, toolResultMsg) // Save tool result message to session - al.sessions.AddFullMessage(sessionKey, toolResultMsg) + al.sessions.AddFullMessage(opts.SessionKey, toolResultMsg) } } - if finalContent == "" { - finalContent = "Background task completed." - } - - // Save final assistant message to session - al.sessions.AddMessage(sessionKey, "assistant", finalContent) - al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) - - logger.InfoCF("agent", "System message processing completed", - map[string]interface{}{ - "iterations": iteration, - "final_length": len(finalContent), - }) - - // Send response back to the original channel - al.bus.PublishOutbound(bus.OutboundMessage{ - Channel: originChannel, - ChatID: originChatID, - Content: finalContent, - }) - - return finalContent, nil + return finalContent, iteration, nil } -// truncate returns a truncated version of s with at most maxLen characters. -// If the string is truncated, "..." is appended to indicate truncation. -// If the string fits within maxLen, it is returned unchanged. -func truncate(s string, maxLen int) string { - if len(s) <= maxLen { - return s +// updateToolContexts updates the context for tools that need channel/chatID info. +func (al *AgentLoop) updateToolContexts(channel, chatID string) { + if tool, ok := al.tools.Get("message"); ok { + if mt, ok := tool.(*tools.MessageTool); ok { + mt.SetContext(channel, chatID) + } } - // Reserve 3 chars for "..." - if maxLen <= 3 { - return s[:maxLen] + if tool, ok := al.tools.Get("spawn"); ok { + if st, ok := tool.(*tools.SpawnTool); ok { + st.SetContext(channel, chatID) + } + } +} + +// maybeSummarize triggers summarization if the session history exceeds thresholds. +func (al *AgentLoop) maybeSummarize(sessionKey string) { + newHistory := al.sessions.GetHistory(sessionKey) + tokenEstimate := al.estimateTokens(newHistory) + threshold := al.contextWindow * 75 / 100 + + if len(newHistory) > 20 || tokenEstimate > threshold { + if _, loading := al.summarizing.LoadOrStore(sessionKey, true); !loading { + go func() { + defer al.summarizing.Delete(sessionKey) + al.summarizeSession(sessionKey) + }() + } } - return s[:maxLen-3] + "..." } // GetStartupInfo returns information about loaded tools and skills for logging. @@ -574,12 +490,12 @@ func formatMessagesForLog(messages []providers.Message) string { for _, tc := range msg.ToolCalls { result += fmt.Sprintf(" - ID: %s, Type: %s, Name: %s\n", tc.ID, tc.Type, tc.Name) if tc.Function != nil { - result += fmt.Sprintf(" Arguments: %s\n", truncateString(tc.Function.Arguments, 200)) + result += fmt.Sprintf(" Arguments: %s\n", utils.Truncate(tc.Function.Arguments, 200)) } } } if msg.Content != "" { - content := truncateString(msg.Content, 200) + content := utils.Truncate(msg.Content, 200) result += fmt.Sprintf(" Content: %s\n", content) } if msg.ToolCallID != "" { @@ -603,24 +519,13 @@ func formatToolsForLog(tools []providers.ToolDefinition) string { result += fmt.Sprintf(" [%d] Type: %s, Name: %s\n", i, tool.Type, tool.Function.Name) result += fmt.Sprintf(" Description: %s\n", tool.Function.Description) if len(tool.Function.Parameters) > 0 { - result += fmt.Sprintf(" Parameters: %s\n", truncateString(fmt.Sprintf("%v", tool.Function.Parameters), 200)) + result += fmt.Sprintf(" Parameters: %s\n", utils.Truncate(fmt.Sprintf("%v", tool.Function.Parameters), 200)) } } result += "]" return result } -// truncateString truncates a string to max length -func truncateString(s string, maxLen int) string { - if len(s) <= maxLen { - return s - } - if maxLen <= 3 { - return s[:maxLen] - } - return s[:maxLen-3] + "..." -} - // summarizeSession summarizes the conversation history for a session. func (al *AgentLoop) summarizeSession(sessionKey string) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go index 87aaf35..53570a3 100644 --- a/pkg/tools/cron.go +++ b/pkg/tools/cron.go @@ -8,15 +8,9 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/cron" + "github.com/sipeed/picoclaw/pkg/utils" ) -func truncateString(s string, maxLen int) string { - if len(s) <= maxLen { - return s - } - return s[:maxLen] -} - // JobExecutor is the interface for executing cron jobs through the agent type JobExecutor interface { ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) @@ -171,8 +165,11 @@ func (t *CronTool) addJob(args map[string]interface{}) (string, error) { deliver = d } + // Truncate message for job name (max 30 chars) + messagePreview := utils.Truncate(message, 30) + job, err := t.cronService.AddJob( - truncateString(message, 30), + messagePreview, schedule, message, deliver, From 9ec84c6630a6b28845c4589b2ff8eca54d4aaf57 Mon Sep 17 00:00:00 2001 From: Harshdeep Sharma <24f2006619@ds.study.iitm.ac.in> Date: Wed, 11 Feb 2026 17:39:13 +0530 Subject: [PATCH 5/5] Fix typos and update API keys in README --- README.md | 24 ++++++++++++------------ pkg/channels/base.go | 1 - 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 70e06ac..9778918 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ ### 🐜 Innovative Low-Footprint Deploy PicoClaw can be deployed on almost any Linux device! -- $9.9 [LicheeRV-Nano](https://www.aliexpress.com/item/1005006519668532.html) E(Ethernet) or W(WiFi6) version, for Minimal Home Assitant +- $9.9 [LicheeRV-Nano](https://www.aliexpress.com/item/1005006519668532.html) E(Ethernet) or W(WiFi6) version, for Minimal Home Assistant - $30~50 [NanoKVM](https://www.aliexpress.com/item/1005007369816019.html), or $100 [NanoKVM-Pro](https://www.aliexpress.com/item/1005010048471263.html) for Automated Server Maintenance - $50 [MaixCAM](https://www.aliexpress.com/item/1005008053333693.html) or $100 [MaixCAM2](https://www.kickstarter.com/projects/zepan/maixcam2-build-your-next-gen-4k-ai-camera) for Smart Monitoring @@ -144,7 +144,7 @@ picoclaw onboard "providers": { "openrouter": { "api_key": "xxx", - "api_base": "https://open.bigmodel.cn/api/paas/v4" + "api_base": "https://openrouter.ai/api/v1" } }, "tools": { @@ -165,7 +165,7 @@ picoclaw onboard > **Note**: See `config.example.json` for a complete configuration template. -**3. Chat** +**4. Chat** ```bash picoclaw agent -m "What is 2+2?" @@ -413,17 +413,17 @@ picoclaw agent -m "Hello" }, "providers": { "openrouter": { - "apiKey": "sk-or-v1-xxx" + "api_key": "sk-or-v1-xxx" }, "groq": { - "apiKey": "gsk_xxx" + "api_key": "gsk_xxx" } }, "channels": { "telegram": { "enabled": true, "token": "123456:ABC...", - "allowFrom": ["123456789"] + "allow_from": ["123456789"] }, "discord": { "enabled": true, @@ -435,11 +435,11 @@ picoclaw agent -m "Hello" }, "feishu": { "enabled": false, - "appId": "cli_xxx", - "appSecret": "xxx", - "encryptKey": "", - "verificationToken": "", - "allowFrom": [] + "app_id": "cli_xxx", + "app_secret": "xxx", + "encrypt_key": "", + "verification_token": "", + "allow_from": [] }, "qq": { "enabled": false, @@ -451,7 +451,7 @@ picoclaw agent -m "Hello" "tools": { "web": { "search": { - "apiKey": "BSA..." + "api_key": "BSA..." } } } diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 8abe7b5..3ade400 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -73,7 +73,6 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st SessionKey: sessionKey, Metadata: metadata, } - } c.bus.PublishInbound(msg) }