fix race data related to reading bridge state

This commit is contained in:
Tyler Stiene 2021-04-07 01:24:17 -04:00
parent 177553f3a4
commit 480fa533a2
6 changed files with 43 additions and 11 deletions

View File

@ -6,6 +6,9 @@ mumble-discord-bridge: $(GOFILES)
dev: $(GOFILES) dev: $(GOFILES)
goreleaser build --skip-validate --rm-dist && sudo ./dist/mumble-discord-bridge_linux_amd64/mumble-discord-bridge goreleaser build --skip-validate --rm-dist && sudo ./dist/mumble-discord-bridge_linux_amd64/mumble-discord-bridge
dev-race: $(GOFILES)
go run -race *.go
dev-profile: $(GOFILES) dev-profile: $(GOFILES)
goreleaser build --skip-validate --rm-dist && sudo ./dist/mumble-discord-bridge_linux_amd64/mumble-discord-bridge -cpuprofile cpu.prof goreleaser build --skip-validate --rm-dist && sudo ./dist/mumble-discord-bridge_linux_amd64/mumble-discord-bridge -cpuprofile cpu.prof

View File

@ -35,6 +35,9 @@ type BridgeState struct {
// Wait for bridge to exit cleanly // Wait for bridge to exit cleanly
WaitExit *sync.WaitGroup WaitExit *sync.WaitGroup
// Bridge State Mutex
BridgeMutex sync.Mutex
// Bridge connection // Bridge connection
Connected bool Connected bool
@ -189,7 +192,9 @@ func (b *BridgeState) startBridge() {
} }
}() }()
b.BridgeMutex.Lock()
b.Connected = true b.Connected = true
b.BridgeMutex.Unlock()
// Hold until cancelled or external die request // Hold until cancelled or external die request
select { select {
@ -200,7 +205,10 @@ func (b *BridgeState) startBridge() {
cancel() cancel()
} }
b.BridgeMutex.Lock()
b.Connected = false b.Connected = false
b.BridgeMutex.Unlock()
wg.Wait() wg.Wait()
log.Println("Terminating Bridge") log.Println("Terminating Bridge")
b.MumbleUsersMutex.Lock() b.MumbleUsersMutex.Lock()
@ -221,6 +229,7 @@ func (b *BridgeState) discordStatusUpdate() {
b.DiscordSession.UpdateListeningStatus("an error pinging mumble") b.DiscordSession.UpdateListeningStatus("an error pinging mumble")
} else { } else {
b.MumbleUsersMutex.Lock() b.MumbleUsersMutex.Lock()
b.BridgeMutex.Lock()
b.MumbleUserCount = resp.ConnectedUsers b.MumbleUserCount = resp.ConnectedUsers
if b.Connected { if b.Connected {
b.MumbleUserCount = b.MumbleUserCount - 1 b.MumbleUserCount = b.MumbleUserCount - 1
@ -234,6 +243,7 @@ func (b *BridgeState) discordStatusUpdate() {
status = fmt.Sprintf("%v users in Mumble\n", b.MumbleUserCount) status = fmt.Sprintf("%v users in Mumble\n", b.MumbleUserCount)
} }
} }
b.BridgeMutex.Unlock()
b.MumbleUsersMutex.Unlock() b.MumbleUsersMutex.Unlock()
b.DiscordSession.UpdateListeningStatus(status) b.DiscordSession.UpdateListeningStatus(status)
} }
@ -257,6 +267,7 @@ func (b *BridgeState) AutoBridge() {
b.MumbleUsersMutex.Lock() b.MumbleUsersMutex.Lock()
b.DiscordUsersMutex.Lock() b.DiscordUsersMutex.Lock()
b.BridgeMutex.Lock()
if !b.Connected && b.MumbleUserCount > 0 && len(b.DiscordUsers) > 0 { if !b.Connected && b.MumbleUserCount > 0 && len(b.DiscordUsers) > 0 {
log.Println("users detected in mumble and discord, bridging") log.Println("users detected in mumble and discord, bridging")
@ -267,6 +278,7 @@ func (b *BridgeState) AutoBridge() {
b.BridgeDie <- true b.BridgeDie <- true
} }
b.BridgeMutex.Unlock()
b.MumbleUsersMutex.Unlock() b.MumbleUsersMutex.Unlock()
b.DiscordUsersMutex.Unlock() b.DiscordUsersMutex.Unlock()
} }

View File

