From c94362581aad3419afda9b8bfbecc097070ac110 Mon Sep 17 00:00:00 2001 From: Tyler Stiene Date: Fri, 19 Nov 2021 00:57:20 -0500 Subject: [PATCH] refactor global variables into duplex structs to fix issue on bridge restart disable timer pausing in discord send untill better notification method can be developed --- internal/bridge/bridge.go | 11 +++---- internal/bridge/discord.go | 53 ++++++++++++++++++++++-------- internal/bridge/mumble.go | 66 ++++++++++++++++++++------------------ 3 files changed, 79 insertions(+), 51 deletions(-) diff --git a/internal/bridge/bridge.go b/internal/bridge/bridge.go index 80765ae..20917b1 100644 --- a/internal/bridge/bridge.go +++ b/internal/bridge/bridge.go @@ -141,7 +141,7 @@ func (b *BridgeState) StartBridge() { // MUMBLE Connect - b.MumbleStream = &MumbleDuplex{} + b.MumbleStream = NewMumbleDuplex() det := b.BridgeConfig.MumbleConfig.AudioListeners.Attach(b.MumbleStream) defer det.Detach() @@ -180,6 +180,9 @@ func (b *BridgeState) StartBridge() { defer close(toDiscord) defer close(toMumble) + // From Discord + b.DiscordStream = NewDiscordDuplex(b) + // Start Passing Between // From Mumble @@ -189,12 +192,6 @@ func (b *BridgeState) StartBridge() { b.MumbleStream.fromMumbleMixer(ctx, cancel, toDiscord) }() - // From Discord - b.DiscordStream = &DiscordDuplex{ - Bridge: b, - fromDiscordMap: make(map[uint32]fromDiscord), - } - wg.Add(1) go func() { defer wg.Done() diff --git a/internal/bridge/discord.go b/internal/bridge/discord.go index de2722a..09cd1b5 100644 --- a/internal/bridge/discord.go +++ b/internal/bridge/discord.go @@ -26,12 +26,20 @@ type fromDiscord struct { type DiscordDuplex struct { Bridge *BridgeState - discordMutex sync.Mutex - fromDiscordMap map[uint32]fromDiscord + discordMutex sync.Mutex + fromDiscordMap map[uint32]fromDiscord + discordSendSleepTick sleepct.SleepCT + discordReceiveSleepTick sleepct.SleepCT } -var discrodSendSleepTick sleepct.SleepCT = sleepct.SleepCT{} -var discrodReceiveSleepTick sleepct.SleepCT = sleepct.SleepCT{} +func NewDiscordDuplex(b *BridgeState) *DiscordDuplex { + return &DiscordDuplex{ + Bridge: b, + fromDiscordMap: make(map[uint32]fromDiscord), + discordSendSleepTick: sleepct.SleepCT{}, + discordReceiveSleepTick: sleepct.SleepCT{}, + } +} // OnError gets called by dgvoice when an error is encountered. // By default logs to STDERR @@ -46,7 +54,7 @@ var OnError = func(str string, err error) { } // SendPCM will receive on the provied channel encode -// received PCM data into Opus then send that to Discordgo +// received PCM data with Opus then send that to Discordgo func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.CancelFunc, pcm <-chan []int16) { const channels int = 1 const frameRate int = 48000 // audio sampling rate @@ -64,12 +72,28 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.Canc // Generate Opus Silence Frame opusSilence := []byte{0xf8, 0xff, 0xfe} - discrodSendSleepTick.Start(20 * time.Millisecond) + dd.discordSendSleepTick.Start(20 * time.Millisecond) lastReady := true var readyTimeout *time.Timer var speakingStart time.Time + // Spy on the PCM channel to notify + // TODO determine a method to notify a paused sleepct + // pcm := make(chan []int16, 10) + // go func() { + // for { + // t, ok := <-pcmIn + // if !ok { + // close(pcm) + // return + // } else { + // dd.discordSendSleepTick.Notify() + // pcm <- t + // } + // } + // }() + internalSend := func(opus []byte) { dd.Bridge.DiscordVoice.RWMutex.RLock() if !dd.Bridge.DiscordVoice.Ready || dd.Bridge.DiscordVoice.OpusSend == nil { @@ -101,7 +125,8 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.Canc } // if we are not streaming try to pause - promTimerDiscordSend.Observe(float64(discrodSendSleepTick.SleepNextTarget(ctx, !streaming))) + // promTimerDiscordSend.Observe(float64(dd.discordSendSleepTick.SleepNextTarget(ctx, !streaming))) + promTimerDiscordSend.Observe(float64(dd.discordSendSleepTick.SleepNextTarget(ctx, false))) if (len(pcm) > 1 && streaming) || (len(pcm) > dd.Bridge.BridgeConfig.DiscordStartStreamingCount && !streaming) { if !streaming { @@ -128,7 +153,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.Canc // It is possible that short speaking cycle is the result of a short input to mumble (Not a problem). ie a quick tap of push to talk button. // Or when timing delays are introduced via network, hardware or kernel delays (Problem). // The problem delays result in choppy or stuttering sounds, especially when the silence frames are introduced into the opus frames below. - // Multiple short cycle delays can result in a Discrod rate limiter being trigger due to of multiple JSON speaking/not-speaking state changes + // Multiple short cycle delays can result in a discord rate limiter being trigger due to of multiple JSON speaking/not-speaking state changes if time.Since(speakingStart).Milliseconds() < 50 { log.Println("Warning: Short Mumble to Discord speaking cycle. Consider increaseing the size of the to Discord jitter buffer.") } @@ -137,7 +162,9 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.Canc // We want to do this after alerting the user of possible short speaking cycles for i := 0; i < 5; i++ { internalSend(opusSilence) - promTimerDiscordSend.Observe(float64(discrodSendSleepTick.SleepNextTarget(ctx, true))) + // promTimerDiscordSend.Observe(float64(dd.discordSendSleepTick.SleepNextTarget(ctx, true))) + promTimerDiscordSend.Observe(float64(dd.discordSendSleepTick.SleepNextTarget(ctx, false))) + } dd.Bridge.DiscordVoice.Speaking(false) @@ -260,7 +287,7 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, cancel context.C } dd.discordMutex.Unlock() - discrodReceiveSleepTick.Notify() + dd.discordReceiveSleepTick.Notify() } } @@ -271,7 +298,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, toMumble chan<- g } var speakingStart time.Time - discrodReceiveSleepTick.Start(10 * time.Millisecond) + dd.discordReceiveSleepTick.Start(10 * time.Millisecond) sendAudio := false toMumbleStreaming := false @@ -285,7 +312,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, toMumble chan<- g } // if didn't send audio try to pause - promTimerDiscordMixer.Observe(float64(discrodReceiveSleepTick.SleepNextTarget(ctx, !sendAudio))) + promTimerDiscordMixer.Observe(float64(dd.discordReceiveSleepTick.SleepNextTarget(ctx, !sendAudio))) dd.discordMutex.Lock() @@ -364,7 +391,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, toMumble chan<- g for i := 0; i < 5; i++ { mumbleTimeoutSend(mumbleSilence) - promTimerDiscordMixer.Observe(float64(discrodReceiveSleepTick.SleepNextTarget(ctx, false))) + promTimerDiscordMixer.Observe(float64(dd.discordReceiveSleepTick.SleepNextTarget(ctx, false))) } toMumbleStreaming = false diff --git a/internal/bridge/mumble.go b/internal/bridge/mumble.go index 36ffea6..1132562 100644 --- a/internal/bridge/mumble.go +++ b/internal/bridge/mumble.go @@ -12,27 +12,34 @@ import ( "github.com/stieneee/mumble-discord-bridge/pkg/sleepct" ) -var mutex sync.Mutex -var fromMumbleArr []chan gumble.AudioBuffer -var mumbleStreamingArr []bool +// MumbleDuplex - listener and outgoing +type MumbleDuplex struct { + mutex sync.Mutex + fromMumbleArr []chan gumble.AudioBuffer + mumbleStreamingArr []bool + mumbleSleepTick sleepct.SleepCT +} -// MumbleDuplex - listenera and outgoing -type MumbleDuplex struct{} - -var mumbleSleepTick sleepct.SleepCT = sleepct.SleepCT{} +func NewMumbleDuplex() *MumbleDuplex { + return &MumbleDuplex{ + fromMumbleArr: make([]chan gumble.AudioBuffer, 0), + mumbleStreamingArr: make([]bool, 0), + mumbleSleepTick: sleepct.SleepCT{}, + } +} // OnAudioStream - Spawn routines to handle incoming packets -func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { +func (m *MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { // hold a reference ot the channel in the closure - localMumbleArray := make(chan gumble.AudioBuffer, 100) + streamChan := make(chan gumble.AudioBuffer, 100) - mutex.Lock() - fromMumbleArr = append(fromMumbleArr, localMumbleArray) - mumbleStreamingArr = append(mumbleStreamingArr, false) - mutex.Unlock() + m.mutex.Lock() + m.fromMumbleArr = append(m.fromMumbleArr, streamChan) + m.mumbleStreamingArr = append(m.mumbleStreamingArr, false) + m.mutex.Unlock() - promMumbleArraySize.Set(float64(len(fromMumbleArr))) + promMumbleArraySize.Set(float64(len(m.fromMumbleArr))) go func() { name := e.User.Name @@ -42,17 +49,17 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { // 480 per 10ms for i := 0; i < len(p.AudioBuffer)/480; i++ { - localMumbleArray <- p.AudioBuffer[480*i : 480*(i+1)] + streamChan <- p.AudioBuffer[480*i : 480*(i+1)] } promReceivedMumblePackets.Inc() - mumbleSleepTick.Notify() + m.mumbleSleepTick.Notify() } log.Println("Mumble audio stream ended", name) }() } -func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, cancel context.CancelFunc, toDiscord chan []int16) { - mumbleSleepTick.Start(10 * time.Millisecond) +func (m *MumbleDuplex) fromMumbleMixer(ctx context.Context, cancel context.CancelFunc, toDiscord chan []int16) { + m.mumbleSleepTick.Start(10 * time.Millisecond) sendAudio := false bufferWarning := false @@ -68,36 +75,35 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, cancel context.Cancel default: } - // if we sent audio on the last pass attempt to pause - promTimerMumbleMixer.Observe(float64(mumbleSleepTick.SleepNextTarget(ctx, !sendAudio))) + promTimerMumbleMixer.Observe(float64(m.mumbleSleepTick.SleepNextTarget(ctx, false))) - mutex.Lock() + m.mutex.Lock() sendAudio = false internalMixerArr := make([]gumble.AudioBuffer, 0) streamingCount := 0 // Work through each channel - for i := 0; i < len(fromMumbleArr); i++ { - if len(fromMumbleArr[i]) > 0 { + for i := 0; i < len(m.fromMumbleArr); i++ { + if len(m.fromMumbleArr[i]) > 0 { sendAudio = true - if !mumbleStreamingArr[i] { - mumbleStreamingArr[i] = true + if !m.mumbleStreamingArr[i] { + m.mumbleStreamingArr[i] = true streamingCount++ // log.Println("Mumble starting", i) } - x1 := (<-fromMumbleArr[i]) + x1 := (<-m.fromMumbleArr[i]) internalMixerArr = append(internalMixerArr, x1) } else { - if mumbleStreamingArr[i] { - mumbleStreamingArr[i] = false + if m.mumbleStreamingArr[i] { + m.mumbleStreamingArr[i] = false // log.Println("Mumble stopping", i) } } } - mutex.Unlock() + m.mutex.Unlock() promMumbleStreaming.Set(float64(streamingCount)) @@ -145,8 +151,6 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, cancel context.Cancel cancel() } } - - discrodSendSleepTick.Notify() } } }