From 2d11e33039e71c9a72f3c4b03913bbb3d902eccc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dejan=20Ro=C5=BEi=C4=8D?= Date: Mon, 20 Apr 2026 06:25:12 +0200 Subject: [PATCH] added bug fixes --- main.go | 358 +++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 266 insertions(+), 92 deletions(-) diff --git a/main.go b/main.go index 9188cf1..a02907f 100644 --- a/main.go +++ b/main.go @@ -34,7 +34,7 @@ import ( //go:embed static var staticFiles embed.FS -const version = "0.9.2" +const version = "0.9.3" // --------------------------------------------------------------------------- // Config structs @@ -562,6 +562,7 @@ type AlarmTracker struct { LeftZone string RightZone string ImbZone string + LastChange map[string]time.Time } // --------------------------------------------------------------------------- @@ -634,6 +635,9 @@ var ( // mqttMgr is nil when MQTT is disabled. mqttMgr *mqttManager + + // mqttAlarmCh is non-nil when MQTT is enabled; used to avoid goroutine spam. + mqttAlarmCh chan AlarmEvent ) // --------------------------------------------------------------------------- @@ -904,8 +908,15 @@ func startMQTTPublisher(ctx context.Context) { } } +// mqttAlarmWorker drains the MQTT alarm channel so we don't spawn a goroutine per event. +func mqttAlarmWorker() { + for a := range mqttAlarmCh { + mqttPublishAlarm(a) + } +} + // mqttPublishAlarm forwards a single alarm event to MQTT non-blocking. -// Called from enqueueAlarm; errors are silently discarded to avoid +// Called from mqttAlarmWorker; errors are silently discarded to avoid // blocking the alarm pipeline. func mqttPublishAlarm(a AlarmEvent) { if mqttMgr == nil { @@ -1208,8 +1219,13 @@ func enqueueAlarm(a AlarmEvent) { state.DroppedEvents++ state.Unlock() } - // Forward to MQTT non-blocking; ignore errors. - go mqttPublishAlarm(a) + // Forward to MQTT non-blocking via single worker goroutine. + if mqttAlarmCh != nil { + select { + case mqttAlarmCh <- a: + default: + } + } } // --------------------------------------------------------------------------- @@ -1255,9 +1271,10 @@ func initDatabase(dbPath string, dbCfg DBConfig) (*sql.DB, error) { return nil, fmt.Errorf("open sqlite: %w", err) } - database.SetMaxOpenConns(1) - database.SetMaxIdleConns(1) - database.SetConnMaxLifetime(0) + // FIX: allow parallel reads (writes still serialize safely via WAL). + database.SetMaxOpenConns(4) + database.SetMaxIdleConns(2) + database.SetConnMaxLifetime(time.Hour) pragmas := []string{ "PRAGMA journal_mode=WAL;", @@ -1309,6 +1326,12 @@ CREATE INDEX IF NOT EXISTS idx_alarm_events_ts_unix_ns ON alarm_events(ts_unix_n return nil, fmt.Errorf("create schema: %w", err) } + // FIX: covering index for trend queries (sum_pct, imbalance_pct by time). + if _, err := database.Exec(`CREATE INDEX IF NOT EXISTS idx_samples_trend ON samples(ts_unix_ns, sum_pct, imbalance_pct);`); err != nil { + _ = database.Close() + return nil, fmt.Errorf("create trend index: %w", err) + } + // Schema migrations: add missing columns to support older databases. migrations := []struct{ table, col, def string }{ {"samples", "ts_unix_ns", "INTEGER NOT NULL DEFAULT 0"}, @@ -1337,6 +1360,11 @@ CREATE INDEX IF NOT EXISTS idx_alarm_events_ts_unix_ns ON alarm_events(ts_unix_n } } + // FIX: update query planner stats so indexes are actually used. + if _, err := database.Exec("ANALYZE"); err != nil { + log.Printf("warning: sqlite analyze failed: %v", err) + } + return database, nil } @@ -1348,7 +1376,20 @@ func startDBWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int ticker := time.NewTicker(time.Duration(flushMs) * time.Millisecond) defer ticker.Stop() + stmt, err := database.Prepare(` + INSERT INTO samples ( + ts, ts_unix_ns, sila_l_pct, sila_r_pct, sila_l_kn, sila_r_kn, + sum_pct, sum_kn, imbalance_pct, bias_pct + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `) + if err != nil { + log.Printf("db writer prepare failed: %v", err) + return + } + defer stmt.Close() + batch := make([]Sample, 0, batchSize) + flushErrCount := 0 flush := func() { if len(batch) == 0 { @@ -1357,22 +1398,19 @@ func startDBWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int tx, err := database.Begin() if err != nil { log.Printf("db begin failed: %v", err) + flushErrCount++ + if flushErrCount >= 3 { + log.Printf("db writer: dropping batch of %d after %d failures", len(batch), flushErrCount) + batch = batch[:0] + flushErrCount = 0 + } return } - stmt, err := tx.Prepare(` - INSERT INTO samples ( - ts, ts_unix_ns, sila_l_pct, sila_r_pct, sila_l_kn, sila_r_kn, - sum_pct, sum_kn, imbalance_pct, bias_pct - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `) - if err != nil { - _ = tx.Rollback() - log.Printf("db prepare failed: %v", err) - return - } + txStmt := tx.Stmt(stmt) + defer txStmt.Close() ok := true for _, s := range batch { - if _, err := stmt.Exec( + if _, err := txStmt.Exec( s.TS.UTC().Format(time.RFC3339Nano), s.TS.UTC().UnixNano(), s.SilaLPct, s.SilaRPct, s.SilaLKN, s.SilaRKN, s.SumPercent, s.SumKN, s.ImbalancePercent, s.BiasPercent, @@ -1382,26 +1420,44 @@ func startDBWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int break } } - _ = stmt.Close() if !ok { _ = tx.Rollback() + flushErrCount++ + if flushErrCount >= 3 { + log.Printf("db writer: dropping batch of %d after %d failures", len(batch), flushErrCount) + batch = batch[:0] + flushErrCount = 0 + } return } if err := tx.Commit(); err != nil { log.Printf("db commit failed: %v", err) + flushErrCount++ + if flushErrCount >= 3 { + log.Printf("db writer: dropping batch of %d after %d failures", len(batch), flushErrCount) + batch = batch[:0] + flushErrCount = 0 + } return } batch = batch[:0] + flushErrCount = 0 } for { select { case <-ctx.Done(): - // Drain remaining samples before exit. + drained := 0 for { select { case s := <-sampleCh: batch = append(batch, s) + drained++ + if drained > 10000 { + log.Printf("db writer: drain limit reached, dropping remaining") + flush() + return + } default: flush() return @@ -1422,7 +1478,19 @@ func startAlarmWriter(ctx context.Context, database *sql.DB, batchSize, flushMs ticker := time.NewTicker(time.Duration(flushMs) * time.Millisecond) defer ticker.Stop() + stmt, err := database.Prepare(` + INSERT INTO alarm_events ( + ts, ts_unix_ns, severity, source, code, state, message, value, limit_value + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `) + if err != nil { + log.Printf("alarm db writer prepare failed: %v", err) + return + } + defer stmt.Close() + batch := make([]AlarmEvent, 0, batchSize) + flushErrCount := 0 flush := func() { if len(batch) == 0 { @@ -1431,21 +1499,19 @@ func startAlarmWriter(ctx context.Context, database *sql.DB, batchSize, flushMs tx, err := database.Begin() if err != nil { log.Printf("alarm db begin failed: %v", err) + flushErrCount++ + if flushErrCount >= 3 { + log.Printf("alarm writer: dropping batch of %d after %d failures", len(batch), flushErrCount) + batch = batch[:0] + flushErrCount = 0 + } return } - stmt, err := tx.Prepare(` - INSERT INTO alarm_events ( - ts, ts_unix_ns, severity, source, code, state, message, value, limit_value - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - `) - if err != nil { - _ = tx.Rollback() - log.Printf("alarm db prepare failed: %v", err) - return - } + txStmt := tx.Stmt(stmt) + defer txStmt.Close() ok := true for _, a := range batch { - if _, err := stmt.Exec( + if _, err := txStmt.Exec( a.TS.UTC().Format(time.RFC3339Nano), a.TS.UTC().UnixNano(), a.Severity, a.Source, a.Code, a.State, a.Message, a.Value, a.Limit, ); err != nil { @@ -1454,25 +1520,44 @@ func startAlarmWriter(ctx context.Context, database *sql.DB, batchSize, flushMs break } } - _ = stmt.Close() if !ok { _ = tx.Rollback() + flushErrCount++ + if flushErrCount >= 3 { + log.Printf("alarm writer: dropping batch of %d after %d failures", len(batch), flushErrCount) + batch = batch[:0] + flushErrCount = 0 + } return } if err := tx.Commit(); err != nil { log.Printf("alarm db commit failed: %v", err) + flushErrCount++ + if flushErrCount >= 3 { + log.Printf("alarm writer: dropping batch of %d after %d failures", len(batch), flushErrCount) + batch = batch[:0] + flushErrCount = 0 + } return } batch = batch[:0] + flushErrCount = 0 } for { select { case <-ctx.Done(): + drained := 0 for { select { case a := <-alarmCh: batch = append(batch, a) + drained++ + if drained > 10000 { + log.Printf("alarm writer: drain limit reached, dropping remaining") + flush() + return + } default: flush() return @@ -1497,14 +1582,51 @@ func startDBCleanup(ctx context.Context, database *sql.DB, retentionDays, interv ticker := time.NewTicker(time.Duration(intervalHr) * time.Hour) defer ticker.Stop() + // Replace the cleanup function in startDBCleanup with this: cleanup := func() { cutoffNs := time.Now().AddDate(0, 0, -retentionDays).UTC().UnixNano() for _, tbl := range []string{"samples", "alarm_events"} { - if _, err := database.Exec( - fmt.Sprintf(`DELETE FROM %s WHERE ts_unix_ns > 0 AND ts_unix_ns < ?`, tbl), - cutoffNs, - ); err != nil { - log.Printf("db cleanup %s failed: %v", tbl, err) + for { + // Select up to 5000 rowids to delete (SQLite supports LIMIT in SELECT) + rows, err := database.QueryContext(ctx, + fmt.Sprintf(`SELECT rowid FROM %s WHERE ts_unix_ns > 0 AND ts_unix_ns < ? LIMIT 5000`, tbl), + cutoffNs, + ) + if err != nil { + log.Printf("db cleanup %s select failed: %v", tbl, err) + break + } + var rowids []int64 + for rows.Next() { + var rid int64 + if err := rows.Scan(&rid); err != nil { + log.Printf("db cleanup %s scan failed: %v", tbl, err) + break + } + rowids = append(rowids, rid) + } + rows.Close() + if err := rows.Err(); err != nil { + log.Printf("db cleanup %s rows error: %v", tbl, err) + break + } + + if len(rowids) == 0 { + break // Nothing more to delete + } + + // Build DELETE IN (...) query + placeholders := make([]string, len(rowids)) + args := make([]any, len(rowids)) + for i, rid := range rowids { + placeholders[i] = "?" + args[i] = rid + } + query := fmt.Sprintf(`DELETE FROM %s WHERE rowid IN (%s)`, tbl, strings.Join(placeholders, ",")) + if _, err := database.ExecContext(ctx, query, args...); err != nil { + log.Printf("db cleanup %s delete failed: %v", tbl, err) + break + } } } } @@ -1579,6 +1701,19 @@ func maybeLogZoneChange(source, prev, curr string, value float64) { if prev == "" && curr == "ok" { return } + + // FIX: 5-second cooldown per source to prevent oscillation spam. + alarmTracker.Lock() + if alarmTracker.LastChange == nil { + alarmTracker.LastChange = make(map[string]time.Time) + } + if last, ok := alarmTracker.LastChange[source]; ok && now.Sub(last) < 5*time.Second { + alarmTracker.Unlock() + return + } + alarmTracker.LastChange[source] = now + alarmTracker.Unlock() + switch curr { case "ok": enqueueAlarm(AlarmEvent{ @@ -1625,10 +1760,16 @@ func maybeLogZoneChange(source, prev, curr string, value float64) { } func evaluateProcessAlarms(s Sample) { + // FIX: snapshot thresholds once to avoid repeated locking. config := getConfigSnapshot() - leftZone := zoneFromValue(float64(s.SilaLPct), config.Thresholds.WarningPercent, config.Thresholds.CriticalPercent) - rightZone := zoneFromValue(float64(s.SilaRPct), config.Thresholds.WarningPercent, config.Thresholds.CriticalPercent) - imbZone := zoneFromValue(float64(s.ImbalancePercent), config.Thresholds.ImbalanceWarningPercent, config.Thresholds.ImbalanceCriticalPercent) + warn := config.Thresholds.WarningPercent + crit := config.Thresholds.CriticalPercent + imbWarn := config.Thresholds.ImbalanceWarningPercent + imbCrit := config.Thresholds.ImbalanceCriticalPercent + + leftZone := zoneFromValue(float64(s.SilaLPct), warn, crit) + rightZone := zoneFromValue(float64(s.SilaRPct), warn, crit) + imbZone := zoneFromValue(float64(s.ImbalancePercent), imbWarn, imbCrit) alarmTracker.Lock() defer alarmTracker.Unlock() @@ -1689,6 +1830,10 @@ func startPLCPoller(ctx context.Context) { pollInterval := time.Duration(bootCfg.PLC.PollMs) * time.Millisecond reconnectDelay := time.Duration(bootCfg.PLC.ReconnectDelaySec) * time.Second + // FIX: cache frequently accessed config values to avoid lock contention. + maxTonnage := bootCfg.Press.MaxTonnage + dbNum := bootCfg.PLC.DBNum + for { select { case <-ctx.Done(): @@ -1717,6 +1862,7 @@ func startPLCPoller(ctx context.Context) { log.Println("PLC connected successfully") buf := make([]byte, 8) + readErrCount := 0 for { select { @@ -1726,18 +1872,30 @@ func startPLCPoller(ctx context.Context) { default: } - if err := client.AGReadDB(bootCfg.PLC.DBNum, 0, 8, buf); err != nil { + if err := client.AGReadDB(dbNum, 0, 8, buf); err != nil { + readErrCount++ + if readErrCount < 3 { + log.Printf("PLC read error (attempt %d/3): %v", readErrCount, err) + select { + case <-ctx.Done(): + _ = handler.Close() + return + case <-time.After(pollInterval): + } + continue + } log.Printf("PLC read error: %v - reconnecting...", err) markDisconnected(err.Error()) _ = handler.Close() break } + readErrCount = 0 var helper gos7.Helper silaL := helper.GetRealAt(buf, 0) silaR := helper.GetRealAt(buf, 4) - leftKN, rightKN, sumPercent, sumKN := calculateForces(silaL, silaR, getConfigSnapshot().Press.MaxTonnage) + leftKN, rightKN, sumPercent, sumKN := calculateForces(silaL, silaR, maxTonnage) imbalance := float32(math.Abs(float64(silaL - silaR))) bias := silaL - silaR now := time.Now() @@ -1806,10 +1964,13 @@ func formatHistoryLabel(t time.Time, window time.Duration) string { return local.Format("15:04:05.000") } -func queryHistory(window time.Duration) ([]HistoryPoint, error) { +// FIX: use ts_unix_ns directly instead of parsing RFC3339 strings. +func queryHistory(ctx context.Context, window time.Duration) ([]HistoryPoint, error) { cutoffNs := time.Now().Add(-window).UTC().UnixNano() - rows, err := db.Query(`SELECT ts, sila_l_pct, sila_r_pct FROM samples WHERE ts_unix_ns >= ? ORDER BY ts_unix_ns ASC`, cutoffNs) + rows, err := db.QueryContext(ctx, + `SELECT ts_unix_ns, sila_l_pct, sila_r_pct FROM samples WHERE ts_unix_ns >= ? ORDER BY ts_unix_ns ASC`, + cutoffNs) if err != nil { return nil, err } @@ -1817,15 +1978,12 @@ func queryHistory(window time.Duration) ([]HistoryPoint, error) { points := make([]HistoryPoint, 0, 1024) for rows.Next() { - var ts string + var tsUnix int64 var l, r float64 - if err := rows.Scan(&ts, &l, &r); err != nil { + if err := rows.Scan(&tsUnix, &l, &r); err != nil { return nil, err } - t, err := time.Parse(time.RFC3339Nano, ts) - if err != nil { - continue - } + t := time.Unix(0, tsUnix).Local() points = append(points, HistoryPoint{ Time: formatHistoryLabel(t, window), SilaL: float32(l), @@ -1843,32 +2001,22 @@ func queryHistory(window time.Duration) ([]HistoryPoint, error) { return downsamplePoints(points, maxPts), nil } +// FIX: remove unnecessary map allocation; indices are monotonic. func downsamplePoints(points []HistoryPoint, max int) []HistoryPoint { if len(points) <= max || max < 3 { return points } - out := make([]HistoryPoint, 0, max) + out := make([]HistoryPoint, max) step := float64(len(points)-1) / float64(max-1) - used := make(map[int]struct{}, max) - for i := 0; i < max; i++ { idx := int(float64(i) * step) if idx >= len(points) { idx = len(points) - 1 } - if _, ok := used[idx]; ok { - continue - } - used[idx] = struct{}{} - out = append(out, points[idx]) + out[i] = points[idx] } - - if len(out) == 0 { - return points - } - // Always ensure last point is the most recent. - out[len(out)-1] = points[len(points)-1] + out[max-1] = points[len(points)-1] return out } @@ -1883,7 +2031,8 @@ func validField(field string) (string, error) { } } -func queryNumericStats(field string, fromNs, toNs int64) (NumericStats, error) { +// FIX: accept context so queries cancel when HTTP client disconnects. +func queryNumericStats(ctx context.Context, field string, fromNs, toNs int64) (NumericStats, error) { safeField, err := validField(field) if err != nil { return NumericStats{}, err @@ -1900,7 +2049,7 @@ func queryNumericStats(field string, fromNs, toNs int64) (NumericStats, error) { `, safeField) var stats NumericStats - err = db.QueryRow(query, fromNs, toNs).Scan(&stats.Avg, &stats.AvgSq, &stats.Min, &stats.Max, &stats.Count) + err = db.QueryRowContext(ctx, query, fromNs, toNs).Scan(&stats.Avg, &stats.AvgSq, &stats.Min, &stats.Max, &stats.Count) if err != nil { return NumericStats{}, err } @@ -1957,41 +2106,60 @@ func classifyProcessStability(forceStd, imbStd, forceDelta, avgImb5m float64, sa return "stable", "Process variation is low" } -func buildTrendResponse(window time.Duration, label string) (TrendResponse, error) { +// FIX: 1-second response cache so overlapping UI polls don't hammer the DB. +var ( + trendCache atomic.Value // stores *trendCacheEntry + trendCacheTime int64 // atomic +) + +type trendCacheEntry struct { + Window time.Duration + Resp TrendResponse +} + +// FIX: accept context and use cache. +func buildTrendResponse(ctx context.Context, window time.Duration, label string) (TrendResponse, error) { + now := time.Now().UnixMilli() + if cached, ok := trendCache.Load().(*trendCacheEntry); ok { + if cached.Window == window && now-atomic.LoadInt64(&trendCacheTime) < 1000 { + return cached.Resp, nil + } + } + nowNs := time.Now().UTC().UnixNano() windowNs := window.Nanoseconds() startNs := nowNs - windowNs midNs := startNs + (windowNs / 2) - force5m, err := queryNumericStats("sum_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs) + force5m, err := queryNumericStats(ctx, "sum_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs) if err != nil { return TrendResponse{}, err } - force1h, err := queryNumericStats("sum_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs) + force1h, err := queryNumericStats(ctx, "sum_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs) if err != nil { return TrendResponse{}, err } - imb5m, err := queryNumericStats("imbalance_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs) + imb5m, err := queryNumericStats(ctx, "imbalance_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs) if err != nil { return TrendResponse{}, err } - imb1h, err := queryNumericStats("imbalance_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs) + imb1h, err := queryNumericStats(ctx, "imbalance_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs) if err != nil { return TrendResponse{}, err } - forceOld, err := queryNumericStats("sum_pct", startNs, midNs) + forceOld, err := queryNumericStats(ctx, "sum_pct", startNs, midNs) if err != nil { return TrendResponse{}, err } - forceNew, err := queryNumericStats("sum_pct", midNs, nowNs) + forceNew, err := queryNumericStats(ctx, "sum_pct", midNs, nowNs) if err != nil { return TrendResponse{}, err } - imbOld, err := queryNumericStats("imbalance_pct", startNs, midNs) + imbOld, err := queryNumericStats(ctx, "imbalance_pct", startNs, midNs) if err != nil { return TrendResponse{}, err } - imbNew, err := queryNumericStats("imbalance_pct", midNs, nowNs) + imbNew, err := queryNumericStats(ctx, "imbalance_pct", midNs, nowNs) if err != nil { return TrendResponse{}, err } @@ -2002,11 +2170,11 @@ func buildTrendResponse(window time.Duration, label string) (TrendResponse, erro forceDirection := classifyDirection(forceDelta, forceOld.Count, forceNew.Count, 1.0, "rising", "falling") imbDirection := classifyDirection(imbDelta, imbOld.Count, imbNew.Count, 0.5, "worsening", "improving") - fullWindowForce, err := queryNumericStats("sum_pct", startNs, nowNs) + fullWindowForce, err := queryNumericStats(ctx, "sum_pct", startNs, nowNs) if err != nil { return TrendResponse{}, err } - fullWindowImb, err := queryNumericStats("imbalance_pct", startNs, nowNs) + fullWindowImb, err := queryNumericStats(ctx, "imbalance_pct", startNs, nowNs) if err != nil { return TrendResponse{}, err } @@ -2016,7 +2184,7 @@ func buildTrendResponse(window time.Duration, label string) (TrendResponse, erro forceDelta, imb5m.Avg, fullWindowForce.Count, ) - return TrendResponse{ + resp := TrendResponse{ Window: label, AvgPeak5m: float32(force5m.Avg), AvgPeak1h: float32(force1h.Avg), @@ -2033,18 +2201,23 @@ func buildTrendResponse(window time.Duration, label string) (TrendResponse, erro SampleCount: fullWindowForce.Count, OldHalfCount: forceOld.Count, NewHalfCount: forceNew.Count, - }, nil + } + + trendCache.Store(&trendCacheEntry{Window: window, Resp: resp}) + atomic.StoreInt64(&trendCacheTime, now) + return resp, nil } -func queryAlarmEvents(limit int) ([]AlarmEventAPI, error) { +// FIX: use ts_unix_ns directly and accept context. +func queryAlarmEvents(ctx context.Context, limit int) ([]AlarmEventAPI, error) { if limit <= 0 { limit = 20 } if limit > 100 { limit = 100 } - rows, err := db.Query(` - SELECT ts, severity, source, state, message, value, limit_value + rows, err := db.QueryContext(ctx, ` + SELECT ts_unix_ns, severity, source, state, message, value, limit_value FROM alarm_events ORDER BY ts_unix_ns DESC LIMIT ? @@ -2056,15 +2229,13 @@ func queryAlarmEvents(limit int) ([]AlarmEventAPI, error) { events := make([]AlarmEventAPI, 0, limit) for rows.Next() { - var ts, severity, source, state, message string + var tsUnix int64 + var severity, source, state, message string var value, limitValue float64 - if err := rows.Scan(&ts, &severity, &source, &state, &message, &value, &limitValue); err != nil { + if err := rows.Scan(&tsUnix, &severity, &source, &state, &message, &value, &limitValue); err != nil { return nil, err } - displayTime := ts - if t, err := time.Parse(time.RFC3339Nano, ts); err == nil { - displayTime = t.Local().Format("02.01.2006 15:04:05") - } + displayTime := time.Unix(0, tsUnix).Local().Format("02.01.2006 15:04:05") events = append(events, AlarmEventAPI{ Time: displayTime, Severity: severity, Source: source, State: state, Message: message, Value: value, Limit: limitValue, @@ -2132,7 +2303,7 @@ func apiHistory(w http.ResponseWriter, r *http.Request) { http.Error(w, `{"error":"invalid window"}`, http.StatusBadRequest) return } - points, err := queryHistory(window) + points, err := queryHistory(r.Context(), window) if err != nil { log.Printf("history query failed: %v", err) http.Error(w, `{"error":"history query failed"}`, http.StatusInternalServerError) @@ -2150,7 +2321,7 @@ func apiTrend(w http.ResponseWriter, r *http.Request) { http.Error(w, `{"error":"invalid trend window"}`, http.StatusBadRequest) return } - resp, err := buildTrendResponse(window, label) + resp, err := buildTrendResponse(r.Context(), window, label) if err != nil { log.Printf("trend query failed: %v", err) http.Error(w, `{"error":"trend query failed"}`, http.StatusInternalServerError) @@ -2169,7 +2340,7 @@ func apiAlarms(w http.ResponseWriter, r *http.Request) { limit = n } } - events, err := queryAlarmEvents(limit) + events, err := queryAlarmEvents(r.Context(), limit) if err != nil { log.Printf("alarm query failed: %v", err) http.Error(w, `{"error":"alarm query failed"}`, http.StatusInternalServerError) @@ -2184,6 +2355,7 @@ func serveUI(w http.ResponseWriter, r *http.Request) { cfgMu.RUnlock() w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Header().Set("Cache-Control", "no-cache, max-age=0") _, _ = w.Write(payload) } @@ -2371,6 +2543,8 @@ func main() { // Initialise MQTT if enabled. if cfg.MQTT.Enabled { mqttMgr = newMQTTManager(cfg.MQTT) + mqttAlarmCh = make(chan AlarmEvent, 256) + go mqttAlarmWorker() if err := mqttMgr.connect(); err != nil { // Non-fatal: paho will reconnect automatically. log.Printf("MQTT initial connect failed (will retry): %v", err)