This commit is contained in:
2026-05-28 21:32:46 +02:00
parent e8cac569e3
commit e82e30dd02
29 changed files with 2485 additions and 97 deletions
+128
View File
@@ -0,0 +1,128 @@
// Package udp manages user-defined UDP integrations: inbound listeners
// (WSJT-X, JTDX, MSHV log events; JTAlert ADIF; N1MM XML; DXHunter call)
// and outbound emitters (db_updated → notifies Cloudlog/N1MM when HamLog
// just logged a QSO).
//
// One Server per connection row, started/stopped by the Manager when the
// user enables/disables or edits the row. Multicast support lets multiple
// apps share the same port without bind conflicts — essential since
// WSJT-X uses 2237 and several tools already listen there.
package udp
import (
"context"
"database/sql"
"fmt"
)
// Direction is "inbound" (we listen) or "outbound" (we emit).
type Direction string
const (
Inbound Direction = "inbound"
Outbound Direction = "outbound"
)
// ServiceType selects the parser/emitter for a connection.
type ServiceType string
const (
ServiceWSJT ServiceType = "wsjt" // WSJT-X / JTDX / MSHV binary
ServiceADIF ServiceType = "adif" // text ADIF over UDP
ServiceN1MM ServiceType = "n1mm" // N1MM Logger+ XML
ServiceRemoteCall ServiceType = "remote_call" // plain text callsign
ServiceDBUpdated ServiceType = "db_updated" // outbound ADIF of local QSO
)
// Config is one user-defined UDP connection.
type Config struct {
ID int64 `json:"id"`
Direction Direction `json:"direction"`
Name string `json:"name"`
Port int `json:"port"`
ServiceType ServiceType `json:"service_type"`
Multicast bool `json:"multicast"`
MulticastGroup string `json:"multicast_group"`
DestinationIP string `json:"destination_ip"` // outbound only
Enabled bool `json:"enabled"`
SortOrder int `json:"sort_order"`
}
// Repo is the persistence layer for UDP integration rows.
type Repo struct{ db *sql.DB }
func NewRepo(db *sql.DB) *Repo { return &Repo{db: db} }
func (r *Repo) List(ctx context.Context) ([]Config, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, direction, name, port, service_type,
multicast, multicast_group, destination_ip,
enabled, sort_order
FROM integrations_udp
ORDER BY direction, sort_order, id`)
if err != nil {
return nil, fmt.Errorf("list udp: %w", err)
}
defer rows.Close()
var out []Config
for rows.Next() {
var c Config
var mc, en int
if err := rows.Scan(&c.ID, &c.Direction, &c.Name, &c.Port, &c.ServiceType,
&mc, &c.MulticastGroup, &c.DestinationIP, &en, &c.SortOrder); err != nil {
return nil, err
}
c.Multicast = mc != 0
c.Enabled = en != 0
out = append(out, c)
}
return out, rows.Err()
}
func (r *Repo) Save(ctx context.Context, c *Config) error {
if c.Direction != Inbound && c.Direction != Outbound {
return fmt.Errorf("invalid direction %q", c.Direction)
}
if c.Name == "" {
return fmt.Errorf("name required")
}
mc, en := 0, 0
if c.Multicast {
mc = 1
}
if c.Enabled {
en = 1
}
if c.ID == 0 {
res, err := r.db.ExecContext(ctx, `
INSERT INTO integrations_udp(direction, name, port, service_type,
multicast, multicast_group, destination_ip, enabled, sort_order)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)`,
c.Direction, c.Name, c.Port, c.ServiceType,
mc, c.MulticastGroup, c.DestinationIP, en, c.SortOrder)
if err != nil {
return fmt.Errorf("insert udp: %w", err)
}
id, _ := res.LastInsertId()
c.ID = id
return nil
}
_, err := r.db.ExecContext(ctx, `
UPDATE integrations_udp SET
direction = ?, name = ?, port = ?, service_type = ?,
multicast = ?, multicast_group = ?, destination_ip = ?,
enabled = ?, sort_order = ?,
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE id = ?`,
c.Direction, c.Name, c.Port, c.ServiceType,
mc, c.MulticastGroup, c.DestinationIP, en, c.SortOrder, c.ID)
if err != nil {
return fmt.Errorf("update udp: %w", err)
}
return nil
}
func (r *Repo) Delete(ctx context.Context, id int64) error {
_, err := r.db.ExecContext(ctx, `DELETE FROM integrations_udp WHERE id = ?`, id)
return err
}
+18
View File
@@ -0,0 +1,18 @@
//go:build !windows
package udp
import "golang.org/x/sys/unix"
// setSocketReuse enables SO_REUSEADDR + SO_REUSEPORT on Linux/macOS so
// multiple processes can share a multicast UDP port (matches the Windows
// behaviour with SO_REUSEADDR).
func setSocketReuse(fd uintptr) error {
if err := unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1); err != nil {
return err
}
// SO_REUSEPORT isn't defined on every Unix; the syscall returning
// ENOPROTOOPT is fine to ignore.
_ = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
return nil
}
@@ -0,0 +1,14 @@
//go:build windows
package udp
import "golang.org/x/sys/windows"
// setSocketReuse enables SO_REUSEADDR on the socket before bind so that
// multiple processes (HamLog + Log4OM + …) can listen on the same UDP
// multicast port. Without it, Windows fails the second bind with
// WSAEADDRINUSE ("Une seule utilisation de chaque adresse de socket…").
func setSocketReuse(fd uintptr) error {
return windows.SetsockoptInt(windows.Handle(fd),
windows.SOL_SOCKET, windows.SO_REUSEADDR, 1)
}
+369
View File
@@ -0,0 +1,369 @@
package udp
import (
"context"
"fmt"
"net"
"strings"
"sync"
"syscall"
"time"
"golang.org/x/net/ipv4"
"hamlog/internal/applog"
)
// reusingListenConfig builds a net.ListenConfig that sets SO_REUSEADDR
// (and SO_REUSEPORT on Unix) on the underlying socket before bind. This
// is the only way for two processes to share a UDP port on Windows — Go
// doesn't expose the option directly, but ListenConfig.Control hooks the
// raw socket and lets us call setsockopt.
func reusingListenConfig() net.ListenConfig {
return net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
var opErr error
err := c.Control(func(fd uintptr) {
opErr = setSocketReuse(fd)
})
if err != nil {
return err
}
return opErr
},
}
}
// Event is what a Server emits to its consumer for every parsed packet.
// At most one of the fields is populated per event.
type Event struct {
ConfigID int64
Service ServiceType
Source string // remote addr that sent the packet, for diagnostics
DXCall string // ServiceWSJT (Status) or ServiceRemoteCall
DXGrid string // ServiceWSJT (Status)
Mode string // ServiceWSJT (Status)
FreqHz int64 // ServiceWSJT (Status)
LoggedADIF string // ServiceWSJT (LoggedADIF) or ServiceADIF
RawText string // generic fallback (n1mm xml, etc.)
}
// Server is a single inbound UDP listener.
type Server struct {
cfg Config
conn *net.UDPConn
out chan<- Event
stop chan struct{}
done chan struct{}
stopped bool
mu sync.Mutex
}
func newServer(cfg Config, out chan<- Event) *Server {
return &Server{
cfg: cfg,
out: out,
stop: make(chan struct{}),
done: make(chan struct{}),
}
}
func (s *Server) start() error {
var conn *net.UDPConn
if s.cfg.Multicast {
group := strings.TrimSpace(s.cfg.MulticastGroup)
if group == "" {
return fmt.Errorf("multicast enabled but group address is empty")
}
groupIP := net.ParseIP(group)
if groupIP == nil {
return fmt.Errorf("bad multicast group %q", group)
}
gaddr := &net.UDPAddr{IP: groupIP, Port: s.cfg.Port}
// Bind to INADDR_ANY:port so the kernel will forward packets
// addressed to the multicast group from any interface. Then
// JoinGroup() on every up & multicast-capable interface — Windows
// won't route multicast through interfaces we haven't explicitly
// joined, and the "default" interface picked by
// net.ListenMulticastUDP isn't always the one MSHV/WSJT sends on.
// ListenConfig with SO_REUSEADDR lets us share the port with
// Log4OM / other listeners already bound to 2237.
lc := reusingListenConfig()
pc, err := lc.ListenPacket(context.Background(), "udp4", fmt.Sprintf("0.0.0.0:%d", s.cfg.Port))
if err != nil {
return fmt.Errorf("listen :%d for multicast: %w", s.cfg.Port, err)
}
c, ok := pc.(*net.UDPConn)
if !ok {
_ = pc.Close()
return fmt.Errorf("internal: ListenPacket returned %T not *net.UDPConn", pc)
}
p := ipv4.NewPacketConn(c)
ifaces, _ := net.Interfaces()
joined := 0
for _, ifi := range ifaces {
if ifi.Flags&net.FlagUp == 0 || ifi.Flags&net.FlagMulticast == 0 {
continue
}
if err := p.JoinGroup(&ifi, gaddr); err != nil {
applog.Printf("udp: [%s] join %s on %s: %v\n", s.cfg.Name, gaddr.IP, ifi.Name, err)
continue
}
joined++
}
if joined == 0 {
_ = c.Close()
return fmt.Errorf("couldn't join multicast %s on any interface", gaddr.IP)
}
conn = c
applog.Printf("udp: [%s] listening on multicast %s on %d interface(s) (service=%s)\n",
s.cfg.Name, gaddr, joined, s.cfg.ServiceType)
} else {
lc := reusingListenConfig()
pc, err := lc.ListenPacket(context.Background(), "udp4", fmt.Sprintf("0.0.0.0:%d", s.cfg.Port))
if err != nil {
return fmt.Errorf("listen udp :%d: %w", s.cfg.Port, err)
}
c, ok := pc.(*net.UDPConn)
if !ok {
_ = pc.Close()
return fmt.Errorf("internal: ListenPacket returned %T not *net.UDPConn", pc)
}
conn = c
applog.Printf("udp: [%s] listening on unicast :%d (service=%s)\n", s.cfg.Name, s.cfg.Port, s.cfg.ServiceType)
}
s.conn = conn
go s.run()
return nil
}
func (s *Server) run() {
defer close(s.done)
buf := make([]byte, 64*1024)
for {
select {
case <-s.stop:
return
default:
}
_ = s.conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
n, remote, err := s.conn.ReadFromUDP(buf)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
}
// Closed by stop(): exit silently.
return
}
if n == 0 {
continue
}
pkt := make([]byte, n)
copy(pkt, buf[:n])
go s.handle(pkt, remote)
}
}
func (s *Server) handle(pkt []byte, remote *net.UDPAddr) {
applog.Printf("udp: [%s] rx %d bytes from %s\n", s.cfg.Name, len(pkt), remote)
ev := Event{ConfigID: s.cfg.ID, Service: s.cfg.ServiceType, Source: remote.String()}
switch s.cfg.ServiceType {
case ServiceWSJT:
w, ok, err := ParseWSJT(pkt)
if err != nil {
applog.Printf("udp: [%s] WSJT parse error: %v\n", s.cfg.Name, err)
return
}
if !ok {
applog.Printf("udp: [%s] WSJT msg type ignored\n", s.cfg.Name)
return
}
applog.Printf("udp: [%s] WSJT decoded: prog=%q dx_call=%q grid=%q mode=%q freq=%d adif_len=%d\n",
s.cfg.Name, w.ProgramID, w.DXCall, w.DXGrid, w.Mode, w.FreqHz, len(w.LoggedADIF))
ev.DXCall = w.DXCall
ev.DXGrid = w.DXGrid
ev.Mode = w.Mode
ev.FreqHz = w.FreqHz
ev.LoggedADIF = w.LoggedADIF
case ServiceADIF:
ev.LoggedADIF = string(pkt)
case ServiceRemoteCall:
// Common payload shapes seen in the wild:
// "F4XYZ" (bare callsign)
// "CALL F4XYZ" (text prefix)
// "<CALLSIGN>F4XYZ<CALLSIGN>" (DXHunter-style tags)
// "<CALLSIGN>F4XYZ</CALLSIGN>" (proper XML)
// Strip every angle-bracket tag, normalise whitespace, take the
// last non-empty token. Upper-case for downstream consistency.
text := string(pkt)
// Drop every <...> tag (open or close) — works for both
// <CALLSIGN>...<CALLSIGN> and <CALLSIGN>...</CALLSIGN>.
for {
start := strings.IndexByte(text, '<')
if start < 0 {
break
}
end := strings.IndexByte(text[start:], '>')
if end < 0 {
break
}
text = text[:start] + " " + text[start+end+1:]
}
text = strings.TrimSpace(text)
parts := strings.Fields(text)
if len(parts) == 0 {
return
}
ev.DXCall = strings.ToUpper(parts[len(parts)-1])
case ServiceN1MM:
ev.RawText = string(pkt)
default:
return
}
// Empty events are useless; skip.
if ev.DXCall == "" && ev.LoggedADIF == "" && ev.RawText == "" {
return
}
select {
case s.out <- ev:
default:
// Drop on backpressure rather than block the read loop.
}
}
func (s *Server) close() {
s.mu.Lock()
if s.stopped {
s.mu.Unlock()
return
}
s.stopped = true
stop, done, conn := s.stop, s.done, s.conn
s.mu.Unlock()
if conn != nil {
_ = conn.Close()
}
if stop != nil {
close(stop)
}
if done != nil {
<-done
}
}
// ── Outbound emitter ──────────────────────────────────────────────────
// SendUDP sends payload to dst (host:port). Unicast or directed broadcast.
// Returns the error from the write; the connection is closed before return.
func SendUDP(dst string, payload []byte) error {
conn, err := net.Dial("udp4", dst)
if err != nil {
return fmt.Errorf("dial %s: %w", dst, err)
}
defer conn.Close()
_ = conn.SetWriteDeadline(time.Now().Add(2 * time.Second))
_, err = conn.Write(payload)
return err
}
// ── Manager ───────────────────────────────────────────────────────────
// Manager owns every inbound Server and exposes a helper to emit on
// outbound connections at QSO-save time. It reloads from the Repo on
// demand (after a CRUD change in the Settings panel).
type Manager struct {
repo *Repo
out chan Event
mu sync.Mutex
inbound map[int64]*Server
outbound []Config
}
func NewManager(repo *Repo) *Manager {
return &Manager{
repo: repo,
out: make(chan Event, 64),
inbound: map[int64]*Server{},
}
}
// Events returns the channel inbound parsed events are delivered on.
// The app exposes these as Wails events.
func (m *Manager) Events() <-chan Event { return m.out }
// Reload restarts every server based on the current Repo contents.
// Existing servers are stopped, the snapshot is rebuilt from scratch.
// Errors on individual rows are logged via the returned slice; the
// caller can surface them in the UI.
func (m *Manager) Reload(ctx context.Context) []string {
applog.Printf("udp: Reload() called")
m.mu.Lock()
old := m.inbound
m.inbound = map[int64]*Server{}
m.outbound = nil
m.mu.Unlock()
for _, s := range old {
s.close()
}
cfgs, err := m.repo.List(ctx)
if err != nil {
applog.Printf("udp: Reload list failed: %v", err)
return []string{fmt.Sprintf("load udp configs: %v", err)}
}
applog.Printf("udp: Reload found %d config(s) in DB", len(cfgs))
var errs []string
for _, c := range cfgs {
applog.Printf("udp: cfg id=%d name=%q dir=%s service=%s port=%d mcast=%v group=%q enabled=%v",
c.ID, c.Name, c.Direction, c.ServiceType, c.Port, c.Multicast, c.MulticastGroup, c.Enabled)
if !c.Enabled {
continue
}
if c.Direction == Outbound {
m.mu.Lock()
m.outbound = append(m.outbound, c)
m.mu.Unlock()
continue
}
srv := newServer(c, m.out)
if err := srv.start(); err != nil {
applog.Printf("udp: start %q failed: %v", c.Name, err)
errs = append(errs, fmt.Sprintf("%s: %v", c.Name, err))
continue
}
m.mu.Lock()
m.inbound[c.ID] = srv
m.mu.Unlock()
}
applog.Printf("udp: Reload done — %d server(s) running, %d error(s)", len(m.inbound), len(errs))
return errs
}
// Outbound returns the active outbound configs matching a service type.
// Used by the QSO save path to push notifications to listeners.
func (m *Manager) Outbound(service ServiceType) []Config {
m.mu.Lock()
defer m.mu.Unlock()
var out []Config
for _, c := range m.outbound {
if c.ServiceType == service {
out = append(out, c)
}
}
return out
}
// StopAll closes every running server. Called at app shutdown.
func (m *Manager) StopAll() {
m.mu.Lock()
old := m.inbound
m.inbound = map[int64]*Server{}
m.outbound = nil
m.mu.Unlock()
for _, s := range old {
s.close()
}
}
+176
View File
@@ -0,0 +1,176 @@
package udp
import (
"bytes"
"encoding/binary"
"fmt"
"strings"
)
// WSJT-X / JTDX / MSHV UDP protocol (WSJT-X v2 schema).
//
// Wire format:
// uint32 magic (0xadbccbda)
// uint32 schema (2 or 3)
// uint32 type (message id)
// QString id (the program's "id" — typically "WSJT-X")
// ... type-specific payload ...
//
// QString = int32 length followed by `length` UTF-8 bytes, or -1 for nil.
// QUtf8 in newer versions; same wire format for the common case.
//
// We only care about two messages here:
// Status (type 1) → exposes the current DX call so HamLog can pre-fill
// LoggedADIF (type 12) → carries the ADIF of the just-logged QSO
// Everything else (heartbeat, decodes, clears, status of other VFOs) is
// ignored.
const (
wsjtMagic = 0xadbccbda
wsjtMsgHeartbeat = 0
wsjtMsgStatus = 1
wsjtMsgDecode = 2
wsjtMsgClear = 3
wsjtMsgQSOLogged = 5
wsjtMsgLoggedADIF = 12
)
// WSJTEvent is the parsed, typed result of decoding a single packet.
// One of (DXCall, LoggedADIF) is non-empty depending on the message.
type WSJTEvent struct {
DXCall string // current "DX Call" field in the WSJT app
DXGrid string // optional grid for that call
Mode string // FT8 / FT4 / …
FreqHz int64 // current dial freq when available
LoggedADIF string // full ADIF text when message is LoggedADIF
ProgramID string // "WSJT-X" / "JTDX" / "MSHV" — for diagnostics / dedup
}
// ParseWSJT decodes one UDP packet. Returns ok=false for messages we
// don't care about (heartbeat, decode lines, clears, etc.).
func ParseWSJT(pkt []byte) (WSJTEvent, bool, error) {
if len(pkt) < 12 {
return WSJTEvent{}, false, fmt.Errorf("packet too short")
}
r := bytes.NewReader(pkt)
var magic, schema, mtype uint32
if err := binary.Read(r, binary.BigEndian, &magic); err != nil {
return WSJTEvent{}, false, err
}
if magic != wsjtMagic {
return WSJTEvent{}, false, fmt.Errorf("bad magic %#x", magic)
}
if err := binary.Read(r, binary.BigEndian, &schema); err != nil {
return WSJTEvent{}, false, err
}
_ = schema
if err := binary.Read(r, binary.BigEndian, &mtype); err != nil {
return WSJTEvent{}, false, err
}
id, err := readQString(r)
if err != nil {
return WSJTEvent{}, false, fmt.Errorf("read id: %w", err)
}
ev := WSJTEvent{ProgramID: id}
switch mtype {
case wsjtMsgStatus:
// Status payload order (v2):
// quint64 dial_frequency
// QUtf8 mode
// QUtf8 dx_call
// QUtf8 report
// QUtf8 tx_mode
// bool tx_enabled
// bool transmitting
// bool decoding
// qint32 rx_df
// qint32 tx_df
// QUtf8 de_call
// QUtf8 de_grid
// QUtf8 dx_grid
// ... (more fields appended in later schemas, we stop reading
// after dx_grid which is all we need)
var dialHz uint64
if err := binary.Read(r, binary.BigEndian, &dialHz); err != nil {
return WSJTEvent{}, false, err
}
ev.FreqHz = int64(dialHz)
mode, err := readQString(r)
if err != nil {
return WSJTEvent{}, false, err
}
ev.Mode = strings.ToUpper(strings.TrimSpace(mode))
dxCall, err := readQString(r)
if err != nil {
return WSJTEvent{}, false, err
}
ev.DXCall = strings.ToUpper(strings.TrimSpace(dxCall))
// Skip report, tx_mode (QUtf8), tx_enabled (bool), transmitting,
// decoding, rx_df (qint32), tx_df (qint32), de_call (QUtf8),
// de_grid (QUtf8) → then dx_grid.
for _, name := range []string{"report", "tx_mode"} {
if _, err := readQString(r); err != nil {
return ev, true, fmt.Errorf("read %s: %w", name, err)
}
}
// 3 booleans (each 1 byte)
for i := 0; i < 3; i++ {
var b uint8
if err := binary.Read(r, binary.BigEndian, &b); err != nil {
return ev, true, err
}
}
// 2 int32
var i32 int32
for i := 0; i < 2; i++ {
if err := binary.Read(r, binary.BigEndian, &i32); err != nil {
return ev, true, err
}
}
// de_call, de_grid, dx_grid
if _, err := readQString(r); err != nil {
return ev, true, err
}
if _, err := readQString(r); err != nil {
return ev, true, err
}
dxGrid, err := readQString(r)
if err != nil {
return ev, true, err
}
ev.DXGrid = strings.ToUpper(strings.TrimSpace(dxGrid))
return ev, true, nil
case wsjtMsgLoggedADIF:
// Payload: a single QString containing the ADIF record.
adif, err := readQString(r)
if err != nil {
return WSJTEvent{}, false, err
}
ev.LoggedADIF = adif
return ev, true, nil
}
return WSJTEvent{}, false, nil
}
// readQString reads a Qt QString as written by QDataStream: an int32 byte
// length (or -1 for null) followed by the UTF-8 bytes.
func readQString(r *bytes.Reader) (string, error) {
var n int32
if err := binary.Read(r, binary.BigEndian, &n); err != nil {
return "", err
}
if n <= 0 {
return "", nil
}
if int(n) > r.Len() {
return "", fmt.Errorf("short string: want %d have %d", n, r.Len())
}
buf := make([]byte, n)
if _, err := r.Read(buf); err != nil {
return "", err
}
return string(buf), nil
}