first commit
This commit is contained in:
332
internal/cluster/client.go
Normal file
332
internal/cluster/client.go
Normal file
@@ -0,0 +1,332 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/user/flexdxcluster2/internal/spot"
|
||||
)
|
||||
|
||||
const (
|
||||
MaxReconnectAttempts = 10
|
||||
BaseReconnectDelay = 1 * time.Second
|
||||
MaxReconnectDelay = 60 * time.Second
|
||||
ConnectionTimeout = 10 * time.Second
|
||||
ReadTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
// Config reprend ClusterConfig de l'ancien code
|
||||
type Config struct {
|
||||
Name string
|
||||
Server string
|
||||
Port string
|
||||
Login string
|
||||
Password string
|
||||
Skimmer bool
|
||||
FT8 bool
|
||||
FT4 bool
|
||||
Beacon bool
|
||||
Command string
|
||||
LoginPrompt string
|
||||
Enabled bool
|
||||
Master bool
|
||||
Type string // dxspider, cc_cluster, ar_cluster — vide = auto
|
||||
}
|
||||
|
||||
// Client gère la connexion TCP à un cluster DX
|
||||
type Client struct {
|
||||
cfg Config
|
||||
conn net.Conn
|
||||
reader *bufio.Reader
|
||||
writer *bufio.Writer
|
||||
mu sync.Mutex
|
||||
loggedIn bool
|
||||
clusterType string
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
reconnects int
|
||||
|
||||
// SpotChan reçoit les spots parsés — consommé par le SpotProcessor
|
||||
SpotChan chan *spot.Spot
|
||||
// ConsoleChan reçoit toutes les lignes brutes — pour l'onglet Console
|
||||
ConsoleChan chan string
|
||||
// CmdChan reçoit les commandes à envoyer au cluster depuis l'UI
|
||||
CmdChan chan string
|
||||
}
|
||||
|
||||
func New(cfg Config) *Client {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &Client{
|
||||
cfg: cfg,
|
||||
clusterType: cfg.Type,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
SpotChan: make(chan *spot.Spot, 200),
|
||||
ConsoleChan: make(chan string, 200),
|
||||
CmdChan: make(chan string, 100),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Name() string { return c.cfg.Name }
|
||||
func (c *Client) IsMaster() bool { return c.cfg.Master }
|
||||
func (c *Client) ClusterType() string { return c.clusterType }
|
||||
|
||||
// Start démarre le client avec reconnexion automatique
|
||||
func (c *Client) Start() {
|
||||
// Goroutine commandes UI → cluster
|
||||
go c.commandLoop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if err := c.connect(); err != nil {
|
||||
c.reconnects++
|
||||
if c.reconnects >= MaxReconnectAttempts {
|
||||
return
|
||||
}
|
||||
delay := c.backoff()
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case <-time.After(delay):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
c.readLoop()
|
||||
c.loggedIn = false
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Close() {
|
||||
c.cancel()
|
||||
if c.conn != nil {
|
||||
c.write([]byte("bye\r\n"))
|
||||
c.conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// ReloadFilters renvoie les commandes de filtres au cluster
|
||||
func (c *Client) ReloadFilters() {
|
||||
if c.loggedIn {
|
||||
c.setFilters()
|
||||
}
|
||||
}
|
||||
|
||||
// SendCommand envoie une commande arbitraire au cluster depuis l'UI
|
||||
func (c *Client) SendCommand(cmd string) {
|
||||
select {
|
||||
case c.CmdChan <- cmd:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) connect() error {
|
||||
addr := c.cfg.Server + ":" + c.cfg.Port
|
||||
conn, err := net.DialTimeout("tcp", addr, ConnectionTimeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect %s: %w", addr, err)
|
||||
}
|
||||
c.conn = conn
|
||||
c.reader = bufio.NewReader(conn)
|
||||
c.writer = bufio.NewWriter(conn)
|
||||
c.loggedIn = false
|
||||
c.reconnects = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) commandLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case cmd := <-c.CmdChan:
|
||||
c.write([]byte(cmd + "\r\n"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) readLoop() {
|
||||
defer c.conn.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if !c.loggedIn {
|
||||
c.conn.SetReadDeadline(time.Now().Add(30 * time.Second))
|
||||
msg, err := c.reader.ReadBytes(':')
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
c.conn.SetReadDeadline(time.Time{})
|
||||
|
||||
line := string(msg)
|
||||
c.detectType(line)
|
||||
c.sendConsole(line)
|
||||
|
||||
if strings.Contains(line, c.cfg.LoginPrompt) || strings.Contains(line, "login:") {
|
||||
time.Sleep(time.Second)
|
||||
c.write([]byte(c.cfg.Login + "\n\r"))
|
||||
c.loggedIn = true
|
||||
go func() {
|
||||
time.Sleep(3 * time.Second)
|
||||
c.setFilters()
|
||||
}()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
c.conn.SetReadDeadline(time.Now().Add(ReadTimeout))
|
||||
msg, err := c.reader.ReadBytes('\n')
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
c.conn.SetReadDeadline(time.Time{})
|
||||
|
||||
line := string(msg)
|
||||
if strings.TrimSpace(line) == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if c.clusterType == "" {
|
||||
c.detectType(line)
|
||||
}
|
||||
|
||||
if strings.Contains(line, "password") {
|
||||
c.write([]byte(c.cfg.Password + "\r\n"))
|
||||
}
|
||||
|
||||
if strings.Contains(line, "Hello") || strings.Contains(line, "Welcome") {
|
||||
if c.cfg.Command != "" {
|
||||
c.write([]byte(c.cfg.Command + "\n\r"))
|
||||
}
|
||||
}
|
||||
|
||||
// Tenter de parser comme spot DX
|
||||
isDX := strings.Contains(line, "DX de ") || spot.ShortSpotDetectRe.MatchString(line)
|
||||
if isDX && !c.shouldSkip(line) {
|
||||
if parsed := spot.ParseLine(line, c.cfg.Name); parsed != nil {
|
||||
select {
|
||||
case c.SpotChan <- parsed:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Console — marquer les messages "To ALL"
|
||||
consoleMsg := line
|
||||
if strings.HasPrefix(strings.TrimSpace(line), "To ALL de ") {
|
||||
consoleMsg = "TO_ALL:" + strings.TrimSpace(line)
|
||||
}
|
||||
c.sendConsole(consoleMsg)
|
||||
}
|
||||
}
|
||||
|
||||
// shouldSkip applique le filtre applicatif selon la config du cluster
|
||||
func (c *Client) shouldSkip(line string) bool {
|
||||
upper := strings.ToUpper(line)
|
||||
if strings.Contains(upper, "FT8") && !c.cfg.FT8 {
|
||||
return true
|
||||
}
|
||||
if strings.Contains(upper, "FT4") && !c.cfg.FT4 {
|
||||
return true
|
||||
}
|
||||
if (strings.Contains(upper, "CW SKIMMER") || strings.Contains(upper, "SKIMMER")) && !c.cfg.Skimmer {
|
||||
return true
|
||||
}
|
||||
if strings.Contains(upper, "BEACON") && !c.cfg.Beacon {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *Client) detectType(line string) {
|
||||
if c.cfg.Type != "" {
|
||||
c.clusterType = c.cfg.Type
|
||||
return
|
||||
}
|
||||
lower := strings.ToLower(line)
|
||||
switch {
|
||||
case strings.Contains(lower, "dxspider"):
|
||||
c.clusterType = "dxspider"
|
||||
case strings.Contains(lower, "cc cluster") || strings.Contains(lower, "cc-cluster"):
|
||||
c.clusterType = "cc_cluster"
|
||||
case strings.Contains(lower, "ar-cluster") || strings.Contains(lower, "arcluster"):
|
||||
c.clusterType = "ar_cluster"
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) setFilters() {
|
||||
switch c.clusterType {
|
||||
case "dxspider":
|
||||
c.setFiltersDXSpider()
|
||||
case "ar_cluster":
|
||||
c.setFiltersAR()
|
||||
default:
|
||||
c.setFiltersCC()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) setFiltersCC() {
|
||||
if c.cfg.FT8 { c.write([]byte("set/ft8\r\n")) } else { c.write([]byte("set/noft8\r\n")) }
|
||||
if c.cfg.FT4 { c.write([]byte("set/ft4\r\n")) } else { c.write([]byte("set/noft4\r\n")) }
|
||||
if c.cfg.Skimmer { c.write([]byte("set/skimmer\r\n")) } else { c.write([]byte("set/noskimmer\r\n")) }
|
||||
if c.cfg.Beacon { c.write([]byte("set/beacon\r\n")) } else { c.write([]byte("set/nobeacon\r\n")) }
|
||||
}
|
||||
|
||||
func (c *Client) setFiltersDXSpider() {
|
||||
if c.cfg.Skimmer && c.cfg.FT8 && c.cfg.FT4 {
|
||||
c.write([]byte("SET/SKIMMER CW FT8 FT4\r\n"))
|
||||
} else if c.cfg.Skimmer && c.cfg.FT8 {
|
||||
c.write([]byte("SET/SKIMMER CW FT8\r\n"))
|
||||
} else if c.cfg.Skimmer {
|
||||
c.write([]byte("SET/SKIMMER CW\r\n"))
|
||||
} else {
|
||||
c.write([]byte("UNSET/SKIMMER\r\n"))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) setFiltersAR() {
|
||||
if c.cfg.FT8 { c.write([]byte("set/ft8\r\n")) } else { c.write([]byte("set/noft8\r\n")) }
|
||||
if c.cfg.FT4 { c.write([]byte("set/ft4\r\n")) } else { c.write([]byte("set/noft4\r\n")) }
|
||||
}
|
||||
|
||||
func (c *Client) write(data []byte) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.conn == nil || c.writer == nil {
|
||||
return
|
||||
}
|
||||
c.writer.Write(data)
|
||||
c.writer.Flush()
|
||||
}
|
||||
|
||||
func (c *Client) sendConsole(line string) {
|
||||
select {
|
||||
case c.ConsoleChan <- line:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) backoff() time.Duration {
|
||||
d := time.Duration(float64(BaseReconnectDelay) * math.Pow(2, float64(c.reconnects)))
|
||||
if d > MaxReconnectDelay {
|
||||
return MaxReconnectDelay
|
||||
}
|
||||
return d
|
||||
}
|
||||
Reference in New Issue
Block a user