clean up bridge terminate

This commit is contained in:
Tyler Stiene 2021-02-01 16:03:38 -05:00
parent 2fe2080bcf
commit a58a7197a9
6 changed files with 114 additions and 75 deletions

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"log" "log"
@ -24,9 +25,15 @@ type BridgeState struct {
// The configuration data for this bridge // The configuration data for this bridge
BridgeConfig *BridgeConfig BridgeConfig *BridgeConfig
// TODO // External requests to kill the bridge
BridgeDie chan bool BridgeDie chan bool
// Lock to only allow one bridge session at a time
lock sync.Mutex
// Wait for bridge to exit cleanly
WaitExit *sync.WaitGroup
// Bridge connection // Bridge connection
Connected bool Connected bool
@ -67,33 +74,44 @@ type BridgeState struct {
// startBridge established the voice connection // startBridge established the voice connection
func (b *BridgeState) startBridge() { func (b *BridgeState) startBridge() {
b.lock.Lock()
defer b.lock.Unlock()
b.BridgeDie = make(chan bool) b.BridgeDie = make(chan bool)
defer close(b.BridgeDie)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := sync.WaitGroup{}
b.WaitExit = &wg
var err error var err error
// DISCORD Connect Voice // DISCORD Connect Voice
log.Println("Attempting to join Discord voice channel")
b.DiscordVoice, err = b.DiscordSession.ChannelVoiceJoin(b.BridgeConfig.GID, b.BridgeConfig.CID, false, false) b.DiscordVoice, err = b.DiscordSession.ChannelVoiceJoin(b.BridgeConfig.GID, b.BridgeConfig.CID, false, false)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
b.DiscordVoice.Disconnect()
return return
} }
defer b.DiscordVoice.Disconnect()
defer b.DiscordVoice.Speaking(false) defer b.DiscordVoice.Speaking(false)
defer b.DiscordVoice.Close() log.Println("Discord Voice Connected")
// MUMBLE Connect // MUMBLE Connect
b.MumbleStream = &MumbleDuplex{ b.MumbleStream = &MumbleDuplex{}
die: b.BridgeDie,
}
det := b.BridgeConfig.MumbleConfig.AudioListeners.Attach(b.MumbleStream) det := b.BridgeConfig.MumbleConfig.AudioListeners.Attach(b.MumbleStream)
defer det.Detach()
var tlsConfig tls.Config var tlsConfig tls.Config
if b.BridgeConfig.MumbleInsecure { if b.BridgeConfig.MumbleInsecure {
tlsConfig.InsecureSkipVerify = true tlsConfig.InsecureSkipVerify = true
} }
log.Println("Attempting to join Mumble")
b.MumbleClient, err = gumble.DialWithDialer(new(net.Dialer), b.BridgeConfig.MumbleAddr, b.BridgeConfig.MumbleConfig, &tlsConfig) b.MumbleClient, err = gumble.DialWithDialer(new(net.Dialer), b.BridgeConfig.MumbleAddr, b.BridgeConfig.MumbleConfig, &tlsConfig)
if err != nil { if err != nil {
@ -102,6 +120,7 @@ func (b *BridgeState) startBridge() {
return return
} }
defer b.MumbleClient.Disconnect() defer b.MumbleClient.Disconnect()
log.Println("Mumble Connected")
// Shared Channels // Shared Channels
// Shared channels pass PCM information in 10ms chunks [480]int16 // Shared channels pass PCM information in 10ms chunks [480]int16
@ -109,65 +128,66 @@ func (b *BridgeState) startBridge() {
var toMumble = b.MumbleClient.AudioOutgoing() var toMumble = b.MumbleClient.AudioOutgoing()
var toDiscord = make(chan []int16, 100) var toDiscord = make(chan []int16, 100)
log.Println("Mumble Connected") defer close(toDiscord)
defer close(toMumble)
// Start Passing Between // Start Passing Between
// From Mumble // From Mumble
go b.MumbleStream.fromMumbleMixer(toDiscord, b.BridgeDie) go b.MumbleStream.fromMumbleMixer(ctx, &wg, toDiscord)
// From Discord // From Discord
b.DiscordStream = &DiscordDuplex{ b.DiscordStream = &DiscordDuplex{
Bridge: b, Bridge: b,
fromDiscordMap: make(map[uint32]fromDiscord), fromDiscordMap: make(map[uint32]fromDiscord),
die: b.BridgeDie,
} }
go b.DiscordStream.discordReceivePCM() go b.DiscordStream.discordReceivePCM(ctx, &wg, cancel)
go b.DiscordStream.fromDiscordMixer(toMumble) go b.DiscordStream.fromDiscordMixer(ctx, &wg, toMumble)
// To Discord // To Discord
go b.DiscordStream.discordSendPCM(toDiscord) go b.DiscordStream.discordSendPCM(ctx, &wg, cancel, toDiscord)
// Monitor Mumble
go func() { go func() {
wg.Add(1)
ticker := time.NewTicker(500 * time.Millisecond) ticker := time.NewTicker(500 * time.Millisecond)
for { for {
<-ticker.C select {
if b.MumbleClient == nil || b.MumbleClient.State() != 2 { case <-ticker.C:
if b.MumbleClient != nil { if b.MumbleClient == nil || b.MumbleClient.State() != 2 {
log.Println("Lost mumble connection " + strconv.Itoa(int(b.MumbleClient.State()))) if b.MumbleClient != nil {
} else { log.Println("Lost mumble connection " + strconv.Itoa(int(b.MumbleClient.State())))
log.Println("Lost mumble connection due to bridge dieing") } else {
return log.Println("Lost mumble connection due to bridge dieing")
}
cancel()
} }
select { case <-ctx.Done():
case <-b.BridgeDie: wg.Done()
//die is already closed return
default:
close(b.BridgeDie)
}
} }
} }
}() }()
b.Connected = true b.Connected = true
// Hold until cancelled or external die request
select { select {
case <-ctx.Done():
log.Println("Bridge internal context cancel")
case <-b.BridgeDie: case <-b.BridgeDie:
log.Println("\nGot internal die request. Terminating Mumble-Bridge") log.Println("Bridge die request received")
b.DiscordVoice.Disconnect() cancel()
det.Detach()
close(toDiscord)
close(toMumble)
close(b.BridgeDie)
b.Connected = false
b.DiscordVoice = nil
b.MumbleClient = nil
b.MumbleUsers = make(map[string]bool)
b.DiscordUsers = make(map[string]discordUser)
} }
b.Connected = false
wg.Wait()
log.Println("Terminating Bridge")
b.MumbleUsersMutex.Lock()
b.MumbleUsers = make(map[string]bool)
b.MumbleUsersMutex.Unlock()
b.DiscordUsers = make(map[string]discordUser)
} }
func (b *BridgeState) discordStatusUpdate() { func (b *BridgeState) discordStatusUpdate() {

View File

@ -100,7 +100,6 @@ func (l *DiscordListener) messageCreate(s *discordgo.Session, m *discordgo.Messa
if vs.UserID == m.Author.ID { if vs.UserID == m.Author.ID {
log.Printf("Trying to leave GID %v and VID %v\n", g.ID, vs.ChannelID) log.Printf("Trying to leave GID %v and VID %v\n", g.ID, vs.ChannelID)
l.Bridge.BridgeDie <- true l.Bridge.BridgeDie <- true
l.Bridge.BridgeDie = nil
return return
} }
} }
@ -167,7 +166,7 @@ func (l *DiscordListener) voiceUpdate(s *discordgo.Session, event *discordgo.Voi
continue continue
} }
println("User joined Discord " + u.Username) log.Println("User joined Discord " + u.Username)
dm, err := s.UserChannelCreate(u.ID) dm, err := s.UserChannelCreate(u.ID)
if err != nil { if err != nil {
log.Println("Error creating private channel for", u.Username) log.Println("Error creating private channel for", u.Username)
@ -194,7 +193,7 @@ func (l *DiscordListener) voiceUpdate(s *discordgo.Session, event *discordgo.Voi
// Remove users that are no longer connected // Remove users that are no longer connected
for id := range l.Bridge.DiscordUsers { for id := range l.Bridge.DiscordUsers {
if l.Bridge.DiscordUsers[id].seen == false { if l.Bridge.DiscordUsers[id].seen == false {
println("User left Discord channel " + l.Bridge.DiscordUsers[id].username) log.Println("User left Discord channel " + l.Bridge.DiscordUsers[id].username)
if l.Bridge.Connected && !l.Bridge.BridgeConfig.MumbleDisableText { if l.Bridge.Connected && !l.Bridge.BridgeConfig.MumbleDisableText {
l.Bridge.MumbleClient.Do(func() { l.Bridge.MumbleClient.Do(func() {
l.Bridge.MumbleClient.Self.Channel.Send(fmt.Sprintf("%v has left Discord channel\n", l.Bridge.DiscordUsers[id].username), false) l.Bridge.MumbleClient.Self.Channel.Send(fmt.Sprintf("%v has left Discord channel\n", l.Bridge.DiscordUsers[id].username), false)

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"log" "log"
"sync" "sync"
@ -25,8 +26,6 @@ type DiscordDuplex struct {
discordMutex sync.Mutex discordMutex sync.Mutex
discordMixerMutex sync.Mutex discordMixerMutex sync.Mutex
fromDiscordMap map[uint32]fromDiscord fromDiscordMap map[uint32]fromDiscord
die chan bool
} }
// OnError gets called by dgvoice when an error is encountered. // OnError gets called by dgvoice when an error is encountered.
@ -43,7 +42,7 @@ var OnError = func(str string, err error) {
// SendPCM will receive on the provied channel encode // SendPCM will receive on the provied channel encode
// received PCM data into Opus then send that to Discordgo // received PCM data into Opus then send that to Discordgo
func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) { func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc, pcm <-chan []int16) {
const channels int = 1 const channels int = 1
const frameRate int = 48000 // audio sampling rate const frameRate int = 48000 // audio sampling rate
const frameSize int = 960 // uint16 size of each audio frame const frameSize int = 960 // uint16 size of each audio frame
@ -62,10 +61,12 @@ func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) {
lastReady := true lastReady := true
var readyTimeout *time.Timer var readyTimeout *time.Timer
wg.Add(1)
for { for {
select { select {
case <-dd.die: case <-ctx.Done():
log.Println("Killing discordSendPCM") wg.Done()
return return
default: default:
} }
@ -90,7 +91,8 @@ func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) {
if lastReady == true { if lastReady == true {
OnError(fmt.Sprintf("Discordgo not ready for opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil) OnError(fmt.Sprintf("Discordgo not ready for opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil)
readyTimeout = time.AfterFunc(30*time.Second, func() { readyTimeout = time.AfterFunc(30*time.Second, func() {
dd.die <- true log.Println("set ready timeout")
cancel()
}) })
lastReady = false lastReady = false
} }
@ -112,19 +114,21 @@ func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) {
// ReceivePCM will receive on the the Discordgo OpusRecv channel and decode // ReceivePCM will receive on the the Discordgo OpusRecv channel and decode
// the opus audio into PCM then send it on the provided channel. // the opus audio into PCM then send it on the provided channel.
func (dd *DiscordDuplex) discordReceivePCM() { func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc) {
var err error var err error
lastReady := true lastReady := true
var readyTimeout *time.Timer var readyTimeout *time.Timer
wg.Add(1)
for { for {
if dd.Bridge.DiscordVoice.Ready == false || dd.Bridge.DiscordVoice.OpusRecv == nil { if dd.Bridge.DiscordVoice.Ready == false || dd.Bridge.DiscordVoice.OpusRecv == nil {
if lastReady == true { if lastReady == true {
OnError(fmt.Sprintf("Discordgo not to receive opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil) OnError(fmt.Sprintf("Discordgo not to receive opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil)
readyTimeout = time.AfterFunc(30*time.Second, func() { readyTimeout = time.AfterFunc(30*time.Second, func() {
log.Println("set ready timeout") log.Println("set ready timeout")
dd.die <- true cancel()
}) })
lastReady = false lastReady = false
} }
@ -139,10 +143,10 @@ func (dd *DiscordDuplex) discordReceivePCM() {
var p *discordgo.Packet var p *discordgo.Packet
select { select {
case p, ok = <-dd.Bridge.DiscordVoice.OpusRecv: case <-ctx.Done():
case <-dd.die: wg.Done()
log.Println("killing discord ReceivePCM")
return return
case p, ok = <-dd.Bridge.DiscordVoice.OpusRecv:
} }
if !ok { if !ok {
@ -196,14 +200,15 @@ func (dd *DiscordDuplex) discordReceivePCM() {
} }
} }
func (dd *DiscordDuplex) fromDiscordMixer(toMumble chan<- gumble.AudioBuffer) { func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGroup, toMumble chan<- gumble.AudioBuffer) {
ticker := time.NewTicker(10 * time.Millisecond) ticker := time.NewTicker(10 * time.Millisecond)
sendAudio := false sendAudio := false
wg.Add(1)
for { for {
select { select {
case <-dd.die: case <-ctx.Done():
log.Println("killing fromDiscordMixer") wg.Done()
return return
case <-ticker.C: case <-ticker.C:
} }

30
main.go
View File

@ -112,17 +112,16 @@ func main() {
} }
// MUMBLE SETUP // MUMBLE SETUP
MumbleConfig := gumble.NewConfig() Bridge.BridgeConfig.MumbleConfig = gumble.NewConfig()
Bridge.BridgeConfig.MumbleConfig = MumbleConfig Bridge.BridgeConfig.MumbleConfig.Username = *mumbleUsername
MumbleConfig.Username = *mumbleUsername Bridge.BridgeConfig.MumbleConfig.Password = *mumblePassword
MumbleConfig.Password = *mumblePassword Bridge.BridgeConfig.MumbleConfig.AudioInterval = time.Millisecond * 10
MumbleConfig.AudioInterval = time.Millisecond * 10
Bridge.MumbleListener = &MumbleListener{ Bridge.MumbleListener = &MumbleListener{
Bridge: Bridge, Bridge: Bridge,
} }
MumbleConfig.Attach(gumbleutil.Listener{ Bridge.BridgeConfig.MumbleConfig.Attach(gumbleutil.Listener{
Connect: Bridge.MumbleListener.mumbleConnect, Connect: Bridge.MumbleListener.mumbleConnect,
UserChange: Bridge.MumbleListener.mumbleUserChange, UserChange: Bridge.MumbleListener.mumbleUserChange,
}) })
@ -171,7 +170,12 @@ func main() {
case "constant": case "constant":
log.Println("bridge starting in constant mode") log.Println("bridge starting in constant mode")
Bridge.Mode = bridgeModeConstant Bridge.Mode = bridgeModeConstant
go Bridge.startBridge() go func() {
for {
Bridge.startBridge()
log.Println("Bridge died. Restarting")
}
}()
default: default:
Bridge.DiscordSession.Close() Bridge.DiscordSession.Close()
log.Fatalln("invalid bridge mode set") log.Fatalln("invalid bridge mode set")
@ -179,8 +183,18 @@ func main() {
go Bridge.discordStatusUpdate() go Bridge.discordStatusUpdate()
// Shutdown on OS signal
sc := make(chan os.Signal, 1) sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill) signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
<-sc <-sc
log.Println("Bot shutting down")
// Signal the bridge to exit cleanly
Bridge.BridgeDie <- true
log.Println("OS Signal. Bot shutting down")
// Wait or the bridge to exit cleanly
if Bridge.Connected {
Bridge.WaitExit.Wait()
}
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"log"
"strings" "strings"
"layeh.com/gumble/gumble" "layeh.com/gumble/gumble"
@ -38,6 +39,8 @@ func (l *MumbleListener) mumbleUserChange(e *gumble.UserChangeEvent) {
if e.Type.Has(gumble.UserChangeConnected) { if e.Type.Has(gumble.UserChangeConnected) {
log.Println("User connected to mumble " + e.User.Name)
if !l.Bridge.BridgeConfig.MumbleDisableText { if !l.Bridge.BridgeConfig.MumbleDisableText {
e.User.Send("Mumble-Discord-Bridge v" + version) e.User.Send("Mumble-Discord-Bridge v" + version)
@ -63,7 +66,9 @@ func (l *MumbleListener) mumbleUserChange(e *gumble.UserChangeEvent) {
// Send discord a notice // Send discord a notice
l.Bridge.discordSendMessageAll(e.User.Name + " has joined mumble") l.Bridge.discordSendMessageAll(e.User.Name + " has joined mumble")
} }
if e.Type.Has(gumble.UserChangeDisconnected) { if e.Type.Has(gumble.UserChangeDisconnected) {
l.Bridge.discordSendMessageAll(e.User.Name + " has left mumble") l.Bridge.discordSendMessageAll(e.User.Name + " has left mumble")
log.Println("User disconnected from mumble " + e.User.Name)
} }
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"log" "log"
"sync" "sync"
"time" "time"
@ -14,9 +15,7 @@ var fromMumbleArr []chan gumble.AudioBuffer
var mumbleStreamingArr []bool var mumbleStreamingArr []bool
// MumbleDuplex - listenera and outgoing // MumbleDuplex - listenera and outgoing
type MumbleDuplex struct { type MumbleDuplex struct{}
die chan bool
}
// OnAudioStream - Spawn routines to handle incoming packets // OnAudioStream - Spawn routines to handle incoming packets
func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
@ -30,12 +29,10 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
mutex.Unlock() mutex.Unlock()
go func() { go func() {
// TODO kill go routine on cleanup
log.Println("new mumble audio stream", e.User.Name) log.Println("new mumble audio stream", e.User.Name)
for { for {
select { select {
case <-m.die:
log.Println("Removing mumble audio stream")
return
case p := <-e.C: case p := <-e.C:
// log.Println("audio packet", p.Sender.Name, len(p.AudioBuffer)) // log.Println("audio packet", p.Sender.Name, len(p.AudioBuffer))
@ -49,17 +46,19 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
return return
} }
func (m MumbleDuplex) fromMumbleMixer(toDiscord chan []int16, die chan bool) { func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, toDiscord chan []int16) {
ticker := time.NewTicker(10 * time.Millisecond) ticker := time.NewTicker(10 * time.Millisecond)
sendAudio := false sendAudio := false
wg.Add(1)
for { for {
select { select {
case <-die: case <-ctx.Done():
log.Println("Killing fromMumbleMixer") wg.Done()
return return
default: default:
} }
<-ticker.C <-ticker.C
mutex.Lock() mutex.Lock()
@ -99,9 +98,6 @@ func (m MumbleDuplex) fromMumbleMixer(toDiscord chan []int16, die chan bool) {
if sendAudio { if sendAudio {
select { select {
case toDiscord <- outBuf: case toDiscord <- outBuf:
case <-die:
log.Println("Killing fromMumbleMixer")
return
default: default:
log.Println("toDiscord buffer full. Dropping packet") log.Println("toDiscord buffer full. Dropping packet")
} }