fix concurrency and persistence safety in session/cron/heartbeat services

This commit is contained in:
esubaalew
2026-02-11 16:38:19 +03:00
parent d83fb6e081
commit 7fa341c449
5 changed files with 161 additions and 88 deletions

View File

@@ -71,7 +71,6 @@ func NewCronService(storePath string, onJob JobHandler) *CronService {
cs := &CronService{
storePath: storePath,
onJob: onJob,
stopChan: make(chan struct{}),
gronx: gronx.New(),
}
// Initialize and load store on creation
@@ -96,8 +95,9 @@ func (cs *CronService) Start() error {
return fmt.Errorf("failed to save store: %w", err)
}
cs.stopChan = make(chan struct{})
cs.running = true
go cs.runLoop()
go cs.runLoop(cs.stopChan)
return nil
}
@@ -111,16 +111,19 @@ func (cs *CronService) Stop() {
}
cs.running = false
close(cs.stopChan)
if cs.stopChan != nil {
close(cs.stopChan)
cs.stopChan = nil
}
}
func (cs *CronService) runLoop() {
func (cs *CronService) runLoop(stopChan chan struct{}) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-cs.stopChan:
case <-stopChan:
return
case <-ticker.C:
cs.checkJobs()
@@ -137,27 +140,23 @@ func (cs *CronService) checkJobs() {
}
now := time.Now().UnixMilli()
var dueJobs []*CronJob
var dueJobIDs []string
// Collect jobs that are due (we need to copy them to execute outside lock)
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now {
// Create a shallow copy of the job for execution
jobCopy := *job
dueJobs = append(dueJobs, &jobCopy)
dueJobIDs = append(dueJobIDs, job.ID)
}
}
// Update next run times for due jobs immediately (before executing)
// Use map for O(n) lookup instead of O(n²) nested loop
dueMap := make(map[string]bool, len(dueJobs))
for _, job := range dueJobs {
dueMap[job.ID] = true
// Reset next run for due jobs before unlocking to avoid duplicate execution.
dueMap := make(map[string]bool, len(dueJobIDs))
for _, jobID := range dueJobIDs {
dueMap[jobID] = true
}
for i := range cs.store.Jobs {
if dueMap[cs.store.Jobs[i].ID] {
// Reset NextRunAtMS temporarily so we don't re-execute
cs.store.Jobs[i].State.NextRunAtMS = nil
}
}
@@ -168,53 +167,75 @@ func (cs *CronService) checkJobs() {
cs.mu.Unlock()
// Execute jobs outside the lock
for _, job := range dueJobs {
cs.executeJob(job)
// Execute jobs outside lock.
for _, jobID := range dueJobIDs {
cs.executeJobByID(jobID)
}
}
func (cs *CronService) executeJob(job *CronJob) {
func (cs *CronService) executeJobByID(jobID string) {
startTime := time.Now().UnixMilli()
cs.mu.RLock()
var callbackJob *CronJob
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.ID == jobID {
jobCopy := *job
callbackJob = &jobCopy
break
}
}
cs.mu.RUnlock()
if callbackJob == nil {
return
}
var err error
if cs.onJob != nil {
_, err = cs.onJob(job)
_, err = cs.onJob(callbackJob)
}
// Now acquire lock to update state
cs.mu.Lock()
defer cs.mu.Unlock()
// Find the job in store and update it
var job *CronJob
for i := range cs.store.Jobs {
if cs.store.Jobs[i].ID == job.ID {
cs.store.Jobs[i].State.LastRunAtMS = &startTime
cs.store.Jobs[i].UpdatedAtMS = time.Now().UnixMilli()
if err != nil {
cs.store.Jobs[i].State.LastStatus = "error"
cs.store.Jobs[i].State.LastError = err.Error()
} else {
cs.store.Jobs[i].State.LastStatus = "ok"
cs.store.Jobs[i].State.LastError = ""
}
// Compute next run time
if cs.store.Jobs[i].Schedule.Kind == "at" {
if cs.store.Jobs[i].DeleteAfterRun {
cs.removeJobUnsafe(job.ID)
} else {
cs.store.Jobs[i].Enabled = false
cs.store.Jobs[i].State.NextRunAtMS = nil
}
} else {
nextRun := cs.computeNextRun(&cs.store.Jobs[i].Schedule, time.Now().UnixMilli())
cs.store.Jobs[i].State.NextRunAtMS = nextRun
}
if cs.store.Jobs[i].ID == jobID {
job = &cs.store.Jobs[i]
break
}
}
if job == nil {
log.Printf("[cron] job %s disappeared before state update", jobID)
return
}
job.State.LastRunAtMS = &startTime
job.UpdatedAtMS = time.Now().UnixMilli()
if err != nil {
job.State.LastStatus = "error"
job.State.LastError = err.Error()
} else {
job.State.LastStatus = "ok"
job.State.LastError = ""
}
// Compute next run time
if job.Schedule.Kind == "at" {
if job.DeleteAfterRun {
cs.removeJobUnsafe(job.ID)
} else {
job.Enabled = false
job.State.NextRunAtMS = nil
}
} else {
nextRun := cs.computeNextRun(&job.Schedule, time.Now().UnixMilli())
job.State.NextRunAtMS = nextRun
}
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store: %v", err)