From a58a7197a9249f6dac55134fc26b493fe42e4406 Mon Sep 17 00:00:00 2001 From: Tyler Stiene Date: Mon, 1 Feb 2021 16:03:38 -0500 Subject: [PATCH] clean up bridge terminate --- bridge.go | 96 +++++++++++++++++++++++++++------------------ discord-handlers.go | 5 +-- discord.go | 33 +++++++++------- main.go | 30 ++++++++++---- mumble-handlers.go | 5 +++ mumble.go | 20 ++++------ 6 files changed, 114 insertions(+), 75 deletions(-) diff --git a/bridge.go b/bridge.go index 7ea2e52..6e6bac5 100644 --- a/bridge.go +++ b/bridge.go @@ -1,6 +1,7 @@ package main import ( + "context" "crypto/tls" "fmt" "log" @@ -24,9 +25,15 @@ type BridgeState struct { // The configuration data for this bridge BridgeConfig *BridgeConfig - // TODO + // External requests to kill the bridge BridgeDie chan bool + // Lock to only allow one bridge session at a time + lock sync.Mutex + + // Wait for bridge to exit cleanly + WaitExit *sync.WaitGroup + // Bridge connection Connected bool @@ -67,33 +74,44 @@ type BridgeState struct { // startBridge established the voice connection func (b *BridgeState) startBridge() { + b.lock.Lock() + defer b.lock.Unlock() b.BridgeDie = make(chan bool) + defer close(b.BridgeDie) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg := sync.WaitGroup{} + b.WaitExit = &wg var err error // DISCORD Connect Voice - + log.Println("Attempting to join Discord voice channel") b.DiscordVoice, err = b.DiscordSession.ChannelVoiceJoin(b.BridgeConfig.GID, b.BridgeConfig.CID, false, false) if err != nil { log.Println(err) + b.DiscordVoice.Disconnect() return } + defer b.DiscordVoice.Disconnect() defer b.DiscordVoice.Speaking(false) - defer b.DiscordVoice.Close() + log.Println("Discord Voice Connected") // MUMBLE Connect - b.MumbleStream = &MumbleDuplex{ - die: b.BridgeDie, - } + b.MumbleStream = &MumbleDuplex{} det := b.BridgeConfig.MumbleConfig.AudioListeners.Attach(b.MumbleStream) + defer det.Detach() var tlsConfig tls.Config if b.BridgeConfig.MumbleInsecure { tlsConfig.InsecureSkipVerify = true } + log.Println("Attempting to join Mumble") b.MumbleClient, err = gumble.DialWithDialer(new(net.Dialer), b.BridgeConfig.MumbleAddr, b.BridgeConfig.MumbleConfig, &tlsConfig) if err != nil { @@ -102,6 +120,7 @@ func (b *BridgeState) startBridge() { return } defer b.MumbleClient.Disconnect() + log.Println("Mumble Connected") // Shared Channels // Shared channels pass PCM information in 10ms chunks [480]int16 @@ -109,65 +128,66 @@ func (b *BridgeState) startBridge() { var toMumble = b.MumbleClient.AudioOutgoing() var toDiscord = make(chan []int16, 100) - log.Println("Mumble Connected") + defer close(toDiscord) + defer close(toMumble) // Start Passing Between // From Mumble - go b.MumbleStream.fromMumbleMixer(toDiscord, b.BridgeDie) + go b.MumbleStream.fromMumbleMixer(ctx, &wg, toDiscord) // From Discord b.DiscordStream = &DiscordDuplex{ Bridge: b, fromDiscordMap: make(map[uint32]fromDiscord), - die: b.BridgeDie, } - go b.DiscordStream.discordReceivePCM() - go b.DiscordStream.fromDiscordMixer(toMumble) + go b.DiscordStream.discordReceivePCM(ctx, &wg, cancel) + go b.DiscordStream.fromDiscordMixer(ctx, &wg, toMumble) // To Discord - go b.DiscordStream.discordSendPCM(toDiscord) + go b.DiscordStream.discordSendPCM(ctx, &wg, cancel, toDiscord) + // Monitor Mumble go func() { + wg.Add(1) ticker := time.NewTicker(500 * time.Millisecond) for { - <-ticker.C - if b.MumbleClient == nil || b.MumbleClient.State() != 2 { - if b.MumbleClient != nil { - log.Println("Lost mumble connection " + strconv.Itoa(int(b.MumbleClient.State()))) - } else { - log.Println("Lost mumble connection due to bridge dieing") - return + select { + case <-ticker.C: + if b.MumbleClient == nil || b.MumbleClient.State() != 2 { + if b.MumbleClient != nil { + log.Println("Lost mumble connection " + strconv.Itoa(int(b.MumbleClient.State()))) + } else { + log.Println("Lost mumble connection due to bridge dieing") + } + cancel() } - select { - case <-b.BridgeDie: - //die is already closed - - default: - close(b.BridgeDie) - } - + case <-ctx.Done(): + wg.Done() + return } } }() b.Connected = true + // Hold until cancelled or external die request select { + case <-ctx.Done(): + log.Println("Bridge internal context cancel") case <-b.BridgeDie: - log.Println("\nGot internal die request. Terminating Mumble-Bridge") - b.DiscordVoice.Disconnect() - det.Detach() - close(toDiscord) - close(toMumble) - close(b.BridgeDie) - b.Connected = false - b.DiscordVoice = nil - b.MumbleClient = nil - b.MumbleUsers = make(map[string]bool) - b.DiscordUsers = make(map[string]discordUser) + log.Println("Bridge die request received") + cancel() } + + b.Connected = false + wg.Wait() + log.Println("Terminating Bridge") + b.MumbleUsersMutex.Lock() + b.MumbleUsers = make(map[string]bool) + b.MumbleUsersMutex.Unlock() + b.DiscordUsers = make(map[string]discordUser) } func (b *BridgeState) discordStatusUpdate() { diff --git a/discord-handlers.go b/discord-handlers.go index 002edf2..ac46684 100644 --- a/discord-handlers.go +++ b/discord-handlers.go @@ -100,7 +100,6 @@ func (l *DiscordListener) messageCreate(s *discordgo.Session, m *discordgo.Messa if vs.UserID == m.Author.ID { log.Printf("Trying to leave GID %v and VID %v\n", g.ID, vs.ChannelID) l.Bridge.BridgeDie <- true - l.Bridge.BridgeDie = nil return } } @@ -167,7 +166,7 @@ func (l *DiscordListener) voiceUpdate(s *discordgo.Session, event *discordgo.Voi continue } - println("User joined Discord " + u.Username) + log.Println("User joined Discord " + u.Username) dm, err := s.UserChannelCreate(u.ID) if err != nil { log.Println("Error creating private channel for", u.Username) @@ -194,7 +193,7 @@ func (l *DiscordListener) voiceUpdate(s *discordgo.Session, event *discordgo.Voi // Remove users that are no longer connected for id := range l.Bridge.DiscordUsers { if l.Bridge.DiscordUsers[id].seen == false { - println("User left Discord channel " + l.Bridge.DiscordUsers[id].username) + log.Println("User left Discord channel " + l.Bridge.DiscordUsers[id].username) if l.Bridge.Connected && !l.Bridge.BridgeConfig.MumbleDisableText { l.Bridge.MumbleClient.Do(func() { l.Bridge.MumbleClient.Self.Channel.Send(fmt.Sprintf("%v has left Discord channel\n", l.Bridge.DiscordUsers[id].username), false) diff --git a/discord.go b/discord.go index 4224d5a..44befca 100644 --- a/discord.go +++ b/discord.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "log" "sync" @@ -25,8 +26,6 @@ type DiscordDuplex struct { discordMutex sync.Mutex discordMixerMutex sync.Mutex fromDiscordMap map[uint32]fromDiscord - - die chan bool } // OnError gets called by dgvoice when an error is encountered. @@ -43,7 +42,7 @@ var OnError = func(str string, err error) { // SendPCM will receive on the provied channel encode // received PCM data into Opus then send that to Discordgo -func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) { +func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc, pcm <-chan []int16) { const channels int = 1 const frameRate int = 48000 // audio sampling rate const frameSize int = 960 // uint16 size of each audio frame @@ -62,10 +61,12 @@ func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) { lastReady := true var readyTimeout *time.Timer + wg.Add(1) + for { select { - case <-dd.die: - log.Println("Killing discordSendPCM") + case <-ctx.Done(): + wg.Done() return default: } @@ -90,7 +91,8 @@ func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) { if lastReady == true { OnError(fmt.Sprintf("Discordgo not ready for opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil) readyTimeout = time.AfterFunc(30*time.Second, func() { - dd.die <- true + log.Println("set ready timeout") + cancel() }) lastReady = false } @@ -112,19 +114,21 @@ func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) { // ReceivePCM will receive on the the Discordgo OpusRecv channel and decode // the opus audio into PCM then send it on the provided channel. -func (dd *DiscordDuplex) discordReceivePCM() { +func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc) { var err error lastReady := true var readyTimeout *time.Timer + wg.Add(1) + for { if dd.Bridge.DiscordVoice.Ready == false || dd.Bridge.DiscordVoice.OpusRecv == nil { if lastReady == true { OnError(fmt.Sprintf("Discordgo not to receive opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil) readyTimeout = time.AfterFunc(30*time.Second, func() { log.Println("set ready timeout") - dd.die <- true + cancel() }) lastReady = false } @@ -139,10 +143,10 @@ func (dd *DiscordDuplex) discordReceivePCM() { var p *discordgo.Packet select { - case p, ok = <-dd.Bridge.DiscordVoice.OpusRecv: - case <-dd.die: - log.Println("killing discord ReceivePCM") + case <-ctx.Done(): + wg.Done() return + case p, ok = <-dd.Bridge.DiscordVoice.OpusRecv: } if !ok { @@ -196,14 +200,15 @@ func (dd *DiscordDuplex) discordReceivePCM() { } } -func (dd *DiscordDuplex) fromDiscordMixer(toMumble chan<- gumble.AudioBuffer) { +func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGroup, toMumble chan<- gumble.AudioBuffer) { ticker := time.NewTicker(10 * time.Millisecond) sendAudio := false + wg.Add(1) for { select { - case <-dd.die: - log.Println("killing fromDiscordMixer") + case <-ctx.Done(): + wg.Done() return case <-ticker.C: } diff --git a/main.go b/main.go index 52c6a33..3474313 100644 --- a/main.go +++ b/main.go @@ -112,17 +112,16 @@ func main() { } // MUMBLE SETUP - MumbleConfig := gumble.NewConfig() - Bridge.BridgeConfig.MumbleConfig = MumbleConfig - MumbleConfig.Username = *mumbleUsername - MumbleConfig.Password = *mumblePassword - MumbleConfig.AudioInterval = time.Millisecond * 10 + Bridge.BridgeConfig.MumbleConfig = gumble.NewConfig() + Bridge.BridgeConfig.MumbleConfig.Username = *mumbleUsername + Bridge.BridgeConfig.MumbleConfig.Password = *mumblePassword + Bridge.BridgeConfig.MumbleConfig.AudioInterval = time.Millisecond * 10 Bridge.MumbleListener = &MumbleListener{ Bridge: Bridge, } - MumbleConfig.Attach(gumbleutil.Listener{ + Bridge.BridgeConfig.MumbleConfig.Attach(gumbleutil.Listener{ Connect: Bridge.MumbleListener.mumbleConnect, UserChange: Bridge.MumbleListener.mumbleUserChange, }) @@ -171,7 +170,12 @@ func main() { case "constant": log.Println("bridge starting in constant mode") Bridge.Mode = bridgeModeConstant - go Bridge.startBridge() + go func() { + for { + Bridge.startBridge() + log.Println("Bridge died. Restarting") + } + }() default: Bridge.DiscordSession.Close() log.Fatalln("invalid bridge mode set") @@ -179,8 +183,18 @@ func main() { go Bridge.discordStatusUpdate() + // Shutdown on OS signal sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill) <-sc - log.Println("Bot shutting down") + + // Signal the bridge to exit cleanly + Bridge.BridgeDie <- true + + log.Println("OS Signal. Bot shutting down") + + // Wait or the bridge to exit cleanly + if Bridge.Connected { + Bridge.WaitExit.Wait() + } } diff --git a/mumble-handlers.go b/mumble-handlers.go index cb4ead0..860d5da 100644 --- a/mumble-handlers.go +++ b/mumble-handlers.go @@ -1,6 +1,7 @@ package main import ( + "log" "strings" "layeh.com/gumble/gumble" @@ -38,6 +39,8 @@ func (l *MumbleListener) mumbleUserChange(e *gumble.UserChangeEvent) { if e.Type.Has(gumble.UserChangeConnected) { + log.Println("User connected to mumble " + e.User.Name) + if !l.Bridge.BridgeConfig.MumbleDisableText { e.User.Send("Mumble-Discord-Bridge v" + version) @@ -63,7 +66,9 @@ func (l *MumbleListener) mumbleUserChange(e *gumble.UserChangeEvent) { // Send discord a notice l.Bridge.discordSendMessageAll(e.User.Name + " has joined mumble") } + if e.Type.Has(gumble.UserChangeDisconnected) { l.Bridge.discordSendMessageAll(e.User.Name + " has left mumble") + log.Println("User disconnected from mumble " + e.User.Name) } } diff --git a/mumble.go b/mumble.go index 8912989..4f99c70 100644 --- a/mumble.go +++ b/mumble.go @@ -1,6 +1,7 @@ package main import ( + "context" "log" "sync" "time" @@ -14,9 +15,7 @@ var fromMumbleArr []chan gumble.AudioBuffer var mumbleStreamingArr []bool // MumbleDuplex - listenera and outgoing -type MumbleDuplex struct { - die chan bool -} +type MumbleDuplex struct{} // OnAudioStream - Spawn routines to handle incoming packets func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { @@ -30,12 +29,10 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { mutex.Unlock() go func() { + // TODO kill go routine on cleanup log.Println("new mumble audio stream", e.User.Name) for { select { - case <-m.die: - log.Println("Removing mumble audio stream") - return case p := <-e.C: // log.Println("audio packet", p.Sender.Name, len(p.AudioBuffer)) @@ -49,17 +46,19 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { return } -func (m MumbleDuplex) fromMumbleMixer(toDiscord chan []int16, die chan bool) { +func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, toDiscord chan []int16) { ticker := time.NewTicker(10 * time.Millisecond) sendAudio := false + wg.Add(1) for { select { - case <-die: - log.Println("Killing fromMumbleMixer") + case <-ctx.Done(): + wg.Done() return default: } + <-ticker.C mutex.Lock() @@ -99,9 +98,6 @@ func (m MumbleDuplex) fromMumbleMixer(toDiscord chan []int16, die chan bool) { if sendAudio { select { case toDiscord <- outBuf: - case <-die: - log.Println("Killing fromMumbleMixer") - return default: log.Println("toDiscord buffer full. Dropping packet") }