From 649fe9f33eea49e5a03c88d4f246d5c002f9edc8 Mon Sep 17 00:00:00 2001 From: Tyler Stiene Date: Mon, 13 Sep 2021 00:50:23 -0400 Subject: [PATCH] fix sleepct pausing --- .gitignore | 3 ++- internal/bridge/discord.go | 21 +++++++++++------- internal/bridge/mumble.go | 11 +++++++--- pkg/sleepct/sleepct.go | 45 +++++++++++++++++++++++++++----------- test/timing_test.go | 7 +++--- 5 files changed, 59 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index edc4385..b48a929 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ dist *.test cert.pem *.gob -docker-compose.yml \ No newline at end of file +docker-compose.yml +mdb-local \ No newline at end of file diff --git a/internal/bridge/discord.go b/internal/bridge/discord.go index bef104e..c50590a 100644 --- a/internal/bridge/discord.go +++ b/internal/bridge/discord.go @@ -30,6 +30,9 @@ type DiscordDuplex struct { fromDiscordMap map[uint32]fromDiscord } +var discrodSendSleepTick sleepct.SleepCT = sleepct.SleepCT{} +var discrodReceiveSleepTick sleepct.SleepCT = sleepct.SleepCT{} + // OnError gets called by dgvoice when an error is encountered. // By default logs to STDERR var OnError = func(str string, err error) { @@ -61,8 +64,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup, // Generate Opus Silence Frame opusSilence := []byte{0xf8, 0xff, 0xfe} - sleepTick := sleepct.SleepCT{} - sleepTick.Start(20 * time.Millisecond) + discrodSendSleepTick.Start(20 * time.Millisecond) lastReady := true var readyTimeout *time.Timer @@ -100,7 +102,8 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup, default: } - promTimerDiscordSend.Observe(float64(sleepTick.SleepNextTarget(true))) + // if we are not streaming try to pause + promTimerDiscordSend.Observe(float64(discrodSendSleepTick.SleepNextTarget(ctx, !streaming))) if (len(pcm) > 1 && streaming) || (len(pcm) > dd.Bridge.BridgeConfig.DiscordStartStreamingCount && !streaming) { if !streaming { @@ -136,7 +139,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup, // We want to do this after alerting the user of possible short speaking cycles for i := 0; i < 5; i++ { internalSend(opusSilence) - promTimerDiscordSend.Observe(float64(sleepTick.SleepNextTarget(true))) + promTimerDiscordSend.Observe(float64(discrodSendSleepTick.SleepNextTarget(ctx, true))) } dd.Bridge.DiscordVoice.Speaking(false) @@ -260,6 +263,8 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGro } } dd.discordMutex.Unlock() + + discrodReceiveSleepTick.Notify() } } @@ -270,8 +275,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGrou } var speakingStart time.Time - sleepTick := sleepct.SleepCT{} - sleepTick.Start(10 * time.Millisecond) + discrodReceiveSleepTick.Start(10 * time.Millisecond) sendAudio := false toMumbleStreaming := false @@ -285,7 +289,8 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGrou default: } - promTimerDiscordMixer.Observe(float64(sleepTick.SleepNextTarget(true))) + // if didn't send audio try to pause + promTimerDiscordMixer.Observe(float64(discrodReceiveSleepTick.SleepNextTarget(ctx, !sendAudio))) dd.discordMutex.Lock() @@ -364,7 +369,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGrou for i := 0; i < 5; i++ { mumbleTimeoutSend(mumbleSilence) - promTimerDiscordMixer.Observe(float64(sleepTick.SleepNextTarget(false))) + promTimerDiscordMixer.Observe(float64(discrodReceiveSleepTick.SleepNextTarget(ctx, false))) } toMumbleStreaming = false diff --git a/internal/bridge/mumble.go b/internal/bridge/mumble.go index bfc72dd..609525f 100644 --- a/internal/bridge/mumble.go +++ b/internal/bridge/mumble.go @@ -18,6 +18,8 @@ var mumbleStreamingArr []bool // MumbleDuplex - listenera and outgoing type MumbleDuplex struct{} +var mumbleSleepTick sleepct.SleepCT = sleepct.SleepCT{} + // OnAudioStream - Spawn routines to handle incoming packets func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { @@ -42,14 +44,14 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { localMumbleArray <- p.AudioBuffer[480*i : 480*(i+1)] } promReceivedMumblePackets.Inc() + mumbleSleepTick.Notify() } log.Println("Mumble audio stream ended", name) }() } func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, toDiscord chan []int16) { - sleepTick := sleepct.SleepCT{} - sleepTick.Start(10 * time.Millisecond) + mumbleSleepTick.Start(10 * time.Millisecond) sendAudio := false bufferWarning := false @@ -64,7 +66,8 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, t default: } - promTimerMumbleMixer.Observe(float64(sleepTick.SleepNextTarget(true))) + // if we sent audio on the last pass attempt to pause + promTimerMumbleMixer.Observe(float64(mumbleSleepTick.SleepNextTarget(ctx, !sendAudio))) mutex.Lock() @@ -125,6 +128,8 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, t log.Println("Error: toDiscord buffer full. Dropping packet") promToDiscordDropped.Inc() } + + discrodSendSleepTick.Notify() } } } diff --git a/pkg/sleepct/sleepct.go b/pkg/sleepct/sleepct.go index 10edb6b..1ba67ce 100644 --- a/pkg/sleepct/sleepct.go +++ b/pkg/sleepct/sleepct.go @@ -1,22 +1,25 @@ package sleepct import ( + "context" "fmt" "time" ) // SleepCT - Sleep constant time step crates a sleep based ticker. -// designed maintain a consistent sleep/tick interval. +// designed to maintain a consistent sleep/tick interval. // The sleeper can be paused waiting to be signaled from another go routine. // This allows for the pausing of loops that do not have work to complete type SleepCT struct { d time.Duration // desired duration between targets t time.Time // last time target resume chan bool + wake time.Time // last wake time + drift int64 // last wake drift microseconds } func (s *SleepCT) Start(d time.Duration) { - s.resume = make(chan bool, 1) + s.resume = make(chan bool, 2) if s.t.IsZero() { s.d = d s.t = time.Now() @@ -29,31 +32,43 @@ func (s *SleepCT) Start(d time.Duration) { // If pause it set to true will sleep the duration and wait to be notified. // The notification channel will be cleared when the thread wakes. // SleepNextTarget should not be call more than once concurrently. -func (s *SleepCT) SleepNextTarget(pause bool) int64 { - var last time.Time +func (s *SleepCT) SleepNextTarget(ctx context.Context, pause bool) int64 { now := time.Now() + // if target is zero safety net if s.t.IsZero() { fmt.Println("SleepCT reset") - last = now.Add(-s.d) - } else { - last = s.t + s.t = now.Add(-s.d) } // Sleep to Next Target - s.t = last.Add(s.d) + s.t = s.t.Add(s.d) + // Compute the desired sleep time to reach the target d := time.Until(s.t) + // Sleep time.Sleep(d) + // record the wake time + s.wake = time.Now() + s.drift = s.wake.Sub(s.t).Microseconds() + + // fmt.Println(s.t.UnixMilli(), d.Milliseconds(), wake.UnixMilli(), drift, pause, len(s.resume)) + + // external pause control if pause { - // wait until resume + // don't pause if the notification channel has something if len(s.resume) == 0 { - <-s.resume + // fmt.Println("pause") + select { + case <-s.resume: + case <-ctx.Done(): + // fmt.Println("sleepct ctx exit") + } // if we did pause set the last sleep target to now - last = time.Now() + s.t = time.Now() } } @@ -63,11 +78,15 @@ func (s *SleepCT) SleepNextTarget(pause bool) int64 { default: } - return now.Sub(s.t).Microseconds() + // return the drift for monitoring purposes + return s.drift } // Notify attempts to resume a paused sleeper. // It is safe to call notify from other processes and as often as desired. func (s *SleepCT) Notify() { - s.resume <- true + select { + case s.resume <- true: + default: + } } diff --git a/test/timing_test.go b/test/timing_test.go index 7985e85..a6f2ddd 100644 --- a/test/timing_test.go +++ b/test/timing_test.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "math" "math/rand" @@ -13,7 +14,7 @@ import ( "github.com/stieneee/tickerct" ) -const testCount int64 = 1000 +const testCount int64 = 10000 const maxSleepInterval time.Duration = 15 * time.Millisecond const tickerInterval time.Duration = 10 * time.Millisecond const testDuration time.Duration = time.Duration(testCount * 10 * int64(time.Millisecond)) @@ -115,7 +116,7 @@ func testSleepCT(wg *sync.WaitGroup) { if i+1 < testCount { time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64())) } - s.SleepNextTarget(false) + s.SleepNextTarget(context.TODO(), false) } fmt.Println("SleepCT (loaded) after", testDuration, "drifts", time.Since(start)-testDuration) wg.Done() @@ -144,7 +145,7 @@ func testSleepCTPause(wg *sync.WaitGroup) { time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64())) } s.Notify() - s.SleepNextTarget(true) + s.SleepNextTarget(context.TODO(), true) } fmt.Println("SleepCT Pause (loaded) after", testDuration, "drifts", time.Since(start)-testDuration) wg.Done()