mirror of
https://github.com/stryan/mumble-discord-bridge.git
synced 2024-11-22 21:35:44 -05:00
fix sleepct pausing
This commit is contained in:
parent
5c5ccb72c9
commit
649fe9f33e
1
.gitignore
vendored
1
.gitignore
vendored
@ -7,3 +7,4 @@ dist
|
|||||||
cert.pem
|
cert.pem
|
||||||
*.gob
|
*.gob
|
||||||
docker-compose.yml
|
docker-compose.yml
|
||||||
|
mdb-local
|
@ -30,6 +30,9 @@ type DiscordDuplex struct {
|
|||||||
fromDiscordMap map[uint32]fromDiscord
|
fromDiscordMap map[uint32]fromDiscord
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var discrodSendSleepTick sleepct.SleepCT = sleepct.SleepCT{}
|
||||||
|
var discrodReceiveSleepTick sleepct.SleepCT = sleepct.SleepCT{}
|
||||||
|
|
||||||
// OnError gets called by dgvoice when an error is encountered.
|
// OnError gets called by dgvoice when an error is encountered.
|
||||||
// By default logs to STDERR
|
// By default logs to STDERR
|
||||||
var OnError = func(str string, err error) {
|
var OnError = func(str string, err error) {
|
||||||
@ -61,8 +64,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup,
|
|||||||
// Generate Opus Silence Frame
|
// Generate Opus Silence Frame
|
||||||
opusSilence := []byte{0xf8, 0xff, 0xfe}
|
opusSilence := []byte{0xf8, 0xff, 0xfe}
|
||||||
|
|
||||||
sleepTick := sleepct.SleepCT{}
|
discrodSendSleepTick.Start(20 * time.Millisecond)
|
||||||
sleepTick.Start(20 * time.Millisecond)
|
|
||||||
|
|
||||||
lastReady := true
|
lastReady := true
|
||||||
var readyTimeout *time.Timer
|
var readyTimeout *time.Timer
|
||||||
@ -100,7 +102,8 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup,
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
promTimerDiscordSend.Observe(float64(sleepTick.SleepNextTarget(true)))
|
// if we are not streaming try to pause
|
||||||
|
promTimerDiscordSend.Observe(float64(discrodSendSleepTick.SleepNextTarget(ctx, !streaming)))
|
||||||
|
|
||||||
if (len(pcm) > 1 && streaming) || (len(pcm) > dd.Bridge.BridgeConfig.DiscordStartStreamingCount && !streaming) {
|
if (len(pcm) > 1 && streaming) || (len(pcm) > dd.Bridge.BridgeConfig.DiscordStartStreamingCount && !streaming) {
|
||||||
if !streaming {
|
if !streaming {
|
||||||
@ -136,7 +139,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup,
|
|||||||
// We want to do this after alerting the user of possible short speaking cycles
|
// We want to do this after alerting the user of possible short speaking cycles
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
internalSend(opusSilence)
|
internalSend(opusSilence)
|
||||||
promTimerDiscordSend.Observe(float64(sleepTick.SleepNextTarget(true)))
|
promTimerDiscordSend.Observe(float64(discrodSendSleepTick.SleepNextTarget(ctx, true)))
|
||||||
}
|
}
|
||||||
|
|
||||||
dd.Bridge.DiscordVoice.Speaking(false)
|
dd.Bridge.DiscordVoice.Speaking(false)
|
||||||
@ -260,6 +263,8 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGro
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
dd.discordMutex.Unlock()
|
dd.discordMutex.Unlock()
|
||||||
|
|
||||||
|
discrodReceiveSleepTick.Notify()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -270,8 +275,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGrou
|
|||||||
}
|
}
|
||||||
var speakingStart time.Time
|
var speakingStart time.Time
|
||||||
|
|
||||||
sleepTick := sleepct.SleepCT{}
|
discrodReceiveSleepTick.Start(10 * time.Millisecond)
|
||||||
sleepTick.Start(10 * time.Millisecond)
|
|
||||||
|
|
||||||
sendAudio := false
|
sendAudio := false
|
||||||
toMumbleStreaming := false
|
toMumbleStreaming := false
|
||||||
@ -285,7 +289,8 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGrou
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
promTimerDiscordMixer.Observe(float64(sleepTick.SleepNextTarget(true)))
|
// if didn't send audio try to pause
|
||||||
|
promTimerDiscordMixer.Observe(float64(discrodReceiveSleepTick.SleepNextTarget(ctx, !sendAudio)))
|
||||||
|
|
||||||
dd.discordMutex.Lock()
|
dd.discordMutex.Lock()
|
||||||
|
|
||||||
@ -364,7 +369,7 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGrou
|
|||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
mumbleTimeoutSend(mumbleSilence)
|
mumbleTimeoutSend(mumbleSilence)
|
||||||
promTimerDiscordMixer.Observe(float64(sleepTick.SleepNextTarget(false)))
|
promTimerDiscordMixer.Observe(float64(discrodReceiveSleepTick.SleepNextTarget(ctx, false)))
|
||||||
}
|
}
|
||||||
|
|
||||||
toMumbleStreaming = false
|
toMumbleStreaming = false
|
||||||
|
@ -18,6 +18,8 @@ var mumbleStreamingArr []bool
|
|||||||
// MumbleDuplex - listenera and outgoing
|
// MumbleDuplex - listenera and outgoing
|
||||||
type MumbleDuplex struct{}
|
type MumbleDuplex struct{}
|
||||||
|
|
||||||
|
var mumbleSleepTick sleepct.SleepCT = sleepct.SleepCT{}
|
||||||
|
|
||||||
// OnAudioStream - Spawn routines to handle incoming packets
|
// OnAudioStream - Spawn routines to handle incoming packets
|
||||||
func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
|
func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
|
||||||
|
|
||||||
@ -42,14 +44,14 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
|
|||||||
localMumbleArray <- p.AudioBuffer[480*i : 480*(i+1)]
|
localMumbleArray <- p.AudioBuffer[480*i : 480*(i+1)]
|
||||||
}
|
}
|
||||||
promReceivedMumblePackets.Inc()
|
promReceivedMumblePackets.Inc()
|
||||||
|
mumbleSleepTick.Notify()
|
||||||
}
|
}
|
||||||
log.Println("Mumble audio stream ended", name)
|
log.Println("Mumble audio stream ended", name)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, toDiscord chan []int16) {
|
func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, toDiscord chan []int16) {
|
||||||
sleepTick := sleepct.SleepCT{}
|
mumbleSleepTick.Start(10 * time.Millisecond)
|
||||||
sleepTick.Start(10 * time.Millisecond)
|
|
||||||
|
|
||||||
sendAudio := false
|
sendAudio := false
|
||||||
bufferWarning := false
|
bufferWarning := false
|
||||||
@ -64,7 +66,8 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, t
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
promTimerMumbleMixer.Observe(float64(sleepTick.SleepNextTarget(true)))
|
// if we sent audio on the last pass attempt to pause
|
||||||
|
promTimerMumbleMixer.Observe(float64(mumbleSleepTick.SleepNextTarget(ctx, !sendAudio)))
|
||||||
|
|
||||||
mutex.Lock()
|
mutex.Lock()
|
||||||
|
|
||||||
@ -125,6 +128,8 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, t
|
|||||||
log.Println("Error: toDiscord buffer full. Dropping packet")
|
log.Println("Error: toDiscord buffer full. Dropping packet")
|
||||||
promToDiscordDropped.Inc()
|
promToDiscordDropped.Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
discrodSendSleepTick.Notify()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,22 +1,25 @@
|
|||||||
package sleepct
|
package sleepct
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SleepCT - Sleep constant time step crates a sleep based ticker.
|
// SleepCT - Sleep constant time step crates a sleep based ticker.
|
||||||
// designed maintain a consistent sleep/tick interval.
|
// designed to maintain a consistent sleep/tick interval.
|
||||||
// The sleeper can be paused waiting to be signaled from another go routine.
|
// The sleeper can be paused waiting to be signaled from another go routine.
|
||||||
// This allows for the pausing of loops that do not have work to complete
|
// This allows for the pausing of loops that do not have work to complete
|
||||||
type SleepCT struct {
|
type SleepCT struct {
|
||||||
d time.Duration // desired duration between targets
|
d time.Duration // desired duration between targets
|
||||||
t time.Time // last time target
|
t time.Time // last time target
|
||||||
resume chan bool
|
resume chan bool
|
||||||
|
wake time.Time // last wake time
|
||||||
|
drift int64 // last wake drift microseconds
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SleepCT) Start(d time.Duration) {
|
func (s *SleepCT) Start(d time.Duration) {
|
||||||
s.resume = make(chan bool, 1)
|
s.resume = make(chan bool, 2)
|
||||||
if s.t.IsZero() {
|
if s.t.IsZero() {
|
||||||
s.d = d
|
s.d = d
|
||||||
s.t = time.Now()
|
s.t = time.Now()
|
||||||
@ -29,31 +32,43 @@ func (s *SleepCT) Start(d time.Duration) {
|
|||||||
// If pause it set to true will sleep the duration and wait to be notified.
|
// If pause it set to true will sleep the duration and wait to be notified.
|
||||||
// The notification channel will be cleared when the thread wakes.
|
// The notification channel will be cleared when the thread wakes.
|
||||||
// SleepNextTarget should not be call more than once concurrently.
|
// SleepNextTarget should not be call more than once concurrently.
|
||||||
func (s *SleepCT) SleepNextTarget(pause bool) int64 {
|
func (s *SleepCT) SleepNextTarget(ctx context.Context, pause bool) int64 {
|
||||||
var last time.Time
|
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
// if target is zero safety net
|
||||||
if s.t.IsZero() {
|
if s.t.IsZero() {
|
||||||
fmt.Println("SleepCT reset")
|
fmt.Println("SleepCT reset")
|
||||||
last = now.Add(-s.d)
|
s.t = now.Add(-s.d)
|
||||||
} else {
|
|
||||||
last = s.t
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sleep to Next Target
|
// Sleep to Next Target
|
||||||
s.t = last.Add(s.d)
|
s.t = s.t.Add(s.d)
|
||||||
|
|
||||||
|
// Compute the desired sleep time to reach the target
|
||||||
d := time.Until(s.t)
|
d := time.Until(s.t)
|
||||||
|
|
||||||
|
// Sleep
|
||||||
time.Sleep(d)
|
time.Sleep(d)
|
||||||
|
|
||||||
|
// record the wake time
|
||||||
|
s.wake = time.Now()
|
||||||
|
s.drift = s.wake.Sub(s.t).Microseconds()
|
||||||
|
|
||||||
|
// fmt.Println(s.t.UnixMilli(), d.Milliseconds(), wake.UnixMilli(), drift, pause, len(s.resume))
|
||||||
|
|
||||||
|
// external pause control
|
||||||
if pause {
|
if pause {
|
||||||
// wait until resume
|
// don't pause if the notification channel has something
|
||||||
if len(s.resume) == 0 {
|
if len(s.resume) == 0 {
|
||||||
<-s.resume
|
// fmt.Println("pause")
|
||||||
|
select {
|
||||||
|
case <-s.resume:
|
||||||
|
case <-ctx.Done():
|
||||||
|
// fmt.Println("sleepct ctx exit")
|
||||||
|
}
|
||||||
// if we did pause set the last sleep target to now
|
// if we did pause set the last sleep target to now
|
||||||
last = time.Now()
|
s.t = time.Now()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,11 +78,15 @@ func (s *SleepCT) SleepNextTarget(pause bool) int64 {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
return now.Sub(s.t).Microseconds()
|
// return the drift for monitoring purposes
|
||||||
|
return s.drift
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify attempts to resume a paused sleeper.
|
// Notify attempts to resume a paused sleeper.
|
||||||
// It is safe to call notify from other processes and as often as desired.
|
// It is safe to call notify from other processes and as often as desired.
|
||||||
func (s *SleepCT) Notify() {
|
func (s *SleepCT) Notify() {
|
||||||
s.resume <- true
|
select {
|
||||||
|
case s.resume <- true:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
@ -13,7 +14,7 @@ import (
|
|||||||
"github.com/stieneee/tickerct"
|
"github.com/stieneee/tickerct"
|
||||||
)
|
)
|
||||||
|
|
||||||
const testCount int64 = 1000
|
const testCount int64 = 10000
|
||||||
const maxSleepInterval time.Duration = 15 * time.Millisecond
|
const maxSleepInterval time.Duration = 15 * time.Millisecond
|
||||||
const tickerInterval time.Duration = 10 * time.Millisecond
|
const tickerInterval time.Duration = 10 * time.Millisecond
|
||||||
const testDuration time.Duration = time.Duration(testCount * 10 * int64(time.Millisecond))
|
const testDuration time.Duration = time.Duration(testCount * 10 * int64(time.Millisecond))
|
||||||
@ -115,7 +116,7 @@ func testSleepCT(wg *sync.WaitGroup) {
|
|||||||
if i+1 < testCount {
|
if i+1 < testCount {
|
||||||
time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64()))
|
time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64()))
|
||||||
}
|
}
|
||||||
s.SleepNextTarget(false)
|
s.SleepNextTarget(context.TODO(), false)
|
||||||
}
|
}
|
||||||
fmt.Println("SleepCT (loaded) after", testDuration, "drifts", time.Since(start)-testDuration)
|
fmt.Println("SleepCT (loaded) after", testDuration, "drifts", time.Since(start)-testDuration)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
@ -144,7 +145,7 @@ func testSleepCTPause(wg *sync.WaitGroup) {
|
|||||||
time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64()))
|
time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64()))
|
||||||
}
|
}
|
||||||
s.Notify()
|
s.Notify()
|
||||||
s.SleepNextTarget(true)
|
s.SleepNextTarget(context.TODO(), true)
|
||||||
}
|
}
|
||||||
fmt.Println("SleepCT Pause (loaded) after", testDuration, "drifts", time.Since(start)-testDuration)
|
fmt.Println("SleepCT Pause (loaded) after", testDuration, "drifts", time.Since(start)-testDuration)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
Loading…
Reference in New Issue
Block a user