package udp import ( "context" "fmt" "net" "strings" "sync" "syscall" "time" "golang.org/x/net/ipv4" "hamlog/internal/applog" ) // reusingListenConfig builds a net.ListenConfig that sets SO_REUSEADDR // (and SO_REUSEPORT on Unix) on the underlying socket before bind. This // is the only way for two processes to share a UDP port on Windows — Go // doesn't expose the option directly, but ListenConfig.Control hooks the // raw socket and lets us call setsockopt. func reusingListenConfig() net.ListenConfig { return net.ListenConfig{ Control: func(network, address string, c syscall.RawConn) error { var opErr error err := c.Control(func(fd uintptr) { opErr = setSocketReuse(fd) }) if err != nil { return err } return opErr }, } } // Event is what a Server emits to its consumer for every parsed packet. // At most one of the fields is populated per event. type Event struct { ConfigID int64 Service ServiceType Source string // remote addr that sent the packet, for diagnostics DXCall string // ServiceWSJT (Status) or ServiceRemoteCall DXGrid string // ServiceWSJT (Status) Mode string // ServiceWSJT (Status) FreqHz int64 // ServiceWSJT (Status) LoggedADIF string // ServiceWSJT (LoggedADIF), ServiceADIF or ServiceN1MM } // Server is a single inbound UDP listener. type Server struct { cfg Config conn *net.UDPConn out chan<- Event stop chan struct{} done chan struct{} stopped bool mu sync.Mutex } func newServer(cfg Config, out chan<- Event) *Server { return &Server{ cfg: cfg, out: out, stop: make(chan struct{}), done: make(chan struct{}), } } func (s *Server) start() error { var conn *net.UDPConn if s.cfg.Multicast { group := strings.TrimSpace(s.cfg.MulticastGroup) if group == "" { return fmt.Errorf("multicast enabled but group address is empty") } groupIP := net.ParseIP(group) if groupIP == nil { return fmt.Errorf("bad multicast group %q", group) } gaddr := &net.UDPAddr{IP: groupIP, Port: s.cfg.Port} // Bind to INADDR_ANY:port so the kernel will forward packets // addressed to the multicast group from any interface. Then // JoinGroup() on every up & multicast-capable interface — Windows // won't route multicast through interfaces we haven't explicitly // joined, and the "default" interface picked by // net.ListenMulticastUDP isn't always the one MSHV/WSJT sends on. // ListenConfig with SO_REUSEADDR lets us share the port with // Log4OM / other listeners already bound to 2237. lc := reusingListenConfig() pc, err := lc.ListenPacket(context.Background(), "udp4", fmt.Sprintf("0.0.0.0:%d", s.cfg.Port)) if err != nil { return fmt.Errorf("listen :%d for multicast: %w", s.cfg.Port, err) } c, ok := pc.(*net.UDPConn) if !ok { _ = pc.Close() return fmt.Errorf("internal: ListenPacket returned %T not *net.UDPConn", pc) } p := ipv4.NewPacketConn(c) ifaces, _ := net.Interfaces() joined := 0 for _, ifi := range ifaces { if ifi.Flags&net.FlagUp == 0 || ifi.Flags&net.FlagMulticast == 0 { continue } if err := p.JoinGroup(&ifi, gaddr); err != nil { applog.Printf("udp: [%s] join %s on %s: %v\n", s.cfg.Name, gaddr.IP, ifi.Name, err) continue } joined++ } if joined == 0 { _ = c.Close() return fmt.Errorf("couldn't join multicast %s on any interface", gaddr.IP) } conn = c applog.Printf("udp: [%s] listening on multicast %s on %d interface(s) (service=%s)\n", s.cfg.Name, gaddr, joined, s.cfg.ServiceType) } else { lc := reusingListenConfig() pc, err := lc.ListenPacket(context.Background(), "udp4", fmt.Sprintf("0.0.0.0:%d", s.cfg.Port)) if err != nil { return fmt.Errorf("listen udp :%d: %w", s.cfg.Port, err) } c, ok := pc.(*net.UDPConn) if !ok { _ = pc.Close() return fmt.Errorf("internal: ListenPacket returned %T not *net.UDPConn", pc) } conn = c applog.Printf("udp: [%s] listening on unicast :%d (service=%s)\n", s.cfg.Name, s.cfg.Port, s.cfg.ServiceType) } s.conn = conn go s.run() return nil } func (s *Server) run() { defer close(s.done) buf := make([]byte, 64*1024) for { select { case <-s.stop: return default: } _ = s.conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) n, remote, err := s.conn.ReadFromUDP(buf) if err != nil { if ne, ok := err.(net.Error); ok && ne.Timeout() { continue } // Closed by stop(): exit silently. return } if n == 0 { continue } pkt := make([]byte, n) copy(pkt, buf[:n]) go s.handle(pkt, remote) } } func (s *Server) handle(pkt []byte, remote *net.UDPAddr) { applog.Printf("udp: [%s] rx %d bytes from %s\n", s.cfg.Name, len(pkt), remote) ev := Event{ConfigID: s.cfg.ID, Service: s.cfg.ServiceType, Source: remote.String()} switch s.cfg.ServiceType { case ServiceWSJT: w, ok, err := ParseWSJT(pkt) if err != nil { applog.Printf("udp: [%s] WSJT parse error: %v\n", s.cfg.Name, err) return } if !ok { applog.Printf("udp: [%s] WSJT msg type ignored\n", s.cfg.Name) return } applog.Printf("udp: [%s] WSJT decoded: prog=%q dx_call=%q grid=%q mode=%q freq=%d adif_len=%d\n", s.cfg.Name, w.ProgramID, w.DXCall, w.DXGrid, w.Mode, w.FreqHz, len(w.LoggedADIF)) ev.DXCall = w.DXCall ev.DXGrid = w.DXGrid ev.Mode = w.Mode ev.FreqHz = w.FreqHz ev.LoggedADIF = w.LoggedADIF case ServiceADIF: // JTAlert / GridTracker forward a text ADIF record after a QSO is // logged. Guard against keep-alive / non-ADIF chatter on the socket: // only forward payloads that actually carry a callsign field and a // record terminator. text := string(pkt) low := strings.ToLower(text) if !strings.Contains(low, "/)\n", s.cfg.Name) return } ev.LoggedADIF = text case ServiceRemoteCall: // Common payload shapes seen in the wild: // "F4XYZ" (bare callsign) // "CALL F4XYZ" (text prefix) // "F4XYZ" (DXHunter-style tags) // "F4XYZ" (proper XML) // Strip every angle-bracket tag, normalise whitespace, take the // last non-empty token. Upper-case for downstream consistency. text := string(pkt) // Drop every <...> tag (open or close) — works for both // ... and .... for { start := strings.IndexByte(text, '<') if start < 0 { break } end := strings.IndexByte(text[start:], '>') if end < 0 { break } text = text[:start] + " " + text[start+end+1:] } text = strings.TrimSpace(text) parts := strings.Fields(text) if len(parts) == 0 { return } ev.DXCall = strings.ToUpper(parts[len(parts)-1]) case ServiceN1MM: adifText, ok, err := ParseN1MM(pkt) if err != nil { applog.Printf("udp: [%s] N1MM parse error: %v\n", s.cfg.Name, err) return } if !ok { applog.Printf("udp: [%s] N1MM datagram ignored (not a loggable contact)\n", s.cfg.Name) return } applog.Printf("udp: [%s] N1MM contact decoded (%d bytes ADIF)\n", s.cfg.Name, len(adifText)) ev.LoggedADIF = adifText default: return } // Empty events are useless; skip. if ev.DXCall == "" && ev.LoggedADIF == "" { return } select { case s.out <- ev: default: // Drop on backpressure rather than block the read loop. } } func (s *Server) close() { s.mu.Lock() if s.stopped { s.mu.Unlock() return } s.stopped = true stop, done, conn := s.stop, s.done, s.conn s.mu.Unlock() if conn != nil { _ = conn.Close() } if stop != nil { close(stop) } if done != nil { <-done } } // ── Outbound emitter ────────────────────────────────────────────────── // SendUDP sends payload to dst (host:port). Unicast or directed broadcast. // Returns the error from the write; the connection is closed before return. func SendUDP(dst string, payload []byte) error { conn, err := net.Dial("udp4", dst) if err != nil { return fmt.Errorf("dial %s: %w", dst, err) } defer conn.Close() _ = conn.SetWriteDeadline(time.Now().Add(2 * time.Second)) _, err = conn.Write(payload) return err } // ── Manager ─────────────────────────────────────────────────────────── // Manager owns every inbound Server and exposes a helper to emit on // outbound connections at QSO-save time. It reloads from the Repo on // demand (after a CRUD change in the Settings panel). type Manager struct { repo *Repo out chan Event mu sync.Mutex inbound map[int64]*Server outbound []Config } func NewManager(repo *Repo) *Manager { return &Manager{ repo: repo, out: make(chan Event, 64), inbound: map[int64]*Server{}, } } // Events returns the channel inbound parsed events are delivered on. // The app exposes these as Wails events. func (m *Manager) Events() <-chan Event { return m.out } // Reload restarts every server based on the current Repo contents. // Existing servers are stopped, the snapshot is rebuilt from scratch. // Errors on individual rows are logged via the returned slice; the // caller can surface them in the UI. func (m *Manager) Reload(ctx context.Context) []string { applog.Printf("udp: Reload() called") m.mu.Lock() old := m.inbound m.inbound = map[int64]*Server{} m.outbound = nil m.mu.Unlock() for _, s := range old { s.close() } cfgs, err := m.repo.List(ctx) if err != nil { applog.Printf("udp: Reload list failed: %v", err) return []string{fmt.Sprintf("load udp configs: %v", err)} } applog.Printf("udp: Reload found %d config(s) in DB", len(cfgs)) var errs []string for _, c := range cfgs { applog.Printf("udp: cfg id=%d name=%q dir=%s service=%s port=%d mcast=%v group=%q enabled=%v", c.ID, c.Name, c.Direction, c.ServiceType, c.Port, c.Multicast, c.MulticastGroup, c.Enabled) if !c.Enabled { continue } if c.Direction == Outbound { m.mu.Lock() m.outbound = append(m.outbound, c) m.mu.Unlock() continue } srv := newServer(c, m.out) if err := srv.start(); err != nil { applog.Printf("udp: start %q failed: %v", c.Name, err) errs = append(errs, fmt.Sprintf("%s: %v", c.Name, err)) continue } m.mu.Lock() m.inbound[c.ID] = srv m.mu.Unlock() } applog.Printf("udp: Reload done — %d server(s) running, %d error(s)", len(m.inbound), len(errs)) return errs } // Outbound returns the active outbound configs matching a service type. // Used by the QSO save path to push notifications to listeners. func (m *Manager) Outbound(service ServiceType) []Config { m.mu.Lock() defer m.mu.Unlock() var out []Config for _, c := range m.outbound { if c.ServiceType == service { out = append(out, c) } } return out } // StopAll closes every running server. Called at app shutdown. func (m *Manager) StopAll() { m.mu.Lock() old := m.inbound m.inbound = map[int64]*Server{} m.outbound = nil m.mu.Unlock() for _, s := range old { s.close() } }