refactor wait group

added debug message for bridge cancel
This commit is contained in:
Tyler Stiene 2021-11-17 23:58:54 -05:00
parent 0f57c5d33a
commit 8ca66fb500
3 changed files with 29 additions and 20 deletions

View File

@ -183,7 +183,11 @@ func (b *BridgeState) StartBridge() {
// Start Passing Between // Start Passing Between
// From Mumble // From Mumble
go b.MumbleStream.fromMumbleMixer(ctx, &wg, cancel, toDiscord) wg.Add(1)
go func() {
defer wg.Done()
b.MumbleStream.fromMumbleMixer(ctx, cancel, toDiscord)
}()
// From Discord // From Discord
b.DiscordStream = &DiscordDuplex{ b.DiscordStream = &DiscordDuplex{
@ -191,15 +195,28 @@ func (b *BridgeState) StartBridge() {
fromDiscordMap: make(map[uint32]fromDiscord), fromDiscordMap: make(map[uint32]fromDiscord),
} }
go b.DiscordStream.discordReceivePCM(ctx, &wg, cancel) wg.Add(1)
go b.DiscordStream.fromDiscordMixer(ctx, &wg, toMumble) go func() {
defer wg.Done()
b.DiscordStream.discordReceivePCM(ctx, cancel)
}()
wg.Add(1)
go func() {
defer wg.Done()
b.DiscordStream.fromDiscordMixer(ctx, toMumble)
}()
// To Discord // To Discord
go b.DiscordStream.discordSendPCM(ctx, &wg, cancel, toDiscord) wg.Add(1)
go func() {
defer wg.Done()
b.DiscordStream.discordSendPCM(ctx, cancel, toDiscord)
}()
// Monitor Mumble // Monitor Mumble
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done()
ticker := time.NewTicker(500 * time.Millisecond) ticker := time.NewTicker(500 * time.Millisecond)
for { for {
select { select {
@ -213,7 +230,6 @@ func (b *BridgeState) StartBridge() {
cancel() cancel()
} }
case <-ctx.Done(): case <-ctx.Done():
wg.Done()
return return
} }
} }

View File

@ -47,7 +47,7 @@ var OnError = func(str string, err error) {
// SendPCM will receive on the provied channel encode // SendPCM will receive on the provied channel encode
// received PCM data into Opus then send that to Discordgo // received PCM data into Opus then send that to Discordgo
func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc, pcm <-chan []int16) { func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, cancel context.CancelFunc, pcm <-chan []int16) {
const channels int = 1 const channels int = 1
const frameRate int = 48000 // audio sampling rate const frameRate int = 48000 // audio sampling rate
const frameSize int = 960 // uint16 size of each audio frame const frameSize int = 960 // uint16 size of each audio frame
@ -70,8 +70,6 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup,
var readyTimeout *time.Timer var readyTimeout *time.Timer
var speakingStart time.Time var speakingStart time.Time
wg.Add(1)
internalSend := func(opus []byte) { internalSend := func(opus []byte) {
dd.Bridge.DiscordVoice.RWMutex.RLock() dd.Bridge.DiscordVoice.RWMutex.RLock()
if !dd.Bridge.DiscordVoice.Ready || dd.Bridge.DiscordVoice.OpusSend == nil { if !dd.Bridge.DiscordVoice.Ready || dd.Bridge.DiscordVoice.OpusSend == nil {
@ -97,7 +95,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup,
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
wg.Done() log.Println("Stopping Discord send PCM")
return return
default: default:
} }
@ -151,7 +149,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup,
// ReceivePCM will receive on the the Discordgo OpusRecv channel and decode // ReceivePCM will receive on the the Discordgo OpusRecv channel and decode
// the opus audio into PCM then send it on the provided channel. // the opus audio into PCM then send it on the provided channel.
func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc) { func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, cancel context.CancelFunc) {
var err error var err error
lastReady := true lastReady := true
@ -162,8 +160,6 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGro
zeros[i] = 0 zeros[i] = 0
} }
wg.Add(1)
for { for {
dd.Bridge.DiscordVoice.RWMutex.RLock() dd.Bridge.DiscordVoice.RWMutex.RLock()
if !dd.Bridge.DiscordVoice.Ready || dd.Bridge.DiscordVoice.OpusRecv == nil { if !dd.Bridge.DiscordVoice.Ready || dd.Bridge.DiscordVoice.OpusRecv == nil {
@ -188,7 +184,7 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGro
select { select {
case <-ctx.Done(): case <-ctx.Done():
wg.Done() log.Println("Stopping Discord receive PCM")
return return
case p, ok = <-dd.Bridge.DiscordVoice.OpusRecv: case p, ok = <-dd.Bridge.DiscordVoice.OpusRecv:
} }
@ -268,7 +264,7 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGro
} }
} }
func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGroup, toMumble chan<- gumble.AudioBuffer) { func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, toMumble chan<- gumble.AudioBuffer) {
mumbleSilence := gumble.AudioBuffer{} mumbleSilence := gumble.AudioBuffer{}
for i := 3; i < 480; i++ { for i := 3; i < 480; i++ {
mumbleSilence = append(mumbleSilence, 0x00) mumbleSilence = append(mumbleSilence, 0x00)
@ -279,12 +275,11 @@ func (dd *DiscordDuplex) fromDiscordMixer(ctx context.Context, wg *sync.WaitGrou
sendAudio := false sendAudio := false
toMumbleStreaming := false toMumbleStreaming := false
wg.Add(1)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
wg.Done() log.Println("Stopping from Discord mixer")
return return
default: default:
} }

View File

@ -51,7 +51,7 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
}() }()
} }
func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc, toDiscord chan []int16) { func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, cancel context.CancelFunc, toDiscord chan []int16) {
mumbleSleepTick.Start(10 * time.Millisecond) mumbleSleepTick.Start(10 * time.Millisecond)
sendAudio := false sendAudio := false
@ -60,12 +60,10 @@ func (m MumbleDuplex) fromMumbleMixer(ctx context.Context, wg *sync.WaitGroup, c
droppingPackets := false droppingPackets := false
droppingPacketCount := 0 droppingPacketCount := 0
wg.Add(1)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
wg.Done() log.Println("Stopping From Mumble Mixer")
return return
default: default:
} }