Files
OpsLog/internal/cluster/cluster.go
T
2026-05-28 11:09:07 +02:00

524 lines
14 KiB
Go

// Package cluster provides a multi-server DX cluster client (telnet) —
// connects concurrently to several AR-Cluster / CC-Cluster / DXSpider
// nodes, logs in with the operator's callsign, optionally sends an init
// command list, and streams DX spots back to the UI via a callback.
//
// Spot parsing is tolerant of the dozens of slight format variations
// between cluster flavours (the prompt, the trailing locator, the time
// format). Anything that doesn't match the spot regex is treated as
// banner/chat noise and ignored, not surfaced as an error.
package cluster
import (
"bufio"
"fmt"
"net"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
// ServerConfig is the persisted shape of one cluster node. Mirrors the
// columns of the cluster_servers table; the frontend SettingsPanel
// pushes one of these per row.
type ServerConfig struct {
ID int64 `json:"id"`
Name string `json:"name"`
Host string `json:"host"`
Port int `json:"port"`
LoginOverride string `json:"login_override"`
Password string `json:"password,omitempty"`
InitCommands string `json:"init_commands"` // newline-separated, sent post-login
Enabled bool `json:"enabled"`
SortOrder int `json:"sort_order"`
}
// Spot is a single DX spot as parsed from the cluster stream.
// Country/Continent are filled by the caller (app.go) before the spot
// is emitted to the UI, so the table never has empty country cells
// flickering in for a few hundred ms.
type Spot struct {
SourceID int64 `json:"source_id"` // ID of the cluster server this came from
SourceName string `json:"source_name"` // display name (handy in the UI when multiple servers)
Spotter string `json:"spotter"` // DE field
DXCall string `json:"dx_call"` // the DX station heard
FreqKHz float64 `json:"freq_khz"`
FreqHz int64 `json:"freq_hz"`
Band string `json:"band,omitempty"`
Comment string `json:"comment,omitempty"`
Locator string `json:"locator,omitempty"` // spotter grid (optional)
TimeUTC string `json:"time_utc,omitempty"`
Country string `json:"country,omitempty"` // DXCC entity name (cty.dat)
Continent string `json:"continent,omitempty"` // 2-letter continent
ReceivedAt time.Time `json:"received_at"`
Raw string `json:"raw"`
}
// State enumerates the per-server lifecycle.
type State string
const (
StateDisconnected State = "disconnected"
StateConnecting State = "connecting"
StateConnected State = "connected"
StateReconnecting State = "reconnecting"
StateError State = "error"
)
// ServerStatus is one row of the runtime status table — one entry per
// active session.
type ServerStatus struct {
ServerID int64 `json:"server_id"`
Name string `json:"name"`
Host string `json:"host"`
Port int `json:"port"`
State State `json:"state"`
Login string `json:"login,omitempty"`
Error string `json:"error,omitempty"`
ConnectedAt time.Time `json:"connected_at,omitempty"`
SpotsCount int `json:"spots_count,omitempty"`
Retries int `json:"retries,omitempty"`
}
// session is one telnet connection bound to a single server config.
// Internal — callers use Manager. The onStatus callback is fire-and-
// forget: it tells the manager something changed; the frontend fetches
// the new aggregate via Status() rather than receiving per-server diffs.
type session struct {
cfg ServerConfig
login string
onSpot func(Spot)
onStatus func()
mu sync.RWMutex
status ServerStatus
conn net.Conn
stopCh chan struct{}
doneCh chan struct{}
stopped bool // guards against double-stop on the same session
spotsCnt int
}
// Manager owns N sessions, one per enabled server. Safe for concurrent
// use from any goroutine; I/O is on per-session background goroutines.
type Manager struct {
mu sync.RWMutex
sessions map[int64]*session
onSpot func(Spot)
onStatus func()
}
// NewManager builds an empty manager. emitSpot is called for each parsed
// spot (with the source server filled in). emitStatusChanged is called
// whenever ANY server's status changes — the frontend then re-fetches
// the aggregate Status() via a Wails binding.
func NewManager(emitSpot func(Spot), emitStatusChanged func()) *Manager {
return &Manager{
sessions: make(map[int64]*session),
onSpot: emitSpot,
onStatus: emitStatusChanged,
}
}
// Status returns a snapshot of every running session's status.
func (m *Manager) Status() []ServerStatus {
m.mu.RLock()
defer m.mu.RUnlock()
out := make([]ServerStatus, 0, len(m.sessions))
for _, s := range m.sessions {
out = append(out, s.snapshot())
}
return out
}
// StartServer launches a session for cfg. login is the resolved callsign
// to send (empty = anonymous). If a session for the same ID is already
// running it is restarted with the new config.
func (m *Manager) StartServer(cfg ServerConfig, login string) {
m.StopServer(cfg.ID)
if !cfg.Enabled || cfg.Host == "" {
return
}
s := &session{
cfg: cfg,
login: login,
onSpot: m.onSpot,
onStatus: m.emitStatus,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
status: ServerStatus{
ServerID: cfg.ID, Name: cfg.Name, Host: cfg.Host, Port: cfg.Port,
Login: login, State: StateConnecting,
},
}
m.mu.Lock()
m.sessions[cfg.ID] = s
m.mu.Unlock()
s.emitStatus()
go s.run()
}
// StopServer terminates the session for the given ID (if any) and waits
// for its goroutine to exit.
func (m *Manager) StopServer(id int64) {
m.mu.Lock()
s, ok := m.sessions[id]
if ok {
delete(m.sessions, id)
}
remaining := len(m.sessions)
m.mu.Unlock()
fmt.Printf("cluster.StopServer id=%d found=%v remaining=%d\n", id, ok, remaining)
if !ok {
return
}
s.stop()
fmt.Printf("cluster.StopServer id=%d stopped successfully\n", id)
m.emitStatus()
}
// StopAll closes every running session.
func (m *Manager) StopAll() {
m.mu.Lock()
all := make([]*session, 0, len(m.sessions))
for _, s := range m.sessions {
all = append(all, s)
}
m.sessions = make(map[int64]*session)
m.mu.Unlock()
for _, s := range all {
s.stop()
}
m.emitStatus()
}
// SendCommand writes raw text (a CRLF is appended) to the session for
// the given server ID. Used for "show last 30", "set/skimmer", etc.
func (m *Manager) SendCommand(serverID int64, cmd string) error {
m.mu.RLock()
s, ok := m.sessions[serverID]
m.mu.RUnlock()
if !ok {
return fmt.Errorf("no active session for server %d", serverID)
}
return s.send(cmd)
}
func (m *Manager) emitStatus() {
if m.onStatus != nil {
m.onStatus()
}
}
// ---------- session ----------
func (s *session) snapshot() ServerStatus {
s.mu.RLock()
defer s.mu.RUnlock()
return s.status
}
func (s *session) emitStatus() {
if s.onStatus != nil {
s.onStatus()
}
}
func (s *session) send(cmd string) error {
s.mu.RLock()
conn := s.conn
s.mu.RUnlock()
if conn == nil {
return fmt.Errorf("session %q not connected", s.cfg.Name)
}
_ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
_, err := conn.Write([]byte(strings.TrimRight(cmd, "\r\n") + "\r\n"))
return err
}
func (s *session) stop() {
// Critical: do NOT nil out s.stopCh — the supervisor goroutine reads
// `<-s.stopCh` in its select. Setting the field to nil would make
// `<-nil` block forever, leaving the supervisor stuck on its backoff
// timer and then re-dialing → a "deleted" cluster keeps spotting.
// We just close() the channel and let the goroutine see the broadcast.
s.mu.Lock()
if s.stopped {
s.mu.Unlock()
return
}
s.stopped = true
stop, done := s.stopCh, s.doneCh
conn := s.conn
s.mu.Unlock()
if conn != nil {
_ = conn.Close()
}
if stop != nil {
close(stop)
}
if done != nil {
<-done
}
}
// run is the per-session supervisor: keeps trying to connect until
// Stop is called. Backoff caps at 60s, resets after a 30s healthy link.
func (s *session) run() {
defer close(s.doneCh)
backoff := []time.Duration{2, 5, 10, 30, 60}
attempt := 0
for {
select {
case <-s.stopCh:
return
default:
}
connectedAt, err := s.runOnce()
select {
case <-s.stopCh:
return
default:
}
if !connectedAt.IsZero() && time.Since(connectedAt) > 30*time.Second {
attempt = 0
}
idx := attempt
if idx >= len(backoff) {
idx = len(backoff) - 1
}
delay := backoff[idx] * time.Second
attempt++
s.mu.Lock()
s.status.State = StateReconnecting
if err != nil {
s.status.Error = err.Error()
}
s.status.Retries = attempt
s.mu.Unlock()
s.emitStatus()
select {
case <-s.stopCh:
return
case <-time.After(delay):
}
}
}
// runOnce dials, optionally logs in, sends init commands, parses spots.
// Returns the moment we marked the link "connected" (zero if dial failed)
// and the error that ended the session (nil if stopCh).
func (s *session) runOnce() (time.Time, error) {
addr := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port)
conn, err := net.DialTimeout("tcp", addr, 10*time.Second)
if err != nil {
return time.Time{}, fmt.Errorf("dial %s: %w", addr, err)
}
s.mu.Lock()
s.conn = conn
s.mu.Unlock()
defer func() {
s.mu.Lock()
if s.conn == conn {
s.conn = nil
}
s.mu.Unlock()
_ = conn.Close()
}()
// Login: send on first prompt OR blindly after 1.5s. Many DXSpider
// nodes accept the callsign without re-prompting.
loginSent := false
if s.login != "" {
go func() {
select {
case <-s.stopCh:
return
case <-time.After(1500 * time.Millisecond):
if !loginSent {
_, _ = conn.Write([]byte(s.login + "\r\n"))
}
}
}()
}
// Init commands: fire 1s after login goes through. Each command on
// its own line; blank lines and "//" comments are skipped.
initFired := false
fireInitCommands := func() {
if initFired || strings.TrimSpace(s.cfg.InitCommands) == "" {
return
}
initFired = true
go func() {
time.Sleep(1 * time.Second)
for _, line := range strings.Split(s.cfg.InitCommands, "\n") {
line = strings.TrimRight(line, "\r")
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "//") {
continue
}
select {
case <-s.stopCh:
return
default:
}
_, _ = conn.Write([]byte(line + "\r\n"))
time.Sleep(500 * time.Millisecond)
}
}()
}
var connectedAt time.Time
rd := bufio.NewReader(conn)
for {
select {
case <-s.stopCh:
return connectedAt, nil
default:
}
_ = conn.SetReadDeadline(time.Now().Add(120 * time.Second))
line, err := rd.ReadString('\n')
if err != nil {
return connectedAt, fmt.Errorf("read: %w", err)
}
line = strings.TrimRight(line, "\r\n")
if line == "" {
continue
}
// Login on explicit prompt.
if !loginSent && s.login != "" && isLoginPrompt(line) {
_, _ = conn.Write([]byte(s.login + "\r\n"))
loginSent = true
continue
}
// Password on prompt (rare).
if loginSent && s.cfg.Password != "" && isPasswordPrompt(line) {
_, _ = conn.Write([]byte(s.cfg.Password + "\r\n"))
continue
}
// Mark connected once we've sent login OR seen a welcome banner.
if s.snapshot().State != StateConnected && (loginSent || isWelcome(line)) {
connectedAt = time.Now()
s.mu.Lock()
s.status.State = StateConnected
s.status.ConnectedAt = connectedAt
s.status.Error = ""
s.mu.Unlock()
s.emitStatus()
fireInitCommands()
}
if spot, ok := parseSpot(line); ok {
spot.SourceID = s.cfg.ID
spot.SourceName = s.cfg.Name
s.mu.Lock()
s.spotsCnt++
s.status.SpotsCount = s.spotsCnt
s.mu.Unlock()
if s.onSpot != nil {
s.onSpot(spot)
}
}
}
}
// ---------- parsing ----------
// spotRE matches "DX de SPOTTER: FREQ DXCALL COMMENT TIME [LOC]".
var spotRE = regexp.MustCompile(
`^\s*DX\s+de\s+([A-Z0-9/#\-]+):?\s+(\d+\.?\d*)\s+([A-Z0-9/]+)\s+(.*?)\s+(\d{4}Z?)(?:\s+([A-R]{2}\d{2}(?:[A-X]{2})?))?\s*$`,
)
func parseSpot(line string) (Spot, bool) {
m := spotRE.FindStringSubmatch(line)
if m == nil {
return Spot{}, false
}
freqKHz, err := strconv.ParseFloat(m[2], 64)
if err != nil {
return Spot{}, false
}
freqHz := int64(freqKHz*1000 + 0.5)
return Spot{
Spotter: strings.ToUpper(m[1]),
FreqKHz: freqKHz,
FreqHz: freqHz,
Band: bandFromHz(freqHz),
DXCall: strings.ToUpper(m[3]),
Comment: strings.TrimSpace(m[4]),
TimeUTC: m[5],
Locator: strings.ToUpper(m[6]),
ReceivedAt: time.Now(),
Raw: line,
}, true
}
func isLoginPrompt(s string) bool {
low := strings.ToLower(s)
return strings.Contains(low, "login:") ||
strings.Contains(low, "please enter your call") ||
strings.Contains(low, "your call:") ||
strings.HasSuffix(strings.TrimSpace(low), "callsign:")
}
func isPasswordPrompt(s string) bool {
low := strings.ToLower(s)
return strings.Contains(low, "password:") || strings.Contains(low, "pwd:")
}
func isWelcome(s string) bool {
low := strings.ToLower(s)
return strings.Contains(low, "welcome") ||
strings.Contains(low, "logged in") ||
strings.Contains(low, "started")
}
func bandFromHz(hz int64) string {
mhz := float64(hz) / 1_000_000
switch {
case mhz >= 1.8 && mhz <= 2.0:
return "160m"
case mhz >= 3.5 && mhz <= 4.0:
return "80m"
case mhz >= 5.3 && mhz <= 5.5:
return "60m"
case mhz >= 7.0 && mhz <= 7.3:
return "40m"
case mhz >= 10.1 && mhz <= 10.15:
return "30m"
case mhz >= 14.0 && mhz <= 14.35:
return "20m"
case mhz >= 18.068 && mhz <= 18.168:
return "17m"
case mhz >= 21.0 && mhz <= 21.45:
return "15m"
case mhz >= 24.89 && mhz <= 24.99:
return "12m"
case mhz >= 28.0 && mhz <= 29.7:
return "10m"
case mhz >= 50.0 && mhz <= 54.0:
return "6m"
case mhz >= 70.0 && mhz <= 70.5:
return "4m"
case mhz >= 144.0 && mhz <= 148.0:
return "2m"
case mhz >= 222.0 && mhz <= 225.0:
return "1.25m"
case mhz >= 420.0 && mhz <= 450.0:
return "70cm"
case mhz >= 902.0 && mhz <= 928.0:
return "33cm"
case mhz >= 1240.0 && mhz <= 1300.0:
return "23cm"
}
return ""
}