refactor global variables into duplex structs to fix issue on bridge restart

disable timer pausing in discord send untill better notification method can be developed
This commit is contained in:
Tyler Stiene 2021-11-19 00:57:20 -05:00
parent 8ca66fb500
commit c94362581a
3 changed files with 79 additions and 51 deletions

View File

@ -141,7 +141,7 @@ func (b *BridgeState) StartBridge() {
// MUMBLE Connect
b.MumbleStream = &MumbleDuplex{}
b.MumbleStream = NewMumbleDuplex()
det := b.BridgeConfig.MumbleConfig.AudioListeners.Attach(b.MumbleStream)
defer det.Detach()
@ -180,6 +180,9 @@ func (b *BridgeState) StartBridge() {
defer close(toDiscord)
defer close(toMumble)
// From Discord
b.DiscordStream = NewDiscordDuplex(b)
// Start Passing Between
// From Mumble
@ -189,12 +192,6 @@ func (b *BridgeState) StartBridge() {
b.MumbleStream.fromMumbleMixer(ctx, cancel, toDiscord)
}()
// From Discord
b.DiscordStream = &DiscordDuplex{
Bridge: b,
fromDiscordMap: make(map[uint32]fromDiscord),
}
wg.Add(1)
go func() {
defer wg.Done()

View File

@ -26,12 +26,20 @@ type fromDiscord struct {
type DiscordDuplex struct {
Bridge *BridgeState
discordMutex sync.Mutex
fromDiscordMap map[uint32]fromDiscord
discordMutex sync.Mutex
fromDiscordMap map[uint32]fromDiscord
discordSendSleepTick sleepct.SleepCT
discordReceiveSleepTick sleepct.SleepCT
}
var discrodSendSleepTick sleepct.SleepCT = sleepct.SleepCT{}
var discrodReceiveSleepTick sleepct.SleepCT = sleepct.SleepCT{}
func NewDiscordDuplex(b *BridgeState) *DiscordDuplex {
return &DiscordDuplex{
Bridge: b,
fromDiscordMap: make(map[uint32]fromDiscord),
discordSendSleepTick: sleepct.SleepCT{},
discordReceiveSleepTick: sleepct.SleepCT{},
}
}
// OnError gets called by dgvoice when an error is encountered.
// By default logs to STDERR
@ -46,7 +54,7 @@ var OnError = func(str string, err error) {
}
// SendPCM will receive on the provied channel encode
// received PCM data into Opus then send that to Discordgo
// received PCM data with Opus then send that to Discordgo
func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.CancelFunc, pcm <-chan []int16) {
const channels int = 1
const frameRate int = 48000 // audio sampling rate
@ -64,12 +72,28 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.Canc
// Generate Opus Silence Frame
opusSilence := []byte{0xf8, 0xff, 0xfe}
discrodSendSleepTick.Start(20 * time.Millisecond)
dd.discordSendSleepTick.Start(20 * time.Millisecond)
lastReady := true
var readyTimeout *time.Timer
var speakingStart time.Time
// Spy on the PCM channel to notify
// TODO determine a method to notify a paused sleepct
// pcm := make(chan []int16, 10)
// go func() {
// for {
// t, ok := <-pcmIn
// if !ok {
// close(pcm)
// return
// } else {
// dd.discordSendSleepTick.Notify()
// pcm <- t
// }
// }
// }()
internalSend := func(opus []byte) {
dd.Bridge.DiscordVoice.RWMutex.RLock()
if !dd.Bridge.DiscordVoice.Ready || dd.Bridge.DiscordVoice.OpusSend == nil {
@ -101,7 +125,8 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.Canc
}
// if we are not streaming try to pause
promTimerDiscordSend.Observe(float64(discrodSendSleepTick.SleepNextTarget(ctx, !streaming)))
// promTimerDiscordSend.Observe(float64(dd.discordSendSleepTick.SleepNextTarget(ctx, !streaming)))
promTimerDiscordSend.Observe(float64(dd.discordSendSleepTick.SleepNextTarget(ctx, false)))
if (len(pcm) > 1 && streaming) || (len(pcm) > dd.Bridge.BridgeConfig.DiscordStartStreamingCount && !streaming) {
if !streaming {
@ -128,7 +153,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.Canc
// It is possible that short speaking cycle is the result of a short input to mumble (Not a problem). ie a quick tap of push to talk button.
// Or when timing delays are introduced via network, hardware or kernel delays (Problem).
// The problem delays result in choppy or stuttering sounds, especially when the silence frames are introduced into the opus frames below.
// Multiple short cycle delays can result in a Discrod rate limiter being trigger due to of multiple JSON speaking/not-speaking state changes
// Multiple short cycle delays can result in a discord rate limiter being trigger due to of multiple JSON speaking/not-speaking state changes
if time.Since(speakingStart).Milliseconds() < 50 {
log.Println("Warning: Short Mumble to Discord speaking cycle. Consider increaseing the size of the to Discord jitter buffer.")
}
@ -137,7 +162,9 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.Canc
// We want to do this after alerting the user of possible short speaking cycles
for i := 0; i < 5; i++ {
internalSend(opusSilence)
promTimerDiscordSend.Observe(float64(discrodSendSleepTick.SleepNextTarget(ctx, true)))
// promTimerDiscordSend.Observe(float64(dd.discordSendSleepTick.SleepNextTarget(ctx, true)))
promTimerDiscordSend.Observe(float64(dd.discordSendSleepTick.SleepNextTarget(ctx, false)))
}
dd.Bridge.DiscordVoice.Speaking(false)
@ -260,7 +287,7 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, cancel context.C
}
dd.discordMutex.Unlock()
discrodReceiveSleepTick.Notify()
dd.discordReceiveSleepTick.Notify()
}
}
@ -271,7 +298,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, toMumble chan<- g
}
var speakingStart time.Time
discrodReceiveSleepTick.Start(10 * time.Millisecond)
dd.discordReceiveSleepTick.Start(10 * time.Millisecond)
sendAudio := false
toMumbleStreaming := false
@ -285,7 +312,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, toMumble chan<- g
}
// if didn't send audio try to pause
promTimerDiscordMixer.Observe(float64(discrodReceiveSleepTick.SleepNextTarget(ctx, !sendAudio)))
promTimerDiscordMixer.Observe(float64(dd.discordReceiveSleepTick.SleepNextTarget(ctx, !sendAudio)))
dd.discordMutex.Lock()
@ -364,7 +391,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, toMumble chan<- g
for i := 0; i < 5; i++ {
mumbleTimeoutSend(mumbleSilence)
promTimerDiscordMixer.Observe(float64(discrodReceiveSleepTick.SleepNextTarget(ctx, false)))
promTimerDiscordMixer.Observe(float64(dd.discordReceiveSleepTick.SleepNextTarget(ctx, false)))
}
toMumbleStreaming = false

View File

@ -12,27 +12,34 @@ import (
"github.com/stieneee/mumble-discord-bridge/pkg/sleepct"
)
var mutex sync.Mutex
var fromMumbleArr []chan gumble.AudioBuffer
var mumbleStreamingArr []bool
// MumbleDuplex - listener and outgoing
type MumbleDuplex struct {
mutex sync.Mutex
fromMumbleArr []chan gumble.AudioBuffer
mumbleStreamingArr []bool
mumbleSleepTick sleepct.SleepCT
}
// MumbleDuplex - listenera and outgoing
type MumbleDuplex struct{}
var mumbleSleepTick sleepct.SleepCT = sleepct.SleepCT{}
func NewMumbleDuplex() *MumbleDuplex {
return &MumbleDuplex{
fromMumbleArr: make([]chan gumble.AudioBuffer, 0),
mumbleStreamingArr: make([]bool, 0),
mumbleSleepTick: sleepct.SleepCT{},
}
}
// OnAudioStream - Spawn routines to handle incoming packets
func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
func (m *MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
// hold a reference ot the channel in the closure
localMumbleArray := make(chan gumble.AudioBuffer, 100)
streamChan := make(chan gumble.AudioBuffer, 100)
mutex.Lock()
fromMumbleArr = append(fromMumbleArr, localMumbleArray)
mumbleStreamingArr = append(mumbleStreamingArr, false)
mutex.Unlock()
m.mutex.Lock()
m.fromMumbleArr = append(m.fromMumbleArr, streamChan)
m.mumbleStreamingArr = append(m.mumbleStreamingArr, false)
m.mutex.Unlock()
promMumbleArraySize.Set(float64(len(fromMumbleArr)))
promMumbleArraySize.Set(float64(len(m.fromMumbleArr)))
go func() {
name := e.User.Name
@ -42,17 +49,17 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
// 480 per 10ms
for i := 0; i < len(p.AudioBuffer)/480; i++ {
localMumbleArray <- p.AudioBuffer[480*i : 480*(i+1)]
streamChan <- p.AudioBuffer[480*i : 480*(i+1)]
}
promReceivedMumblePackets.Inc()
mumbleSleepTick.Notify()
m.mumbleSleepTick.Notify()
}
log.Println("Mumble audio stream ended", name)
}()
}
func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, cancel context.CancelFunc, toDiscord chan []int16) {
mumbleSleepTick.Start(10 * time.Millisecond)
func (m *MumbleDuplex) fromMumbleMixer(ctx context.Context, cancel context.CancelFunc, toDiscord chan []int16) {
m.mumbleSleepTick.Start(10 * time.Millisecond)
sendAudio := false
bufferWarning := false
@ -68,36 +75,35 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, cancel context.Cancel
default:
}
// if we sent audio on the last pass attempt to pause
promTimerMumbleMixer.Observe(float64(mumbleSleepTick.SleepNextTarget(ctx, !sendAudio)))
promTimerMumbleMixer.Observe(float64(m.mumbleSleepTick.SleepNextTarget(ctx, false)))
mutex.Lock()
m.mutex.Lock()
sendAudio = false
internalMixerArr := make([]gumble.AudioBuffer, 0)
streamingCount := 0
// Work through each channel
for i := 0; i < len(fromMumbleArr); i++ {
if len(fromMumbleArr[i]) > 0 {
for i := 0; i < len(m.fromMumbleArr); i++ {
if len(m.fromMumbleArr[i]) > 0 {
sendAudio = true
if !mumbleStreamingArr[i] {
mumbleStreamingArr[i] = true
if !m.mumbleStreamingArr[i] {
m.mumbleStreamingArr[i] = true
streamingCount++
// log.Println("Mumble starting", i)
}
x1 := (<-fromMumbleArr[i])
x1 := (<-m.fromMumbleArr[i])
internalMixerArr = append(internalMixerArr, x1)
} else {
if mumbleStreamingArr[i] {
mumbleStreamingArr[i] = false
if m.mumbleStreamingArr[i] {
m.mumbleStreamingArr[i] = false
// log.Println("Mumble stopping", i)
}
}
}
mutex.Unlock()
m.mutex.Unlock()
promMumbleStreaming.Set(float64(streamingCount))
@ -145,8 +151,6 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, cancel context.Cancel
cancel()
}
}
discrodSendSleepTick.Notify()
}
}
}