From ca1ba1d099b247bec5364610b93c71737649a4d2 Mon Sep 17 00:00:00 2001 From: Steve Date: Tue, 29 Dec 2020 18:19:44 -0500 Subject: [PATCH] more cleanup on manual linkage, attempts to get discord->mumble working --- discord.go | 38 +++++++++++++++++++++++++++++++++++--- handlers.go | 7 ++++++- main.go | 2 +- mumble.go | 25 +++++++++++++++++++++---- yammer.go | 14 ++++++++++---- 5 files changed, 73 insertions(+), 13 deletions(-) diff --git a/discord.go b/discord.go index 8ff77b8..15a7ac4 100644 --- a/discord.go +++ b/discord.go @@ -22,6 +22,10 @@ var discordMutex sync.Mutex var discordMixerMutex sync.Mutex var fromDiscordMap = make(map[uint32]fromDiscord) +func DiscordReset() { + fromDiscordMap = make(map[uint32]fromDiscord) +} + // OnError gets called by dgvoice when an error is encountered. // By default logs to STDERR var OnError = func(str string, err error) { @@ -56,9 +60,15 @@ func discordSendPCM(v *discordgo.VoiceConnection, pcm <-chan []int16, die chan b var readyTimeout *time.Timer for { + select { + case <-die: + log.Println("Killing discordSendPCM") + return + default: + } <-ticker.C - if len(pcm) > 1 { + log.Println("looking for speech") if !streaming { v.Speaking(true) streaming = true @@ -88,7 +98,7 @@ func discordSendPCM(v *discordgo.VoiceConnection, pcm <-chan []int16, die chan b lastReady = true readyTimeout.Stop() } - + log.Println("sending packets to mumble") v.OpusSend <- opus } else { if streaming { @@ -108,10 +118,19 @@ func discordReceivePCM(v *discordgo.VoiceConnection, die chan bool) { var readyTimeout *time.Timer for { + log.Println("Start loop") + select { + case <-die: + log.Println("killing discord ReceivePCM") + return + default: + } + log.Println("checked for death") if v.Ready == false || v.OpusRecv == nil { if lastReady == true { OnError(fmt.Sprintf("Discordgo not to receive opus packets. %+v : %+v", v.Ready, v.OpusSend), nil) readyTimeout = time.AfterFunc(30*time.Second, func() { + log.Println("set ready timeout") die <- true }) lastReady = false @@ -123,13 +142,16 @@ func discordReceivePCM(v *discordgo.VoiceConnection, die chan bool) { readyTimeout.Stop() } + log.Println("rec1") p, ok := <-v.OpusRecv + log.Println("rec2") if !ok { log.Println("Opus not ok") continue } discordMutex.Lock() + log.Println("got lock one") _, ok = fromDiscordMap[p.SSRC] discordMutex.Unlock() if !ok { @@ -147,6 +169,7 @@ func discordReceivePCM(v *discordgo.VoiceConnection, die chan bool) { } discordMutex.Lock() + log.Println("got lock 2") p.PCM, err = fromDiscordMap[p.SSRC].decoder.Decode(p.Opus, 960, false) discordMutex.Unlock() if err != nil { @@ -155,17 +178,25 @@ func discordReceivePCM(v *discordgo.VoiceConnection, die chan bool) { } discordMutex.Lock() + log.Println("got lock 3") fromDiscordMap[p.SSRC].pcm <- p.PCM[0:480] fromDiscordMap[p.SSRC].pcm <- p.PCM[480:960] discordMutex.Unlock() + log.Println("finish loop") } } -func fromDiscordMixer(toMumble chan<- gumble.AudioBuffer) { +func fromDiscordMixer(toMumble chan<- gumble.AudioBuffer, die chan bool) { ticker := time.NewTicker(10 * time.Millisecond) sendAudio := false for { + select { + case <-die: + log.Println("killing fromDiscordMixer") + return + default: + } <-ticker.C discordMutex.Lock() @@ -206,6 +237,7 @@ func fromDiscordMixer(toMumble chan<- gumble.AudioBuffer) { if sendAudio { select { case toMumble <- outBuf: + log.Println("sending to mumble") default: log.Println("toMumble buffer full. Dropping packet") } diff --git a/handlers.go b/handlers.go index 5a24737..f275e05 100644 --- a/handlers.go +++ b/handlers.go @@ -71,6 +71,8 @@ func messageCreate(s *discordgo.Session, m *discordgo.MessageCreate) { log.Printf("Trying to leave GID %v and VID %v\n", g.ID, vs.ChannelID) YBConfig.ActiveConns[vs.ChannelID] <- true YBConfig.ActiveConns[vs.ChannelID] = nil + MumbleReset() + DiscordReset() return } } @@ -97,7 +99,10 @@ func messageCreate(s *discordgo.Session, m *discordgo.MessageCreate) { if vs.UserID == m.Author.ID { log.Printf("Trying to refresh GID %v and VID %v\n", g.ID, vs.ChannelID) YBConfig.ActiveConns[vs.ChannelID] <- true - time.Sleep(2 * time.Second) + MumbleReset() + DiscordReset() + time.Sleep(5 * time.Second) + YBConfig.ActiveConns[vs.ChannelID] = make(chan bool) go startBridge(s, g.ID, vs.ChannelID, YBConfig.Config, YBConfig.MumbleAddr, YBConfig.MumbleInsecure, YBConfig.ActiveConns[vs.ChannelID]) return } diff --git a/main.go b/main.go index 8ae56d0..8ae4c58 100644 --- a/main.go +++ b/main.go @@ -61,7 +61,7 @@ func main() { // Open Websocket discord.LogLevel = 2 discord.StateEnabled = true - discord.Identify.Intents = discordgo.MakeIntent(discordgo.IntentsGuilds | discordgo.IntentsGuildMessages | discordgo.IntentsGuildVoiceStates) + discord.Identify.Intents = discordgo.MakeIntent(discordgo.IntentsAllWithoutPrivileged) // register handlers discord.AddHandler(ready) discord.AddHandler(messageCreate) diff --git a/mumble.go b/mumble.go index 2909c36..fbb12a0 100644 --- a/mumble.go +++ b/mumble.go @@ -14,7 +14,14 @@ var fromMumbleArr []chan gumble.AudioBuffer var mumbleStreamingArr []bool // MumbleDuplex - listenera and outgoing -type MumbleDuplex struct{} +type MumbleDuplex struct { + Close chan bool +} + +func MumbleReset() { + fromMumbleArr = []chan gumble.AudioBuffer{} + mumbleStreamingArr = []bool{} +} // OnAudioStream - Spawn routines to handle incoming packets func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { @@ -27,10 +34,14 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { mumbleStreamingArr = append(mumbleStreamingArr, false) mutex.Unlock() - go func() { + go func(die chan bool) { log.Println("new mumble audio stream", e.User.Name) for { select { + default: + case <-die: + log.Println("Removing mumble audio stream") + return case p := <-e.C: // log.Println("audio packet", p.Sender.Name, len(p.AudioBuffer)) @@ -40,15 +51,21 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { } } } - }() + }(m.Close) return } -func (m MumbleDuplex) fromMumbleMixer(toDiscord chan []int16) { +func (m MumbleDuplex) fromMumbleMixer(toDiscord chan []int16, die chan bool) { ticker := time.NewTicker(10 * time.Millisecond) sendAudio := false for { + select { + case <-die: + log.Println("Killing fromMumbleMixer") + return + default: + } <-ticker.C mutex.Lock() diff --git a/yammer.go b/yammer.go index 7186a67..7bfb6de 100644 --- a/yammer.go +++ b/yammer.go @@ -27,7 +27,9 @@ func startBridge(discord *discordgo.Session, discordGID string, discordCID strin // MUMBLE Setup - m := MumbleDuplex{} + m := MumbleDuplex{ + Close: make(chan bool), + } var tlsConfig tls.Config if mumbleInsecure { @@ -51,11 +53,11 @@ func startBridge(discord *discordgo.Session, discordGID string, discordCID strin // Start Passing Between // Mumble - go m.fromMumbleMixer(toDiscord) - config.AudioListeners.Attach(m) + go m.fromMumbleMixer(toDiscord, die) + det := config.AudioListeners.Attach(m) //Discord go discordReceivePCM(dgv, die) - go fromDiscordMixer(toMumble) + go fromDiscordMixer(toMumble, die) go discordSendPCM(dgv, toDiscord, die) c := make(chan os.Signal) signal.Notify(c, os.Interrupt) @@ -67,6 +69,7 @@ func startBridge(discord *discordgo.Session, discordGID string, discordCID strin if mumble.State() != 2 { log.Println("Lost mumble connection " + strconv.Itoa(int(mumble.State()))) die <- true + m.Close <- true } } }() @@ -76,7 +79,10 @@ func startBridge(discord *discordgo.Session, discordGID string, discordCID strin log.Printf("\nGot %s signal. Terminating Mumble-Bridge\n", sig) case <-die: log.Println("\nGot internal die request. Terminating Mumble-Bridge") + close(toMumble) dgv.Disconnect() + log.Println("Closing mumble threads") + det.Detach() } }