506 lines
13 KiB
Go
506 lines
13 KiB
Go
// Package cluster provides a multi-server DX cluster client (telnet) —
|
|
// connects concurrently to several AR-Cluster / CC-Cluster / DXSpider
|
|
// nodes, logs in with the operator's callsign, optionally sends an init
|
|
// command list, and streams DX spots back to the UI via a callback.
|
|
//
|
|
// Spot parsing is tolerant of the dozens of slight format variations
|
|
// between cluster flavours (the prompt, the trailing locator, the time
|
|
// format). Anything that doesn't match the spot regex is treated as
|
|
// banner/chat noise and ignored, not surfaced as an error.
|
|
package cluster
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"net"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ServerConfig is the persisted shape of one cluster node. Mirrors the
|
|
// columns of the cluster_servers table; the frontend SettingsPanel
|
|
// pushes one of these per row.
|
|
type ServerConfig struct {
|
|
ID int64 `json:"id"`
|
|
Name string `json:"name"`
|
|
Host string `json:"host"`
|
|
Port int `json:"port"`
|
|
LoginOverride string `json:"login_override"`
|
|
Password string `json:"password,omitempty"`
|
|
InitCommands string `json:"init_commands"` // newline-separated, sent post-login
|
|
Enabled bool `json:"enabled"`
|
|
SortOrder int `json:"sort_order"`
|
|
}
|
|
|
|
// Spot is a single DX spot as parsed from the cluster stream.
|
|
type Spot struct {
|
|
SourceID int64 `json:"source_id"` // ID of the cluster server this came from
|
|
SourceName string `json:"source_name"` // display name (handy in the UI when multiple servers)
|
|
Spotter string `json:"spotter"` // DE field
|
|
DXCall string `json:"dx_call"` // the DX station heard
|
|
FreqKHz float64 `json:"freq_khz"`
|
|
FreqHz int64 `json:"freq_hz"`
|
|
Band string `json:"band,omitempty"`
|
|
Comment string `json:"comment,omitempty"`
|
|
Locator string `json:"locator,omitempty"` // spotter grid (optional)
|
|
TimeUTC string `json:"time_utc,omitempty"`
|
|
ReceivedAt time.Time `json:"received_at"`
|
|
Raw string `json:"raw"`
|
|
}
|
|
|
|
// State enumerates the per-server lifecycle.
|
|
type State string
|
|
|
|
const (
|
|
StateDisconnected State = "disconnected"
|
|
StateConnecting State = "connecting"
|
|
StateConnected State = "connected"
|
|
StateReconnecting State = "reconnecting"
|
|
StateError State = "error"
|
|
)
|
|
|
|
// ServerStatus is one row of the runtime status table — one entry per
|
|
// active session.
|
|
type ServerStatus struct {
|
|
ServerID int64 `json:"server_id"`
|
|
Name string `json:"name"`
|
|
Host string `json:"host"`
|
|
Port int `json:"port"`
|
|
State State `json:"state"`
|
|
Login string `json:"login,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
ConnectedAt time.Time `json:"connected_at,omitempty"`
|
|
SpotsCount int `json:"spots_count,omitempty"`
|
|
Retries int `json:"retries,omitempty"`
|
|
}
|
|
|
|
// session is one telnet connection bound to a single server config.
|
|
// Internal — callers use Manager. The onStatus callback is fire-and-
|
|
// forget: it tells the manager something changed; the frontend fetches
|
|
// the new aggregate via Status() rather than receiving per-server diffs.
|
|
type session struct {
|
|
cfg ServerConfig
|
|
login string
|
|
onSpot func(Spot)
|
|
onStatus func()
|
|
|
|
mu sync.RWMutex
|
|
status ServerStatus
|
|
conn net.Conn
|
|
stopCh chan struct{}
|
|
doneCh chan struct{}
|
|
spotsCnt int
|
|
}
|
|
|
|
// Manager owns N sessions, one per enabled server. Safe for concurrent
|
|
// use from any goroutine; I/O is on per-session background goroutines.
|
|
type Manager struct {
|
|
mu sync.RWMutex
|
|
sessions map[int64]*session
|
|
onSpot func(Spot)
|
|
onStatus func()
|
|
}
|
|
|
|
// NewManager builds an empty manager. emitSpot is called for each parsed
|
|
// spot (with the source server filled in). emitStatusChanged is called
|
|
// whenever ANY server's status changes — the frontend then re-fetches
|
|
// the aggregate Status() via a Wails binding.
|
|
func NewManager(emitSpot func(Spot), emitStatusChanged func()) *Manager {
|
|
return &Manager{
|
|
sessions: make(map[int64]*session),
|
|
onSpot: emitSpot,
|
|
onStatus: emitStatusChanged,
|
|
}
|
|
}
|
|
|
|
// Status returns a snapshot of every running session's status.
|
|
func (m *Manager) Status() []ServerStatus {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
out := make([]ServerStatus, 0, len(m.sessions))
|
|
for _, s := range m.sessions {
|
|
out = append(out, s.snapshot())
|
|
}
|
|
return out
|
|
}
|
|
|
|
// StartServer launches a session for cfg. login is the resolved callsign
|
|
// to send (empty = anonymous). If a session for the same ID is already
|
|
// running it is restarted with the new config.
|
|
func (m *Manager) StartServer(cfg ServerConfig, login string) {
|
|
m.StopServer(cfg.ID)
|
|
if !cfg.Enabled || cfg.Host == "" {
|
|
return
|
|
}
|
|
s := &session{
|
|
cfg: cfg,
|
|
login: login,
|
|
onSpot: m.onSpot,
|
|
onStatus: m.emitStatus,
|
|
stopCh: make(chan struct{}),
|
|
doneCh: make(chan struct{}),
|
|
status: ServerStatus{
|
|
ServerID: cfg.ID, Name: cfg.Name, Host: cfg.Host, Port: cfg.Port,
|
|
Login: login, State: StateConnecting,
|
|
},
|
|
}
|
|
m.mu.Lock()
|
|
m.sessions[cfg.ID] = s
|
|
m.mu.Unlock()
|
|
s.emitStatus()
|
|
go s.run()
|
|
}
|
|
|
|
// StopServer terminates the session for the given ID (if any) and waits
|
|
// for its goroutine to exit.
|
|
func (m *Manager) StopServer(id int64) {
|
|
m.mu.Lock()
|
|
s, ok := m.sessions[id]
|
|
if ok {
|
|
delete(m.sessions, id)
|
|
}
|
|
m.mu.Unlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
s.stop()
|
|
m.emitStatus()
|
|
}
|
|
|
|
// StopAll closes every running session.
|
|
func (m *Manager) StopAll() {
|
|
m.mu.Lock()
|
|
all := make([]*session, 0, len(m.sessions))
|
|
for _, s := range m.sessions {
|
|
all = append(all, s)
|
|
}
|
|
m.sessions = make(map[int64]*session)
|
|
m.mu.Unlock()
|
|
for _, s := range all {
|
|
s.stop()
|
|
}
|
|
m.emitStatus()
|
|
}
|
|
|
|
// SendCommand writes raw text (a CRLF is appended) to the session for
|
|
// the given server ID. Used for "show last 30", "set/skimmer", etc.
|
|
func (m *Manager) SendCommand(serverID int64, cmd string) error {
|
|
m.mu.RLock()
|
|
s, ok := m.sessions[serverID]
|
|
m.mu.RUnlock()
|
|
if !ok {
|
|
return fmt.Errorf("no active session for server %d", serverID)
|
|
}
|
|
return s.send(cmd)
|
|
}
|
|
|
|
func (m *Manager) emitStatus() {
|
|
if m.onStatus != nil {
|
|
m.onStatus()
|
|
}
|
|
}
|
|
|
|
// ---------- session ----------
|
|
|
|
func (s *session) snapshot() ServerStatus {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.status
|
|
}
|
|
|
|
func (s *session) emitStatus() {
|
|
if s.onStatus != nil {
|
|
s.onStatus()
|
|
}
|
|
}
|
|
|
|
func (s *session) send(cmd string) error {
|
|
s.mu.RLock()
|
|
conn := s.conn
|
|
s.mu.RUnlock()
|
|
if conn == nil {
|
|
return fmt.Errorf("session %q not connected", s.cfg.Name)
|
|
}
|
|
_ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
|
_, err := conn.Write([]byte(strings.TrimRight(cmd, "\r\n") + "\r\n"))
|
|
return err
|
|
}
|
|
|
|
func (s *session) stop() {
|
|
s.mu.Lock()
|
|
stop, done := s.stopCh, s.doneCh
|
|
conn := s.conn
|
|
s.stopCh, s.doneCh, s.conn = nil, nil, nil
|
|
s.mu.Unlock()
|
|
if conn != nil {
|
|
_ = conn.Close()
|
|
}
|
|
if stop != nil {
|
|
close(stop)
|
|
}
|
|
if done != nil {
|
|
<-done
|
|
}
|
|
}
|
|
|
|
// run is the per-session supervisor: keeps trying to connect until
|
|
// Stop is called. Backoff caps at 60s, resets after a 30s healthy link.
|
|
func (s *session) run() {
|
|
defer close(s.doneCh)
|
|
backoff := []time.Duration{2, 5, 10, 30, 60}
|
|
attempt := 0
|
|
for {
|
|
select {
|
|
case <-s.stopCh:
|
|
return
|
|
default:
|
|
}
|
|
connectedAt, err := s.runOnce()
|
|
select {
|
|
case <-s.stopCh:
|
|
return
|
|
default:
|
|
}
|
|
if !connectedAt.IsZero() && time.Since(connectedAt) > 30*time.Second {
|
|
attempt = 0
|
|
}
|
|
idx := attempt
|
|
if idx >= len(backoff) {
|
|
idx = len(backoff) - 1
|
|
}
|
|
delay := backoff[idx] * time.Second
|
|
attempt++
|
|
|
|
s.mu.Lock()
|
|
s.status.State = StateReconnecting
|
|
if err != nil {
|
|
s.status.Error = err.Error()
|
|
}
|
|
s.status.Retries = attempt
|
|
s.mu.Unlock()
|
|
s.emitStatus()
|
|
|
|
select {
|
|
case <-s.stopCh:
|
|
return
|
|
case <-time.After(delay):
|
|
}
|
|
}
|
|
}
|
|
|
|
// runOnce dials, optionally logs in, sends init commands, parses spots.
|
|
// Returns the moment we marked the link "connected" (zero if dial failed)
|
|
// and the error that ended the session (nil if stopCh).
|
|
func (s *session) runOnce() (time.Time, error) {
|
|
addr := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port)
|
|
conn, err := net.DialTimeout("tcp", addr, 10*time.Second)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("dial %s: %w", addr, err)
|
|
}
|
|
s.mu.Lock()
|
|
s.conn = conn
|
|
s.mu.Unlock()
|
|
defer func() {
|
|
s.mu.Lock()
|
|
if s.conn == conn {
|
|
s.conn = nil
|
|
}
|
|
s.mu.Unlock()
|
|
_ = conn.Close()
|
|
}()
|
|
|
|
// Login: send on first prompt OR blindly after 1.5s. Many DXSpider
|
|
// nodes accept the callsign without re-prompting.
|
|
loginSent := false
|
|
if s.login != "" {
|
|
go func() {
|
|
select {
|
|
case <-s.stopCh:
|
|
return
|
|
case <-time.After(1500 * time.Millisecond):
|
|
if !loginSent {
|
|
_, _ = conn.Write([]byte(s.login + "\r\n"))
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Init commands: fire 1s after login goes through. Each command on
|
|
// its own line; blank lines and "//" comments are skipped.
|
|
initFired := false
|
|
fireInitCommands := func() {
|
|
if initFired || strings.TrimSpace(s.cfg.InitCommands) == "" {
|
|
return
|
|
}
|
|
initFired = true
|
|
go func() {
|
|
time.Sleep(1 * time.Second)
|
|
for _, line := range strings.Split(s.cfg.InitCommands, "\n") {
|
|
line = strings.TrimRight(line, "\r")
|
|
line = strings.TrimSpace(line)
|
|
if line == "" || strings.HasPrefix(line, "//") {
|
|
continue
|
|
}
|
|
select {
|
|
case <-s.stopCh:
|
|
return
|
|
default:
|
|
}
|
|
_, _ = conn.Write([]byte(line + "\r\n"))
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
}()
|
|
}
|
|
|
|
var connectedAt time.Time
|
|
rd := bufio.NewReader(conn)
|
|
for {
|
|
select {
|
|
case <-s.stopCh:
|
|
return connectedAt, nil
|
|
default:
|
|
}
|
|
|
|
_ = conn.SetReadDeadline(time.Now().Add(120 * time.Second))
|
|
line, err := rd.ReadString('\n')
|
|
if err != nil {
|
|
return connectedAt, fmt.Errorf("read: %w", err)
|
|
}
|
|
line = strings.TrimRight(line, "\r\n")
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
// Login on explicit prompt.
|
|
if !loginSent && s.login != "" && isLoginPrompt(line) {
|
|
_, _ = conn.Write([]byte(s.login + "\r\n"))
|
|
loginSent = true
|
|
continue
|
|
}
|
|
// Password on prompt (rare).
|
|
if loginSent && s.cfg.Password != "" && isPasswordPrompt(line) {
|
|
_, _ = conn.Write([]byte(s.cfg.Password + "\r\n"))
|
|
continue
|
|
}
|
|
|
|
// Mark connected once we've sent login OR seen a welcome banner.
|
|
if s.snapshot().State != StateConnected && (loginSent || isWelcome(line)) {
|
|
connectedAt = time.Now()
|
|
s.mu.Lock()
|
|
s.status.State = StateConnected
|
|
s.status.ConnectedAt = connectedAt
|
|
s.status.Error = ""
|
|
s.mu.Unlock()
|
|
s.emitStatus()
|
|
fireInitCommands()
|
|
}
|
|
|
|
if spot, ok := parseSpot(line); ok {
|
|
spot.SourceID = s.cfg.ID
|
|
spot.SourceName = s.cfg.Name
|
|
s.mu.Lock()
|
|
s.spotsCnt++
|
|
s.status.SpotsCount = s.spotsCnt
|
|
s.mu.Unlock()
|
|
if s.onSpot != nil {
|
|
s.onSpot(spot)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------- parsing ----------
|
|
|
|
// spotRE matches "DX de SPOTTER: FREQ DXCALL COMMENT TIME [LOC]".
|
|
var spotRE = regexp.MustCompile(
|
|
`^\s*DX\s+de\s+([A-Z0-9/#\-]+):?\s+(\d+\.?\d*)\s+([A-Z0-9/]+)\s+(.*?)\s+(\d{4}Z?)(?:\s+([A-R]{2}\d{2}(?:[A-X]{2})?))?\s*$`,
|
|
)
|
|
|
|
func parseSpot(line string) (Spot, bool) {
|
|
m := spotRE.FindStringSubmatch(line)
|
|
if m == nil {
|
|
return Spot{}, false
|
|
}
|
|
freqKHz, err := strconv.ParseFloat(m[2], 64)
|
|
if err != nil {
|
|
return Spot{}, false
|
|
}
|
|
freqHz := int64(freqKHz*1000 + 0.5)
|
|
return Spot{
|
|
Spotter: strings.ToUpper(m[1]),
|
|
FreqKHz: freqKHz,
|
|
FreqHz: freqHz,
|
|
Band: bandFromHz(freqHz),
|
|
DXCall: strings.ToUpper(m[3]),
|
|
Comment: strings.TrimSpace(m[4]),
|
|
TimeUTC: m[5],
|
|
Locator: strings.ToUpper(m[6]),
|
|
ReceivedAt: time.Now(),
|
|
Raw: line,
|
|
}, true
|
|
}
|
|
|
|
func isLoginPrompt(s string) bool {
|
|
low := strings.ToLower(s)
|
|
return strings.Contains(low, "login:") ||
|
|
strings.Contains(low, "please enter your call") ||
|
|
strings.Contains(low, "your call:") ||
|
|
strings.HasSuffix(strings.TrimSpace(low), "callsign:")
|
|
}
|
|
|
|
func isPasswordPrompt(s string) bool {
|
|
low := strings.ToLower(s)
|
|
return strings.Contains(low, "password:") || strings.Contains(low, "pwd:")
|
|
}
|
|
|
|
func isWelcome(s string) bool {
|
|
low := strings.ToLower(s)
|
|
return strings.Contains(low, "welcome") ||
|
|
strings.Contains(low, "logged in") ||
|
|
strings.Contains(low, "started")
|
|
}
|
|
|
|
func bandFromHz(hz int64) string {
|
|
mhz := float64(hz) / 1_000_000
|
|
switch {
|
|
case mhz >= 1.8 && mhz <= 2.0:
|
|
return "160m"
|
|
case mhz >= 3.5 && mhz <= 4.0:
|
|
return "80m"
|
|
case mhz >= 5.3 && mhz <= 5.5:
|
|
return "60m"
|
|
case mhz >= 7.0 && mhz <= 7.3:
|
|
return "40m"
|
|
case mhz >= 10.1 && mhz <= 10.15:
|
|
return "30m"
|
|
case mhz >= 14.0 && mhz <= 14.35:
|
|
return "20m"
|
|
case mhz >= 18.068 && mhz <= 18.168:
|
|
return "17m"
|
|
case mhz >= 21.0 && mhz <= 21.45:
|
|
return "15m"
|
|
case mhz >= 24.89 && mhz <= 24.99:
|
|
return "12m"
|
|
case mhz >= 28.0 && mhz <= 29.7:
|
|
return "10m"
|
|
case mhz >= 50.0 && mhz <= 54.0:
|
|
return "6m"
|
|
case mhz >= 70.0 && mhz <= 70.5:
|
|
return "4m"
|
|
case mhz >= 144.0 && mhz <= 148.0:
|
|
return "2m"
|
|
case mhz >= 222.0 && mhz <= 225.0:
|
|
return "1.25m"
|
|
case mhz >= 420.0 && mhz <= 450.0:
|
|
return "70cm"
|
|
case mhz >= 902.0 && mhz <= 928.0:
|
|
return "33cm"
|
|
case mhz >= 1240.0 && mhz <= 1300.0:
|
|
return "23cm"
|
|
}
|
|
return ""
|
|
}
|