Files
2026-03-17 20:20:23 +01:00

333 lines
7.4 KiB
Go

package cluster
import (
"bufio"
"context"
"fmt"
"math"
"net"
"strings"
"sync"
"time"
"github.com/user/flexdxcluster2/internal/spot"
)
const (
MaxReconnectAttempts = 10
BaseReconnectDelay = 1 * time.Second
MaxReconnectDelay = 60 * time.Second
ConnectionTimeout = 10 * time.Second
ReadTimeout = 5 * time.Minute
)
// Config reprend ClusterConfig de l'ancien code
type Config struct {
Name string
Server string
Port string
Login string
Password string
Skimmer bool
FT8 bool
FT4 bool
Beacon bool
Command string
LoginPrompt string
Enabled bool
Master bool
Type string // dxspider, cc_cluster, ar_cluster — vide = auto
}
// Client gère la connexion TCP à un cluster DX
type Client struct {
cfg Config
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
mu sync.Mutex
loggedIn bool
clusterType string
ctx context.Context
cancel context.CancelFunc
reconnects int
// SpotChan reçoit les spots parsés — consommé par le SpotProcessor
SpotChan chan *spot.Spot
// ConsoleChan reçoit toutes les lignes brutes — pour l'onglet Console
ConsoleChan chan string
// CmdChan reçoit les commandes à envoyer au cluster depuis l'UI
CmdChan chan string
}
func New(cfg Config) *Client {
ctx, cancel := context.WithCancel(context.Background())
return &Client{
cfg: cfg,
clusterType: cfg.Type,
ctx: ctx,
cancel: cancel,
SpotChan: make(chan *spot.Spot, 200),
ConsoleChan: make(chan string, 200),
CmdChan: make(chan string, 100),
}
}
func (c *Client) Name() string { return c.cfg.Name }
func (c *Client) IsMaster() bool { return c.cfg.Master }
func (c *Client) ClusterType() string { return c.clusterType }
// Start démarre le client avec reconnexion automatique
func (c *Client) Start() {
// Goroutine commandes UI → cluster
go c.commandLoop()
for {
select {
case <-c.ctx.Done():
return
default:
}
if err := c.connect(); err != nil {
c.reconnects++
if c.reconnects >= MaxReconnectAttempts {
return
}
delay := c.backoff()
select {
case <-c.ctx.Done():
return
case <-time.After(delay):
continue
}
}
c.readLoop()
c.loggedIn = false
time.Sleep(2 * time.Second)
}
}
func (c *Client) Close() {
c.cancel()
if c.conn != nil {
c.write([]byte("bye\r\n"))
c.conn.Close()
}
}
// ReloadFilters renvoie les commandes de filtres au cluster
func (c *Client) ReloadFilters() {
if c.loggedIn {
c.setFilters()
}
}
// SendCommand envoie une commande arbitraire au cluster depuis l'UI
func (c *Client) SendCommand(cmd string) {
select {
case c.CmdChan <- cmd:
default:
}
}
func (c *Client) connect() error {
addr := c.cfg.Server + ":" + c.cfg.Port
conn, err := net.DialTimeout("tcp", addr, ConnectionTimeout)
if err != nil {
return fmt.Errorf("connect %s: %w", addr, err)
}
c.conn = conn
c.reader = bufio.NewReader(conn)
c.writer = bufio.NewWriter(conn)
c.loggedIn = false
c.reconnects = 0
return nil
}
func (c *Client) commandLoop() {
for {
select {
case <-c.ctx.Done():
return
case cmd := <-c.CmdChan:
c.write([]byte(cmd + "\r\n"))
}
}
}
func (c *Client) readLoop() {
defer c.conn.Close()
for {
select {
case <-c.ctx.Done():
return
default:
}
if !c.loggedIn {
c.conn.SetReadDeadline(time.Now().Add(30 * time.Second))
msg, err := c.reader.ReadBytes(':')
if err != nil {
return
}
c.conn.SetReadDeadline(time.Time{})
line := string(msg)
c.detectType(line)
c.sendConsole(line)
if strings.Contains(line, c.cfg.LoginPrompt) || strings.Contains(line, "login:") {
time.Sleep(time.Second)
c.write([]byte(c.cfg.Login + "\n\r"))
c.loggedIn = true
go func() {
time.Sleep(3 * time.Second)
c.setFilters()
}()
}
continue
}
c.conn.SetReadDeadline(time.Now().Add(ReadTimeout))
msg, err := c.reader.ReadBytes('\n')
if err != nil {
return
}
c.conn.SetReadDeadline(time.Time{})
line := string(msg)
if strings.TrimSpace(line) == "" {
continue
}
if c.clusterType == "" {
c.detectType(line)
}
if strings.Contains(line, "password") {
c.write([]byte(c.cfg.Password + "\r\n"))
}
if strings.Contains(line, "Hello") || strings.Contains(line, "Welcome") {
if c.cfg.Command != "" {
c.write([]byte(c.cfg.Command + "\n\r"))
}
}
// Tenter de parser comme spot DX
isDX := strings.Contains(line, "DX de ") || spot.ShortSpotDetectRe.MatchString(line)
if isDX && !c.shouldSkip(line) {
if parsed := spot.ParseLine(line, c.cfg.Name); parsed != nil {
select {
case c.SpotChan <- parsed:
default:
}
}
}
// Console — marquer les messages "To ALL"
consoleMsg := line
if strings.HasPrefix(strings.TrimSpace(line), "To ALL de ") {
consoleMsg = "TO_ALL:" + strings.TrimSpace(line)
}
c.sendConsole(consoleMsg)
}
}
// shouldSkip applique le filtre applicatif selon la config du cluster
func (c *Client) shouldSkip(line string) bool {
upper := strings.ToUpper(line)
if strings.Contains(upper, "FT8") && !c.cfg.FT8 {
return true
}
if strings.Contains(upper, "FT4") && !c.cfg.FT4 {
return true
}
if (strings.Contains(upper, "CW SKIMMER") || strings.Contains(upper, "SKIMMER")) && !c.cfg.Skimmer {
return true
}
if strings.Contains(upper, "BEACON") && !c.cfg.Beacon {
return true
}
return false
}
func (c *Client) detectType(line string) {
if c.cfg.Type != "" {
c.clusterType = c.cfg.Type
return
}
lower := strings.ToLower(line)
switch {
case strings.Contains(lower, "dxspider"):
c.clusterType = "dxspider"
case strings.Contains(lower, "cc cluster") || strings.Contains(lower, "cc-cluster"):
c.clusterType = "cc_cluster"
case strings.Contains(lower, "ar-cluster") || strings.Contains(lower, "arcluster"):
c.clusterType = "ar_cluster"
}
}
func (c *Client) setFilters() {
switch c.clusterType {
case "dxspider":
c.setFiltersDXSpider()
case "ar_cluster":
c.setFiltersAR()
default:
c.setFiltersCC()
}
}
func (c *Client) setFiltersCC() {
if c.cfg.FT8 { c.write([]byte("set/ft8\r\n")) } else { c.write([]byte("set/noft8\r\n")) }
if c.cfg.FT4 { c.write([]byte("set/ft4\r\n")) } else { c.write([]byte("set/noft4\r\n")) }
if c.cfg.Skimmer { c.write([]byte("set/skimmer\r\n")) } else { c.write([]byte("set/noskimmer\r\n")) }
if c.cfg.Beacon { c.write([]byte("set/beacon\r\n")) } else { c.write([]byte("set/nobeacon\r\n")) }
}
func (c *Client) setFiltersDXSpider() {
if c.cfg.Skimmer && c.cfg.FT8 && c.cfg.FT4 {
c.write([]byte("SET/SKIMMER CW FT8 FT4\r\n"))
} else if c.cfg.Skimmer && c.cfg.FT8 {
c.write([]byte("SET/SKIMMER CW FT8\r\n"))
} else if c.cfg.Skimmer {
c.write([]byte("SET/SKIMMER CW\r\n"))
} else {
c.write([]byte("UNSET/SKIMMER\r\n"))
}
}
func (c *Client) setFiltersAR() {
if c.cfg.FT8 { c.write([]byte("set/ft8\r\n")) } else { c.write([]byte("set/noft8\r\n")) }
if c.cfg.FT4 { c.write([]byte("set/ft4\r\n")) } else { c.write([]byte("set/noft4\r\n")) }
}
func (c *Client) write(data []byte) {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil || c.writer == nil {
return
}
c.writer.Write(data)
c.writer.Flush()
}
func (c *Client) sendConsole(line string) {
select {
case c.ConsoleChan <- line:
default:
}
}
func (c *Client) backoff() time.Duration {
d := time.Duration(float64(BaseReconnectDelay) * math.Pow(2, float64(c.reconnects)))
if d > MaxReconnectDelay {
return MaxReconnectDelay
}
return d
}