Tonnage-app-IMCO/main.go
2026-04-23 10:09:13 +00:00

3799 lines
111 KiB
Go

package main
import (
"bytes"
"context"
"database/sql"
"embed"
"encoding/json"
"errors"
"fmt"
"html/template"
"io"
"io/fs"
"log"
"math"
"net/http"
"os"
"os/signal"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/fsnotify/fsnotify"
_ "github.com/mattn/go-sqlite3"
"github.com/robinson/gos7"
"gopkg.in/yaml.v3"
)
//go:embed static
var embeddedStaticFiles embed.FS
const version = "1.0.8"
// ---------------------------------------------------------------------------
// Config structs
// ---------------------------------------------------------------------------
type Config struct {
Server ServerConfig `yaml:"server"`
PLC PLCConfig `yaml:"plc"`
Thresholds ThresholdsConfig `yaml:"thresholds"`
Trend TrendConfig `yaml:"trend"`
Press PressConfig `yaml:"press"`
UI UIConfig `yaml:"ui"`
Modules ModulesConfig `yaml:"modules"`
DB DBConfig `yaml:"db"`
MQTT MQTTConfig `yaml:"mqtt"`
LegacyLicense *LicenseConfig `yaml:"license,omitempty"`
}
type ServerConfig struct {
ListenAddr string `yaml:"listen_addr"`
}
type PLCConfig struct {
IP string `yaml:"ip"`
DBNum int `yaml:"db_num"`
Rack int `yaml:"rack"`
Slot int `yaml:"slot"`
PollMs int `yaml:"poll_ms"`
ConnectTimeoutSec int `yaml:"connect_timeout_sec"`
IdleTimeoutSec int `yaml:"idle_timeout_sec"`
ReconnectDelaySec int `yaml:"reconnect_delay_sec"`
}
type ThresholdsConfig struct {
WarningPercent float64 `yaml:"warning_percent"`
CriticalPercent float64 `yaml:"critical_percent"`
GaugeMaxPercent float64 `yaml:"gauge_max_percent"`
ImbalanceWarningPercent float64 `yaml:"imbalance_warning_percent"`
ImbalanceCriticalPercent float64 `yaml:"imbalance_critical_percent"`
LegacyWarningKn float64 `yaml:"warning_kn,omitempty"`
LegacyCriticalKn float64 `yaml:"critical_kn,omitempty"`
LegacyMaxKn float64 `yaml:"max_kn,omitempty"`
}
type TrendConfig struct {
Minutes int `yaml:"minutes"`
}
type PressConfig struct {
MaxTonnage float64 `yaml:"MAX_TONNAGE"`
LegacyMaxTonnage float64 `yaml:"max_tonnage,omitempty"`
}
type UIConfig struct {
Title string `yaml:"title"`
Subtitle string `yaml:"subtitle"`
LeftLabel string `yaml:"left_label"`
RightLabel string `yaml:"right_label"`
UnitForce string `yaml:"unit_force"`
UnitPct string `yaml:"unit_percent"`
}
type ModulesConfig struct {
ShowHeaderControls *bool `yaml:"show_header_controls,omitempty"`
ShowVerdict *bool `yaml:"show_verdict,omitempty"`
ShowSummaryBar *bool `yaml:"show_summary_bar,omitempty"`
ShowOverview *bool `yaml:"show_overview,omitempty"`
ShowIntelligence *bool `yaml:"show_intelligence,omitempty"`
ShowAlarmTimeline *bool `yaml:"show_alarm_timeline,omitempty"`
ShowGauges *bool `yaml:"show_gauges,omitempty"`
ShowGaugeDigital *bool `yaml:"show_gauge_digital,omitempty"`
ShowTrendChart *bool `yaml:"show_trend_chart,omitempty"`
}
type DBConfig struct {
Path string `yaml:"path"`
BusyTimeoutMs int `yaml:"busy_timeout_ms"`
BatchSize int `yaml:"batch_size"`
FlushIntervalMs int `yaml:"flush_interval_ms"`
RetentionDays int `yaml:"retention_days"`
MaxChartPoints int `yaml:"max_chart_points"`
WriterQueueSize int `yaml:"writer_queue_size"`
AlarmQueueSize int `yaml:"alarm_queue_size"`
CheckpointPages int `yaml:"checkpoint_pages"`
CleanupIntervalHr int `yaml:"cleanup_interval_hours"`
}
type MQTTConfig struct {
Enabled bool `yaml:"enabled"`
Broker string `yaml:"broker"`
ClientID string `yaml:"client_id"`
Username string `yaml:"username"`
Password string `yaml:"password"`
TopicPrefix string `yaml:"topic_prefix"`
QoS int `yaml:"qos"`
Retain bool `yaml:"retain"`
AutoPublish bool `yaml:"auto_publish"`
PublishIntervalMs int `yaml:"publish_interval_ms"`
ConnectTimeoutSec int `yaml:"connect_timeout_sec"`
ReconnectDelaySec int `yaml:"reconnect_delay_sec"`
}
// ---------------------------------------------------------------------------
// Config helpers
// ---------------------------------------------------------------------------
func boolPtr(v bool) *bool { return &v }
func boolValue(v *bool, def bool) bool {
if v == nil {
return def
}
return *v
}
func defaultConfig() Config {
return Config{
Server: ServerConfig{ListenAddr: ":8080"},
PLC: PLCConfig{
IP: "192.168.0.1",
DBNum: 1001,
Rack: 0,
Slot: 1,
PollMs: 500,
ConnectTimeoutSec: 5,
IdleTimeoutSec: 5,
ReconnectDelaySec: 5,
},
Thresholds: ThresholdsConfig{
WarningPercent: 80,
CriticalPercent: 95,
GaugeMaxPercent: 130,
ImbalanceWarningPercent: 10,
ImbalanceCriticalPercent: 20,
},
Trend: TrendConfig{Minutes: 5},
Press: PressConfig{MaxTonnage: 64},
UI: UIConfig{
Title: "Force Monitor",
Subtitle: "Siemens S7-1215C • Piezo peak/stroke input • PLC values in % • kN calculated from MAX_TONNAGE",
LeftLabel: "LEVI STEBER",
RightLabel: "DESNI STEBER",
UnitForce: "kN",
UnitPct: "%",
},
Modules: ModulesConfig{
ShowHeaderControls: boolPtr(true),
ShowVerdict: boolPtr(true),
ShowSummaryBar: boolPtr(true),
ShowOverview: boolPtr(true),
ShowIntelligence: boolPtr(true),
ShowAlarmTimeline: boolPtr(true),
ShowGauges: boolPtr(true),
ShowGaugeDigital: boolPtr(false),
ShowTrendChart: boolPtr(true),
},
DB: DBConfig{
Path: "force_monitor.db",
BusyTimeoutMs: 5000,
BatchSize: 32,
FlushIntervalMs: 1000,
RetentionDays: 30,
MaxChartPoints: 2000,
WriterQueueSize: 4096,
AlarmQueueSize: 512,
CheckpointPages: 1000,
CleanupIntervalHr: 6,
},
MQTT: MQTTConfig{
Enabled: false,
Broker: "tcp://localhost:1883",
ClientID: "force_monitor",
TopicPrefix: "force_monitor",
QoS: 1,
Retain: false,
AutoPublish: true,
PublishIntervalMs: 1000,
ConnectTimeoutSec: 10,
ReconnectDelaySec: 5,
},
}
}
func setIfZeroF(dst *float64, def float64) {
if *dst <= 0 {
*dst = def
}
}
func setIfZeroI(dst *int, def int) {
if *dst <= 0 {
*dst = def
}
}
func setIfEmpty(dst *string, def string) {
if strings.TrimSpace(*dst) == "" {
*dst = def
}
}
func setIfNilBool(dst **bool, def bool) {
if *dst == nil {
v := def
*dst = &v
}
}
func normalizeConfig(cfg *Config) {
def := defaultConfig()
setIfEmpty(&cfg.Server.ListenAddr, def.Server.ListenAddr)
setIfEmpty(&cfg.PLC.IP, def.PLC.IP)
setIfZeroI(&cfg.PLC.DBNum, def.PLC.DBNum)
setIfZeroI(&cfg.PLC.Rack, def.PLC.Rack)
setIfZeroI(&cfg.PLC.Slot, def.PLC.Slot)
setIfZeroI(&cfg.PLC.PollMs, def.PLC.PollMs)
setIfZeroI(&cfg.PLC.ConnectTimeoutSec, def.PLC.ConnectTimeoutSec)
setIfZeroI(&cfg.PLC.IdleTimeoutSec, def.PLC.IdleTimeoutSec)
setIfZeroI(&cfg.PLC.ReconnectDelaySec, def.PLC.ReconnectDelaySec)
if cfg.Thresholds.WarningPercent <= 0 && cfg.Thresholds.LegacyWarningKn > 0 {
cfg.Thresholds.WarningPercent = cfg.Thresholds.LegacyWarningKn
}
if cfg.Thresholds.CriticalPercent <= 0 && cfg.Thresholds.LegacyCriticalKn > 0 {
cfg.Thresholds.CriticalPercent = cfg.Thresholds.LegacyCriticalKn
}
if cfg.Thresholds.GaugeMaxPercent <= 0 && cfg.Thresholds.LegacyMaxKn > 0 {
cfg.Thresholds.GaugeMaxPercent = cfg.Thresholds.LegacyMaxKn
}
setIfZeroF(&cfg.Thresholds.WarningPercent, def.Thresholds.WarningPercent)
setIfZeroF(&cfg.Thresholds.CriticalPercent, def.Thresholds.CriticalPercent)
setIfZeroF(&cfg.Thresholds.GaugeMaxPercent, def.Thresholds.GaugeMaxPercent)
setIfZeroF(&cfg.Thresholds.ImbalanceWarningPercent, def.Thresholds.ImbalanceWarningPercent)
setIfZeroF(&cfg.Thresholds.ImbalanceCriticalPercent, def.Thresholds.ImbalanceCriticalPercent)
if cfg.Thresholds.ImbalanceCriticalPercent < cfg.Thresholds.ImbalanceWarningPercent {
cfg.Thresholds.ImbalanceCriticalPercent = cfg.Thresholds.ImbalanceWarningPercent
}
setIfZeroI(&cfg.Trend.Minutes, def.Trend.Minutes)
if cfg.Press.MaxTonnage <= 0 && cfg.Press.LegacyMaxTonnage > 0 {
cfg.Press.MaxTonnage = cfg.Press.LegacyMaxTonnage
}
setIfZeroF(&cfg.Press.MaxTonnage, def.Press.MaxTonnage)
setIfEmpty(&cfg.UI.Title, def.UI.Title)
setIfEmpty(&cfg.UI.Subtitle, def.UI.Subtitle)
setIfEmpty(&cfg.UI.LeftLabel, def.UI.LeftLabel)
setIfEmpty(&cfg.UI.RightLabel, def.UI.RightLabel)
setIfEmpty(&cfg.UI.UnitForce, def.UI.UnitForce)
setIfEmpty(&cfg.UI.UnitPct, def.UI.UnitPct)
setIfNilBool(&cfg.Modules.ShowHeaderControls, boolValue(def.Modules.ShowHeaderControls, true))
setIfNilBool(&cfg.Modules.ShowVerdict, boolValue(def.Modules.ShowVerdict, true))
setIfNilBool(&cfg.Modules.ShowSummaryBar, boolValue(def.Modules.ShowSummaryBar, true))
setIfNilBool(&cfg.Modules.ShowOverview, boolValue(def.Modules.ShowOverview, true))
setIfNilBool(&cfg.Modules.ShowIntelligence, boolValue(def.Modules.ShowIntelligence, true))
setIfNilBool(&cfg.Modules.ShowAlarmTimeline, boolValue(def.Modules.ShowAlarmTimeline, true))
setIfNilBool(&cfg.Modules.ShowGauges, boolValue(def.Modules.ShowGauges, true))
setIfNilBool(&cfg.Modules.ShowGaugeDigital, boolValue(def.Modules.ShowGaugeDigital, false))
setIfNilBool(&cfg.Modules.ShowTrendChart, boolValue(def.Modules.ShowTrendChart, true))
setIfEmpty(&cfg.DB.Path, def.DB.Path)
setIfZeroI(&cfg.DB.BusyTimeoutMs, def.DB.BusyTimeoutMs)
setIfZeroI(&cfg.DB.BatchSize, def.DB.BatchSize)
setIfZeroI(&cfg.DB.FlushIntervalMs, def.DB.FlushIntervalMs)
setIfZeroI(&cfg.DB.RetentionDays, def.DB.RetentionDays)
setIfZeroI(&cfg.DB.MaxChartPoints, def.DB.MaxChartPoints)
setIfZeroI(&cfg.DB.WriterQueueSize, def.DB.WriterQueueSize)
setIfZeroI(&cfg.DB.AlarmQueueSize, def.DB.AlarmQueueSize)
setIfZeroI(&cfg.DB.CheckpointPages, def.DB.CheckpointPages)
setIfZeroI(&cfg.DB.CleanupIntervalHr, def.DB.CleanupIntervalHr)
if cfg.MQTT.Enabled {
setIfEmpty(&cfg.MQTT.Broker, def.MQTT.Broker)
setIfEmpty(&cfg.MQTT.ClientID, def.MQTT.ClientID)
setIfEmpty(&cfg.MQTT.TopicPrefix, def.MQTT.TopicPrefix)
if cfg.MQTT.QoS < 0 || cfg.MQTT.QoS > 2 {
cfg.MQTT.QoS = def.MQTT.QoS
}
setIfZeroI(&cfg.MQTT.PublishIntervalMs, def.MQTT.PublishIntervalMs)
setIfZeroI(&cfg.MQTT.ConnectTimeoutSec, def.MQTT.ConnectTimeoutSec)
setIfZeroI(&cfg.MQTT.ReconnectDelaySec, def.MQTT.ReconnectDelaySec)
}
}
func loadConfigStrict(configPath string) (Config, error) {
cfg := defaultConfig()
data, err := os.ReadFile(configPath)
if err != nil {
return cfg, fmt.Errorf("failed to read config file: %w", err)
}
dec := yaml.NewDecoder(bytes.NewReader(data))
dec.KnownFields(true)
if err := dec.Decode(&cfg); err != nil {
return cfg, fmt.Errorf("failed to parse config file: %w", err)
}
normalizeConfig(&cfg)
return cfg, nil
}
func loadOrCreateConfig(configPath string) (Config, error) {
cfg := defaultConfig()
_, err := os.Stat(configPath)
if errors.Is(err, os.ErrNotExist) {
data, marshalErr := yaml.Marshal(&cfg)
if marshalErr != nil {
return cfg, fmt.Errorf("failed to marshal default config: %w", marshalErr)
}
if writeErr := os.WriteFile(configPath, data, 0644); writeErr != nil {
return cfg, fmt.Errorf("failed to create config file: %w", writeErr)
}
log.Printf("config file not found, created default config: %s", configPath)
return cfg, nil
}
if err != nil {
return cfg, fmt.Errorf("failed to stat config file: %w", err)
}
return loadConfigStrict(configPath)
}
func validateConfig(cfg Config) error {
if cfg.Thresholds.WarningPercent <= 0 {
return fmt.Errorf("thresholds.warning_percent must be > 0")
}
if cfg.Thresholds.CriticalPercent < cfg.Thresholds.WarningPercent {
return fmt.Errorf("thresholds.critical_percent must be >= thresholds.warning_percent")
}
if cfg.Thresholds.GaugeMaxPercent < cfg.Thresholds.CriticalPercent {
return fmt.Errorf("thresholds.gauge_max_percent must be >= thresholds.critical_percent")
}
if cfg.Thresholds.ImbalanceWarningPercent <= 0 {
return fmt.Errorf("thresholds.imbalance_warning_percent must be > 0")
}
if cfg.Thresholds.ImbalanceCriticalPercent < cfg.Thresholds.ImbalanceWarningPercent {
return fmt.Errorf("thresholds.imbalance_critical_percent must be >= thresholds.imbalance_warning_percent")
}
if cfg.Trend.Minutes <= 0 {
return fmt.Errorf("trend.minutes must be > 0")
}
if cfg.Press.MaxTonnage <= 0 {
return fmt.Errorf("press.MAX_TONNAGE must be > 0")
}
if cfg.MQTT.Enabled && strings.TrimSpace(cfg.MQTT.Broker) == "" {
return fmt.Errorf("mqtt.broker must be set when mqtt.enabled is true")
}
if cfg.MQTT.Enabled && strings.TrimSpace(cfg.MQTT.ClientID) == "" {
return fmt.Errorf("mqtt.client_id must be set when mqtt.enabled is true")
}
return nil
}
// ---------------------------------------------------------------------------
// Domain types
// ---------------------------------------------------------------------------
type Sample struct {
TS time.Time
SilaLPct float32
SilaRPct float32
SilaLKN float32
SilaRKN float32
SumPercent float32
SumKN float32
ImbalancePercent float32
BiasPercent float32
}
type AlarmEvent struct {
TS time.Time
Severity string
Source string
Code string
State string
Message string
Value float64
Limit float64
}
type AppState struct {
sync.RWMutex
Connected bool
SilaL float32
SilaR float32
SilaLkN float32
SilaRkN float32
SumPercent float32
SumkN float32
ImbalancePercent float32
BiasPercent float32
LastUpdate time.Time
DroppedSamples uint64
DroppedEvents uint64
}
type APIState struct {
Connected bool `json:"connected"`
SilaL float32 `json:"sila_l"`
SilaR float32 `json:"sila_r"`
SilaLkN float32 `json:"sila_l_kn"`
SilaRkN float32 `json:"sila_r_kn"`
SumPercent float32 `json:"sum_percent"`
SumkN float32 `json:"sum_kn"`
ImbalancePercent float32 `json:"imbalance_percent"`
BiasPercent float32 `json:"bias_percent"`
LastUpdate string `json:"last_update"`
ServerTime string `json:"server_time"`
Stale bool `json:"stale"`
DroppedSamples uint64 `json:"dropped_samples"`
DroppedEvents uint64 `json:"dropped_events"`
}
type HistoryPoint struct {
Time string `json:"time"`
SilaL float32 `json:"sila_l"`
SilaR float32 `json:"sila_r"`
}
type HistoryResponse struct {
Window string `json:"window"`
Points []HistoryPoint `json:"points"`
}
type HistoryPeakPoint struct {
Time string `json:"time"`
LeftPercent float64 `json:"left_percent"`
RightPercent float64 `json:"right_percent"`
TotalPercent float64 `json:"total_percent"`
TotalKN float64 `json:"total_kn"`
ImbalancePercent float64 `json:"imbalance_percent"`
}
type HistoryAnalyticsResponse struct {
Window string `json:"window"`
From string `json:"from"`
To string `json:"to"`
SampleCount int `json:"sample_count"`
LeftAvgPct float64 `json:"left_avg_pct"`
RightAvgPct float64 `json:"right_avg_pct"`
TotalAvgPct float64 `json:"total_avg_pct"`
TotalAvgKN float64 `json:"total_avg_kn"`
ImbalanceAvgPct float64 `json:"imbalance_avg_pct"`
LeftMaxPct float64 `json:"left_max_pct"`
RightMaxPct float64 `json:"right_max_pct"`
TotalMaxPct float64 `json:"total_max_pct"`
TotalMaxKN float64 `json:"total_max_kn"`
ImbalanceMaxPct float64 `json:"imbalance_max_pct"`
LeftMinPct float64 `json:"left_min_pct"`
RightMinPct float64 `json:"right_min_pct"`
TotalMinPct float64 `json:"total_min_pct"`
ImbalanceMinPct float64 `json:"imbalance_min_pct"`
LeftStdPct float64 `json:"left_std_pct"`
RightStdPct float64 `json:"right_std_pct"`
TotalStdPct float64 `json:"total_std_pct"`
ImbalanceStdPct float64 `json:"imbalance_std_pct"`
TotalP95Pct float64 `json:"total_p95_pct"`
TotalP99Pct float64 `json:"total_p99_pct"`
ImbalanceP95Pct float64 `json:"imbalance_p95_pct"`
WarningSamples int `json:"warning_samples"`
CriticalSamples int `json:"critical_samples"`
ImbalanceWarningSamples int `json:"imbalance_warning_samples"`
ImbalanceCriticalSamples int `json:"imbalance_critical_samples"`
WarningRatePct float64 `json:"warning_rate_pct"`
CriticalRatePct float64 `json:"critical_rate_pct"`
ImbalanceWarningRatePct float64 `json:"imbalance_warning_rate_pct"`
ImbalanceCriticalRatePct float64 `json:"imbalance_critical_rate_pct"`
AlarmTransitions int `json:"alarm_transitions"`
WarningEvents int `json:"warning_events"`
CriticalEvents int `json:"critical_events"`
PLCDisconnects int `json:"plc_disconnects"`
PreviousWindowDeltaPct float64 `json:"previous_window_delta_pct"`
PreviousImbalanceDeltaPct float64 `json:"previous_imbalance_delta_pct"`
TopPeaks []HistoryPeakPoint `json:"top_peaks"`
WorstImbalances []HistoryPeakPoint `json:"worst_imbalances"`
}
type HistogramBin struct {
Start float64 `json:"start"`
End float64 `json:"end"`
Count int `json:"count"`
Percent float64 `json:"percent"`
}
type ProcessCapabilityResponse struct {
Window string `json:"window"`
From string `json:"from"`
To string `json:"to"`
SampleCount int `json:"sample_count"`
TotalMeanPct float64 `json:"total_mean_pct"`
TotalStdPct float64 `json:"total_std_pct"`
TotalP95Pct float64 `json:"total_p95_pct"`
TotalP99Pct float64 `json:"total_p99_pct"`
TotalCpuWarning float64 `json:"total_cpu_warning"`
TotalCpuCritical float64 `json:"total_cpu_critical"`
TotalCpkWarning float64 `json:"total_cpk_warning"`
TotalCpkCritical float64 `json:"total_cpk_critical"`
ImbalanceMeanPct float64 `json:"imbalance_mean_pct"`
ImbalanceStdPct float64 `json:"imbalance_std_pct"`
ImbalanceP95Pct float64 `json:"imbalance_p95_pct"`
ImbalanceCpuWarning float64 `json:"imbalance_cpu_warning"`
ImbalanceCpuCritical float64 `json:"imbalance_cpu_critical"`
ImbalanceCpkWarning float64 `json:"imbalance_cpk_warning"`
ImbalanceCpkCritical float64 `json:"imbalance_cpk_critical"`
TotalAboveWarningPct float64 `json:"total_above_warning_pct"`
TotalAboveCriticalPct float64 `json:"total_above_critical_pct"`
ImbalanceAboveWarningPct float64 `json:"imbalance_above_warning_pct"`
ImbalanceAboveCriticalPct float64 `json:"imbalance_above_critical_pct"`
LeftRightCorrelation float64 `json:"left_right_correlation"`
SuggestedAction string `json:"suggested_action"`
Stability string `json:"stability"`
StabilityReason string `json:"stability_reason"`
TotalHistogram []HistogramBin `json:"total_histogram"`
ImbalanceHistogram []HistogramBin `json:"imbalance_histogram"`
TopOutliers []HistoryPeakPoint `json:"top_outliers"`
}
type ReportBucket struct {
Label string `json:"label"`
AvgTotalPct float64 `json:"avg_total_pct"`
MaxTotalPct float64 `json:"max_total_pct"`
AvgImbalancePct float64 `json:"avg_imbalance_pct"`
Samples int `json:"samples"`
WarningEvents int `json:"warning_events"`
CriticalEvents int `json:"critical_events"`
PLCDisconnects int `json:"plc_disconnects"`
}
type ReportSummaryResponse struct {
Window string `json:"window"`
From string `json:"from"`
To string `json:"to"`
SampleCount int `json:"sample_count"`
AverageTotalPct float64 `json:"average_total_pct"`
AverageTotalKN float64 `json:"average_total_kn"`
PeakTotalPct float64 `json:"peak_total_pct"`
PeakTotalKN float64 `json:"peak_total_kn"`
AverageImbalancePct float64 `json:"average_imbalance_pct"`
PeakImbalancePct float64 `json:"peak_imbalance_pct"`
WarningRatePct float64 `json:"warning_rate_pct"`
CriticalRatePct float64 `json:"critical_rate_pct"`
ImbalanceWarningRatePct float64 `json:"imbalance_warning_rate_pct"`
ImbalanceCriticalRatePct float64 `json:"imbalance_critical_rate_pct"`
WarningEvents int `json:"warning_events"`
CriticalEvents int `json:"critical_events"`
PLCDisconnects int `json:"plc_disconnects"`
HealthScore int `json:"health_score"`
AvailabilityPct float64 `json:"availability_pct"`
ForceDeltaPct float64 `json:"force_delta_pct"`
ImbalanceDeltaPct float64 `json:"imbalance_delta_pct"`
Stability string `json:"stability"`
StabilityReason string `json:"stability_reason"`
ExecutiveSummary string `json:"executive_summary"`
Findings []string `json:"findings"`
Buckets []ReportBucket `json:"buckets"`
TopPeaks []HistoryPeakPoint `json:"top_peaks"`
}
type TrendResponse struct {
Window string `json:"window"`
AvgPeak5m float32 `json:"avg_peak_5m"`
AvgPeak1h float32 `json:"avg_peak_1h"`
AvgImbalance5m float32 `json:"avg_imbalance_5m"`
AvgImbalance1h float32 `json:"avg_imbalance_1h"`
ForceDeltaPct float32 `json:"force_delta_pct"`
ImbalanceDeltaPct float32 `json:"imbalance_delta_pct"`
ForceDirection string `json:"force_direction"`
ImbalanceDirection string `json:"imbalance_direction"`
ProcessStability string `json:"process_stability"`
StabilityReason string `json:"stability_reason"`
ForceStdDev float32 `json:"force_stddev"`
ImbalanceStdDev float32 `json:"imbalance_stddev"`
SampleCount int `json:"sample_count"`
OldHalfCount int `json:"old_half_count"`
NewHalfCount int `json:"new_half_count"`
}
type AlarmEventAPI struct {
Time string `json:"time"`
Severity string `json:"severity"`
Source string `json:"source"`
State string `json:"state"`
Message string `json:"message"`
Value float64 `json:"value"`
Limit float64 `json:"limit"`
}
type AlarmResponse struct {
Events []AlarmEventAPI `json:"events"`
}
type PublicConfigResponse struct {
Version string `json:"version"`
UIRevision uint64 `json:"ui_revision"`
UI UIConfig `json:"ui"`
Thresholds ThresholdsConfig `json:"thresholds"`
Trend TrendConfig `json:"trend"`
Press PressConfig `json:"press"`
Modules ModulesConfig `json:"modules"`
LicenseHint LicenseHint `json:"license"`
}
type LicenseHint struct {
Enabled bool `json:"enabled"`
TrialDays int `json:"trial_days"`
}
type NumericStats struct {
Avg float64
AvgSq float64
Min float64
Max float64
Count int
}
func (s NumericStats) StdDev() float64 {
if s.Count <= 1 {
return 0
}
v := s.AvgSq - (s.Avg * s.Avg)
if v < 0 {
v = 0
}
return math.Sqrt(v)
}
type runningStats struct {
sum float64
sumSq float64
min float64
max float64
count int
}
func (r *runningStats) Add(v float64) {
if r.count == 0 {
r.min = v
r.max = v
} else {
if v < r.min {
r.min = v
}
if v > r.max {
r.max = v
}
}
r.sum += v
r.sumSq += v * v
r.count++
}
func (r runningStats) Avg() float64 {
if r.count == 0 {
return 0
}
return r.sum / float64(r.count)
}
func (r runningStats) StdDev() float64 {
if r.count <= 1 {
return 0
}
avg := r.Avg()
v := (r.sumSq / float64(r.count)) - (avg * avg)
if v < 0 {
v = 0
}
return math.Sqrt(v)
}
type AlarmTracker struct {
sync.Mutex
PLCKnown bool
PLCConnected bool
LeftZone string
RightZone string
ImbZone string
LastChange map[string]time.Time
}
// ---------------------------------------------------------------------------
// MQTT types
// ---------------------------------------------------------------------------
type MQTTReceivedMsg struct {
Topic string `json:"topic"`
Payload string `json:"payload"`
Retained bool `json:"retained"`
Time string `json:"time"`
}
type MQTTPublishRequest struct {
Topic string `json:"topic"`
Payload string `json:"payload"`
QoS int `json:"qos"`
Retain bool `json:"retain"`
}
type MQTTSubscribeRequest struct {
Topic string `json:"topic"`
QoS int `json:"qos"`
}
type MQTTStatusResponse struct {
Enabled bool `json:"enabled"`
Connected bool `json:"connected"`
Broker string `json:"broker"`
ClientID string `json:"client_id"`
TopicPrefix string `json:"topic_prefix"`
Subscribed []string `json:"subscribed"`
LastError string `json:"last_error,omitempty"`
}
type mqttManager struct {
mu sync.RWMutex
client mqtt.Client
connected bool
lastErr string
broker string
clientID string
prefix string
msgHistory []MQTTReceivedMsg
msgMax int
subs map[string]byte
}
// ---------------------------------------------------------------------------
// Package-level singletons
// ---------------------------------------------------------------------------
var (
cfg Config
cfgMu sync.RWMutex
state AppState
db *sql.DB
sampleCh chan Sample
alarmCh chan AlarmEvent
indexTmpl *template.Template
alarmTracker AlarmTracker
uiRevision uint64 = 1
mqttMgr *mqttManager
mqttAlarmCh chan AlarmEvent
licenseMgr *LicenseManager
)
// ---------------------------------------------------------------------------
// MQTT manager
// ---------------------------------------------------------------------------
func newMQTTManager(mcfg MQTTConfig) *mqttManager {
m := &mqttManager{
broker: mcfg.Broker,
clientID: mcfg.ClientID,
prefix: mcfg.TopicPrefix,
msgMax: 500,
subs: make(map[string]byte),
}
opts := mqtt.NewClientOptions()
opts.AddBroker(mcfg.Broker)
opts.SetClientID(mcfg.ClientID)
if mcfg.Username != "" {
opts.SetUsername(mcfg.Username)
opts.SetPassword(mcfg.Password)
}
opts.SetAutoReconnect(true)
opts.SetMaxReconnectInterval(time.Duration(mcfg.ReconnectDelaySec*6) * time.Second)
opts.SetConnectTimeout(time.Duration(mcfg.ConnectTimeoutSec) * time.Second)
opts.SetCleanSession(true)
opts.SetKeepAlive(30 * time.Second)
opts.SetOnConnectHandler(func(c mqtt.Client) {
m.mu.Lock()
m.connected = true
m.lastErr = ""
resub := make(map[string]byte, len(m.subs))
for k, v := range m.subs {
resub[k] = v
}
m.mu.Unlock()
log.Printf("MQTT connected to %s", mcfg.Broker)
for topic, qos := range resub {
tok := c.Subscribe(topic, qos, m.messageHandler)
if tok.Wait() && tok.Error() != nil {
log.Printf("MQTT re-subscribe %s failed: %v", topic, tok.Error())
}
}
})
opts.SetConnectionLostHandler(func(_ mqtt.Client, err error) {
m.mu.Lock()
m.connected = false
if err != nil {
m.lastErr = err.Error()
}
m.mu.Unlock()
log.Printf("MQTT connection lost: %v", err)
})
m.client = mqtt.NewClient(opts)
return m
}
func (m *mqttManager) connect() error {
tok := m.client.Connect()
if !tok.WaitTimeout(30 * time.Second) {
return fmt.Errorf("MQTT connect timeout after 30s")
}
if err := tok.Error(); err != nil {
m.mu.Lock()
m.lastErr = err.Error()
m.mu.Unlock()
return err
}
return nil
}
func (m *mqttManager) disconnect() {
if m.client != nil {
m.client.Disconnect(500)
}
}
func (m *mqttManager) messageHandler(_ mqtt.Client, msg mqtt.Message) {
entry := MQTTReceivedMsg{
Topic: msg.Topic(),
Payload: string(msg.Payload()),
Retained: msg.Retained(),
Time: time.Now().UTC().Format(time.RFC3339Nano),
}
m.mu.Lock()
m.msgHistory = append(m.msgHistory, entry)
if len(m.msgHistory) > m.msgMax {
half := m.msgMax / 2
copy(m.msgHistory, m.msgHistory[half:])
m.msgHistory = m.msgHistory[:m.msgMax-half]
}
m.mu.Unlock()
}
func (m *mqttManager) publish(topic, payload string, qos byte, retain bool) error {
if topic == "" {
return fmt.Errorf("topic must not be empty")
}
if qos > 2 {
qos = 2
}
m.mu.RLock()
ok := m.connected
m.mu.RUnlock()
if !ok {
return fmt.Errorf("MQTT not connected")
}
tok := m.client.Publish(topic, qos, retain, payload)
if qos > 0 {
if !tok.WaitTimeout(5 * time.Second) {
return fmt.Errorf("MQTT publish timeout")
}
return tok.Error()
}
return nil
}
func (m *mqttManager) subscribe(topic string, qos byte) error {
if topic == "" {
return fmt.Errorf("topic must not be empty")
}
if qos > 2 {
qos = 2
}
m.mu.Lock()
m.subs[topic] = qos
m.mu.Unlock()
m.mu.RLock()
ok := m.connected
m.mu.RUnlock()
if !ok {
return nil
}
tok := m.client.Subscribe(topic, qos, m.messageHandler)
if tok.Wait() && tok.Error() != nil {
return tok.Error()
}
return nil
}
func (m *mqttManager) unsubscribe(topic string) error {
m.mu.Lock()
delete(m.subs, topic)
m.mu.Unlock()
m.mu.RLock()
ok := m.connected
m.mu.RUnlock()
if !ok {
return nil
}
tok := m.client.Unsubscribe(topic)
if tok.Wait() && tok.Error() != nil {
return tok.Error()
}
return nil
}
func (m *mqttManager) status() MQTTStatusResponse {
m.mu.RLock()
defer m.mu.RUnlock()
subs := make([]string, 0, len(m.subs))
for t := range m.subs {
subs = append(subs, t)
}
return MQTTStatusResponse{
Enabled: true,
Connected: m.connected,
Broker: m.broker,
ClientID: m.clientID,
TopicPrefix: m.prefix,
Subscribed: subs,
LastError: m.lastErr,
}
}
func (m *mqttManager) getMessages(limit int) []MQTTReceivedMsg {
if limit <= 0 {
limit = 50
}
if limit > 500 {
limit = 500
}
m.mu.RLock()
defer m.mu.RUnlock()
total := len(m.msgHistory)
if total == 0 {
return []MQTTReceivedMsg{}
}
start := 0
if total > limit {
start = total - limit
}
out := make([]MQTTReceivedMsg, total-start)
copy(out, m.msgHistory[start:])
return out
}
func startMQTTPublisher(ctx context.Context) {
if mqttMgr == nil {
return
}
mcfg := getConfigSnapshot().MQTT
if !mcfg.AutoPublish {
return
}
interval := time.Duration(mcfg.PublishIntervalMs) * time.Millisecond
ticker := time.NewTicker(interval)
defer ticker.Stop()
prefix := mcfg.TopicPrefix
qos := byte(mcfg.QoS)
retain := mcfg.Retain
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !licenseAllowsRuntime() {
continue
}
s := snapshotState()
full, err := json.Marshal(s)
if err == nil {
_ = mqttMgr.publish(prefix+"/data", string(full), qos, retain)
}
_ = mqttMgr.publish(prefix+"/force/left", fmt.Sprintf("%.2f", s.SilaL), qos, retain)
_ = mqttMgr.publish(prefix+"/force/right", fmt.Sprintf("%.2f", s.SilaR), qos, retain)
_ = mqttMgr.publish(prefix+"/force/sum_kn", fmt.Sprintf("%.2f", s.SumkN), qos, retain)
_ = mqttMgr.publish(prefix+"/force/imbalance", fmt.Sprintf("%.2f", s.ImbalancePercent), qos, retain)
connStr := "false"
if s.Connected {
connStr = "true"
}
_ = mqttMgr.publish(prefix+"/connected", connStr, qos, retain)
}
}
}
func mqttAlarmWorker() {
for a := range mqttAlarmCh {
mqttPublishAlarm(a)
}
}
func mqttPublishAlarm(a AlarmEvent) {
if mqttMgr == nil {
return
}
mcfg := getConfigSnapshot().MQTT
if !mcfg.Enabled {
return
}
payload, err := json.Marshal(map[string]interface{}{
"time": a.TS.UTC().Format(time.RFC3339),
"severity": a.Severity,
"source": a.Source,
"code": a.Code,
"state": a.State,
"message": a.Message,
"value": a.Value,
"limit": a.Limit,
})
if err != nil {
return
}
_ = mqttMgr.publish(mcfg.TopicPrefix+"/alarm", string(payload), byte(mcfg.QoS), false)
}
// ---------------------------------------------------------------------------
// Force calculation
// ---------------------------------------------------------------------------
func calculateForces(leftPercent, rightPercent float32, maxTonnage float64) (leftKN, rightKN, sumPercent, sumKN float32) {
lp := float64(leftPercent)
rp := float64(rightPercent)
sumPct := (lp + rp) / 2.0
left := (lp / 100.0) * (maxTonnage / 2.0)
right := (rp / 100.0) * (maxTonnage / 2.0)
total := (sumPct / 100.0) * maxTonnage
return float32(left), float32(right), float32(sumPct), float32(total)
}
func getConfigSnapshot() Config {
cfgMu.RLock()
defer cfgMu.RUnlock()
return cfg
}
func staleThreshold() time.Duration {
pollMs := getConfigSnapshot().PLC.PollMs
if pollMs <= 0 {
pollMs = 500
}
staleMs := pollMs * 4
if staleMs < 2500 {
staleMs = 2500
}
return time.Duration(staleMs) * time.Millisecond
}
func hotReloadSectionsLocked(dst *Config, src Config) {
dst.Thresholds = src.Thresholds
dst.Trend = src.Trend
dst.Press = src.Press
dst.UI = src.UI
dst.Modules = src.Modules
}
func configSectionChanges(oldCfg, newCfg Config) (hotSections []string, restartSections []string) {
if !reflect.DeepEqual(oldCfg.Thresholds, newCfg.Thresholds) {
hotSections = append(hotSections, "thresholds")
}
if !reflect.DeepEqual(oldCfg.Trend, newCfg.Trend) {
hotSections = append(hotSections, "trend")
}
if !reflect.DeepEqual(oldCfg.Press, newCfg.Press) {
hotSections = append(hotSections, "press")
}
if !reflect.DeepEqual(oldCfg.UI, newCfg.UI) {
hotSections = append(hotSections, "ui")
}
if !reflect.DeepEqual(oldCfg.Modules, newCfg.Modules) {
hotSections = append(hotSections, "modules")
}
if !reflect.DeepEqual(oldCfg.Server, newCfg.Server) {
restartSections = append(restartSections, "server")
}
if !reflect.DeepEqual(oldCfg.PLC, newCfg.PLC) {
restartSections = append(restartSections, "plc")
}
if !reflect.DeepEqual(oldCfg.DB, newCfg.DB) {
restartSections = append(restartSections, "db")
}
if !reflect.DeepEqual(oldCfg.MQTT, newCfg.MQTT) {
restartSections = append(restartSections, "mqtt")
}
return hotSections, restartSections
}
func reloadConfigSafely(configPath string) {
newCfg, err := loadConfigStrict(configPath)
if err != nil {
log.Printf("config reload rejected: %v", err)
return
}
if err := validateConfig(newCfg); err != nil {
log.Printf("config reload rejected: %v", err)
return
}
oldCfg := getConfigSnapshot()
hotSections, restartSections := configSectionChanges(oldCfg, newCfg)
if len(hotSections) > 0 {
cfgMu.Lock()
updated := cfg
hotReloadSectionsLocked(&updated, newCfg)
cfg = updated
cfgMu.Unlock()
atomic.AddUint64(&uiRevision, 1)
}
if len(hotSections) == 0 && len(restartSections) == 0 {
log.Printf("config reload checked: no effective changes")
return
}
if len(hotSections) > 0 {
log.Printf("config hot-reloaded safely: %s", strings.Join(hotSections, ", "))
}
if len(restartSections) > 0 {
log.Printf("config changes detected in %s; restart required before they take effect", strings.Join(restartSections, ", "))
}
}
func startConfigWatcher(ctx context.Context, configPath string) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
dir := filepath.Dir(configPath)
target := filepath.Clean(configPath)
if !filepath.IsAbs(target) {
target = filepath.Join(dir, target)
}
target = filepath.Clean(target)
if err := watcher.Add(dir); err != nil {
_ = watcher.Close()
return err
}
go func() {
defer watcher.Close()
var (
debounceTimer *time.Timer
debounceC <-chan time.Time
)
resetDebounce := func() {
if debounceTimer == nil {
debounceTimer = time.NewTimer(350 * time.Millisecond)
} else {
if !debounceTimer.Stop() {
select {
case <-debounceTimer.C:
default:
}
}
debounceTimer.Reset(350 * time.Millisecond)
}
debounceC = debounceTimer.C
}
for {
select {
case <-ctx.Done():
if debounceTimer != nil {
debounceTimer.Stop()
}
return
case event, ok := <-watcher.Events:
if !ok {
return
}
name := event.Name
if !filepath.IsAbs(name) {
name = filepath.Join(dir, name)
}
if filepath.Clean(name) != target {
continue
}
if event.Has(fsnotify.Chmod) {
continue
}
if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) || event.Has(fsnotify.Rename) {
resetDebounce()
}
case <-debounceC:
debounceC = nil
reloadConfigSafely(configPath)
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Printf("config watcher error: %v", err)
}
}
}()
return nil
}
// ---------------------------------------------------------------------------
// State helpers
// ---------------------------------------------------------------------------
func snapshotState() APIState {
state.RLock()
defer state.RUnlock()
now := time.Now()
lastUpdate := ""
if !state.LastUpdate.IsZero() {
lastUpdate = state.LastUpdate.Format(time.RFC3339Nano)
}
stale := false
if state.Connected && !state.LastUpdate.IsZero() {
stale = now.Sub(state.LastUpdate) > staleThreshold()
}
return APIState{
Connected: state.Connected,
SilaL: state.SilaL,
SilaR: state.SilaR,
SilaLkN: state.SilaLkN,
SilaRkN: state.SilaRkN,
SumPercent: state.SumPercent,
SumkN: state.SumkN,
ImbalancePercent: state.ImbalancePercent,
BiasPercent: state.BiasPercent,
LastUpdate: lastUpdate,
ServerTime: now.Format(time.RFC3339Nano),
Stale: stale,
DroppedSamples: state.DroppedSamples,
DroppedEvents: state.DroppedEvents,
}
}
func markDisconnected(reason string) {
state.Lock()
state.Connected = false
state.Unlock()
maybeLogPLCDisconnected(reason)
}
func enqueueSample(s Sample) {
select {
case sampleCh <- s:
default:
state.Lock()
state.DroppedSamples++
state.Unlock()
}
}
func enqueueAlarm(a AlarmEvent) {
select {
case alarmCh <- a:
default:
state.Lock()
state.DroppedEvents++
state.Unlock()
}
if mqttAlarmCh != nil {
select {
case mqttAlarmCh <- a:
default:
}
}
}
// ---------------------------------------------------------------------------
// Database initialisation
// ---------------------------------------------------------------------------
func ensureColumn(database *sql.DB, tableName, columnName, definition string) error {
rows, err := database.Query(fmt.Sprintf("PRAGMA table_info(%s)", tableName))
if err != nil {
return err
}
defer rows.Close()
found := false
for rows.Next() {
var cid int
var name, ctype string
var notNull int
var dfltValue sql.NullString
var pk int
if err := rows.Scan(&cid, &name, &ctype, &notNull, &dfltValue, &pk); err != nil {
return err
}
if name == columnName {
found = true
break
}
}
if err := rows.Err(); err != nil {
return err
}
if found {
return nil
}
_, err = database.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", tableName, columnName, definition))
return err
}
func initDatabase(dbPath string, dbCfg DBConfig) (*sql.DB, error) {
dsn := fmt.Sprintf("file:%s?_busy_timeout=%d&_foreign_keys=on", filepath.ToSlash(dbPath), dbCfg.BusyTimeoutMs)
database, err := sql.Open("sqlite3", dsn)
if err != nil {
return nil, fmt.Errorf("open sqlite: %w", err)
}
database.SetMaxOpenConns(4)
database.SetMaxIdleConns(2)
database.SetConnMaxLifetime(time.Hour)
pragmas := []string{
"PRAGMA journal_mode=WAL;",
"PRAGMA synchronous=NORMAL;",
fmt.Sprintf("PRAGMA wal_autocheckpoint=%d;", dbCfg.CheckpointPages),
fmt.Sprintf("PRAGMA busy_timeout=%d;", dbCfg.BusyTimeoutMs),
"PRAGMA temp_store=MEMORY;",
}
for _, q := range pragmas {
if _, err := database.Exec(q); err != nil {
_ = database.Close()
return nil, fmt.Errorf("sqlite pragma failed (%s): %w", q, err)
}
}
schema := `
CREATE TABLE IF NOT EXISTS samples (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts DATETIME NOT NULL,
ts_unix_ns INTEGER NOT NULL DEFAULT 0,
sila_l_pct REAL NOT NULL,
sila_r_pct REAL NOT NULL,
sila_l_kn REAL NOT NULL,
sila_r_kn REAL NOT NULL,
sum_pct REAL NOT NULL,
sum_kn REAL NOT NULL,
imbalance_pct REAL NOT NULL DEFAULT 0,
bias_pct REAL NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_samples_ts ON samples(ts);
CREATE INDEX IF NOT EXISTS idx_samples_ts_unix_ns ON samples(ts_unix_ns);
CREATE TABLE IF NOT EXISTS alarm_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts DATETIME NOT NULL,
ts_unix_ns INTEGER NOT NULL DEFAULT 0,
severity TEXT NOT NULL,
source TEXT NOT NULL,
code TEXT NOT NULL,
state TEXT NOT NULL,
message TEXT NOT NULL,
value REAL NOT NULL DEFAULT 0,
limit_value REAL NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_alarm_events_ts_unix_ns ON alarm_events(ts_unix_ns DESC);
`
if _, err := database.Exec(schema); err != nil {
_ = database.Close()
return nil, fmt.Errorf("create schema: %w", err)
}
if _, err := database.Exec(`CREATE INDEX IF NOT EXISTS idx_samples_trend ON samples(ts_unix_ns, sum_pct, imbalance_pct);`); err != nil {
_ = database.Close()
return nil, fmt.Errorf("create trend index: %w", err)
}
migrations := []struct{ table, col, def string }{
{"samples", "ts_unix_ns", "INTEGER NOT NULL DEFAULT 0"},
{"samples", "imbalance_pct", "REAL NOT NULL DEFAULT 0"},
{"samples", "bias_pct", "REAL NOT NULL DEFAULT 0"},
{"alarm_events", "ts_unix_ns", "INTEGER NOT NULL DEFAULT 0"},
}
for _, m := range migrations {
if err := ensureColumn(database, m.table, m.col, m.def); err != nil {
_ = database.Close()
return nil, fmt.Errorf("migration (%s.%s): %w", m.table, m.col, err)
}
}
if _, err := database.Exec(`CREATE INDEX IF NOT EXISTS idx_samples_ts_unix_ns ON samples(ts_unix_ns)`); err != nil {
_ = database.Close()
return nil, fmt.Errorf("create ts_unix_ns index: %w", err)
}
for _, tbl := range []string{"samples", "alarm_events"} {
q := fmt.Sprintf(`UPDATE %s SET ts_unix_ns = CAST(strftime('%%s', ts) AS INTEGER) * 1000000000 WHERE ts_unix_ns = 0 AND ts IS NOT NULL`, tbl)
if _, err := database.Exec(q); err != nil {
log.Printf("warning: ts_unix_ns backfill for %s failed: %v", tbl, err)
}
}
if _, err := database.Exec("ANALYZE"); err != nil {
log.Printf("warning: sqlite analyze failed: %v", err)
}
return database, nil
}
// ---------------------------------------------------------------------------
// DB writer goroutines
// ---------------------------------------------------------------------------
func startDBWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int) {
ticker := time.NewTicker(time.Duration(flushMs) * time.Millisecond)
defer ticker.Stop()
stmt, err := database.Prepare(`
INSERT INTO samples (
ts, ts_unix_ns, sila_l_pct, sila_r_pct, sila_l_kn, sila_r_kn,
sum_pct, sum_kn, imbalance_pct, bias_pct
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
log.Printf("db writer prepare failed: %v", err)
return
}
defer stmt.Close()
batch := make([]Sample, 0, batchSize)
flushErrCount := 0
flush := func() {
if len(batch) == 0 {
return
}
tx, err := database.Begin()
if err != nil {
log.Printf("db begin failed: %v", err)
flushErrCount++
if flushErrCount >= 3 {
log.Printf("db writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
batch = batch[:0]
flushErrCount = 0
}
return
}
txStmt := tx.Stmt(stmt)
defer txStmt.Close()
ok := true
for _, s := range batch {
if _, err := txStmt.Exec(
s.TS.UTC().Format(time.RFC3339Nano), s.TS.UTC().UnixNano(),
s.SilaLPct, s.SilaRPct, s.SilaLKN, s.SilaRKN,
s.SumPercent, s.SumKN, s.ImbalancePercent, s.BiasPercent,
); err != nil {
ok = false
log.Printf("db insert failed: %v", err)
break
}
}
if !ok {
_ = tx.Rollback()
flushErrCount++
if flushErrCount >= 3 {
log.Printf("db writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
batch = batch[:0]
flushErrCount = 0
}
return
}
if err := tx.Commit(); err != nil {
log.Printf("db commit failed: %v", err)
flushErrCount++
if flushErrCount >= 3 {
log.Printf("db writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
batch = batch[:0]
flushErrCount = 0
}
return
}
batch = batch[:0]
flushErrCount = 0
}
for {
select {
case <-ctx.Done():
drained := 0
for {
select {
case s := <-sampleCh:
batch = append(batch, s)
drained++
if drained > 10000 {
log.Printf("db writer: drain limit reached, dropping remaining")
flush()
return
}
default:
flush()
return
}
}
case s := <-sampleCh:
batch = append(batch, s)
if len(batch) >= batchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}
func startAlarmWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int) {
ticker := time.NewTicker(time.Duration(flushMs) * time.Millisecond)
defer ticker.Stop()
stmt, err := database.Prepare(`
INSERT INTO alarm_events (
ts, ts_unix_ns, severity, source, code, state, message, value, limit_value
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
log.Printf("alarm db writer prepare failed: %v", err)
return
}
defer stmt.Close()
batch := make([]AlarmEvent, 0, batchSize)
flushErrCount := 0
flush := func() {
if len(batch) == 0 {
return
}
tx, err := database.Begin()
if err != nil {
log.Printf("alarm db begin failed: %v", err)
flushErrCount++
if flushErrCount >= 3 {
log.Printf("alarm writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
batch = batch[:0]
flushErrCount = 0
}
return
}
txStmt := tx.Stmt(stmt)
defer txStmt.Close()
ok := true
for _, a := range batch {
if _, err := txStmt.Exec(
a.TS.UTC().Format(time.RFC3339Nano), a.TS.UTC().UnixNano(),
a.Severity, a.Source, a.Code, a.State, a.Message, a.Value, a.Limit,
); err != nil {
ok = false
log.Printf("alarm db insert failed: %v", err)
break
}
}
if !ok {
_ = tx.Rollback()
flushErrCount++
if flushErrCount >= 3 {
log.Printf("alarm writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
batch = batch[:0]
flushErrCount = 0
}
return
}
if err := tx.Commit(); err != nil {
log.Printf("alarm db commit failed: %v", err)
flushErrCount++
if flushErrCount >= 3 {
log.Printf("alarm writer: dropping batch of %d after %d failures", len(batch), flushErrCount)
batch = batch[:0]
flushErrCount = 0
}
return
}
batch = batch[:0]
flushErrCount = 0
}
for {
select {
case <-ctx.Done():
drained := 0
for {
select {
case a := <-alarmCh:
batch = append(batch, a)
drained++
if drained > 10000 {
log.Printf("alarm writer: drain limit reached, dropping remaining")
flush()
return
}
default:
flush()
return
}
}
case a := <-alarmCh:
batch = append(batch, a)
if len(batch) >= batchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}
func startDBCleanup(ctx context.Context, database *sql.DB, retentionDays, intervalHr int) {
if retentionDays <= 0 {
return
}
ticker := time.NewTicker(time.Duration(intervalHr) * time.Hour)
defer ticker.Stop()
cleanup := func() {
cutoffNs := time.Now().AddDate(0, 0, -retentionDays).UTC().UnixNano()
for _, tbl := range []string{"samples", "alarm_events"} {
for {
rows, err := database.QueryContext(ctx,
fmt.Sprintf(`SELECT rowid FROM %s WHERE ts_unix_ns > 0 AND ts_unix_ns < ? LIMIT 5000`, tbl),
cutoffNs,
)
if err != nil {
log.Printf("db cleanup %s select failed: %v", tbl, err)
break
}
var rowids []int64
for rows.Next() {
var rid int64
if err := rows.Scan(&rid); err != nil {
log.Printf("db cleanup %s scan failed: %v", tbl, err)
break
}
rowids = append(rowids, rid)
}
rows.Close()
if len(rowids) == 0 {
break
}
placeholders := make([]string, len(rowids))
args := make([]any, len(rowids))
for i, rid := range rowids {
placeholders[i] = "?"
args[i] = rid
}
query := fmt.Sprintf(`DELETE FROM %s WHERE rowid IN (%s)`, tbl, strings.Join(placeholders, ","))
if _, err := database.ExecContext(ctx, query, args...); err != nil {
log.Printf("db cleanup %s delete failed: %v", tbl, err)
break
}
}
}
}
cleanup()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cleanup()
}
}
}
// ---------------------------------------------------------------------------
// Alarm zone helpers
// ---------------------------------------------------------------------------
func zoneFromValue(value, warn, crit float64) string {
if value >= crit {
return "critical"
}
if value >= warn {
return "warning"
}
return "ok"
}
func sourceName(source string) string {
switch source {
case "force_left":
return "Left force"
case "force_right":
return "Right force"
case "imbalance":
return "Imbalance"
case "plc":
return "PLC"
default:
return source
}
}
func sourceLimit(source, zone string) float64 {
config := getConfigSnapshot()
switch source {
case "imbalance":
if zone == "critical" {
return config.Thresholds.ImbalanceCriticalPercent
}
if zone == "warning" {
return config.Thresholds.ImbalanceWarningPercent
}
default:
if zone == "critical" {
return config.Thresholds.CriticalPercent
}
if zone == "warning" {
return config.Thresholds.WarningPercent
}
}
return 0
}
func maybeLogZoneChange(source, prev, curr string, value float64) {
if prev == curr {
return
}
name := sourceName(source)
now := time.Now()
if prev == "" && curr == "ok" {
return
}
alarmTracker.Lock()
if alarmTracker.LastChange == nil {
alarmTracker.LastChange = make(map[string]time.Time)
}
if last, ok := alarmTracker.LastChange[source]; ok && now.Sub(last) < 5*time.Second {
alarmTracker.Unlock()
return
}
alarmTracker.LastChange[source] = now
alarmTracker.Unlock()
switch curr {
case "ok":
enqueueAlarm(AlarmEvent{
TS: now,
Severity: "info",
Source: source,
Code: source + "_clear",
State: "clear",
Message: fmt.Sprintf("%s returned to OK", name),
Value: value,
Limit: 0,
})
case "warning":
msg := fmt.Sprintf("%s entered WARNING zone", name)
if prev == "critical" {
msg = fmt.Sprintf("%s downgraded from CRITICAL to WARNING", name)
}
enqueueAlarm(AlarmEvent{
TS: now,
Severity: "warning",
Source: source,
Code: source + "_warning",
State: "active",
Message: msg,
Value: value,
Limit: sourceLimit(source, "warning"),
})
case "critical":
msg := fmt.Sprintf("%s entered CRITICAL zone", name)
if prev == "warning" {
msg = fmt.Sprintf("%s escalated from WARNING to CRITICAL", name)
}
enqueueAlarm(AlarmEvent{
TS: now,
Severity: "critical",
Source: source,
Code: source + "_critical",
State: "active",
Message: msg,
Value: value,
Limit: sourceLimit(source, "critical"),
})
}
}
func evaluateProcessAlarms(s Sample) {
config := getConfigSnapshot()
warn := config.Thresholds.WarningPercent
crit := config.Thresholds.CriticalPercent
imbWarn := config.Thresholds.ImbalanceWarningPercent
imbCrit := config.Thresholds.ImbalanceCriticalPercent
leftZone := zoneFromValue(float64(s.SilaLPct), warn, crit)
rightZone := zoneFromValue(float64(s.SilaRPct), warn, crit)
imbZone := zoneFromValue(float64(s.ImbalancePercent), imbWarn, imbCrit)
alarmTracker.Lock()
prevLeft := alarmTracker.LeftZone
prevRight := alarmTracker.RightZone
prevImb := alarmTracker.ImbZone
alarmTracker.LeftZone = leftZone
alarmTracker.RightZone = rightZone
alarmTracker.ImbZone = imbZone
alarmTracker.Unlock()
maybeLogZoneChange("force_left", prevLeft, leftZone, float64(s.SilaLPct))
maybeLogZoneChange("force_right", prevRight, rightZone, float64(s.SilaRPct))
maybeLogZoneChange("imbalance", prevImb, imbZone, float64(s.ImbalancePercent))
}
func maybeLogPLCConnected() {
alarmTracker.Lock()
defer alarmTracker.Unlock()
if !alarmTracker.PLCKnown {
alarmTracker.PLCKnown = true
alarmTracker.PLCConnected = true
enqueueAlarm(AlarmEvent{
TS: time.Now(), Severity: "info", Source: "plc",
Code: "plc_connected", State: "info",
Message: "PLC connection established", Value: 1,
})
return
}
if !alarmTracker.PLCConnected {
alarmTracker.PLCConnected = true
enqueueAlarm(AlarmEvent{
TS: time.Now(), Severity: "info", Source: "plc",
Code: "plc_restored", State: "info",
Message: "PLC connection restored", Value: 1,
})
}
}
func maybeLogPLCDisconnected(reason string) {
alarmTracker.Lock()
defer alarmTracker.Unlock()
if !alarmTracker.PLCKnown || !alarmTracker.PLCConnected {
return
}
alarmTracker.PLCConnected = false
alarmTracker.LeftZone = ""
alarmTracker.RightZone = ""
alarmTracker.ImbZone = ""
enqueueAlarm(AlarmEvent{
TS: time.Now(), Severity: "critical", Source: "plc",
Code: "plc_disconnected", State: "active",
Message: "PLC connection lost: " + reason, Value: 0,
})
}
// ---------------------------------------------------------------------------
// PLC poller
// ---------------------------------------------------------------------------
func startPLCPoller(ctx context.Context) {
bootCfg := getConfigSnapshot()
pollInterval := time.Duration(bootCfg.PLC.PollMs) * time.Millisecond
reconnectDelay := time.Duration(bootCfg.PLC.ReconnectDelaySec) * time.Second
dbNum := bootCfg.PLC.DBNum
for {
select {
case <-ctx.Done():
return
default:
}
if !licenseAllowsRuntime() {
markDisconnected("license locked")
select {
case <-ctx.Done():
return
case <-time.After(2 * time.Second):
}
continue
}
handler := gos7.NewTCPClientHandler(bootCfg.PLC.IP, bootCfg.PLC.Rack, bootCfg.PLC.Slot)
handler.Timeout = time.Duration(bootCfg.PLC.ConnectTimeoutSec) * time.Second
handler.IdleTimeout = time.Duration(bootCfg.PLC.IdleTimeoutSec) * time.Second
if err := handler.Connect(); err != nil {
markDisconnected(err.Error())
log.Printf("PLC connect failed: %v - retrying in %ds...", err, bootCfg.PLC.ReconnectDelaySec)
select {
case <-ctx.Done():
return
case <-time.After(reconnectDelay):
}
continue
}
maybeLogPLCConnected()
client := gos7.NewClient(handler)
log.Println("PLC connected successfully")
buf := make([]byte, 8)
readErrCount := 0
for {
select {
case <-ctx.Done():
_ = handler.Close()
return
default:
}
if !licenseAllowsRuntime() {
markDisconnected("license locked")
_ = handler.Close()
break
}
if err := client.AGReadDB(dbNum, 0, 8, buf); err != nil {
readErrCount++
if readErrCount < 3 {
log.Printf("PLC read error (attempt %d/3): %v", readErrCount, err)
select {
case <-ctx.Done():
_ = handler.Close()
return
case <-time.After(pollInterval):
}
continue
}
log.Printf("PLC read error: %v - reconnecting...", err)
markDisconnected(err.Error())
_ = handler.Close()
break
}
readErrCount = 0
var helper gos7.Helper
silaL := helper.GetRealAt(buf, 0)
silaR := helper.GetRealAt(buf, 4)
cfgSnap := getConfigSnapshot()
leftKN, rightKN, sumPercent, sumKN := calculateForces(silaL, silaR, cfgSnap.Press.MaxTonnage)
imbalance := float32(math.Abs(float64(silaL - silaR)))
bias := silaL - silaR
now := time.Now()
state.Lock()
state.Connected = true
state.SilaL = silaL
state.SilaR = silaR
state.SilaLkN = leftKN
state.SilaRkN = rightKN
state.SumPercent = sumPercent
state.SumkN = sumKN
state.ImbalancePercent = imbalance
state.BiasPercent = bias
state.LastUpdate = now
state.Unlock()
sample := Sample{
TS: now, SilaLPct: silaL, SilaRPct: silaR,
SilaLKN: leftKN, SilaRKN: rightKN,
SumPercent: sumPercent, SumKN: sumKN,
ImbalancePercent: imbalance, BiasPercent: bias,
}
evaluateProcessAlarms(sample)
enqueueSample(sample)
select {
case <-ctx.Done():
_ = handler.Close()
return
case <-time.After(pollInterval):
}
}
}
}
// ---------------------------------------------------------------------------
// Query helpers
// ---------------------------------------------------------------------------
func parseWindow(raw string) (time.Duration, string, error) {
s := strings.TrimSpace(strings.ToLower(raw))
if s == "" {
s = fmt.Sprintf("%dm", getConfigSnapshot().Trend.Minutes)
}
if strings.HasSuffix(s, "d") {
n, err := strconv.Atoi(strings.TrimSuffix(s, "d"))
if err != nil || n <= 0 {
return 0, "", fmt.Errorf("invalid day window")
}
return time.Duration(n) * 24 * time.Hour, s, nil
}
d, err := time.ParseDuration(s)
if err != nil || d <= 0 {
return 0, "", fmt.Errorf("invalid window")
}
return d, s, nil
}
func formatHistoryLabel(t time.Time, window time.Duration) string {
local := t.Local()
if window >= 12*time.Hour {
return local.Format("02.01 15:04")
}
return local.Format("15:04:05.000")
}
func queryHistory(ctx context.Context, window time.Duration) ([]HistoryPoint, error) {
cutoffNs := time.Now().Add(-window).UTC().UnixNano()
rows, err := db.QueryContext(ctx,
`SELECT ts_unix_ns, sila_l_pct, sila_r_pct FROM samples WHERE ts_unix_ns >= ? ORDER BY ts_unix_ns ASC`,
cutoffNs)
if err != nil {
return nil, err
}
defer rows.Close()
points := make([]HistoryPoint, 0, 1024)
for rows.Next() {
var tsUnix int64
var l, r float64
if err := rows.Scan(&tsUnix, &l, &r); err != nil {
return nil, err
}
t := time.Unix(0, tsUnix).Local()
points = append(points, HistoryPoint{
Time: formatHistoryLabel(t, window),
SilaL: float32(l),
SilaR: float32(r),
})
}
if err := rows.Err(); err != nil {
return nil, err
}
maxPts := getConfigSnapshot().DB.MaxChartPoints
if len(points) <= maxPts {
return points, nil
}
return downsamplePoints(points, maxPts), nil
}
func downsamplePoints(points []HistoryPoint, max int) []HistoryPoint {
if len(points) <= max || max < 3 {
return points
}
out := make([]HistoryPoint, max)
step := float64(len(points)-1) / float64(max-1)
for i := 0; i < max; i++ {
idx := int(float64(i) * step)
if idx >= len(points) {
idx = len(points) - 1
}
out[i] = points[idx]
}
out[max-1] = points[len(points)-1]
return out
}
func validField(field string) (string, error) {
switch field {
case "sum_pct":
return "sum_pct", nil
case "imbalance_pct":
return "imbalance_pct", nil
default:
return "", fmt.Errorf("invalid field")
}
}
func queryNumericStats(ctx context.Context, field string, fromNs, toNs int64) (NumericStats, error) {
safeField, err := validField(field)
if err != nil {
return NumericStats{}, err
}
query := fmt.Sprintf(`
SELECT
COALESCE(AVG(%[1]s), 0),
COALESCE(AVG(%[1]s * %[1]s), 0),
COALESCE(MIN(%[1]s), 0),
COALESCE(MAX(%[1]s), 0),
COUNT(*)
FROM samples
WHERE ts_unix_ns >= ? AND ts_unix_ns < ?
`, safeField)
var stats NumericStats
err = db.QueryRowContext(ctx, query, fromNs, toNs).Scan(&stats.Avg, &stats.AvgSq, &stats.Min, &stats.Max, &stats.Count)
if err != nil {
return NumericStats{}, err
}
return stats, nil
}
// ---------------------------------------------------------------------------
// Trend / stability classification
// ---------------------------------------------------------------------------
func classifyDirection(delta float64, oldCount, newCount int, stableThreshold float64, posLabel, negLabel string) string {
if oldCount < 3 || newCount < 3 {
return "insufficient_data"
}
if math.Abs(delta) < stableThreshold {
return "stable"
}
if delta > 0 {
return posLabel
}
return negLabel
}
func classifyProcessStability(forceStd, imbStd, forceDelta, avgImb5m float64, sampleCount int) (string, string) {
if sampleCount < 8 {
return "insufficient_data", "Too few samples in selected trend window"
}
config := getConfigSnapshot()
if forceStd >= 6.0 || math.Abs(forceDelta) >= 8.0 || avgImb5m >= config.Thresholds.ImbalanceCriticalPercent || imbStd >= 4.0 {
if avgImb5m >= config.Thresholds.ImbalanceCriticalPercent {
return "unstable", "High average imbalance in last 5 minutes"
}
if math.Abs(forceDelta) >= 8.0 {
return "unstable", "Average peak force is drifting fast"
}
if forceStd >= 6.0 {
return "unstable", "Force variation is too high"
}
return "unstable", "Imbalance variation is too high"
}
if forceStd >= 3.0 || math.Abs(forceDelta) >= 3.0 || avgImb5m >= config.Thresholds.ImbalanceWarningPercent || imbStd >= 2.0 {
if avgImb5m >= config.Thresholds.ImbalanceWarningPercent {
return "caution", "Imbalance is trending above warning region"
}
if math.Abs(forceDelta) >= 3.0 {
return "caution", "Average force is drifting"
}
if forceStd >= 3.0 {
return "caution", "Force is less repeatable than normal"
}
return "caution", "Imbalance repeatability is degrading"
}
return "stable", "Process variation is low"
}
var (
trendCache atomic.Value
trendCacheTime int64
)
type trendCacheEntry struct {
Window time.Duration
Resp TrendResponse
}
func buildTrendResponse(ctx context.Context, window time.Duration, label string) (TrendResponse, error) {
now := time.Now().UnixMilli()
if cached, ok := trendCache.Load().(*trendCacheEntry); ok {
if cached.Window == window && now-atomic.LoadInt64(&trendCacheTime) < 1000 {
return cached.Resp, nil
}
}
nowNs := time.Now().UTC().UnixNano()
windowNs := window.Nanoseconds()
startNs := nowNs - windowNs
midNs := startNs + (windowNs / 2)
force5m, err := queryNumericStats(ctx, "sum_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs)
if err != nil {
return TrendResponse{}, err
}
force1h, err := queryNumericStats(ctx, "sum_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs)
if err != nil {
return TrendResponse{}, err
}
imb5m, err := queryNumericStats(ctx, "imbalance_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs)
if err != nil {
return TrendResponse{}, err
}
imb1h, err := queryNumericStats(ctx, "imbalance_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs)
if err != nil {
return TrendResponse{}, err
}
forceOld, err := queryNumericStats(ctx, "sum_pct", startNs, midNs)
if err != nil {
return TrendResponse{}, err
}
forceNew, err := queryNumericStats(ctx, "sum_pct", midNs, nowNs)
if err != nil {
return TrendResponse{}, err
}
imbOld, err := queryNumericStats(ctx, "imbalance_pct", startNs, midNs)
if err != nil {
return TrendResponse{}, err
}
imbNew, err := queryNumericStats(ctx, "imbalance_pct", midNs, nowNs)
if err != nil {
return TrendResponse{}, err
}
forceDelta := forceNew.Avg - forceOld.Avg
imbDelta := imbNew.Avg - imbOld.Avg
forceDirection := classifyDirection(forceDelta, forceOld.Count, forceNew.Count, 1.0, "rising", "falling")
imbDirection := classifyDirection(imbDelta, imbOld.Count, imbNew.Count, 0.5, "worsening", "improving")
fullWindowForce, err := queryNumericStats(ctx, "sum_pct", startNs, nowNs)
if err != nil {
return TrendResponse{}, err
}
fullWindowImb, err := queryNumericStats(ctx, "imbalance_pct", startNs, nowNs)
if err != nil {
return TrendResponse{}, err
}
stability, reason := classifyProcessStability(
fullWindowForce.StdDev(), fullWindowImb.StdDev(),
forceDelta, imb5m.Avg, fullWindowForce.Count,
)
resp := TrendResponse{
Window: label,
AvgPeak5m: float32(force5m.Avg),
AvgPeak1h: float32(force1h.Avg),
AvgImbalance5m: float32(imb5m.Avg),
AvgImbalance1h: float32(imb1h.Avg),
ForceDeltaPct: float32(forceDelta),
ImbalanceDeltaPct: float32(imbDelta),
ForceDirection: forceDirection,
ImbalanceDirection: imbDirection,
ProcessStability: stability,
StabilityReason: reason,
ForceStdDev: float32(fullWindowForce.StdDev()),
ImbalanceStdDev: float32(fullWindowImb.StdDev()),
SampleCount: fullWindowForce.Count,
OldHalfCount: forceOld.Count,
NewHalfCount: forceNew.Count,
}
trendCache.Store(&trendCacheEntry{Window: window, Resp: resp})
atomic.StoreInt64(&trendCacheTime, now)
return resp, nil
}
func queryAlarmEvents(ctx context.Context, limit int) ([]AlarmEventAPI, error) {
if limit <= 0 {
limit = 20
}
if limit > 100 {
limit = 100
}
rows, err := db.QueryContext(ctx, `
SELECT ts_unix_ns, severity, source, state, message, value, limit_value
FROM alarm_events
ORDER BY ts_unix_ns DESC
LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
events := make([]AlarmEventAPI, 0, limit)
for rows.Next() {
var tsUnix int64
var severity, source, state, message string
var value, limitValue float64
if err := rows.Scan(&tsUnix, &severity, &source, &state, &message, &value, &limitValue); err != nil {
return nil, err
}
displayTime := time.Unix(0, tsUnix).Local().Format("02.01.2006 15:04:05")
events = append(events, AlarmEventAPI{
Time: displayTime, Severity: severity, Source: source,
State: state, Message: message, Value: value, Limit: limitValue,
})
}
return events, rows.Err()
}
func percentileFromSorted(vals []float64, p float64) float64 {
if len(vals) == 0 {
return 0
}
if p <= 0 {
return vals[0]
}
if p >= 1 {
return vals[len(vals)-1]
}
idx := p * float64(len(vals)-1)
lo := int(math.Floor(idx))
hi := int(math.Ceil(idx))
if lo == hi {
return vals[lo]
}
frac := idx - float64(lo)
return vals[lo] + (vals[hi]-vals[lo])*frac
}
func insertPeakDescending(peaks []HistoryPeakPoint, candidate HistoryPeakPoint, limit int, by func(HistoryPeakPoint) float64) []HistoryPeakPoint {
peaks = append(peaks, candidate)
sort.Slice(peaks, func(i, j int) bool { return by(peaks[i]) > by(peaks[j]) })
if len(peaks) > limit {
peaks = peaks[:limit]
}
return peaks
}
func queryAlarmCount(ctx context.Context, cutoffNs int64, extraWhere string, args ...any) (int, error) {
query := `SELECT COUNT(*) FROM alarm_events WHERE ts_unix_ns >= ?`
params := []any{cutoffNs}
if strings.TrimSpace(extraWhere) != "" {
query += " AND " + extraWhere
params = append(params, args...)
}
var count int
if err := db.QueryRowContext(ctx, query, params...).Scan(&count); err != nil {
return 0, err
}
return count, nil
}
func queryHistoryAnalytics(ctx context.Context, window time.Duration, label string) (HistoryAnalyticsResponse, error) {
now := time.Now().UTC()
windowNs := window.Nanoseconds()
startNs := now.UnixNano() - windowNs
cfgSnap := getConfigSnapshot()
rows, err := db.QueryContext(ctx, `
SELECT ts_unix_ns, sila_l_pct, sila_r_pct, sum_pct, sum_kn, imbalance_pct
FROM samples
WHERE ts_unix_ns >= ?
ORDER BY ts_unix_ns ASC
`, startNs)
if err != nil {
return HistoryAnalyticsResponse{}, err
}
defer rows.Close()
var leftStats, rightStats, totalStats, totalKNStats, imbalanceStats runningStats
totalValues := make([]float64, 0, 2048)
imbalanceValues := make([]float64, 0, 2048)
topPeaks := make([]HistoryPeakPoint, 0, 10)
worstImbalances := make([]HistoryPeakPoint, 0, 10)
warningSamples := 0
criticalSamples := 0
imbWarningSamples := 0
imbCriticalSamples := 0
firstTS := int64(0)
lastTS := int64(0)
for rows.Next() {
var tsUnix int64
var leftPct, rightPct, totalPct, totalKN, imbalancePct float64
if err := rows.Scan(&tsUnix, &leftPct, &rightPct, &totalPct, &totalKN, &imbalancePct); err != nil {
return HistoryAnalyticsResponse{}, err
}
if firstTS == 0 {
firstTS = tsUnix
}
lastTS = tsUnix
leftStats.Add(leftPct)
rightStats.Add(rightPct)
totalStats.Add(totalPct)
totalKNStats.Add(totalKN)
imbalanceStats.Add(imbalancePct)
totalValues = append(totalValues, totalPct)
imbalanceValues = append(imbalanceValues, imbalancePct)
if totalPct >= cfgSnap.Thresholds.WarningPercent {
warningSamples++
}
if totalPct >= cfgSnap.Thresholds.CriticalPercent {
criticalSamples++
}
if imbalancePct >= cfgSnap.Thresholds.ImbalanceWarningPercent {
imbWarningSamples++
}
if imbalancePct >= cfgSnap.Thresholds.ImbalanceCriticalPercent {
imbCriticalSamples++
}
peak := HistoryPeakPoint{
Time: time.Unix(0, tsUnix).Local().Format("02.01.2006 15:04:05"),
LeftPercent: leftPct,
RightPercent: rightPct,
TotalPercent: totalPct,
TotalKN: totalKN,
ImbalancePercent: imbalancePct,
}
topPeaks = insertPeakDescending(topPeaks, peak, 10, func(p HistoryPeakPoint) float64 { return p.TotalPercent })
worstImbalances = insertPeakDescending(worstImbalances, peak, 10, func(p HistoryPeakPoint) float64 { return p.ImbalancePercent })
}
if err := rows.Err(); err != nil {
return HistoryAnalyticsResponse{}, err
}
sort.Float64s(totalValues)
sort.Float64s(imbalanceValues)
warnEvents, err := queryAlarmCount(ctx, startNs, `severity = ?`, "warning")
if err != nil {
return HistoryAnalyticsResponse{}, err
}
criticalEvents, err := queryAlarmCount(ctx, startNs, `severity = ?`, "critical")
if err != nil {
return HistoryAnalyticsResponse{}, err
}
alarmTransitions, err := queryAlarmCount(ctx, startNs, ``)
if err != nil {
return HistoryAnalyticsResponse{}, err
}
plcDisconnects, err := queryAlarmCount(ctx, startNs, `source = ? AND code = ?`, "plc", "plc_disconnected")
if err != nil {
return HistoryAnalyticsResponse{}, err
}
prevStartNs := startNs - windowNs
prevForce, err := queryNumericStats(ctx, "sum_pct", prevStartNs, startNs)
if err != nil {
return HistoryAnalyticsResponse{}, err
}
prevImb, err := queryNumericStats(ctx, "imbalance_pct", prevStartNs, startNs)
if err != nil {
return HistoryAnalyticsResponse{}, err
}
resp := HistoryAnalyticsResponse{
Window: label,
From: time.Unix(0, firstTS).Local().Format(time.RFC3339),
To: time.Unix(0, maxInt64(firstTS, lastTS)).Local().Format(time.RFC3339),
SampleCount: totalStats.count,
LeftAvgPct: leftStats.Avg(),
RightAvgPct: rightStats.Avg(),
TotalAvgPct: totalStats.Avg(),
TotalAvgKN: totalKNStats.Avg(),
ImbalanceAvgPct: imbalanceStats.Avg(),
LeftMaxPct: leftStats.max,
RightMaxPct: rightStats.max,
TotalMaxPct: totalStats.max,
TotalMaxKN: totalKNStats.max,
ImbalanceMaxPct: imbalanceStats.max,
LeftMinPct: leftStats.min,
RightMinPct: rightStats.min,
TotalMinPct: totalStats.min,
ImbalanceMinPct: imbalanceStats.min,
LeftStdPct: leftStats.StdDev(),
RightStdPct: rightStats.StdDev(),
TotalStdPct: totalStats.StdDev(),
ImbalanceStdPct: imbalanceStats.StdDev(),
TotalP95Pct: percentileFromSorted(totalValues, 0.95),
TotalP99Pct: percentileFromSorted(totalValues, 0.99),
ImbalanceP95Pct: percentileFromSorted(imbalanceValues, 0.95),
WarningSamples: warningSamples,
CriticalSamples: criticalSamples,
ImbalanceWarningSamples: imbWarningSamples,
ImbalanceCriticalSamples: imbCriticalSamples,
AlarmTransitions: alarmTransitions,
WarningEvents: warnEvents,
CriticalEvents: criticalEvents,
PLCDisconnects: plcDisconnects,
PreviousWindowDeltaPct: totalStats.Avg() - prevForce.Avg,
PreviousImbalanceDeltaPct: imbalanceStats.Avg() - prevImb.Avg,
TopPeaks: topPeaks,
WorstImbalances: worstImbalances,
}
if resp.SampleCount > 0 {
den := float64(resp.SampleCount)
resp.WarningRatePct = (float64(resp.WarningSamples) / den) * 100
resp.CriticalRatePct = (float64(resp.CriticalSamples) / den) * 100
resp.ImbalanceWarningRatePct = (float64(resp.ImbalanceWarningSamples) / den) * 100
resp.ImbalanceCriticalRatePct = (float64(resp.ImbalanceCriticalSamples) / den) * 100
}
if resp.SampleCount == 0 {
resp.From = time.Unix(0, startNs).Local().Format(time.RFC3339)
resp.To = now.Local().Format(time.RFC3339)
}
return resp, nil
}
func maxInt64(a, b int64) int64 {
if a > b {
return a
}
return b
}
func capabilityIndex(mean, sigma, usl, lsl float64) float64 {
if sigma <= 0 {
return 0
}
upper := (usl - mean) / (3 * sigma)
lower := (mean - lsl) / (3 * sigma)
return math.Min(upper, lower)
}
func oneSidedCapability(mean, sigma, usl float64) float64 {
if sigma <= 0 {
return 0
}
return (usl - mean) / (3 * sigma)
}
func safePercent(count, total int) float64 {
if total <= 0 {
return 0
}
return (float64(count) / float64(total)) * 100
}
func buildHistogram(values []float64, bins int, minVal, maxVal float64) []HistogramBin {
if bins <= 0 {
bins = 12
}
if len(values) == 0 {
return []HistogramBin{}
}
if maxVal <= minVal {
maxVal = minVal + 1
}
width := (maxVal - minVal) / float64(bins)
counts := make([]int, bins)
for _, v := range values {
idx := int((v - minVal) / width)
if idx < 0 {
idx = 0
}
if idx >= bins {
idx = bins - 1
}
counts[idx]++
}
out := make([]HistogramBin, 0, bins)
total := len(values)
for i := 0; i < bins; i++ {
start := minVal + float64(i)*width
end := start + width
if i == bins-1 {
end = maxVal
}
out = append(out, HistogramBin{Start: start, End: end, Count: counts[i], Percent: safePercent(counts[i], total)})
}
return out
}
func correlationCoefficient(xs, ys []float64) float64 {
if len(xs) == 0 || len(xs) != len(ys) {
return 0
}
var sumX, sumY, sumXX, sumYY, sumXY float64
n := float64(len(xs))
for i := range xs {
x, y := xs[i], ys[i]
sumX += x
sumY += y
sumXX += x * x
sumYY += y * y
sumXY += x * y
}
num := (n * sumXY) - (sumX * sumY)
denX := (n * sumXX) - (sumX * sumX)
denY := (n * sumYY) - (sumY * sumY)
if denX <= 0 || denY <= 0 {
return 0
}
return num / math.Sqrt(denX*denY)
}
func queryProcessCapability(ctx context.Context, window time.Duration, label string) (ProcessCapabilityResponse, error) {
now := time.Now().UTC()
startNs := now.UnixNano() - window.Nanoseconds()
cfgSnap := getConfigSnapshot()
rows, err := db.QueryContext(ctx, `
SELECT ts_unix_ns, sila_l_pct, sila_r_pct, sum_pct, sum_kn, imbalance_pct
FROM samples
WHERE ts_unix_ns >= ?
ORDER BY ts_unix_ns ASC
`, startNs)
if err != nil {
return ProcessCapabilityResponse{}, err
}
defer rows.Close()
var firstTS, lastTS int64
var totalStats, imbalanceStats runningStats
totalValues := make([]float64, 0, 2048)
imbalanceValues := make([]float64, 0, 2048)
leftValues := make([]float64, 0, 2048)
rightValues := make([]float64, 0, 2048)
topOutliers := make([]HistoryPeakPoint, 0, 8)
warningCount, criticalCount := 0, 0
imbWarnCount, imbCritCount := 0, 0
for rows.Next() {
var tsUnix int64
var leftPct, rightPct, totalPct, totalKN, imbalancePct float64
if err := rows.Scan(&tsUnix, &leftPct, &rightPct, &totalPct, &totalKN, &imbalancePct); err != nil {
return ProcessCapabilityResponse{}, err
}
if firstTS == 0 {
firstTS = tsUnix
}
lastTS = tsUnix
totalStats.Add(totalPct)
imbalanceStats.Add(imbalancePct)
totalValues = append(totalValues, totalPct)
imbalanceValues = append(imbalanceValues, imbalancePct)
leftValues = append(leftValues, leftPct)
rightValues = append(rightValues, rightPct)
if totalPct >= cfgSnap.Thresholds.WarningPercent {
warningCount++
}
if totalPct >= cfgSnap.Thresholds.CriticalPercent {
criticalCount++
}
if imbalancePct >= cfgSnap.Thresholds.ImbalanceWarningPercent {
imbWarnCount++
}
if imbalancePct >= cfgSnap.Thresholds.ImbalanceCriticalPercent {
imbCritCount++
}
peak := HistoryPeakPoint{
Time: time.Unix(0, tsUnix).Local().Format("02.01.2006 15:04:05"),
LeftPercent: leftPct, RightPercent: rightPct, TotalPercent: totalPct, TotalKN: totalKN, ImbalancePercent: imbalancePct,
}
score := math.Abs(totalPct-cfgSnap.Thresholds.CriticalPercent) + (imbalancePct * 1.5)
topOutliers = insertPeakDescending(topOutliers, peak, 8, func(p HistoryPeakPoint) float64 {
return math.Abs(p.TotalPercent-cfgSnap.Thresholds.CriticalPercent) + (p.ImbalancePercent * 1.5)
})
_ = score
}
if err := rows.Err(); err != nil {
return ProcessCapabilityResponse{}, err
}
sort.Float64s(totalValues)
sort.Float64s(imbalanceValues)
trendResp, err := buildTrendResponse(ctx, window, label)
if err != nil {
return ProcessCapabilityResponse{}, err
}
resp := ProcessCapabilityResponse{
Window: label,
From: time.Unix(0, firstTS).Local().Format(time.RFC3339),
To: time.Unix(0, maxInt64(firstTS, lastTS)).Local().Format(time.RFC3339),
SampleCount: totalStats.count,
TotalMeanPct: totalStats.Avg(),
TotalStdPct: totalStats.StdDev(),
TotalP95Pct: percentileFromSorted(totalValues, 0.95),
TotalP99Pct: percentileFromSorted(totalValues, 0.99),
ImbalanceMeanPct: imbalanceStats.Avg(),
ImbalanceStdPct: imbalanceStats.StdDev(),
ImbalanceP95Pct: percentileFromSorted(imbalanceValues, 0.95),
TotalAboveWarningPct: safePercent(warningCount, totalStats.count),
TotalAboveCriticalPct: safePercent(criticalCount, totalStats.count),
ImbalanceAboveWarningPct: safePercent(imbWarnCount, imbalanceStats.count),
ImbalanceAboveCriticalPct: safePercent(imbCritCount, imbalanceStats.count),
LeftRightCorrelation: correlationCoefficient(leftValues, rightValues),
Stability: trendResp.ProcessStability,
StabilityReason: trendResp.StabilityReason,
TotalHistogram: buildHistogram(totalValues, 14, 0, math.Max(cfgSnap.Thresholds.GaugeMaxPercent, totalStats.max)),
ImbalanceHistogram: buildHistogram(imbalanceValues, 12, 0, math.Max(cfgSnap.Thresholds.ImbalanceCriticalPercent*1.5, imbalanceStats.max)),
TopOutliers: topOutliers,
}
resp.TotalCpuWarning = oneSidedCapability(resp.TotalMeanPct, resp.TotalStdPct, cfgSnap.Thresholds.WarningPercent)
resp.TotalCpuCritical = oneSidedCapability(resp.TotalMeanPct, resp.TotalStdPct, cfgSnap.Thresholds.CriticalPercent)
resp.TotalCpkWarning = capabilityIndex(resp.TotalMeanPct, resp.TotalStdPct, cfgSnap.Thresholds.WarningPercent, 0)
resp.TotalCpkCritical = capabilityIndex(resp.TotalMeanPct, resp.TotalStdPct, cfgSnap.Thresholds.CriticalPercent, 0)
resp.ImbalanceCpuWarning = oneSidedCapability(resp.ImbalanceMeanPct, resp.ImbalanceStdPct, cfgSnap.Thresholds.ImbalanceWarningPercent)
resp.ImbalanceCpuCritical = oneSidedCapability(resp.ImbalanceMeanPct, resp.ImbalanceStdPct, cfgSnap.Thresholds.ImbalanceCriticalPercent)
resp.ImbalanceCpkWarning = capabilityIndex(resp.ImbalanceMeanPct, resp.ImbalanceStdPct, cfgSnap.Thresholds.ImbalanceWarningPercent, 0)
resp.ImbalanceCpkCritical = capabilityIndex(resp.ImbalanceMeanPct, resp.ImbalanceStdPct, cfgSnap.Thresholds.ImbalanceCriticalPercent, 0)
if resp.SampleCount == 0 {
resp.From = time.Unix(0, startNs).Local().Format(time.RFC3339)
resp.To = now.Local().Format(time.RFC3339)
resp.SuggestedAction = "No process data in selected window. Check PLC connection, machine runtime, or choose a wider period."
} else {
switch {
case resp.TotalCpkCritical < 1.0 || resp.ImbalanceCpkCritical < 1.0:
resp.SuggestedAction = "Capability is weak versus critical limits. Review overload moments, alignment, tooling, and setup repeatability."
case resp.TotalAboveWarningPct > 10 || resp.ImbalanceAboveWarningPct > 10:
resp.SuggestedAction = "Capability is marginal. Investigate drift sources and reduce high-variation periods before they become critical."
default:
resp.SuggestedAction = "Capability looks healthy for the selected window. Use this as a reference baseline for future comparisons."
}
}
return resp, nil
}
func reportBucketLabel(t time.Time, window time.Duration) string {
t = t.Local()
switch {
case window <= 2*time.Hour:
return t.Format("15:04")
case window <= 48*time.Hour:
return t.Format("02.01 15:00")
default:
return t.Format("02.01")
}
}
func queryReportSummary(ctx context.Context, window time.Duration, label string) (ReportSummaryResponse, error) {
analytics, err := queryHistoryAnalytics(ctx, window, label)
if err != nil {
return ReportSummaryResponse{}, err
}
trendResp, err := buildTrendResponse(ctx, window, label)
if err != nil {
return ReportSummaryResponse{}, err
}
cfgSnap := getConfigSnapshot()
now := time.Now().UTC()
startNs := now.UnixNano() - window.Nanoseconds()
rows, err := db.QueryContext(ctx, `
SELECT ts_unix_ns, sum_pct, imbalance_pct
FROM samples
WHERE ts_unix_ns >= ?
ORDER BY ts_unix_ns ASC
`, startNs)
if err != nil {
return ReportSummaryResponse{}, err
}
defer rows.Close()
type bucketAgg struct {
sumTotal, maxTotal, sumImb float64
samples int
}
bucketMap := map[string]*bucketAgg{}
order := []string{}
for rows.Next() {
var tsUnix int64
var totalPct, imbPct float64
if err := rows.Scan(&tsUnix, &totalPct, &imbPct); err != nil {
return ReportSummaryResponse{}, err
}
labelKey := reportBucketLabel(time.Unix(0, tsUnix), window)
bucket := bucketMap[labelKey]
if bucket == nil {
bucket = &bucketAgg{}
bucketMap[labelKey] = bucket
order = append(order, labelKey)
}
bucket.sumTotal += totalPct
bucket.sumImb += imbPct
if totalPct > bucket.maxTotal {
bucket.maxTotal = totalPct
}
bucket.samples++
}
if err := rows.Err(); err != nil {
return ReportSummaryResponse{}, err
}
warnEventsByBucket := map[string]int{}
criticalEventsByBucket := map[string]int{}
plcDiscByBucket := map[string]int{}
alarmRows, err := db.QueryContext(ctx, `
SELECT ts_unix_ns, severity, source, code
FROM alarm_events
WHERE ts_unix_ns >= ?
ORDER BY ts_unix_ns ASC
`, startNs)
if err != nil {
return ReportSummaryResponse{}, err
}
defer alarmRows.Close()
for alarmRows.Next() {
var tsUnix int64
var severity, source, code string
if err := alarmRows.Scan(&tsUnix, &severity, &source, &code); err != nil {
return ReportSummaryResponse{}, err
}
labelKey := reportBucketLabel(time.Unix(0, tsUnix), window)
switch severity {
case "warning":
warnEventsByBucket[labelKey]++
case "critical":
criticalEventsByBucket[labelKey]++
}
if source == "plc" && code == "plc_disconnected" {
plcDiscByBucket[labelKey]++
}
}
if err := alarmRows.Err(); err != nil {
return ReportSummaryResponse{}, err
}
buckets := make([]ReportBucket, 0, len(order))
for _, key := range order {
b := bucketMap[key]
avgTotal := 0.0
avgImb := 0.0
if b.samples > 0 {
avgTotal = b.sumTotal / float64(b.samples)
avgImb = b.sumImb / float64(b.samples)
}
buckets = append(buckets, ReportBucket{
Label: key, AvgTotalPct: avgTotal, MaxTotalPct: b.maxTotal, AvgImbalancePct: avgImb, Samples: b.samples,
WarningEvents: warnEventsByBucket[key], CriticalEvents: criticalEventsByBucket[key], PLCDisconnects: plcDiscByBucket[key],
})
}
health := 100.0
health -= analytics.WarningRatePct * 0.55
health -= analytics.CriticalRatePct * 1.15
health -= analytics.ImbalanceWarningRatePct * 0.45
health -= analytics.ImbalanceCriticalRatePct * 1.00
health -= float64(analytics.CriticalEvents) * 1.5
health -= float64(analytics.PLCDisconnects) * 8
if trendResp.ProcessStability == "unstable" {
health -= 10
}
if trendResp.ProcessStability == "caution" {
health -= 4
}
if health < 0 {
health = 0
}
if health > 100 {
health = 100
}
availability := 100.0
if len(buckets) > 0 {
availability -= math.Min(25, float64(analytics.PLCDisconnects)*2.5)
}
if availability < 0 {
availability = 0
}
findings := []string{}
if analytics.CriticalRatePct > 0 {
findings = append(findings, fmt.Sprintf("Critical-force occupancy is %.1f%% of samples.", analytics.CriticalRatePct))
}
if analytics.ImbalanceCriticalRatePct > 0 {
findings = append(findings, fmt.Sprintf("Critical imbalance appears in %.1f%% of samples.", analytics.ImbalanceCriticalRatePct))
}
if math.Abs(analytics.PreviousWindowDeltaPct) >= 3 {
trendWord := "up"
if analytics.PreviousWindowDeltaPct < 0 {
trendWord = "down"
}
findings = append(findings, fmt.Sprintf("Average total force is %s %.1f%% versus the previous window.", trendWord, math.Abs(analytics.PreviousWindowDeltaPct)))
}
if analytics.PLCDisconnects > 0 {
findings = append(findings, fmt.Sprintf("PLC disconnected %d time(s) in the selected report window.", analytics.PLCDisconnects))
}
if len(findings) == 0 {
findings = append(findings, "No major process exceptions detected in the selected report window.")
}
execSummary := fmt.Sprintf("Health score %d/100. Avg total peak %.1f%s, peak %.1f%s, avg imbalance %.1f%s, with %d warning and %d critical events.",
int(math.Round(health)), analytics.TotalAvgPct, cfgSnap.UI.UnitPct, analytics.TotalMaxPct, cfgSnap.UI.UnitPct, analytics.ImbalanceAvgPct, cfgSnap.UI.UnitPct, analytics.WarningEvents, analytics.CriticalEvents)
resp := ReportSummaryResponse{
Window: label,
From: analytics.From,
To: analytics.To,
SampleCount: analytics.SampleCount,
AverageTotalPct: analytics.TotalAvgPct,
AverageTotalKN: analytics.TotalAvgKN,
PeakTotalPct: analytics.TotalMaxPct,
PeakTotalKN: analytics.TotalMaxKN,
AverageImbalancePct: analytics.ImbalanceAvgPct,
PeakImbalancePct: analytics.ImbalanceMaxPct,
WarningRatePct: analytics.WarningRatePct,
CriticalRatePct: analytics.CriticalRatePct,
ImbalanceWarningRatePct: analytics.ImbalanceWarningRatePct,
ImbalanceCriticalRatePct: analytics.ImbalanceCriticalRatePct,
WarningEvents: analytics.WarningEvents,
CriticalEvents: analytics.CriticalEvents,
PLCDisconnects: analytics.PLCDisconnects,
HealthScore: int(math.Round(health)),
AvailabilityPct: availability,
ForceDeltaPct: analytics.PreviousWindowDeltaPct,
ImbalanceDeltaPct: analytics.PreviousImbalanceDeltaPct,
Stability: trendResp.ProcessStability,
StabilityReason: trendResp.StabilityReason,
ExecutiveSummary: execSummary,
Findings: findings,
Buckets: buckets,
TopPeaks: analytics.TopPeaks,
}
return resp, nil
}
// ---------------------------------------------------------------------------
// HTTP helpers
// ---------------------------------------------------------------------------
func writeJSON(w http.ResponseWriter, status int, v any) {
h := w.Header()
h.Set("Content-Type", "application/json")
h.Set("Cache-Control", "no-store")
h.Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}
func allowMethod(w http.ResponseWriter, r *http.Request, method string) bool {
if r.Method == http.MethodOptions {
h := w.Header()
h.Set("Access-Control-Allow-Origin", "*")
h.Set("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS")
h.Set("Access-Control-Allow-Headers", "Content-Type")
w.WriteHeader(http.StatusNoContent)
return false
}
if r.Method != method {
http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed)
return false
}
return true
}
func requireActiveLicense(w http.ResponseWriter, r *http.Request) bool {
if licenseMgr == nil {
return true
}
status := licenseMgr.Status()
if !status.Locked {
_ = licenseMgr.Touch()
return true
}
writeJSON(w, http.StatusForbidden, map[string]any{
"error": "license required",
"license": status,
})
return false
}
func requireActiveLicensePage(w http.ResponseWriter, r *http.Request) bool {
if licenseMgr == nil {
return true
}
status := licenseMgr.Status()
if !status.Locked {
_ = licenseMgr.Touch()
return true
}
http.Redirect(w, r, "/license", http.StatusSeeOther)
return false
}
func licenseAllowsRuntime() bool {
if licenseMgr == nil {
return true
}
return !licenseMgr.Status().Locked
}
// ---------------------------------------------------------------------------
// HTTP handlers — core
// ---------------------------------------------------------------------------
func apiData(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if !requireActiveLicense(w, r) {
return
}
writeJSON(w, http.StatusOK, snapshotState())
}
func apiUIRevision(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
writeJSON(w, http.StatusOK, map[string]uint64{"revision": atomic.LoadUint64(&uiRevision)})
}
func apiPublicConfig(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
c := getConfigSnapshot()
policy := runtimeLicenseConfig()
resp := PublicConfigResponse{
Version: version,
UIRevision: atomic.LoadUint64(&uiRevision),
UI: c.UI,
Thresholds: c.Thresholds,
Trend: c.Trend,
Press: c.Press,
Modules: c.Modules,
LicenseHint: LicenseHint{
Enabled: policy.Enabled,
TrialDays: policy.TrialDays,
},
}
writeJSON(w, http.StatusOK, resp)
}
func apiHistory(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if !requireActiveLicense(w, r) {
return
}
window, label, err := parseWindow(r.URL.Query().Get("window"))
if err != nil {
http.Error(w, `{"error":"invalid window"}`, http.StatusBadRequest)
return
}
points, err := queryHistory(r.Context(), window)
if err != nil {
log.Printf("history query failed: %v", err)
http.Error(w, `{"error":"history query failed"}`, http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, HistoryResponse{Window: label, Points: points})
}
func apiHistoryAnalytics(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if !requireActiveLicense(w, r) {
return
}
window, label, err := parseWindow(r.URL.Query().Get("window"))
if err != nil {
http.Error(w, `{"error":"invalid window"}`, http.StatusBadRequest)
return
}
resp, err := queryHistoryAnalytics(r.Context(), window, label)
if err != nil {
log.Printf("history analytics query failed: %v", err)
http.Error(w, `{"error":"history analytics query failed"}`, http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, resp)
}
func apiTrend(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if !requireActiveLicense(w, r) {
return
}
window, label, err := parseWindow(r.URL.Query().Get("window"))
if err != nil {
http.Error(w, `{"error":"invalid trend window"}`, http.StatusBadRequest)
return
}
resp, err := buildTrendResponse(r.Context(), window, label)
if err != nil {
log.Printf("trend query failed: %v", err)
http.Error(w, `{"error":"trend query failed"}`, http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, resp)
}
func apiAlarms(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if !requireActiveLicense(w, r) {
return
}
limit := 20
if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" {
if n, err := strconv.Atoi(raw); err == nil && n > 0 {
limit = n
}
}
events, err := queryAlarmEvents(r.Context(), limit)
if err != nil {
log.Printf("alarm query failed: %v", err)
http.Error(w, `{"error":"alarm query failed"}`, http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, AlarmResponse{Events: events})
}
func serveEmbeddedHTMLPage(w http.ResponseWriter, embeddedPath string) {
data, err := embeddedStaticFiles.ReadFile(embeddedPath)
if err != nil {
log.Printf("embedded page read error (%s): %v", embeddedPath, err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Cache-Control", "no-store")
_, _ = w.Write(data)
}
func redirectToCanonicalPath(w http.ResponseWriter, r *http.Request, canonicalPath string) bool {
if r.URL.Path == canonicalPath {
return false
}
if r.URL.Path == canonicalPath+"/" {
http.Redirect(w, r, canonicalPath, http.StatusMovedPermanently)
return true
}
return false
}
func serveDashboardAlias(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/dashboard" || r.URL.Path == "/dashboard/" {
http.Redirect(w, r, "/", http.StatusMovedPermanently)
return
}
http.NotFound(w, r)
}
func serveAlarmsPage(w http.ResponseWriter, r *http.Request) {
if redirectToCanonicalPath(w, r, "/alarms") {
return
}
if r.URL.Path != "/alarms" {
http.NotFound(w, r)
return
}
if !requireActiveLicensePage(w, r) {
return
}
serveEmbeddedHTMLPage(w, "static/alarms.html")
}
func serveHistoryPage(w http.ResponseWriter, r *http.Request) {
if redirectToCanonicalPath(w, r, "/history") {
return
}
if r.URL.Path != "/history" {
http.NotFound(w, r)
return
}
if !requireActiveLicensePage(w, r) {
return
}
serveEmbeddedHTMLPage(w, "static/history.html")
}
func serveLicensePage(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/licence" || r.URL.Path == "/licence/" {
http.Redirect(w, r, "/license", http.StatusMovedPermanently)
return
}
if redirectToCanonicalPath(w, r, "/license") {
return
}
if r.URL.Path != "/license" {
http.NotFound(w, r)
return
}
serveEmbeddedHTMLPage(w, "static/license.html")
}
func apiProcessCapability(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if !requireActiveLicense(w, r) {
return
}
window, label, err := parseWindow(r.URL.Query().Get("window"))
if err != nil {
http.Error(w, `{"error":"invalid window"}`, http.StatusBadRequest)
return
}
resp, err := queryProcessCapability(r.Context(), window, label)
if err != nil {
log.Printf("process capability query failed: %v", err)
http.Error(w, `{"error":"process capability query failed"}`, http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, resp)
}
func apiReportsSummary(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if !requireActiveLicense(w, r) {
return
}
window, label, err := parseWindow(r.URL.Query().Get("window"))
if err != nil {
http.Error(w, `{"error":"invalid window"}`, http.StatusBadRequest)
return
}
resp, err := queryReportSummary(r.Context(), window, label)
if err != nil {
log.Printf("reports summary query failed: %v", err)
http.Error(w, `{"error":"reports summary query failed"}`, http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, resp)
}
func serveKioskPage(w http.ResponseWriter, r *http.Request) {
if redirectToCanonicalPath(w, r, "/kiosk") {
return
}
if r.URL.Path != "/kiosk" {
http.NotFound(w, r)
return
}
if !requireActiveLicensePage(w, r) {
return
}
serveEmbeddedHTMLPage(w, "static/kiosk.html")
}
func serveProcessCapabilityPage(w http.ResponseWriter, r *http.Request) {
if redirectToCanonicalPath(w, r, "/process-capability") {
return
}
if r.URL.Path != "/process-capability" {
http.NotFound(w, r)
return
}
if !requireActiveLicensePage(w, r) {
return
}
serveEmbeddedHTMLPage(w, "static/process-capability.html")
}
func serveReportsPage(w http.ResponseWriter, r *http.Request) {
if redirectToCanonicalPath(w, r, "/reports") {
return
}
if r.URL.Path != "/reports" {
http.NotFound(w, r)
return
}
if !requireActiveLicensePage(w, r) {
return
}
serveEmbeddedHTMLPage(w, "static/reports.html")
}
func serveUI(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
if !requireActiveLicensePage(w, r) {
return
}
// License OK — serve the full dashboard template from the embedded static files
if indexTmpl == nil {
log.Printf("dashboard template not initialized")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
c := getConfigSnapshot()
data := struct {
Title, Subtitle, LeftLabel, RightLabel, UnitForce, UnitPct string
WarningPercent, CriticalPercent, GaugeMaxPercent float64
ImbalanceWarningPercent, ImbalanceCriticalPercent float64
MaxTonnage float64
PollMs int
DefaultWindow, DefaultTrendWindow string
UIRevision uint64
ShowHeaderControls, ShowVerdict, ShowSummaryBar bool
ShowOverview, ShowIntelligence, ShowAlarmTimeline bool
ShowGauges, ShowGaugeDigital, ShowTrendChart bool
}{
Title: c.UI.Title,
Subtitle: c.UI.Subtitle,
LeftLabel: c.UI.LeftLabel,
RightLabel: c.UI.RightLabel,
UnitForce: c.UI.UnitForce,
UnitPct: c.UI.UnitPct,
WarningPercent: c.Thresholds.WarningPercent,
CriticalPercent: c.Thresholds.CriticalPercent,
GaugeMaxPercent: c.Thresholds.GaugeMaxPercent,
ImbalanceWarningPercent: c.Thresholds.ImbalanceWarningPercent,
ImbalanceCriticalPercent: c.Thresholds.ImbalanceCriticalPercent,
MaxTonnage: c.Press.MaxTonnage,
PollMs: c.PLC.PollMs,
DefaultWindow: fmt.Sprintf("%dm", c.Trend.Minutes),
DefaultTrendWindow: fmt.Sprintf("%dm", c.Trend.Minutes),
UIRevision: atomic.LoadUint64(&uiRevision),
ShowHeaderControls: boolValue(c.Modules.ShowHeaderControls, true),
ShowVerdict: boolValue(c.Modules.ShowVerdict, true),
ShowSummaryBar: boolValue(c.Modules.ShowSummaryBar, true),
ShowOverview: boolValue(c.Modules.ShowOverview, true),
ShowIntelligence: boolValue(c.Modules.ShowIntelligence, true),
ShowAlarmTimeline: boolValue(c.Modules.ShowAlarmTimeline, true),
ShowGauges: boolValue(c.Modules.ShowGauges, true),
ShowGaugeDigital: boolValue(c.Modules.ShowGaugeDigital, false),
ShowTrendChart: boolValue(c.Modules.ShowTrendChart, true),
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Cache-Control", "no-store")
if err := indexTmpl.Execute(w, data); err != nil {
log.Printf("template execute error: %v", err)
}
return
}
http.NotFound(w, r)
}
// ---------------------------------------------------------------------------
// HTTP handlers — MQTT REST API
// ---------------------------------------------------------------------------
func apiMQTTStatus(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if mqttMgr == nil {
writeJSON(w, http.StatusOK, MQTTStatusResponse{
Enabled: false,
Connected: false,
Broker: "",
Subscribed: []string{},
})
return
}
writeJSON(w, http.StatusOK, mqttMgr.status())
}
func apiMQTTPublish(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodPost) {
return
}
if !requireActiveLicense(w, r) {
return
}
if mqttMgr == nil {
http.Error(w, `{"error":"MQTT not enabled"}`, http.StatusServiceUnavailable)
return
}
var req MQTTPublishRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest)
return
}
if strings.TrimSpace(req.Topic) == "" {
http.Error(w, `{"error":"topic required"}`, http.StatusBadRequest)
return
}
if req.QoS < 0 || req.QoS > 2 {
http.Error(w, `{"error":"qos must be 0, 1, or 2"}`, http.StatusBadRequest)
return
}
if err := mqttMgr.publish(req.Topic, req.Payload, byte(req.QoS), req.Retain); err != nil {
writeJSON(w, http.StatusBadGateway, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "published", "topic": req.Topic})
}
func apiMQTTMessages(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if !requireActiveLicense(w, r) {
return
}
if mqttMgr == nil {
writeJSON(w, http.StatusOK, map[string]any{"messages": []MQTTReceivedMsg{}, "enabled": false})
return
}
limit := 50
if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" {
if n, err := strconv.Atoi(raw); err == nil && n > 0 {
limit = n
}
}
msgs := mqttMgr.getMessages(limit)
writeJSON(w, http.StatusOK, map[string]any{"messages": msgs, "count": len(msgs)})
}
func apiMQTTSubscribe(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodPost) {
return
}
if !requireActiveLicense(w, r) {
return
}
if mqttMgr == nil {
http.Error(w, `{"error":"MQTT not enabled"}`, http.StatusServiceUnavailable)
return
}
var req MQTTSubscribeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest)
return
}
if strings.TrimSpace(req.Topic) == "" {
http.Error(w, `{"error":"topic required"}`, http.StatusBadRequest)
return
}
if req.QoS < 0 || req.QoS > 2 {
http.Error(w, `{"error":"qos must be 0, 1, or 2"}`, http.StatusBadRequest)
return
}
if err := mqttMgr.subscribe(req.Topic, byte(req.QoS)); err != nil {
writeJSON(w, http.StatusBadGateway, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "subscribed", "topic": req.Topic})
}
func apiMQTTUnsubscribe(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodDelete) {
return
}
if !requireActiveLicense(w, r) {
return
}
if mqttMgr == nil {
http.Error(w, `{"error":"MQTT not enabled"}`, http.StatusServiceUnavailable)
return
}
var req MQTTSubscribeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest)
return
}
if strings.TrimSpace(req.Topic) == "" {
http.Error(w, `{"error":"topic required"}`, http.StatusBadRequest)
return
}
if err := mqttMgr.unsubscribe(req.Topic); err != nil {
writeJSON(w, http.StatusBadGateway, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "unsubscribed", "topic": req.Topic})
}
// ---------------------------------------------------------------------------
// HTTP handlers — License API
// ---------------------------------------------------------------------------
type activateRequestBody struct {
LicenseText string `json:"license_text"`
}
func apiLicenseStatus(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if licenseMgr == nil {
writeJSON(w, http.StatusOK, LicenseStatus{
Enabled: false,
Mode: "disabled",
Message: "licensing disabled",
})
return
}
writeJSON(w, http.StatusOK, licenseMgr.Status())
}
func apiLicenseRequest(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodGet) {
return
}
if licenseMgr == nil {
writeJSON(w, http.StatusOK, map[string]any{"enabled": false})
return
}
req := licenseMgr.BuildActivationRequest()
writeJSON(w, http.StatusOK, req)
}
func apiLicenseActivate(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, http.MethodPost) {
return
}
if licenseMgr == nil {
http.Error(w, `{"error":"licensing disabled"}`, http.StatusServiceUnavailable)
return
}
raw, err := ioReadAllLimit(r.Body, 1<<20)
if err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
payload := strings.TrimSpace(string(raw))
if payload == "" {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "request body is empty"})
return
}
licenseText := payload
var wrapped activateRequestBody
if err := json.Unmarshal(raw, &wrapped); err == nil && strings.TrimSpace(wrapped.LicenseText) != "" {
licenseText = wrapped.LicenseText
}
if err := licenseMgr.ActivateFromText(licenseText); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]any{
"error": err.Error(),
"license": licenseMgr.Status(),
})
return
}
writeJSON(w, http.StatusOK, map[string]any{
"status": "activated",
"license": licenseMgr.Status(),
})
}
// ---------------------------------------------------------------------------
// Misc helpers
// ---------------------------------------------------------------------------
func ioReadAllLimit(r io.Reader, max int64) ([]byte, error) {
lr := &io.LimitedReader{R: r, N: max + 1}
data, err := io.ReadAll(lr)
if err != nil {
return nil, err
}
if int64(len(data)) > max {
return nil, fmt.Errorf("payload too large")
}
return data, nil
}
func startLicenseHeartbeat(ctx context.Context) {
if licenseMgr == nil {
return
}
_ = licenseMgr.Touch()
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := licenseMgr.Touch(); err != nil {
log.Printf("license heartbeat failed: %v", err)
}
}
}
}
// ---------------------------------------------------------------------------
// main
// ---------------------------------------------------------------------------
func main() {
wd, err := os.Getwd()
if err != nil {
log.Fatalf("failed to get working directory: %v", err)
}
configPath := filepath.Join(wd, "config.yaml")
cfg, err = loadOrCreateConfig(configPath)
if err != nil {
log.Fatalf("failed to load config: %v", err)
}
if err := validateConfig(cfg); err != nil {
log.Fatalf("invalid config: %v", err)
}
if cfg.LegacyLicense != nil {
log.Printf("config.yaml contains a legacy license section; it is ignored by the embedded offline license policy")
}
indexTmpl, err = template.ParseFS(embeddedStaticFiles, "static/index.html")
if err != nil {
log.Fatalf("failed to parse embedded dashboard template: %v", err)
}
dbPath := cfg.DB.Path
if !filepath.IsAbs(dbPath) {
dbPath = filepath.Join(wd, dbPath)
}
db, err = initDatabase(dbPath, cfg.DB)
if err != nil {
log.Fatalf("failed to init database: %v", err)
}
defer db.Close()
licensePolicy := runtimeLicenseConfig()
licenseDataDir := licensePolicy.DataDir
if !filepath.IsAbs(licenseDataDir) {
licenseDataDir = filepath.Join(wd, licenseDataDir)
}
licenseMgr, err = NewLicenseManager(licensePolicy, licenseDataDir)
if err != nil {
log.Fatalf("failed to initialize license manager: %v", err)
}
sampleCh = make(chan Sample, cfg.DB.WriterQueueSize)
alarmCh = make(chan AlarmEvent, cfg.DB.AlarmQueueSize)
log.Printf("S7-1200 Force Monitor v%s", version)
log.Printf("Config: %s", configPath)
log.Printf("DB: %s", dbPath)
log.Printf("PLC: ip=%s db=%d rack=%d slot=%d poll=%dms",
cfg.PLC.IP, cfg.PLC.DBNum, cfg.PLC.Rack, cfg.PLC.Slot, cfg.PLC.PollMs)
log.Printf("Press: MAX_TONNAGE=%.2f %s", cfg.Press.MaxTonnage, cfg.UI.UnitForce)
if licenseMgr != nil {
ls := licenseMgr.Status()
log.Printf("License: mode=%s locked=%v fingerprint=%s", ls.Mode, ls.Locked, ls.FingerprintShort)
}
if cfg.MQTT.Enabled {
mqttMgr = newMQTTManager(cfg.MQTT)
mqttAlarmCh = make(chan AlarmEvent, 256)
go mqttAlarmWorker()
if err := mqttMgr.connect(); err != nil {
log.Printf("MQTT initial connect failed (will retry): %v", err)
} else {
log.Printf("MQTT: connected to %s (prefix=%s)", cfg.MQTT.Broker, cfg.MQTT.TopicPrefix)
}
} else {
log.Printf("MQTT: disabled (set mqtt.enabled: true in config to enable)")
}
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
if err := startConfigWatcher(ctx, configPath); err != nil {
log.Printf("config watch disabled: %v", err)
} else {
log.Printf("Config watcher enabled for %s", configPath)
}
dbCfg := cfg.DB
var wg sync.WaitGroup
wg.Add(4)
go func() { defer wg.Done(); startDBWriter(ctx, db, dbCfg.BatchSize, dbCfg.FlushIntervalMs) }()
go func() { defer wg.Done(); startAlarmWriter(ctx, db, dbCfg.BatchSize, dbCfg.FlushIntervalMs) }()
go func() { defer wg.Done(); startDBCleanup(ctx, db, dbCfg.RetentionDays, dbCfg.CleanupIntervalHr) }()
go func() { defer wg.Done(); startPLCPoller(ctx) }()
if cfg.MQTT.Enabled {
wg.Add(1)
go func() { defer wg.Done(); startMQTTPublisher(ctx) }()
}
wg.Add(1)
go func() { defer wg.Done(); startLicenseHeartbeat(ctx) }()
mux := http.NewServeMux()
staticFS, err := fs.Sub(embeddedStaticFiles, "static")
if err != nil {
log.Fatalf("failed to prepare embedded static files: %v", err)
}
fileServer := http.FileServer(http.FS(staticFS))
mux.Handle("/static/", http.StripPrefix("/static/", fileServer))
mux.HandleFunc("/dashboard", serveDashboardAlias)
mux.HandleFunc("/dashboard/", serveDashboardAlias)
mux.HandleFunc("/alarms", serveAlarmsPage)
mux.HandleFunc("/alarms/", serveAlarmsPage)
mux.HandleFunc("/history", serveHistoryPage)
mux.HandleFunc("/history/", serveHistoryPage)
mux.HandleFunc("/kiosk", serveKioskPage)
mux.HandleFunc("/kiosk/", serveKioskPage)
mux.HandleFunc("/process-capability", serveProcessCapabilityPage)
mux.HandleFunc("/process-capability/", serveProcessCapabilityPage)
mux.HandleFunc("/reports", serveReportsPage)
mux.HandleFunc("/reports/", serveReportsPage)
mux.HandleFunc("/license", serveLicensePage)
mux.HandleFunc("/license/", serveLicensePage)
mux.HandleFunc("/licence", serveLicensePage)
mux.HandleFunc("/licence/", serveLicensePage)
mux.HandleFunc("/", serveUI)
mux.HandleFunc("/api/data", apiData)
mux.HandleFunc("/api/ui-revision", apiUIRevision)
mux.HandleFunc("/api/config/public", apiPublicConfig)
mux.HandleFunc("/api/history", apiHistory)
mux.HandleFunc("/api/history/analytics", apiHistoryAnalytics)
mux.HandleFunc("/api/process-capability", apiProcessCapability)
mux.HandleFunc("/api/reports/summary", apiReportsSummary)
mux.HandleFunc("/api/trend", apiTrend)
mux.HandleFunc("/api/alarms", apiAlarms)
mux.HandleFunc("/api/mqtt/status", apiMQTTStatus)
mux.HandleFunc("/api/mqtt/publish", apiMQTTPublish)
mux.HandleFunc("/api/mqtt/messages", apiMQTTMessages)
mux.HandleFunc("/api/mqtt/subscribe", apiMQTTSubscribe)
mux.HandleFunc("/api/mqtt/unsubscribe", apiMQTTUnsubscribe)
mux.HandleFunc("/api/license/status", apiLicenseStatus)
mux.HandleFunc("/api/license/request", apiLicenseRequest)
mux.HandleFunc("/api/license/activate", apiLicenseActivate)
srv := &http.Server{
Addr: cfg.Server.ListenAddr,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}
log.Printf("Listening address: %s", cfg.Server.ListenAddr)
log.Printf("Open locally: http://localhost%s", cfg.Server.ListenAddr)
log.Printf("License API: GET /api/license/status | GET /api/license/request | POST /api/license/activate")
go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("HTTP server error: %v", err)
}
}()
<-ctx.Done()
log.Println("Shutting down — flushing DB writers...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("HTTP server shutdown error: %v", err)
}
if mqttMgr != nil {
mqttMgr.disconnect()
log.Println("MQTT disconnected")
}
wg.Wait()
log.Println("Shutdown complete")
}