diff --git a/.gitignore b/.gitignore index 87b6e38..c5efcd4 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,7 @@ main mumble-discord-bridge dist bridge -.prof +*.prof +*.out +*.test cert.pem \ No newline at end of file diff --git a/Makefile b/Makefile index 5d9d6cd..e4a2448 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -GOFILES=main.go mumble.go discord.go bridge.go config.go mumble-handlers.go discord-handlers.go +GOFILES=main.go mumble.go discord.go bridge.go config.go mumble-handlers.go discord-handlers.go tickerct.go mumble-discord-bridge: $(GOFILES) goreleaser build --skip-validate --rm-dist @@ -7,7 +7,7 @@ dev: $(GOFILES) goreleaser build --skip-validate --rm-dist && sudo ./dist/mumble-discord-bridge_linux_amd64/mumble-discord-bridge dev-race: $(GOFILES) - go run -race *.go + go run -race $(GOFILES) dev-profile: $(GOFILES) goreleaser build --skip-validate --rm-dist && sudo ./dist/mumble-discord-bridge_linux_amd64/mumble-discord-bridge -cpuprofile cpu.prof diff --git a/discord.go b/discord.go index f2915e9..7d2f79f 100644 --- a/discord.go +++ b/discord.go @@ -61,7 +61,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup, opusSilence = append(opusSilence, 0x00) } - ticker := time.NewTicker(20 * time.Millisecond) + ticker := NewTickerCT(20 * time.Millisecond) lastReady := true var readyTimeout *time.Timer @@ -235,7 +235,7 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGro } func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGroup, toMumble chan<- gumble.AudioBuffer) { - ticker := time.NewTicker(10 * time.Millisecond) + ticker := NewTickerCT(10 * time.Millisecond) sendAudio := false wg.Add(1) diff --git a/mumble.go b/mumble.go index 1d363d2..1f60548 100644 --- a/mumble.go +++ b/mumble.go @@ -44,8 +44,10 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { } func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, toDiscord chan []int16) { - ticker := time.NewTicker(10 * time.Millisecond) + ticker := NewTickerCT(10 * time.Millisecond) sendAudio := false + bufferWarning := false + wg.Add(1) for { @@ -93,14 +95,22 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, t } if len(toDiscord) > 20 { - log.Println("Debug: Warning Discord buffer size") + if !bufferWarning { + log.Println("Warning: toDiscord buffer size") + bufferWarning = true + } + } else { + if bufferWarning { + log.Println("Resolved: toDiscord buffer size") + bufferWarning = false + } } if sendAudio { select { case toDiscord <- outBuf: default: - log.Println("toDiscord buffer full. Dropping packet") + log.Println("Error: toDiscord buffer full. Dropping packet") } } } diff --git a/tickerct.go b/tickerct.go new file mode 100644 index 0000000..392d89a --- /dev/null +++ b/tickerct.go @@ -0,0 +1,81 @@ +package main + +import ( + "errors" + "fmt" + "sync" + "time" +) + +// A Ticker holds a channel that delivers ``ticks'' of a clock +// at intervals. +type TickerCT struct { + sync.Mutex + C <-chan time.Time // The channel on which the ticks are delivered. + c chan<- time.Time // internal use + r *time.Timer // internal timer + d time.Duration // the set duration + last time.Time // the last time the ticker ticked + stop bool // mark the ticker as stopped +} + +// NewTickerCT returns a new Ticker containing a channel that will send +// the time on the channel after each tick. The period of the ticks is +// specified by the duration argument. The ticker queue ticks. +// The duration d must be greater than zero; if not, NewTickerCT will +// panic. Stop the ticker to release associated resources. +func NewTickerCT(d time.Duration) *TickerCT { + if d <= 0 { + panic(errors.New("non-positive interval for NewTickerCT")) + } + + // Give the channel a large buffer to allow clients to catchup + c := make(chan time.Time, 100) + + t := &TickerCT{ + C: c, + c: c, + d: d, + last: time.Now(), + stop: false, + } + + t.Lock() + t.r = time.AfterFunc(d, func() { t.tick() }) + t.Unlock() + + return t +} + +func (t *TickerCT) tick() { + t.Lock() + if t.stop { + fmt.Println("stopped") + return + } + + now := time.Now() + t.c <- now + + current := t.last.Add(t.d) + target := current.Add(t.d) + + d := target.Sub(now) + + // if d.Microseconds() < 1 { + // d = time.Duration(time.Microsecond) + // } + // delta := now.Sub(current) + // fmt.Println("delta", delta, d) + + t.r.Reset(d) + t.last = current + t.Unlock() +} + +func (t *TickerCT) Stop() { + t.stop = true + if t.r != nil { + t.r.Stop() + } +} diff --git a/tickerct_test.go b/tickerct_test.go new file mode 100644 index 0000000..fd1c318 --- /dev/null +++ b/tickerct_test.go @@ -0,0 +1,72 @@ +package main + +import ( + "fmt" + "math/rand" + "sync" + "testing" + "time" +) + +const testCount int64 = 10000 +const maxSleepInterval time.Duration = 15 * time.Millisecond +const tickerInterval time.Duration = 10 * time.Millisecond +const testDuration time.Duration = time.Duration(testCount * 10 * int64(time.Millisecond)) + +func Testticker(wg *sync.WaitGroup) { + wg.Add(1) + go func(interval time.Duration) { + now := time.Now() + start := now + // start the ticker + t := time.NewTicker(interval) + var i int64 + for i = 0; i < testCount; i++ { + if i+1 < testCount { + time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64())) + } + now = <-t.C + // fmt.Println(now) + } + t.Stop() + fmt.Println("Ticker after", testDuration, "drifts", time.Since(start)-testDuration) + wg.Done() + }(tickerInterval) +} + +func TestTicker(t *testing.T) { + wg := sync.WaitGroup{} + + Testticker(&wg) + + wg.Wait() +} + +func TesttickerCT(wg *sync.WaitGroup) { + wg.Add(1) + go func(interval time.Duration) { + now := time.Now() + start := now + // start the ticker + t := NewTickerCT(interval) + var i int64 + for i = 0; i < testCount; i++ { + if i+1 < testCount { + time.Sleep(time.Duration(float64(maxSleepInterval) * rand.Float64())) + } + now = <-t.C + // fmt.Println(now) + } + t.Stop() + fmt.Println("TickerCT after", testDuration, "drifts", time.Since(start)-testDuration) + wg.Done() + }(tickerInterval) +} + +func TestTickerCT(t *testing.T) { + wg := sync.WaitGroup{} + + TesttickerCT(&wg) + + wg.Wait() +}