clean up bridge terminate

This commit is contained in:
Tyler Stiene 2021-02-01 16:03:38 -05:00 committed by Steve
parent ed99e7aa8f
commit 6910725d22
6 changed files with 114 additions and 75 deletions

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"crypto/tls"
"fmt"
"log"
@ -24,9 +25,15 @@ type BridgeState struct {
// The configuration data for this bridge
BridgeConfig *BridgeConfig
// TODO
// External requests to kill the bridge
BridgeDie chan bool
// Lock to only allow one bridge session at a time
lock sync.Mutex
// Wait for bridge to exit cleanly
WaitExit *sync.WaitGroup
// Bridge connection
Connected bool
@ -67,33 +74,44 @@ type BridgeState struct {
// startBridge established the voice connection
func (b *BridgeState) startBridge() {
b.lock.Lock()
defer b.lock.Unlock()
b.BridgeDie = make(chan bool)
defer close(b.BridgeDie)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := sync.WaitGroup{}
b.WaitExit = &wg
var err error
// DISCORD Connect Voice
log.Println("Attempting to join Discord voice channel")
b.DiscordVoice, err = b.DiscordSession.ChannelVoiceJoin(b.BridgeConfig.GID, b.BridgeConfig.CID, false, false)
if err != nil {
log.Println(err)
b.DiscordVoice.Disconnect()
return
}
defer b.DiscordVoice.Disconnect()
defer b.DiscordVoice.Speaking(false)
defer b.DiscordVoice.Close()
log.Println("Discord Voice Connected")
// MUMBLE Connect
b.MumbleStream = &MumbleDuplex{
die: b.BridgeDie,
}
b.MumbleStream = &MumbleDuplex{}
det := b.BridgeConfig.MumbleConfig.AudioListeners.Attach(b.MumbleStream)
defer det.Detach()
var tlsConfig tls.Config
if b.BridgeConfig.MumbleInsecure {
tlsConfig.InsecureSkipVerify = true
}
log.Println("Attempting to join Mumble")
b.MumbleClient, err = gumble.DialWithDialer(new(net.Dialer), b.BridgeConfig.MumbleAddr, b.BridgeConfig.MumbleConfig, &tlsConfig)
if err != nil {
@ -102,6 +120,7 @@ func (b *BridgeState) startBridge() {
return
}
defer b.MumbleClient.Disconnect()
log.Println("Mumble Connected")
// Shared Channels
// Shared channels pass PCM information in 10ms chunks [480]int16
@ -109,65 +128,66 @@ func (b *BridgeState) startBridge() {
var toMumble = b.MumbleClient.AudioOutgoing()
var toDiscord = make(chan []int16, 100)
log.Println("Mumble Connected")
defer close(toDiscord)
defer close(toMumble)
// Start Passing Between
// From Mumble
go b.MumbleStream.fromMumbleMixer(toDiscord, b.BridgeDie)
go b.MumbleStream.fromMumbleMixer(ctx, &wg, toDiscord)
// From Discord
b.DiscordStream = &DiscordDuplex{
Bridge: b,
fromDiscordMap: make(map[uint32]fromDiscord),
die: b.BridgeDie,
}
go b.DiscordStream.discordReceivePCM()
go b.DiscordStream.fromDiscordMixer(toMumble)
go b.DiscordStream.discordReceivePCM(ctx, &wg, cancel)
go b.DiscordStream.fromDiscordMixer(ctx, &wg, toMumble)
// To Discord
go b.DiscordStream.discordSendPCM(toDiscord)
go b.DiscordStream.discordSendPCM(ctx, &wg, cancel, toDiscord)
// Monitor Mumble
go func() {
wg.Add(1)
ticker := time.NewTicker(500 * time.Millisecond)
for {
<-ticker.C
if b.MumbleClient == nil || b.MumbleClient.State() != 2 {
if b.MumbleClient != nil {
log.Println("Lost mumble connection " + strconv.Itoa(int(b.MumbleClient.State())))
} else {
log.Println("Lost mumble connection due to bridge dieing")
return
select {
case <-ticker.C:
if b.MumbleClient == nil || b.MumbleClient.State() != 2 {
if b.MumbleClient != nil {
log.Println("Lost mumble connection " + strconv.Itoa(int(b.MumbleClient.State())))
} else {
log.Println("Lost mumble connection due to bridge dieing")
}
cancel()
}
select {
case <-b.BridgeDie:
//die is already closed
default:
close(b.BridgeDie)
}
case <-ctx.Done():
wg.Done()
return
}
}
}()
b.Connected = true
// Hold until cancelled or external die request
select {
case <-ctx.Done():
log.Println("Bridge internal context cancel")
case <-b.BridgeDie:
log.Println("\nGot internal die request. Terminating Mumble-Bridge")
b.DiscordVoice.Disconnect()
det.Detach()
close(toDiscord)
close(toMumble)
close(b.BridgeDie)
b.Connected = false
b.DiscordVoice = nil
b.MumbleClient = nil
b.MumbleUsers = make(map[string]bool)
b.DiscordUsers = make(map[string]discordUser)
log.Println("Bridge die request received")
cancel()
}
b.Connected = false
wg.Wait()
log.Println("Terminating Bridge")
b.MumbleUsersMutex.Lock()
b.MumbleUsers = make(map[string]bool)
b.MumbleUsersMutex.Unlock()
b.DiscordUsers = make(map[string]discordUser)
}
func (b *BridgeState) discordStatusUpdate() {

View File

@ -100,7 +100,6 @@ func (l *DiscordListener) messageCreate(s *discordgo.Session, m *discordgo.Messa
if vs.UserID == m.Author.ID {
log.Printf("Trying to leave GID %v and VID %v\n", g.ID, vs.ChannelID)
l.Bridge.BridgeDie <- true
l.Bridge.BridgeDie = nil
return
}
}
@ -167,7 +166,7 @@ func (l *DiscordListener) voiceUpdate(s *discordgo.Session, event *discordgo.Voi
continue
}
println("User joined Discord " + u.Username)
log.Println("User joined Discord " + u.Username)
dm, err := s.UserChannelCreate(u.ID)
if err != nil {
log.Println("Error creating private channel for", u.Username)
@ -194,7 +193,7 @@ func (l *DiscordListener) voiceUpdate(s *discordgo.Session, event *discordgo.Voi
// Remove users that are no longer connected
for id := range l.Bridge.DiscordUsers {
if l.Bridge.DiscordUsers[id].seen == false {
println("User left Discord channel " + l.Bridge.DiscordUsers[id].username)
log.Println("User left Discord channel " + l.Bridge.DiscordUsers[id].username)
if l.Bridge.Connected && !l.Bridge.BridgeConfig.MumbleDisableText {
l.Bridge.MumbleClient.Do(func() {
l.Bridge.MumbleClient.Self.Channel.Send(fmt.Sprintf("%v has left Discord channel\n", l.Bridge.DiscordUsers[id].username), false)

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"log"
"sync"
@ -25,8 +26,6 @@ type DiscordDuplex struct {
discordMutex sync.Mutex
discordMixerMutex sync.Mutex
fromDiscordMap map[uint32]fromDiscord
die chan bool
}
// OnError gets called by dgvoice when an error is encountered.
@ -43,7 +42,7 @@ var OnError = func(str string, err error) {
// SendPCM will receive on the provied channel encode
// received PCM data into Opus then send that to Discordgo
func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) {
func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc, pcm <-chan []int16) {
const channels int = 1
const frameRate int = 48000 // audio sampling rate
const frameSize int = 960 // uint16 size of each audio frame
@ -62,10 +61,12 @@ func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) {
lastReady := true
var readyTimeout *time.Timer
wg.Add(1)
for {
select {
case <-dd.die:
log.Println("Killing discordSendPCM")
case <-ctx.Done():
wg.Done()
return
default:
}
@ -90,7 +91,8 @@ func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) {
if lastReady == true {
OnError(fmt.Sprintf("Discordgo not ready for opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil)
readyTimeout = time.AfterFunc(30*time.Second, func() {
dd.die <- true
log.Println("set ready timeout")
cancel()
})
lastReady = false
}
@ -112,19 +114,21 @@ func (dd *DiscordDuplex) discordSendPCM(pcm <-chan []int16) {
// ReceivePCM will receive on the the Discordgo OpusRecv channel and decode
// the opus audio into PCM then send it on the provided channel.
func (dd *DiscordDuplex) discordReceivePCM() {
func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc) {
var err error
lastReady := true
var readyTimeout *time.Timer
wg.Add(1)
for {
if dd.Bridge.DiscordVoice.Ready == false || dd.Bridge.DiscordVoice.OpusRecv == nil {
if lastReady == true {
OnError(fmt.Sprintf("Discordgo not to receive opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil)
readyTimeout = time.AfterFunc(30*time.Second, func() {
log.Println("set ready timeout")
dd.die <- true
cancel()
})
lastReady = false
}
@ -139,10 +143,10 @@ func (dd *DiscordDuplex) discordReceivePCM() {
var p *discordgo.Packet
select {
case p, ok = <-dd.Bridge.DiscordVoice.OpusRecv:
case <-dd.die:
log.Println("killing discord ReceivePCM")
case <-ctx.Done():
wg.Done()
return
case p, ok = <-dd.Bridge.DiscordVoice.OpusRecv:
}
if !ok {
@ -196,14 +200,15 @@ func (dd *DiscordDuplex) discordReceivePCM() {
}
}
func (dd *DiscordDuplex) fromDiscordMixer(toMumble chan<- gumble.AudioBuffer) {
func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGroup, toMumble chan<- gumble.AudioBuffer) {
ticker := time.NewTicker(10 * time.Millisecond)
sendAudio := false
wg.Add(1)
for {
select {
case <-dd.die:
log.Println("killing fromDiscordMixer")
case <-ctx.Done():
wg.Done()
return
case <-ticker.C:
}

30
main.go
View File

@ -112,17 +112,16 @@ func main() {
}
// MUMBLE SETUP
MumbleConfig := gumble.NewConfig()
Bridge.BridgeConfig.MumbleConfig = MumbleConfig
MumbleConfig.Username = *mumbleUsername
MumbleConfig.Password = *mumblePassword
MumbleConfig.AudioInterval = time.Millisecond * 10
Bridge.BridgeConfig.MumbleConfig = gumble.NewConfig()
Bridge.BridgeConfig.MumbleConfig.Username = *mumbleUsername
Bridge.BridgeConfig.MumbleConfig.Password = *mumblePassword
Bridge.BridgeConfig.MumbleConfig.AudioInterval = time.Millisecond * 10
Bridge.MumbleListener = &MumbleListener{
Bridge: Bridge,
}
MumbleConfig.Attach(gumbleutil.Listener{
Bridge.BridgeConfig.MumbleConfig.Attach(gumbleutil.Listener{
Connect: Bridge.MumbleListener.mumbleConnect,
UserChange: Bridge.MumbleListener.mumbleUserChange,
})
@ -171,7 +170,12 @@ func main() {
case "constant":
log.Println("bridge starting in constant mode")
Bridge.Mode = bridgeModeConstant
go Bridge.startBridge()
go func() {
for {
Bridge.startBridge()
log.Println("Bridge died. Restarting")
}
}()
default:
Bridge.DiscordSession.Close()
log.Fatalln("invalid bridge mode set")
@ -179,8 +183,18 @@ func main() {
go Bridge.discordStatusUpdate()
// Shutdown on OS signal
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill)
<-sc
log.Println("Bot shutting down")
// Signal the bridge to exit cleanly
Bridge.BridgeDie <- true
log.Println("OS Signal. Bot shutting down")
// Wait or the bridge to exit cleanly
if Bridge.Connected {
Bridge.WaitExit.Wait()
}
}

View File

@ -1,6 +1,7 @@
package main
import (
"log"
"strings"
"layeh.com/gumble/gumble"
@ -38,6 +39,8 @@ func (l *MumbleListener) mumbleUserChange(e *gumble.UserChangeEvent) {
if e.Type.Has(gumble.UserChangeConnected) {
log.Println("User connected to mumble " + e.User.Name)
if !l.Bridge.BridgeConfig.MumbleDisableText {
e.User.Send("Mumble-Discord-Bridge v" + version)
@ -63,7 +66,9 @@ func (l *MumbleListener) mumbleUserChange(e *gumble.UserChangeEvent) {
// Send discord a notice
l.Bridge.discordSendMessageAll(e.User.Name + " has joined mumble")
}
if e.Type.Has(gumble.UserChangeDisconnected) {
l.Bridge.discordSendMessageAll(e.User.Name + " has left mumble")
log.Println("User disconnected from mumble " + e.User.Name)
}
}

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"log"
"sync"
"time"
@ -14,9 +15,7 @@ var fromMumbleArr []chan gumble.AudioBuffer
var mumbleStreamingArr []bool
// MumbleDuplex - listenera and outgoing
type MumbleDuplex struct {
die chan bool
}
type MumbleDuplex struct{}
// OnAudioStream - Spawn routines to handle incoming packets
func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
@ -30,12 +29,10 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
mutex.Unlock()
go func() {
// TODO kill go routine on cleanup
log.Println("new mumble audio stream", e.User.Name)
for {
select {
case <-m.die:
log.Println("Removing mumble audio stream")
return
case p := <-e.C:
// log.Println("audio packet", p.Sender.Name, len(p.AudioBuffer))
@ -49,17 +46,19 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
return
}
func (m MumbleDuplex) fromMumbleMixer(toDiscord chan []int16, die chan bool) {
func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, toDiscord chan []int16) {
ticker := time.NewTicker(10 * time.Millisecond)
sendAudio := false
wg.Add(1)
for {
select {
case <-die:
log.Println("Killing fromMumbleMixer")
case <-ctx.Done():
wg.Done()
return
default:
}
<-ticker.C
mutex.Lock()
@ -99,9 +98,6 @@ func (m MumbleDuplex) fromMumbleMixer(toDiscord chan []int16, die chan bool) {
if sendAudio {
select {
case toDiscord <- outBuf:
case <-die:
log.Println("Killing fromMumbleMixer")
return
default:
log.Println("toDiscord buffer full. Dropping packet")
}