wip allow a sleepct ticker to pause and be signaled

This commit is contained in:
Tyler Stiene 2021-08-09 23:07:27 -04:00
parent 6a188336b9
commit cb4d2349c5
5 changed files with 80 additions and 26 deletions

View File

@ -99,6 +99,8 @@ The bot requires the following permissions:
* Voice Channel Speak * Voice Channel Speak
* Voice Channel Use Voice Activity * Voice Channel Use Voice Activity
Permission integer 36768768.
### Finding Discord CID and GID ### Finding Discord CID and GID
Discord GID is a unique ID linked to one Discord Server, also called Guild. CID is similarly a unique ID for a Discord Channel. To find these you need to set Discord into developer Mode. Discord GID is a unique ID linked to one Discord Server, also called Guild. CID is similarly a unique ID for a Discord Channel. To find these you need to set Discord into developer Mode.

View File

@ -99,7 +99,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup,
default: default:
} }
sleepTick.SleepNextTarget() sleepTick.SleepNextTarget(true)
if (len(pcm) > 1 && streaming) || (len(pcm) > dd.Bridge.BridgeConfig.DiscordStartStreamingCount && !streaming) { if (len(pcm) > 1 && streaming) || (len(pcm) > dd.Bridge.BridgeConfig.DiscordStartStreamingCount && !streaming) {
if !streaming { if !streaming {
@ -135,7 +135,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup,
// We want to do this after alerting the user of possible short speaking cycles // We want to do this after alerting the user of possible short speaking cycles
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
internalSend(opusSilence) internalSend(opusSilence)
sleepTick.SleepNextTarget() sleepTick.SleepNextTarget(false)
} }
dd.Bridge.DiscordVoice.Speaking(false) dd.Bridge.DiscordVoice.Speaking(false)
@ -234,7 +234,7 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGro
dd.fromDiscordMap[p.SSRC] = s dd.fromDiscordMap[p.SSRC] = s
dd.discordMutex.Unlock() dd.discordMutex.Unlock()
p.PCM, err = s.decoder.Decode(p.Opus, deltaT*2, false) p.PCM, err = s.decoder.Decode(p.Opus, deltaT, false)
if err != nil { if err != nil {
OnError("Error decoding opus data", err) OnError("Error decoding opus data", err)
continue continue
@ -282,7 +282,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGrou
default: default:
} }
sleepTick.SleepNextTarget() sleepTick.SleepNextTarget(true)
dd.discordMutex.Lock() dd.discordMutex.Lock()
@ -354,7 +354,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGrou
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
mumbleTimeoutSend(mumbleSilence) mumbleTimeoutSend(mumbleSilence)
sleepTick.SleepNextTarget() sleepTick.SleepNextTarget(false)
} }
toMumbleStreaming = false toMumbleStreaming = false

View File

@ -61,7 +61,7 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, t
default: default:
} }
sleepTick.SleepNextTarget() sleepTick.SleepNextTarget(true)
mutex.Lock() mutex.Lock()

View File

