package cron import ( "encoding/json" "fmt" "os" "path/filepath" "sync" "time" ) type CronSchedule struct { Kind string `json:"kind"` AtMS *int64 `json:"atMs,omitempty"` EveryMS *int64 `json:"everyMs,omitempty"` Expr string `json:"expr,omitempty"` TZ string `json:"tz,omitempty"` } type CronPayload struct { Kind string `json:"kind"` Message string `json:"message"` Deliver bool `json:"deliver"` Channel string `json:"channel,omitempty"` To string `json:"to,omitempty"` } type CronJobState struct { NextRunAtMS *int64 `json:"nextRunAtMs,omitempty"` LastRunAtMS *int64 `json:"lastRunAtMs,omitempty"` LastStatus string `json:"lastStatus,omitempty"` LastError string `json:"lastError,omitempty"` } type CronJob struct { ID string `json:"id"` Name string `json:"name"` Enabled bool `json:"enabled"` Schedule CronSchedule `json:"schedule"` Payload CronPayload `json:"payload"` State CronJobState `json:"state"` CreatedAtMS int64 `json:"createdAtMs"` UpdatedAtMS int64 `json:"updatedAtMs"` DeleteAfterRun bool `json:"deleteAfterRun"` } type CronStore struct { Version int `json:"version"` Jobs []CronJob `json:"jobs"` } type JobHandler func(job *CronJob) (string, error) type CronService struct { storePath string store *CronStore onJob JobHandler mu sync.RWMutex running bool stopChan chan struct{} } func NewCronService(storePath string, onJob JobHandler) *CronService { cs := &CronService{ storePath: storePath, onJob: onJob, stopChan: make(chan struct{}), } cs.loadStore() return cs } func (cs *CronService) Start() error { cs.mu.Lock() defer cs.mu.Unlock() if cs.running { return nil } if err := cs.loadStore(); err != nil { return fmt.Errorf("failed to load store: %w", err) } cs.recomputeNextRuns() if err := cs.saveStore(); err != nil { return fmt.Errorf("failed to save store: %w", err) } cs.running = true go cs.runLoop() return nil } func (cs *CronService) Stop() { cs.mu.Lock() defer cs.mu.Unlock() if !cs.running { return } cs.running = false close(cs.stopChan) } func (cs *CronService) runLoop() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-cs.stopChan: return case <-ticker.C: cs.checkJobs() } } } func (cs *CronService) checkJobs() { cs.mu.RLock() if !cs.running { cs.mu.RUnlock() return } now := time.Now().UnixMilli() var dueJobs []*CronJob for i := range cs.store.Jobs { job := &cs.store.Jobs[i] if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now { dueJobs = append(dueJobs, job) } } cs.mu.RUnlock() for _, job := range dueJobs { cs.executeJob(job) } cs.mu.Lock() defer cs.mu.Unlock() cs.saveStore() } func (cs *CronService) executeJob(job *CronJob) { startTime := time.Now().UnixMilli() var err error if cs.onJob != nil { _, err = cs.onJob(job) } cs.mu.Lock() defer cs.mu.Unlock() job.State.LastRunAtMS = &startTime job.UpdatedAtMS = time.Now().UnixMilli() if err != nil { job.State.LastStatus = "error" job.State.LastError = err.Error() } else { job.State.LastStatus = "ok" job.State.LastError = "" } if job.Schedule.Kind == "at" { if job.DeleteAfterRun { cs.removeJobUnsafe(job.ID) } else { job.Enabled = false job.State.NextRunAtMS = nil } } else { nextRun := cs.computeNextRun(&job.Schedule, time.Now().UnixMilli()) job.State.NextRunAtMS = nextRun } } func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int64 { if schedule.Kind == "at" { if schedule.AtMS != nil && *schedule.AtMS > nowMS { return schedule.AtMS } return nil } if schedule.Kind == "every" { if schedule.EveryMS == nil || *schedule.EveryMS <= 0 { return nil } next := nowMS + *schedule.EveryMS return &next } return nil } func (cs *CronService) recomputeNextRuns() { now := time.Now().UnixMilli() for i := range cs.store.Jobs { job := &cs.store.Jobs[i] if job.Enabled { job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, now) } } } func (cs *CronService) getNextWakeMS() *int64 { var nextWake *int64 for _, job := range cs.store.Jobs { if job.Enabled && job.State.NextRunAtMS != nil { if nextWake == nil || *job.State.NextRunAtMS < *nextWake { nextWake = job.State.NextRunAtMS } } } return nextWake } func (cs *CronService) Load() error { return cs.loadStore() } func (cs *CronService) loadStore() error { cs.store = &CronStore{ Version: 1, Jobs: []CronJob{}, } data, err := os.ReadFile(cs.storePath) if err != nil { if os.IsNotExist(err) { return nil } return err } return json.Unmarshal(data, cs.store) } func (cs *CronService) saveStore() error { dir := filepath.Dir(cs.storePath) if err := os.MkdirAll(dir, 0755); err != nil { return err } data, err := json.MarshalIndent(cs.store, "", " ") if err != nil { return err } return os.WriteFile(cs.storePath, data, 0644) } func (cs *CronService) AddJob(name string, schedule CronSchedule, message string, deliver bool, channel, to string) (*CronJob, error) { cs.mu.Lock() defer cs.mu.Unlock() now := time.Now().UnixMilli() job := CronJob{ ID: generateID(), Name: name, Enabled: true, Schedule: schedule, Payload: CronPayload{ Kind: "agent_turn", Message: message, Deliver: deliver, Channel: channel, To: to, }, State: CronJobState{ NextRunAtMS: cs.computeNextRun(&schedule, now), }, CreatedAtMS: now, UpdatedAtMS: now, DeleteAfterRun: false, } cs.store.Jobs = append(cs.store.Jobs, job) if err := cs.saveStore(); err != nil { return nil, err } return &job, nil } func (cs *CronService) RemoveJob(jobID string) bool { cs.mu.Lock() defer cs.mu.Unlock() return cs.removeJobUnsafe(jobID) } func (cs *CronService) removeJobUnsafe(jobID string) bool { before := len(cs.store.Jobs) var jobs []CronJob for _, job := range cs.store.Jobs { if job.ID != jobID { jobs = append(jobs, job) } } cs.store.Jobs = jobs removed := len(cs.store.Jobs) < before if removed { cs.saveStore() } return removed } func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob { cs.mu.Lock() defer cs.mu.Unlock() for i := range cs.store.Jobs { job := &cs.store.Jobs[i] if job.ID == jobID { job.Enabled = enabled job.UpdatedAtMS = time.Now().UnixMilli() if enabled { job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, time.Now().UnixMilli()) } else { job.State.NextRunAtMS = nil } cs.saveStore() return job } } return nil } func (cs *CronService) ListJobs(includeDisabled bool) []CronJob { cs.mu.RLock() defer cs.mu.RUnlock() if includeDisabled { return cs.store.Jobs } var enabled []CronJob for _, job := range cs.store.Jobs { if job.Enabled { enabled = append(enabled, job) } } return enabled } func (cs *CronService) Status() map[string]interface{} { cs.mu.RLock() defer cs.mu.RUnlock() var enabledCount int for _, job := range cs.store.Jobs { if job.Enabled { enabledCount++ } } return map[string]interface{}{ "enabled": cs.running, "jobs": len(cs.store.Jobs), "nextWakeAtMS": cs.getNextWakeMS(), } } func generateID() string { return fmt.Sprintf("%d", time.Now().UnixNano()) }