Browse Source

Rewrite listeners logic to allow automatic re-init

This new implementation introduces the concept of a listener hub which
manages the initialization and accept loop for each transport.

Issue #88
Martin Hebnes Pedersen 2 years ago
parent
commit
29d81630aa
7 changed files with 299 additions and 182 deletions
  1. 10 2
      connect.go
  2. 3 3
      http.go
  3. 3 8
      interactive.go
  4. 81 160
      listen.go
  5. 195 0
      listener_hub.go
  6. 6 8
      main.go
  7. 1 1
      vendor/github.com/la5nta/wl2k-go

+ 10 - 2
connect.go

@@ -208,10 +208,14 @@ func waitBusy(b transport.BusyChannelChecker) {
 }
 
 func initWinmorTNC() error {
-	if wmTNC != nil {
+	if wmTNC != nil && wmTNC.Ping() == nil {
 		return nil
 	}
 
+	if wmTNC != nil {
+		wmTNC.Close()
+	}
+
 	var err error
 	wmTNC, err = winmor.Open(config.Winmor.Addr, fOptions.MyCall, config.Locator)
 	if err != nil {
@@ -240,10 +244,14 @@ func initWinmorTNC() error {
 }
 
 func initArdopTNC() error {
-	if adTNC != nil {
+	if adTNC != nil && adTNC.Ping() == nil {
 		return nil
 	}
 
+	if adTNC != nil {
+		adTNC.Close()
+	}
+
 	var err error
 	adTNC, err = ardop.OpenTCP(config.Ardop.Addr, fOptions.MyCall, config.Locator)
 	if err != nil {

+ 3 - 3
http.go

@@ -309,13 +309,13 @@ func uiHandler(w http.ResponseWriter, r *http.Request) {
 
 func getStatus() Status {
 	status := Status{
-		ActiveListeners: make([]string, 0, len(listeners)),
+		ActiveListeners: []string{},
 		Connected:       exchangeConn != nil,
 		HTTPClients:     websocketHub.ClientAddrs(),
 	}
 
-	for method := range listeners {
-		status.ActiveListeners = append(status.ActiveListeners, method)
+	for _, tl := range listenHub.Active() {
+		status.ActiveListeners = append(status.ActiveListeners, tl.Name())
 	}
 	sort.Strings(status.ActiveListeners)
 

+ 3 - 8
interactive.go

@@ -8,7 +8,6 @@ import (
 	"bytes"
 	"fmt"
 	"log"
-	"sort"
 	"strings"
 	"time"
 
@@ -96,14 +95,10 @@ func printInteractiveUsage() {
 func getPrompt() string {
 	var buf bytes.Buffer
 
-	methods := make([]string, 0, len(listeners))
-	for method := range listeners {
-		methods = append(methods, method)
-	}
+	status := getStatus()
 
-	if len(listeners) > 0 {
-		sort.Strings(methods)
-		fmt.Fprintf(&buf, "L%v", methods)
+	if len(status.ActiveListeners) > 0 {
+		fmt.Fprintf(&buf, "L%v", status.ActiveListeners)
 	}
 
 	fmt.Fprint(&buf, "> ")

+ 81 - 160
listen.go

@@ -15,50 +15,30 @@ import (
 	"github.com/la5nta/wl2k-go/transport/telnet"
 )
 
-type incomingConnect struct {
-	conn       net.Conn
-	remoteCall string
-	kind       string
-	freq       Frequency
-}
-
 func Unlisten(param string) {
 	methods := strings.FieldsFunc(param, SplitFunc)
 	for _, method := range methods {
-		ln, ok := listeners[method]
-		if !ok {
-			fmt.Printf("No active %s listener, ignoring.\n", method)
-		} else if err := ln.Close(); err != nil {
-			log.Printf("Unable to close %s listener: %s", method, err)
+		ok, err := listenHub.Disable(method)
+		if err != nil {
+			fmt.Printf("Unable to close %s listener: %s", method, err)
+		} else if !ok {
+			log.Printf("No active %s listener, ignoring.\n", method)
 		}
 	}
-
-	// Make sure the Web clients are updated with the list of active listeners
-	websocketHub.UpdateStatus()
 }
 
 func Listen(listenStr string) {
-	cc := make(chan incomingConnect, 2)
-
 	methods := strings.FieldsFunc(listenStr, SplitFunc)
 	for _, method := range methods {
 		switch strings.ToLower(method) {
 		case MethodWinmor:
-			if err := initWinmorTNC(); err != nil {
-				log.Fatal(err)
-			}
-
-			listenWinmor(cc)
+			listenHub.Enable(WINMORListener{})
 		case MethodArdop:
-			if err := initArdopTNC(); err != nil {
-				log.Fatal(err)
-			}
-
-			listenArdop(cc)
+			listenHub.Enable(ARDOPListener{})
 		case MethodTelnet:
-			listenTelnet(cc)
+			listenHub.Enable(TelnetListener{})
 		case MethodAX25:
-			listenAX25(cc)
+			listenHub.Enable(&AX25Listener{})
 		case MethodSerialTNC:
 			log.Printf("%s listen not implemented, ignoring.", method)
 		default:
@@ -66,164 +46,105 @@ func Listen(listenStr string) {
 			return
 		}
 	}
+	log.Printf("Listening for incoming traffic on %s...", listenStr)
+}
 
-	go func() {
-		for {
-			connect := <-cc
-			eventLog.LogConn("accept", connect.freq, connect.conn, nil)
-			log.Printf("Got connect (%s:%s)", connect.kind, connect.remoteCall)
-
-			err := exchange(connect.conn, connect.remoteCall, true)
-			if err != nil {
-				log.Printf("Exchange failed: %s", err)
-			} else {
-				log.Println("Disconnected.")
-			}
-		}
-	}()
+type AX25Listener struct{ stopBeacon chan<- struct{} }
 
-	log.Printf("Listening for incoming traffic (%s)...", listenStr)
-	websocketHub.UpdateStatus()
+func (l *AX25Listener) Init() (net.Listener, error) {
+	return ax25.ListenAX25(config.AX25.Port, fOptions.MyCall)
 }
 
-func listenWinmor(incoming chan<- incomingConnect) {
-	// RMS Express runs bw at 500Hz except when sending/receiving message. Why?
-	// ... Or is it cmdRobust True?
-	ln, err := wmTNC.Listen(config.Winmor.InboundBandwidth)
-	if err != nil {
-		log.Fatal(err)
+func (l *AX25Listener) BeaconStart() error {
+	if config.AX25.Beacon.Every > 0 {
+		l.stopBeacon = l.beaconLoop(time.Duration(config.AX25.Beacon.Every) * time.Second)
+	}
+	return nil
+}
+
+func (l *AX25Listener) BeaconStop() {
+	select {
+	case l.stopBeacon <- struct{}{}:
+	default:
 	}
+}
 
-	listeners[MethodWinmor] = ln
+func (l *AX25Listener) beaconLoop(dur time.Duration) chan<- struct{} {
+	stop := make(chan struct{}, 1)
 	go func() {
-		defer func() {
-			delete(listeners, MethodWinmor)
-			log.Printf("%s listener closed.", MethodWinmor)
-		}()
+		b, err := ax25.NewAX25Beacon(config.AX25.Port, fOptions.MyCall, config.AX25.Beacon.Destination, config.AX25.Beacon.Message)
+		if err != nil {
+			log.Printf("Unable to activate beacon: %s", err)
+			return
+		}
 
+		t := time.Tick(dur)
 		for {
-			conn, err := ln.Accept()
-			if err != nil {
+			select {
+			case <-t:
+				if err := b.Now(); err != nil {
+					log.Printf("%s beacon failed: %s", l.Name(), err)
+					return
+				}
+			case <-stop:
 				return
 			}
-
-			var freq Frequency
-			if rig, ok := rigs[config.Winmor.Rig]; ok {
-				f, _ := rig.GetFreq()
-				freq = Frequency(f)
-			}
-
-			incoming <- incomingConnect{
-				conn:       conn,
-				remoteCall: conn.RemoteAddr().String(),
-				kind:       MethodWinmor,
-				freq:       freq,
-			}
 		}
 	}()
+	return stop
 }
 
-func listenArdop(incoming chan<- incomingConnect) {
+func (l *AX25Listener) CurrentFreq() (Frequency, bool) { return 0, false }
+func (l *AX25Listener) Name() string                   { return MethodAX25 }
+
+type ARDOPListener struct{}
+
+func (l ARDOPListener) Name() string { return MethodArdop }
+func (l ARDOPListener) Init() (net.Listener, error) {
+	if err := initArdopTNC(); err != nil {
+		return nil, err
+	}
 	ln, err := adTNC.Listen()
 	if err != nil {
-		log.Fatal(err)
+		return nil, err
 	}
-
-	if sec := config.Ardop.BeaconInterval; sec > 0 {
-		adTNC.BeaconEvery(time.Duration(sec) * time.Second)
-	}
-
-	listeners[MethodArdop] = ln
-	go func() {
-		defer func() {
-			delete(listeners, MethodArdop)
-			log.Printf("%s listener closed.", MethodArdop)
-		}()
-
-		for {
-			conn, err := ln.Accept()
-			if err != nil {
-				return
-			}
-
-			var freq Frequency
-			if rig, ok := rigs[config.Ardop.Rig]; ok {
-				f, _ := rig.GetFreq()
-				freq = Frequency(f)
-			}
-
-			incoming <- incomingConnect{
-				conn:       conn,
-				remoteCall: conn.RemoteAddr().String(),
-				kind:       MethodArdop,
-				freq:       freq,
-			}
-		}
-	}()
+	return ln, err
 }
 
-func listenAX25(incoming chan<- incomingConnect) {
-	if config.AX25.Beacon.Every > 0 {
-		b, err := ax25.NewAX25Beacon(config.AX25.Port, fOptions.MyCall, config.AX25.Beacon.Destination, config.AX25.Beacon.Message)
-		if err != nil {
-			log.Printf("Unable to activate beacon: %s", err)
-		} else {
-			go b.Every(time.Duration(config.AX25.Beacon.Every) * time.Second)
-		}
+func (l ARDOPListener) CurrentFreq() (Frequency, bool) {
+	if rig, ok := rigs[config.Ardop.Rig]; ok {
+		f, _ := rig.GetFreq()
+		return Frequency(f), ok
 	}
+	return 0, false
+}
 
-	ln, err := ax25.ListenAX25(config.AX25.Port, fOptions.MyCall)
-	if err != nil {
-		log.Printf("Unable to start AX.25 listener: %s", err)
-		return
-	}
+func (l ARDOPListener) BeaconStart() error {
+	return adTNC.BeaconEvery(time.Duration(config.Ardop.BeaconInterval) * time.Second)
+}
 
-	listeners[MethodAX25] = ln
-	go func() {
-		defer func() {
-			delete(listeners, MethodAX25)
-			log.Printf("%s listener closed.", MethodAX25)
-		}()
+func (l ARDOPListener) BeaconStop() { adTNC.BeaconEvery(0) }
 
-		for {
-			conn, err := ln.Accept()
-			if err != nil {
-				return
-			}
+type WINMORListener struct{}
 
-			incoming <- incomingConnect{
-				conn:       conn,
-				remoteCall: conn.RemoteAddr().String(),
-				kind:       MethodAX25,
-			}
-		}
-	}()
+func (l WINMORListener) Name() string { return MethodWinmor }
+func (l WINMORListener) Init() (net.Listener, error) {
+	if err := initWinmorTNC(); err != nil {
+		return nil, err
+	}
+	return wmTNC.Listen(config.Winmor.InboundBandwidth)
 }
 
-func listenTelnet(incoming chan<- incomingConnect) {
-	ln, err := telnet.Listen(config.Telnet.ListenAddr)
-	if err != nil {
-		log.Fatal(err)
+func (l WINMORListener) CurrentFreq() (Frequency, bool) {
+	if rig, ok := rigs[config.Winmor.Rig]; ok {
+		f, _ := rig.GetFreq()
+		return Frequency(f), ok
 	}
+	return 0, false
+}
 
-	listeners[MethodTelnet] = ln
-	go func() {
-		defer func() {
-			delete(listeners, MethodTelnet)
-			log.Printf("%s listener closed.", MethodTelnet)
-		}()
-
-		for {
-			conn, err := ln.Accept()
-			if err != nil {
-				return
-			}
+type TelnetListener struct{}
 
-			incoming <- incomingConnect{
-				conn:       conn,
-				remoteCall: conn.(*telnet.Conn).RemoteCall(),
-				kind:       MethodTelnet,
-			}
-		}
-	}()
-}
+func (l TelnetListener) Name() string                   { return MethodTelnet }
+func (l TelnetListener) Init() (net.Listener, error)    { return telnet.Listen(config.Telnet.ListenAddr) }
+func (l TelnetListener) CurrentFreq() (Frequency, bool) { return 0, false }

+ 195 - 0
listener_hub.go

@@ -0,0 +1,195 @@
+// Copyright 2017 Martin Hebnes Pedersen (LA5NTA). All rights reserved.
+// Use of this source code is governed by the MIT-license that can be
+// found in the LICENSE file.
+
+package main
+
+import (
+	"log"
+	"net"
+	"sync"
+	"time"
+)
+
+type TransportListener interface {
+	Init() (net.Listener, error)
+	Name() string
+	CurrentFreq() (Frequency, bool)
+}
+
+type Beaconer interface {
+	BeaconStop()
+	BeaconStart() error
+}
+
+type Listener struct {
+	t   TransportListener
+	hub *ListenerHub
+
+	mu       sync.Mutex
+	isClosed bool
+	err      error
+	ln       net.Listener
+}
+
+func NewListener(t TransportListener) *Listener { return &Listener{t: t} }
+
+func (l *Listener) Err() error {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	return l.err
+}
+
+func (l *Listener) Close() error {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	if l.isClosed {
+		return l.err
+	}
+	l.isClosed = true
+
+	// If l.err is not nil, then the last attempt to open the listener failed and we don't have anything to close
+	if l.err != nil {
+		return l.err
+	}
+	return l.ln.Close()
+}
+
+func (l *Listener) listenLoop() {
+	var silenceErr bool
+	for {
+		l.mu.Lock()
+		if l.isClosed {
+			l.mu.Unlock()
+			break
+		}
+
+		// Try to init the TNC
+		l.ln, l.err = l.t.Init()
+		if l.err != nil {
+			l.mu.Unlock()
+			if !silenceErr {
+				log.Printf("Listener %s failed: %s", l.t.Name(), l.err)
+				log.Printf("Will try to re-establish listener in the background...")
+				silenceErr = true
+				websocketHub.UpdateStatus()
+			}
+			time.Sleep(time.Second)
+			continue
+		}
+		l.mu.Unlock()
+		if silenceErr {
+			log.Printf("Listener %s re-established", l.t.Name())
+			silenceErr = false
+			websocketHub.UpdateStatus()
+		}
+
+		if b, ok := l.t.(Beaconer); ok {
+			b.BeaconStart()
+		}
+
+		// Run the accept loop until an error occures
+		if err := l.acceptLoop(); err != nil {
+			log.Printf("Accept %s failed: %s", l.t.Name(), err)
+		}
+
+		if b, ok := l.t.(Beaconer); ok {
+			b.BeaconStop()
+		}
+	}
+}
+
+type RemoteCaller interface {
+	RemoteCall() string
+}
+
+func (l *Listener) acceptLoop() error {
+	for {
+		conn, err := l.ln.Accept()
+		if err != nil {
+			return err
+		}
+
+		remoteCall := conn.RemoteAddr().String()
+		if c, ok := conn.(RemoteCaller); ok {
+			remoteCall = c.RemoteCall()
+		}
+
+		freq, _ := l.t.CurrentFreq()
+
+		eventLog.LogConn("accept", freq, conn, nil)
+		log.Printf("Got connect (%s:%s)", l.t.Name(), remoteCall)
+
+		err = exchange(conn, remoteCall, true)
+		if err != nil {
+			log.Printf("Exchange failed: %s", err)
+		} else {
+			log.Println("Disconnected.")
+		}
+	}
+}
+
+type ListenerHub struct {
+	mu        sync.Mutex
+	listeners map[string]*Listener
+}
+
+func NewListenerHub() *ListenerHub {
+	return &ListenerHub{
+		listeners: map[string]*Listener{},
+	}
+}
+
+func (h *ListenerHub) Active() []TransportListener {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+
+	slice := make([]TransportListener, 0, len(h.listeners))
+	for _, l := range h.listeners {
+		if l.Err() != nil {
+			continue
+		}
+		slice = append(slice, l.t)
+	}
+	return slice
+}
+
+func (h *ListenerHub) Enable(t TransportListener) {
+	h.mu.Lock()
+	defer func() {
+		h.mu.Unlock()
+		websocketHub.UpdateStatus()
+	}()
+	l := NewListener(t)
+	if _, ok := h.listeners[t.Name()]; ok {
+		return
+	}
+	h.listeners[t.Name()] = l
+	go l.listenLoop()
+}
+
+func (h *ListenerHub) Disable(name string) (bool, error) {
+	h.mu.Lock()
+	defer func() {
+		h.mu.Unlock()
+		websocketHub.UpdateStatus()
+	}()
+	l, ok := h.listeners[name]
+	if !ok {
+		return false, nil
+	}
+	delete(h.listeners, name)
+	return true, l.Close()
+}
+
+func (h *ListenerHub) Close() {
+	h.mu.Lock()
+	defer func() {
+		h.mu.Unlock()
+		websocketHub.UpdateStatus()
+	}()
+	for k, l := range h.listeners {
+		l.Close()
+		delete(h.listeners, k)
+	}
+}

+ 6 - 8
main.go

@@ -140,10 +140,10 @@ var (
 	logWriter io.Writer
 	eventLog  *EventLogger
 
-	exchangeChan chan ex                 // The channel that the exchange loop is listening on
-	exchangeConn net.Conn                // Pointer to the active session connection (exchange)
-	listeners    map[string]net.Listener // Active listeners
-	mbox         *mailbox.DirHandler     // The mailbox
+	exchangeChan chan ex             // The channel that the exchange loop is listening on
+	exchangeConn net.Conn            // Pointer to the active session connection (exchange)
+	mbox         *mailbox.DirHandler // The mailbox
+	listenHub    *ListenerHub
 	appDir       string
 )
 
@@ -181,7 +181,7 @@ func optionsSet() *pflag.FlagSet {
 }
 
 func init() {
-	listeners = make(map[string]net.Listener)
+	listenHub = NewListenerHub()
 
 	var err error
 	appDir, err = mailbox.DefaultAppDir()
@@ -388,9 +388,7 @@ func helpHandle(args []string) {
 }
 
 func cleanup() {
-	for method := range listeners {
-		Unlisten(method)
-	}
+	listenHub.Close()
 
 	if wmTNC != nil {
 		if err := wmTNC.Close(); err != nil {

+ 1 - 1
vendor/github.com/la5nta/wl2k-go

@@ -1 +1 @@
-Subproject commit 88b1c68d041b2026bb51a04f97eb2fe54df65835
+Subproject commit 228d6c37e5507c8ac92aa921521e62cdac9ab7df