added bug fixes
This commit is contained in:
parent
79e5a0e269
commit
2d11e33039
356
main.go
356
main.go
|
|
@ -34,7 +34,7 @@ import (
|
|||
//go:embed static
|
||||
var staticFiles embed.FS
|
||||
|
||||
const version = "0.9.2"
|
||||
const version = "0.9.3"
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Config structs
|
||||
|
|
@ -562,6 +562,7 @@ type AlarmTracker struct {
|
|||
LeftZone string
|
||||
RightZone string
|
||||
ImbZone string
|
||||
LastChange map[string]time.Time
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -634,6 +635,9 @@ var (
|
|||
|
||||
// mqttMgr is nil when MQTT is disabled.
|
||||
mqttMgr *mqttManager
|
||||
|
||||
// mqttAlarmCh is non-nil when MQTT is enabled; used to avoid goroutine spam.
|
||||
mqttAlarmCh chan AlarmEvent
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -904,8 +908,15 @@ func startMQTTPublisher(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
// mqttAlarmWorker drains the MQTT alarm channel so we don't spawn a goroutine per event.
|
||||
func mqttAlarmWorker() {
|
||||
for a := range mqttAlarmCh {
|
||||
mqttPublishAlarm(a)
|
||||
}
|
||||
}
|
||||
|
||||
// mqttPublishAlarm forwards a single alarm event to MQTT non-blocking.
|
||||
// Called from enqueueAlarm; errors are silently discarded to avoid
|
||||
// Called from mqttAlarmWorker; errors are silently discarded to avoid
|
||||
// blocking the alarm pipeline.
|
||||
func mqttPublishAlarm(a AlarmEvent) {
|
||||
if mqttMgr == nil {
|
||||
|
|
@ -1208,8 +1219,13 @@ func enqueueAlarm(a AlarmEvent) {
|
|||
state.DroppedEvents++
|
||||
state.Unlock()
|
||||
}
|
||||
// Forward to MQTT non-blocking; ignore errors.
|
||||
go mqttPublishAlarm(a)
|
||||
// Forward to MQTT non-blocking via single worker goroutine.
|
||||
if mqttAlarmCh != nil {
|
||||
select {
|
||||
case mqttAlarmCh <- a:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -1255,9 +1271,10 @@ func initDatabase(dbPath string, dbCfg DBConfig) (*sql.DB, error) {
|
|||
return nil, fmt.Errorf("open sqlite: %w", err)
|
||||
}
|
||||
|
||||
database.SetMaxOpenConns(1)
|
||||
database.SetMaxIdleConns(1)
|
||||
database.SetConnMaxLifetime(0)
|
||||
// FIX: allow parallel reads (writes still serialize safely via WAL).
|
||||
database.SetMaxOpenConns(4)
|
||||
database.SetMaxIdleConns(2)
|
||||
database.SetConnMaxLifetime(time.Hour)
|
||||
|
||||
pragmas := []string{
|
||||
"PRAGMA journal_mode=WAL;",
|
||||
|
|
@ -1309,6 +1326,12 @@ CREATE INDEX IF NOT EXISTS idx_alarm_events_ts_unix_ns ON alarm_events(ts_unix_n
|
|||
return nil, fmt.Errorf("create schema: %w", err)
|
||||
}
|
||||
|
||||
// FIX: covering index for trend queries (sum_pct, imbalance_pct by time).
|
||||
if _, err := database.Exec(`CREATE INDEX IF NOT EXISTS idx_samples_trend ON samples(ts_unix_ns, sum_pct, imbalance_pct);`); err != nil {
|
||||
_ = database.Close()
|
||||
return nil, fmt.Errorf("create trend index: %w", err)
|
||||
}
|
||||
|
||||
// Schema migrations: add missing columns to support older databases.
|
||||
migrations := []struct{ table, col, def string }{
|
||||
{"samples", "ts_unix_ns", "INTEGER NOT NULL DEFAULT 0"},
|
||||
|
|
@ -1337,6 +1360,11 @@ CREATE INDEX IF NOT EXISTS idx_alarm_events_ts_unix_ns ON alarm_events(ts_unix_n
|
|||
}
|
||||
}
|
||||
|
||||
// FIX: update query planner stats so indexes are actually used.
|
||||
if _, err := database.Exec("ANALYZE"); err != nil {
|
||||
log.Printf("warning: sqlite analyze failed: %v", err)
|
||||
}
|
||||
|
||||
return database, nil
|
||||
}
|
||||
|
||||
|
|
@ -1348,7 +1376,20 @@ func startDBWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int
|
|||
ticker := time.NewTicker(time.Duration(flushMs) * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
stmt, err := database.Prepare(`
|
||||
INSERT INTO samples (
|
||||
ts, ts_unix_ns, sila_l_pct, sila_r_pct, sila_l_kn, sila_r_kn,
|
||||
sum_pct, sum_kn, imbalance_pct, bias_pct
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
log.Printf("db writer prepare failed: %v", err)
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
batch := make([]Sample, 0, batchSize)
|
||||
flushErrCount := 0
|
||||
|
||||
flush := func() {
|
||||
if len(batch) == 0 {
|
||||
|
|
@ -1357,22 +1398,19 @@ func startDBWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int
|
|||
tx, err := database.Begin()
|
||||
if err != nil {
|
||||
log.Printf("db begin failed: %v", err)
|
||||
flushErrCount++
|
||||
if flushErrCount >= 3 {
|
||||
log.Printf("db writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
|
||||
batch = batch[:0]
|
||||
flushErrCount = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
stmt, err := tx.Prepare(`
|
||||
INSERT INTO samples (
|
||||
ts, ts_unix_ns, sila_l_pct, sila_r_pct, sila_l_kn, sila_r_kn,
|
||||
sum_pct, sum_kn, imbalance_pct, bias_pct
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
log.Printf("db prepare failed: %v", err)
|
||||
return
|
||||
}
|
||||
txStmt := tx.Stmt(stmt)
|
||||
defer txStmt.Close()
|
||||
ok := true
|
||||
for _, s := range batch {
|
||||
if _, err := stmt.Exec(
|
||||
if _, err := txStmt.Exec(
|
||||
s.TS.UTC().Format(time.RFC3339Nano), s.TS.UTC().UnixNano(),
|
||||
s.SilaLPct, s.SilaRPct, s.SilaLKN, s.SilaRKN,
|
||||
s.SumPercent, s.SumKN, s.ImbalancePercent, s.BiasPercent,
|
||||
|
|
@ -1382,26 +1420,44 @@ func startDBWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int
|
|||
break
|
||||
}
|
||||
}
|
||||
_ = stmt.Close()
|
||||
if !ok {
|
||||
_ = tx.Rollback()
|
||||
flushErrCount++
|
||||
if flushErrCount >= 3 {
|
||||
log.Printf("db writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
|
||||
batch = batch[:0]
|
||||
flushErrCount = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Printf("db commit failed: %v", err)
|
||||
flushErrCount++
|
||||
if flushErrCount >= 3 {
|
||||
log.Printf("db writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
|
||||
batch = batch[:0]
|
||||
flushErrCount = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
batch = batch[:0]
|
||||
flushErrCount = 0
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Drain remaining samples before exit.
|
||||
drained := 0
|
||||
for {
|
||||
select {
|
||||
case s := <-sampleCh:
|
||||
batch = append(batch, s)
|
||||
drained++
|
||||
if drained > 10000 {
|
||||
log.Printf("db writer: drain limit reached, dropping remaining")
|
||||
flush()
|
||||
return
|
||||
}
|
||||
default:
|
||||
flush()
|
||||
return
|
||||
|
|
@ -1422,7 +1478,19 @@ func startAlarmWriter(ctx context.Context, database *sql.DB, batchSize, flushMs
|
|||
ticker := time.NewTicker(time.Duration(flushMs) * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
stmt, err := database.Prepare(`
|
||||
INSERT INTO alarm_events (
|
||||
ts, ts_unix_ns, severity, source, code, state, message, value, limit_value
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
log.Printf("alarm db writer prepare failed: %v", err)
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
batch := make([]AlarmEvent, 0, batchSize)
|
||||
flushErrCount := 0
|
||||
|
||||
flush := func() {
|
||||
if len(batch) == 0 {
|
||||
|
|
@ -1431,21 +1499,19 @@ func startAlarmWriter(ctx context.Context, database *sql.DB, batchSize, flushMs
|
|||
tx, err := database.Begin()
|
||||
if err != nil {
|
||||
log.Printf("alarm db begin failed: %v", err)
|
||||
flushErrCount++
|
||||
if flushErrCount >= 3 {
|
||||
log.Printf("alarm writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
|
||||
batch = batch[:0]
|
||||
flushErrCount = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
stmt, err := tx.Prepare(`
|
||||
INSERT INTO alarm_events (
|
||||
ts, ts_unix_ns, severity, source, code, state, message, value, limit_value
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
log.Printf("alarm db prepare failed: %v", err)
|
||||
return
|
||||
}
|
||||
txStmt := tx.Stmt(stmt)
|
||||
defer txStmt.Close()
|
||||
ok := true
|
||||
for _, a := range batch {
|
||||
if _, err := stmt.Exec(
|
||||
if _, err := txStmt.Exec(
|
||||
a.TS.UTC().Format(time.RFC3339Nano), a.TS.UTC().UnixNano(),
|
||||
a.Severity, a.Source, a.Code, a.State, a.Message, a.Value, a.Limit,
|
||||
); err != nil {
|
||||
|
|
@ -1454,25 +1520,44 @@ func startAlarmWriter(ctx context.Context, database *sql.DB, batchSize, flushMs
|
|||
break
|
||||
}
|
||||
}
|
||||
_ = stmt.Close()
|
||||
if !ok {
|
||||
_ = tx.Rollback()
|
||||
flushErrCount++
|
||||
if flushErrCount >= 3 {
|
||||
log.Printf("alarm writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
|
||||
batch = batch[:0]
|
||||
flushErrCount = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Printf("alarm db commit failed: %v", err)
|
||||
flushErrCount++
|
||||
if flushErrCount >= 3 {
|
||||
log.Printf("alarm writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
|
||||
batch = batch[:0]
|
||||
flushErrCount = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
batch = batch[:0]
|
||||
flushErrCount = 0
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
drained := 0
|
||||
for {
|
||||
select {
|
||||
case a := <-alarmCh:
|
||||
batch = append(batch, a)
|
||||
drained++
|
||||
if drained > 10000 {
|
||||
log.Printf("alarm writer: drain limit reached, dropping remaining")
|
||||
flush()
|
||||
return
|
||||
}
|
||||
default:
|
||||
flush()
|
||||
return
|
||||
|
|
@ -1497,14 +1582,51 @@ func startDBCleanup(ctx context.Context, database *sql.DB, retentionDays, interv
|
|||
ticker := time.NewTicker(time.Duration(intervalHr) * time.Hour)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Replace the cleanup function in startDBCleanup with this:
|
||||
cleanup := func() {
|
||||
cutoffNs := time.Now().AddDate(0, 0, -retentionDays).UTC().UnixNano()
|
||||
for _, tbl := range []string{"samples", "alarm_events"} {
|
||||
if _, err := database.Exec(
|
||||
fmt.Sprintf(`DELETE FROM %s WHERE ts_unix_ns > 0 AND ts_unix_ns < ?`, tbl),
|
||||
for {
|
||||
// Select up to 5000 rowids to delete (SQLite supports LIMIT in SELECT)
|
||||
rows, err := database.QueryContext(ctx,
|
||||
fmt.Sprintf(`SELECT rowid FROM %s WHERE ts_unix_ns > 0 AND ts_unix_ns < ? LIMIT 5000`, tbl),
|
||||
cutoffNs,
|
||||
); err != nil {
|
||||
log.Printf("db cleanup %s failed: %v", tbl, err)
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("db cleanup %s select failed: %v", tbl, err)
|
||||
break
|
||||
}
|
||||
var rowids []int64
|
||||
for rows.Next() {
|
||||
var rid int64
|
||||
if err := rows.Scan(&rid); err != nil {
|
||||
log.Printf("db cleanup %s scan failed: %v", tbl, err)
|
||||
break
|
||||
}
|
||||
rowids = append(rowids, rid)
|
||||
}
|
||||
rows.Close()
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("db cleanup %s rows error: %v", tbl, err)
|
||||
break
|
||||
}
|
||||
|
||||
if len(rowids) == 0 {
|
||||
break // Nothing more to delete
|
||||
}
|
||||
|
||||
// Build DELETE IN (...) query
|
||||
placeholders := make([]string, len(rowids))
|
||||
args := make([]any, len(rowids))
|
||||
for i, rid := range rowids {
|
||||
placeholders[i] = "?"
|
||||
args[i] = rid
|
||||
}
|
||||
query := fmt.Sprintf(`DELETE FROM %s WHERE rowid IN (%s)`, tbl, strings.Join(placeholders, ","))
|
||||
if _, err := database.ExecContext(ctx, query, args...); err != nil {
|
||||
log.Printf("db cleanup %s delete failed: %v", tbl, err)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1579,6 +1701,19 @@ func maybeLogZoneChange(source, prev, curr string, value float64) {
|
|||
if prev == "" && curr == "ok" {
|
||||
return
|
||||
}
|
||||
|
||||
// FIX: 5-second cooldown per source to prevent oscillation spam.
|
||||
alarmTracker.Lock()
|
||||
if alarmTracker.LastChange == nil {
|
||||
alarmTracker.LastChange = make(map[string]time.Time)
|
||||
}
|
||||
if last, ok := alarmTracker.LastChange[source]; ok && now.Sub(last) < 5*time.Second {
|
||||
alarmTracker.Unlock()
|
||||
return
|
||||
}
|
||||
alarmTracker.LastChange[source] = now
|
||||
alarmTracker.Unlock()
|
||||
|
||||
switch curr {
|
||||
case "ok":
|
||||
enqueueAlarm(AlarmEvent{
|
||||
|
|
@ -1625,10 +1760,16 @@ func maybeLogZoneChange(source, prev, curr string, value float64) {
|
|||
}
|
||||
|
||||
func evaluateProcessAlarms(s Sample) {
|
||||
// FIX: snapshot thresholds once to avoid repeated locking.
|
||||
config := getConfigSnapshot()
|
||||
leftZone := zoneFromValue(float64(s.SilaLPct), config.Thresholds.WarningPercent, config.Thresholds.CriticalPercent)
|
||||
rightZone := zoneFromValue(float64(s.SilaRPct), config.Thresholds.WarningPercent, config.Thresholds.CriticalPercent)
|
||||
imbZone := zoneFromValue(float64(s.ImbalancePercent), config.Thresholds.ImbalanceWarningPercent, config.Thresholds.ImbalanceCriticalPercent)
|
||||
warn := config.Thresholds.WarningPercent
|
||||
crit := config.Thresholds.CriticalPercent
|
||||
imbWarn := config.Thresholds.ImbalanceWarningPercent
|
||||
imbCrit := config.Thresholds.ImbalanceCriticalPercent
|
||||
|
||||
leftZone := zoneFromValue(float64(s.SilaLPct), warn, crit)
|
||||
rightZone := zoneFromValue(float64(s.SilaRPct), warn, crit)
|
||||
imbZone := zoneFromValue(float64(s.ImbalancePercent), imbWarn, imbCrit)
|
||||
|
||||
alarmTracker.Lock()
|
||||
defer alarmTracker.Unlock()
|
||||
|
|
@ -1689,6 +1830,10 @@ func startPLCPoller(ctx context.Context) {
|
|||
pollInterval := time.Duration(bootCfg.PLC.PollMs) * time.Millisecond
|
||||
reconnectDelay := time.Duration(bootCfg.PLC.ReconnectDelaySec) * time.Second
|
||||
|
||||
// FIX: cache frequently accessed config values to avoid lock contention.
|
||||
maxTonnage := bootCfg.Press.MaxTonnage
|
||||
dbNum := bootCfg.PLC.DBNum
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
@ -1717,6 +1862,7 @@ func startPLCPoller(ctx context.Context) {
|
|||
log.Println("PLC connected successfully")
|
||||
|
||||
buf := make([]byte, 8)
|
||||
readErrCount := 0
|
||||
|
||||
for {
|
||||
select {
|
||||
|
|
@ -1726,18 +1872,30 @@ func startPLCPoller(ctx context.Context) {
|
|||
default:
|
||||
}
|
||||
|
||||
if err := client.AGReadDB(bootCfg.PLC.DBNum, 0, 8, buf); err != nil {
|
||||
if err := client.AGReadDB(dbNum, 0, 8, buf); err != nil {
|
||||
readErrCount++
|
||||
if readErrCount < 3 {
|
||||
log.Printf("PLC read error (attempt %d/3): %v", readErrCount, err)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
_ = handler.Close()
|
||||
return
|
||||
case <-time.After(pollInterval):
|
||||
}
|
||||
continue
|
||||
}
|
||||
log.Printf("PLC read error: %v - reconnecting...", err)
|
||||
markDisconnected(err.Error())
|
||||
_ = handler.Close()
|
||||
break
|
||||
}
|
||||
readErrCount = 0
|
||||
|
||||
var helper gos7.Helper
|
||||
silaL := helper.GetRealAt(buf, 0)
|
||||
silaR := helper.GetRealAt(buf, 4)
|
||||
|
||||
leftKN, rightKN, sumPercent, sumKN := calculateForces(silaL, silaR, getConfigSnapshot().Press.MaxTonnage)
|
||||
leftKN, rightKN, sumPercent, sumKN := calculateForces(silaL, silaR, maxTonnage)
|
||||
imbalance := float32(math.Abs(float64(silaL - silaR)))
|
||||
bias := silaL - silaR
|
||||
now := time.Now()
|
||||
|
|
@ -1806,10 +1964,13 @@ func formatHistoryLabel(t time.Time, window time.Duration) string {
|
|||
return local.Format("15:04:05.000")
|
||||
}
|
||||
|
||||
func queryHistory(window time.Duration) ([]HistoryPoint, error) {
|
||||
// FIX: use ts_unix_ns directly instead of parsing RFC3339 strings.
|
||||
func queryHistory(ctx context.Context, window time.Duration) ([]HistoryPoint, error) {
|
||||
cutoffNs := time.Now().Add(-window).UTC().UnixNano()
|
||||
|
||||
rows, err := db.Query(`SELECT ts, sila_l_pct, sila_r_pct FROM samples WHERE ts_unix_ns >= ? ORDER BY ts_unix_ns ASC`, cutoffNs)
|
||||
rows, err := db.QueryContext(ctx,
|
||||
`SELECT ts_unix_ns, sila_l_pct, sila_r_pct FROM samples WHERE ts_unix_ns >= ? ORDER BY ts_unix_ns ASC`,
|
||||
cutoffNs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -1817,15 +1978,12 @@ func queryHistory(window time.Duration) ([]HistoryPoint, error) {
|
|||
|
||||
points := make([]HistoryPoint, 0, 1024)
|
||||
for rows.Next() {
|
||||
var ts string
|
||||
var tsUnix int64
|
||||
var l, r float64
|
||||
if err := rows.Scan(&ts, &l, &r); err != nil {
|
||||
if err := rows.Scan(&tsUnix, &l, &r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t, err := time.Parse(time.RFC3339Nano, ts)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
t := time.Unix(0, tsUnix).Local()
|
||||
points = append(points, HistoryPoint{
|
||||
Time: formatHistoryLabel(t, window),
|
||||
SilaL: float32(l),
|
||||
|
|
@ -1843,32 +2001,22 @@ func queryHistory(window time.Duration) ([]HistoryPoint, error) {
|
|||
return downsamplePoints(points, maxPts), nil
|
||||
}
|
||||
|
||||
// FIX: remove unnecessary map allocation; indices are monotonic.
|
||||
func downsamplePoints(points []HistoryPoint, max int) []HistoryPoint {
|
||||
if len(points) <= max || max < 3 {
|
||||
return points
|
||||
}
|
||||
|
||||
out := make([]HistoryPoint, 0, max)
|
||||
out := make([]HistoryPoint, max)
|
||||
step := float64(len(points)-1) / float64(max-1)
|
||||
used := make(map[int]struct{}, max)
|
||||
|
||||
for i := 0; i < max; i++ {
|
||||
idx := int(float64(i) * step)
|
||||
if idx >= len(points) {
|
||||
idx = len(points) - 1
|
||||
}
|
||||
if _, ok := used[idx]; ok {
|
||||
continue
|
||||
out[i] = points[idx]
|
||||
}
|
||||
used[idx] = struct{}{}
|
||||
out = append(out, points[idx])
|
||||
}
|
||||
|
||||
if len(out) == 0 {
|
||||
return points
|
||||
}
|
||||
// Always ensure last point is the most recent.
|
||||
out[len(out)-1] = points[len(points)-1]
|
||||
out[max-1] = points[len(points)-1]
|
||||
return out
|
||||
}
|
||||
|
||||
|
|
@ -1883,7 +2031,8 @@ func validField(field string) (string, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func queryNumericStats(field string, fromNs, toNs int64) (NumericStats, error) {
|
||||
// FIX: accept context so queries cancel when HTTP client disconnects.
|
||||
func queryNumericStats(ctx context.Context, field string, fromNs, toNs int64) (NumericStats, error) {
|
||||
safeField, err := validField(field)
|
||||
if err != nil {
|
||||
return NumericStats{}, err
|
||||
|
|
@ -1900,7 +2049,7 @@ func queryNumericStats(field string, fromNs, toNs int64) (NumericStats, error) {
|
|||
`, safeField)
|
||||
|
||||
var stats NumericStats
|
||||
err = db.QueryRow(query, fromNs, toNs).Scan(&stats.Avg, &stats.AvgSq, &stats.Min, &stats.Max, &stats.Count)
|
||||
err = db.QueryRowContext(ctx, query, fromNs, toNs).Scan(&stats.Avg, &stats.AvgSq, &stats.Min, &stats.Max, &stats.Count)
|
||||
if err != nil {
|
||||
return NumericStats{}, err
|
||||
}
|
||||
|
|
@ -1957,41 +2106,60 @@ func classifyProcessStability(forceStd, imbStd, forceDelta, avgImb5m float64, sa
|
|||
return "stable", "Process variation is low"
|
||||
}
|
||||
|
||||
func buildTrendResponse(window time.Duration, label string) (TrendResponse, error) {
|
||||
// FIX: 1-second response cache so overlapping UI polls don't hammer the DB.
|
||||
var (
|
||||
trendCache atomic.Value // stores *trendCacheEntry
|
||||
trendCacheTime int64 // atomic
|
||||
)
|
||||
|
||||
type trendCacheEntry struct {
|
||||
Window time.Duration
|
||||
Resp TrendResponse
|
||||
}
|
||||
|
||||
// FIX: accept context and use cache.
|
||||
func buildTrendResponse(ctx context.Context, window time.Duration, label string) (TrendResponse, error) {
|
||||
now := time.Now().UnixMilli()
|
||||
if cached, ok := trendCache.Load().(*trendCacheEntry); ok {
|
||||
if cached.Window == window && now-atomic.LoadInt64(&trendCacheTime) < 1000 {
|
||||
return cached.Resp, nil
|
||||
}
|
||||
}
|
||||
|
||||
nowNs := time.Now().UTC().UnixNano()
|
||||
windowNs := window.Nanoseconds()
|
||||
startNs := nowNs - windowNs
|
||||
midNs := startNs + (windowNs / 2)
|
||||
|
||||
force5m, err := queryNumericStats("sum_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs)
|
||||
force5m, err := queryNumericStats(ctx, "sum_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs)
|
||||
if err != nil {
|
||||
return TrendResponse{}, err
|
||||
}
|
||||
force1h, err := queryNumericStats("sum_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs)
|
||||
force1h, err := queryNumericStats(ctx, "sum_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs)
|
||||
if err != nil {
|
||||
return TrendResponse{}, err
|
||||
}
|
||||
imb5m, err := queryNumericStats("imbalance_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs)
|
||||
imb5m, err := queryNumericStats(ctx, "imbalance_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs)
|
||||
if err != nil {
|
||||
return TrendResponse{}, err
|
||||
}
|
||||
imb1h, err := queryNumericStats("imbalance_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs)
|
||||
imb1h, err := queryNumericStats(ctx, "imbalance_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs)
|
||||
if err != nil {
|
||||
return TrendResponse{}, err
|
||||
}
|
||||
forceOld, err := queryNumericStats("sum_pct", startNs, midNs)
|
||||
forceOld, err := queryNumericStats(ctx, "sum_pct", startNs, midNs)
|
||||
if err != nil {
|
||||
return TrendResponse{}, err
|
||||
}
|
||||
forceNew, err := queryNumericStats("sum_pct", midNs, nowNs)
|
||||
forceNew, err := queryNumericStats(ctx, "sum_pct", midNs, nowNs)
|
||||
if err != nil {
|
||||
return TrendResponse{}, err
|
||||
}
|
||||
imbOld, err := queryNumericStats("imbalance_pct", startNs, midNs)
|
||||
imbOld, err := queryNumericStats(ctx, "imbalance_pct", startNs, midNs)
|
||||
if err != nil {
|
||||
return TrendResponse{}, err
|
||||
}
|
||||
imbNew, err := queryNumericStats("imbalance_pct", midNs, nowNs)
|
||||
imbNew, err := queryNumericStats(ctx, "imbalance_pct", midNs, nowNs)
|
||||
if err != nil {
|
||||
return TrendResponse{}, err
|
||||
}
|
||||
|
|
@ -2002,11 +2170,11 @@ func buildTrendResponse(window time.Duration, label string) (TrendResponse, erro
|
|||
forceDirection := classifyDirection(forceDelta, forceOld.Count, forceNew.Count, 1.0, "rising", "falling")
|
||||
imbDirection := classifyDirection(imbDelta, imbOld.Count, imbNew.Count, 0.5, "worsening", "improving")
|
||||
|
||||
fullWindowForce, err := queryNumericStats("sum_pct", startNs, nowNs)
|
||||
fullWindowForce, err := queryNumericStats(ctx, "sum_pct", startNs, nowNs)
|
||||
if err != nil {
|
||||
return TrendResponse{}, err
|
||||
}
|
||||
fullWindowImb, err := queryNumericStats("imbalance_pct", startNs, nowNs)
|
||||
fullWindowImb, err := queryNumericStats(ctx, "imbalance_pct", startNs, nowNs)
|
||||
if err != nil {
|
||||
return TrendResponse{}, err
|
||||
}
|
||||
|
|
@ -2016,7 +2184,7 @@ func buildTrendResponse(window time.Duration, label string) (TrendResponse, erro
|
|||
forceDelta, imb5m.Avg, fullWindowForce.Count,
|
||||
)
|
||||
|
||||
return TrendResponse{
|
||||
resp := TrendResponse{
|
||||
Window: label,
|
||||
AvgPeak5m: float32(force5m.Avg),
|
||||
AvgPeak1h: float32(force1h.Avg),
|
||||
|
|
@ -2033,18 +2201,23 @@ func buildTrendResponse(window time.Duration, label string) (TrendResponse, erro
|
|||
SampleCount: fullWindowForce.Count,
|
||||
OldHalfCount: forceOld.Count,
|
||||
NewHalfCount: forceNew.Count,
|
||||
}, nil
|
||||
}
|
||||
|
||||
trendCache.Store(&trendCacheEntry{Window: window, Resp: resp})
|
||||
atomic.StoreInt64(&trendCacheTime, now)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func queryAlarmEvents(limit int) ([]AlarmEventAPI, error) {
|
||||
// FIX: use ts_unix_ns directly and accept context.
|
||||
func queryAlarmEvents(ctx context.Context, limit int) ([]AlarmEventAPI, error) {
|
||||
if limit <= 0 {
|
||||
limit = 20
|
||||
}
|
||||
if limit > 100 {
|
||||
limit = 100
|
||||
}
|
||||
rows, err := db.Query(`
|
||||
SELECT ts, severity, source, state, message, value, limit_value
|
||||
rows, err := db.QueryContext(ctx, `
|
||||
SELECT ts_unix_ns, severity, source, state, message, value, limit_value
|
||||
FROM alarm_events
|
||||
ORDER BY ts_unix_ns DESC
|
||||
LIMIT ?
|
||||
|
|
@ -2056,15 +2229,13 @@ func queryAlarmEvents(limit int) ([]AlarmEventAPI, error) {
|
|||
|
||||
events := make([]AlarmEventAPI, 0, limit)
|
||||
for rows.Next() {
|
||||
var ts, severity, source, state, message string
|
||||
var tsUnix int64
|
||||
var severity, source, state, message string
|
||||
var value, limitValue float64
|
||||
if err := rows.Scan(&ts, &severity, &source, &state, &message, &value, &limitValue); err != nil {
|
||||
if err := rows.Scan(&tsUnix, &severity, &source, &state, &message, &value, &limitValue); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
displayTime := ts
|
||||
if t, err := time.Parse(time.RFC3339Nano, ts); err == nil {
|
||||
displayTime = t.Local().Format("02.01.2006 15:04:05")
|
||||
}
|
||||
displayTime := time.Unix(0, tsUnix).Local().Format("02.01.2006 15:04:05")
|
||||
events = append(events, AlarmEventAPI{
|
||||
Time: displayTime, Severity: severity, Source: source,
|
||||
State: state, Message: message, Value: value, Limit: limitValue,
|
||||
|
|
@ -2132,7 +2303,7 @@ func apiHistory(w http.ResponseWriter, r *http.Request) {
|
|||
http.Error(w, `{"error":"invalid window"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
points, err := queryHistory(window)
|
||||
points, err := queryHistory(r.Context(), window)
|
||||
if err != nil {
|
||||
log.Printf("history query failed: %v", err)
|
||||
http.Error(w, `{"error":"history query failed"}`, http.StatusInternalServerError)
|
||||
|
|
@ -2150,7 +2321,7 @@ func apiTrend(w http.ResponseWriter, r *http.Request) {
|
|||
http.Error(w, `{"error":"invalid trend window"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
resp, err := buildTrendResponse(window, label)
|
||||
resp, err := buildTrendResponse(r.Context(), window, label)
|
||||
if err != nil {
|
||||
log.Printf("trend query failed: %v", err)
|
||||
http.Error(w, `{"error":"trend query failed"}`, http.StatusInternalServerError)
|
||||
|
|
@ -2169,7 +2340,7 @@ func apiAlarms(w http.ResponseWriter, r *http.Request) {
|
|||
limit = n
|
||||
}
|
||||
}
|
||||
events, err := queryAlarmEvents(limit)
|
||||
events, err := queryAlarmEvents(r.Context(), limit)
|
||||
if err != nil {
|
||||
log.Printf("alarm query failed: %v", err)
|
||||
http.Error(w, `{"error":"alarm query failed"}`, http.StatusInternalServerError)
|
||||
|
|
@ -2184,6 +2355,7 @@ func serveUI(w http.ResponseWriter, r *http.Request) {
|
|||
cfgMu.RUnlock()
|
||||
|
||||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||||
w.Header().Set("Cache-Control", "no-cache, max-age=0")
|
||||
_, _ = w.Write(payload)
|
||||
}
|
||||
|
||||
|
|
@ -2371,6 +2543,8 @@ func main() {
|
|||
// Initialise MQTT if enabled.
|
||||
if cfg.MQTT.Enabled {
|
||||
mqttMgr = newMQTTManager(cfg.MQTT)
|
||||
mqttAlarmCh = make(chan AlarmEvent, 256)
|
||||
go mqttAlarmWorker()
|
||||
if err := mqttMgr.connect(); err != nil {
|
||||
// Non-fatal: paho will reconnect automatically.
|
||||
log.Printf("MQTT initial connect failed (will retry): %v", err)
|
||||
|
|
|
|||
Loading…
Reference in a new issue