@ -49,11 +49,13 @@ func (l *DiscordListener) guildCreate(s *discordgo.Session, event *discordgo.Gui
l.Bridge.DiscordUsersMutex.Unlock() l.Bridge.DiscordUsersMutex.Unlock()
// If connected to mumble inform users of Discord users // If connected to mumble inform users of Discord users
l.Bridge.BridgeMutex.Lock()
if l.Bridge.Connected && !l.Bridge.BridgeConfig.MumbleDisableText { if l.Bridge.Connected && !l.Bridge.BridgeConfig.MumbleDisableText {
l.Bridge.MumbleClient.Do(func() { l.Bridge.MumbleClient.Do(func() {
l.Bridge.MumbleClient.Self.Channel.Send(fmt.Sprintf("%v has joined Discord\n", u.Username), false) l.Bridge.MumbleClient.Self.Channel.Send(fmt.Sprintf("%v has joined Discord\n", u.Username), false)
}) })
} }
l.Bridge.BridgeMutex.Unlock()
} }
} }
@ -85,10 +87,14 @@ func (l *DiscordListener) messageCreate(s *discordgo.Session, m *discordgo.Messa
return return
} }
l.Bridge.BridgeMutex.Lock()
bridgeConnected := l.Bridge.Connected
l.Bridge.BridgeMutex.Unlock()
if strings.HasPrefix(m.Content, prefix+" link") { if strings.HasPrefix(m.Content, prefix+" link") {
// Look for the message sender in that guild's current voice states. // Look for the message sender in that guild's current voice states.
for _, vs := range g.VoiceStates { for _, vs := range g.VoiceStates {
if l.Bridge.Connected { if bridgeConnected {
l.Bridge.DiscordSession.ChannelMessageSend(m.ChannelID, "Bridge already running, unlink first") l.Bridge.DiscordSession.ChannelMessageSend(m.ChannelID, "Bridge already running, unlink first")
return return
} }
@ -103,7 +109,7 @@ func (l *DiscordListener) messageCreate(s *discordgo.Session, m *discordgo.Messa
if strings.HasPrefix(m.Content, prefix+" unlink") { if strings.HasPrefix(m.Content, prefix+" unlink") {
// Look for the message sender in that guild's current voice states. // Look for the message sender in that guild's current voice states.
if !l.Bridge.Connected { if !bridgeConnected {
l.Bridge.DiscordSession.ChannelMessageSend(m.ChannelID, "Bridge is not currently running") l.Bridge.DiscordSession.ChannelMessageSend(m.ChannelID, "Bridge is not currently running")
return return
} }
@ -118,7 +124,7 @@ func (l *DiscordListener) messageCreate(s *discordgo.Session, m *discordgo.Messa
if strings.HasPrefix(m.Content, prefix+" refresh") { if strings.HasPrefix(m.Content, prefix+" refresh") {
// Look for the message sender in that guild's current voice states. // Look for the message sender in that guild's current voice states.
if !l.Bridge.Connected { if !bridgeConnected {
l.Bridge.DiscordSession.ChannelMessageSend(m.ChannelID, "Bridge is not currently running") l.Bridge.DiscordSession.ChannelMessageSend(m.ChannelID, "Bridge is not currently running")
return return
} }
@ -195,11 +201,13 @@ func (l *DiscordListener) voiceUpdate(s *discordgo.Session, event *discordgo.Voi
seen: true, seen: true,
dm: dm, dm: dm,
} }
l.Bridge.BridgeMutex.Lock()
if l.Bridge.Connected && !l.Bridge.BridgeConfig.MumbleDisableText { if l.Bridge.Connected && !l.Bridge.BridgeConfig.MumbleDisableText {
l.Bridge.MumbleClient.Do(func() { l.Bridge.MumbleClient.Do(func() {
l.Bridge.MumbleClient.Self.Channel.Send(fmt.Sprintf("%v has joined Discord\n", u.Username), false) l.Bridge.MumbleClient.Self.Channel.Send(fmt.Sprintf("%v has joined Discord\n", u.Username), false)
}) })
} }
l.Bridge.BridgeMutex.Unlock()
} else { } else {
du := l.Bridge.DiscordUsers[vs.UserID] du := l.Bridge.DiscordUsers[vs.UserID]
du.seen = true du.seen = true
@ -213,11 +221,13 @@ func (l *DiscordListener) voiceUpdate(s *discordgo.Session, event *discordgo.Voi
for id := range l.Bridge.DiscordUsers { for id := range l.Bridge.DiscordUsers {
if !l.Bridge.DiscordUsers[id].seen { if !l.Bridge.DiscordUsers[id].seen {
log.Println("User left Discord channel " + l.Bridge.DiscordUsers[id].username) log.Println("User left Discord channel " + l.Bridge.DiscordUsers[id].username)
l.Bridge.BridgeMutex.Lock()
if l.Bridge.Connected && !l.Bridge.BridgeConfig.MumbleDisableText { if l.Bridge.Connected && !l.Bridge.BridgeConfig.MumbleDisableText {
l.Bridge.MumbleClient.Do(func() { l.Bridge.MumbleClient.Do(func() {
l.Bridge.MumbleClient.Self.Channel.Send(fmt.Sprintf("%v has left Discord channel\n", l.Bridge.DiscordUsers[id].username), false) l.Bridge.MumbleClient.Self.Channel.Send(fmt.Sprintf("%v has left Discord channel\n", l.Bridge.DiscordUsers[id].username), false)
}) })
} }
l.Bridge.BridgeMutex.Unlock()
delete(l.Bridge.DiscordUsers, id) delete(l.Bridge.DiscordUsers, id)
} }
} }

View File

@ -23,9 +23,8 @@ type fromDiscord struct {
type DiscordDuplex struct { type DiscordDuplex struct {
Bridge *BridgeState Bridge *BridgeState
discordMutex sync.Mutex discordMutex sync.Mutex
discordMixerMutex sync.Mutex fromDiscordMap map[uint32]fromDiscord
fromDiscordMap map[uint32]fromDiscord
} }
// OnError gets called by dgvoice when an error is encountered. // OnError gets called by dgvoice when an error is encountered.
@ -87,6 +86,7 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup,
continue continue
} }
dd.Bridge.DiscordVoice.RWMutex.RLock()
if !dd.Bridge.DiscordVoice.Ready || dd.Bridge.DiscordVoice.OpusSend == nil { if !dd.Bridge.DiscordVoice.Ready || dd.Bridge.DiscordVoice.OpusSend == nil {
if lastReady { if lastReady {
OnError(fmt.Sprintf("Discordgo not ready for opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil) OnError(fmt.Sprintf("Discordgo not ready for opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil)
@ -96,13 +96,15 @@ func (dd *DiscordDuplex) discordSendPCM(ctx context.Context, wg *sync.WaitGroup,
}) })
lastReady = false lastReady = false
} }
continue
} else if !lastReady { } else if !lastReady {
fmt.Println("Discordgo ready to send opus packets") fmt.Println("Discordgo ready to send opus packets")
lastReady = true lastReady = true
readyTimeout.Stop() readyTimeout.Stop()
} else {
dd.Bridge.DiscordVoice.OpusSend <- opus
} }
dd.Bridge.DiscordVoice.OpusSend <- opus dd.Bridge.DiscordVoice.RWMutex.RUnlock()
} else { } else {
if streaming { if streaming {
dd.Bridge.DiscordVoice.Speaking(false) dd.Bridge.DiscordVoice.Speaking(false)
@ -123,6 +125,7 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGro
wg.Add(1) wg.Add(1)
for { for {
dd.Bridge.DiscordVoice.RWMutex.RLock()
if !dd.Bridge.DiscordVoice.Ready || dd.Bridge.DiscordVoice.OpusRecv == nil { if !dd.Bridge.DiscordVoice.Ready || dd.Bridge.DiscordVoice.OpusRecv == nil {
if lastReady { if lastReady {
OnError(fmt.Sprintf("Discordgo not to receive opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil) OnError(fmt.Sprintf("Discordgo not to receive opus packets. %+v : %+v", dd.Bridge.DiscordVoice.Ready, dd.Bridge.DiscordVoice.OpusSend), nil)
@ -138,6 +141,7 @@ func (dd *DiscordDuplex) discordReceivePCM(ctx context.Context, wg *sync.WaitGro
lastReady = true lastReady = true
readyTimeout.Stop() readyTimeout.Stop()
} }
dd.Bridge.DiscordVoice.RWMutex.RUnlock()
var ok bool var ok bool
var p *discordgo.Packet var p *discordgo.Packet

View File

@ -40,7 +40,7 @@ func main() {
mumblePassword := flag.String("mumble-password", lookupEnvOrString("MUMBLE_PASSWORD", ""), "MUMBLE_PASSWORD, mumble password, optional") mumblePassword := flag.String("mumble-password", lookupEnvOrString("MUMBLE_PASSWORD", ""), "MUMBLE_PASSWORD, mumble password, optional")
mumbleInsecure := flag.Bool("mumble-insecure", lookupEnvOrBool("MUMBLE_INSECURE", false), " MUMBLE_INSECURE, mumble insecure, optional") mumbleInsecure := flag.Bool("mumble-insecure", lookupEnvOrBool("MUMBLE_INSECURE", false), " MUMBLE_INSECURE, mumble insecure, optional")
mumbleCertificate := flag.String("mumble-certificate", lookupEnvOrString("MUMBLE_CERTIFICATE", ""), "MUMBLE_CERTIFICATE, client certificate to use when connecting to the Mumble server") mumbleCertificate := flag.String("mumble-certificate", lookupEnvOrString("MUMBLE_CERTIFICATE", ""), "MUMBLE_CERTIFICATE, client certificate to use when connecting to the Mumble server")
mumbleChannel := flag.String("mumble-channel", lookupEnvOrString("MUMBLE_CHANNEL", ""), "MUMBLE_CHANNEL, mumble channel to start in, using '/' to seperate nested channels, optional") mumbleChannel := flag.String("mumble-channel", lookupEnvOrString("MUMBLE_CHANNEL", ""), "MUMBLE_CHANNEL, mumble channel to start in, using '/' to separate nested channels, optional")
mumbleDisableText := flag.Bool("mumble-disable-text", lookupEnvOrBool("MUMBLE_DISABLE_TEXT", false), "MUMBLE_DISABLE_TEXT, disable sending text to mumble, (default false)") mumbleDisableText := flag.Bool("mumble-disable-text", lookupEnvOrBool("MUMBLE_DISABLE_TEXT", false), "MUMBLE_DISABLE_TEXT, disable sending text to mumble, (default false)")
discordToken := flag.String("discord-token", lookupEnvOrString("DISCORD_TOKEN", ""), "DISCORD_TOKEN, discord bot token, required") discordToken := flag.String("discord-token", lookupEnvOrString("DISCORD_TOKEN", ""), "DISCORD_TOKEN, discord bot token, required")
discordGID := flag.String("discord-gid", lookupEnvOrString("DISCORD_GID", ""), "DISCORD_GID, discord gid, required") discordGID := flag.String("discord-gid", lookupEnvOrString("DISCORD_GID", ""), "DISCORD_GID, discord gid, required")
@ -197,9 +197,11 @@ func main() {
log.Println("OS Signal. Bot shutting down") log.Println("OS Signal. Bot shutting down")
// Wait or the bridge to exit cleanly // Wait or the bridge to exit cleanly
Bridge.BridgeMutex.Lock()
if Bridge.Connected { if Bridge.Connected {
//TODO BridgeDie occasionally panics on send to closed channel //TODO BridgeDie occasionally panics on send to closed channel
Bridge.BridgeDie <- true Bridge.BridgeDie <- true
Bridge.WaitExit.Wait() Bridge.WaitExit.Wait()
} }
Bridge.BridgeMutex.Unlock()
} }

View File

@ -43,22 +43,23 @@ func (l *MumbleListener) mumbleUserChange(e *gumble.UserChangeEvent) {
e.User.Send("Mumble-Discord-Bridge v" + version) e.User.Send("Mumble-Discord-Bridge v" + version)
// Tell the user who is connected to discord // Tell the user who is connected to discord
l.Bridge.DiscordUsersMutex.Lock()
if len(l.Bridge.DiscordUsers) == 0 { if len(l.Bridge.DiscordUsers) == 0 {
e.User.Send("No users connected to Discord") e.User.Send("No users connected to Discord")
} else { } else {
s := "Connected to Discord: " s := "Connected to Discord: "
arr := []string{} arr := []string{}
l.Bridge.DiscordUsersMutex.Lock()
for u := range l.Bridge.DiscordUsers { for u := range l.Bridge.DiscordUsers {
arr = append(arr, l.Bridge.DiscordUsers[u].username) arr = append(arr, l.Bridge.DiscordUsers[u].username)
} }
s = s + strings.Join(arr[:], ",") s = s + strings.Join(arr[:], ",")
l.Bridge.DiscordUsersMutex.Unlock()
e.User.Send(s) e.User.Send(s)
} }
l.Bridge.DiscordUsersMutex.Unlock()
} }
// Send discord a notice // Send discord a notice