@ -2,19 +2,21 @@ package sleepct
import ( import (
"fmt" "fmt"
"sync"
"time" "time"
) )
// SleepCT - Sleep constant time step crates a sleep based ticker // SleepCT - Sleep constant time step crates a sleep based ticker.
// designed maintain a sleep/tick interval // designed maintain a consistent sleep/tick interval.
// The sleeper can be paused waiting to be singaled from another go routine.
// This allows for the pausing of loops that do not have work to complete
type SleepCT struct { type SleepCT struct {
sync.Mutex d time.Duration // desired duration between targets
d time.Duration // duration t time.Time // last time target
t time.Time // last time target resume chan bool
} }
func (s *SleepCT) Start(d time.Duration) { func (s *SleepCT) Start(d time.Duration) {
s.resume = make(chan bool, 1)
if s.t.IsZero() { if s.t.IsZero() {
s.d = d s.d = d
s.t = time.Now() s.t = time.Now()
@ -23,28 +25,49 @@ func (s *SleepCT) Start(d time.Duration) {
} }
} }
func (s *SleepCT) SleepNextTarget() { // Sleep to the next target duration.
s.Lock() // If pause it set to true will sleep the duration and wait to be notified.
// The notification channel will be cleared when the thread wakes.
now := time.Now() // SleepNextTarget should not be call more than once concurrently.
func (s *SleepCT) SleepNextTarget(pause bool) {
var last time.Time var last time.Time
if s.t.IsZero() { if s.t.IsZero() {
fmt.Println("SleepCT reset") fmt.Println("SleepCT reset")
last = now.Add(-s.d) last = time.Now().Add(-s.d)
} else { } else {
last = s.t last = s.t
} }
// Next Target // Sleep to Next Target
s.t = last.Add(s.d) s.t = last.Add(s.d)
d := s.t.Sub(now) d := time.Until(s.t)
time.Sleep(d) time.Sleep(d)
// delta := now.Sub(s.t) // delta := time.Since(s.t)
// fmt.Println("delta", delta, d, time.Since(s.t)) // fmt.Println("delta", delta, d, time.Since(s.t))
s.Unlock() if pause {
// wait until resume
if len(s.resume) == 0 {
<-s.resume
// if we did pause set the last sleep target to now
last = time.Now()
}
}
// Drain the resume channel
select {
case <-s.resume:
default:
}
}
// Notify attempts to resume a paused sleeper.
// It is safe to call notify from other processes and as often as desired.
func (s *SleepCT) Notify() {
select {
case s.resume <- true:
}
} }

View File

@ -13,12 +13,12 @@ import (
"github.com/stieneee/tickerct" "github.com/stieneee/tickerct"
) )
const testCount int64 = 10000 const testCount int64 = 1000
const maxSleepInterval time.Duration = 15 * time.Millisecond const maxSleepInterval time.Duration = 15 * time.Millisecond
const tickerInterval time.Duration = 10 * time.Millisecond const tickerInterval time.Duration = 10 * time.Millisecond
const testDuration time.Duration = time.Duration(testCount * 10 * int64(time.Millisecond)) const testDuration time.Duration = time.Duration(testCount * 10 * int64(time.Millisecond))
func testTickerBaseCase(wg *sync.WaitGroup) { func testTickerBaseCase(wg *sync.WaitGroup, test *testing.T) {
wg.Add(1) wg.Add(1)
go func(interval time.Duration) { go func(interval time.Duration) {
now := time.Now() now := time.Now()
@ -39,7 +39,7 @@ func testTickerBaseCase(wg *sync.WaitGroup) {
func TestTickerBaseCase(t *testing.T) { func TestTickerBaseCase(t *testing.T) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
testTickerBaseCase(&wg) testTickerBaseCase(&wg, t)
wg.Wait() wg.Wait()
} }
@ -115,7 +115,7 @@ func testSleepCT(wg *sync.WaitGroup) {
if i+1 < testCount { if i+1 < testCount {
time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64())) time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64()))
} }
s.SleepNextTarget() s.SleepNextTarget(false)
} }
fmt.Println("SleepCT (loaded) after", testDuration, "drifts", time.Since(start)-testDuration) fmt.Println("SleepCT (loaded) after", testDuration, "drifts", time.Since(start)-testDuration)
wg.Done() wg.Done()
@ -130,6 +130,35 @@ func TestSleepCT(t *testing.T) {
wg.Wait() wg.Wait()
} }
func testSleepCTPause(wg *sync.WaitGroup) {
wg.Add(1)
go func(interval time.Duration) {
now := time.Now()
start := now
// start the ticker
s := sleepct.SleepCT{}
s.Start(interval)
var i int64
for i = 0; i < testCount; i++ {
if i+1 < testCount {
time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64()))
}
s.Notify()
s.SleepNextTarget(true)
}
fmt.Println("SleepCT Pause (loaded) after", testDuration, "drifts", time.Since(start)-testDuration)
wg.Done()
}(tickerInterval)
}
func TestSleepCTPause(t *testing.T) {
wg := sync.WaitGroup{}
testSleepCTPause(&wg)
wg.Wait()
}
func TestIdleJitter(t *testing.T) { func TestIdleJitter(t *testing.T) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}