more cleanup on manual linkage, attempts to get discord->mumble working

This commit is contained in:
stryan 2020-12-29 18:19:44 -05:00
parent 2df391f0d9
commit ca1ba1d099
5 changed files with 73 additions and 13 deletions

View File

@ -22,6 +22,10 @@ var discordMutex sync.Mutex
var discordMixerMutex sync.Mutex var discordMixerMutex sync.Mutex
var fromDiscordMap = make(map[uint32]fromDiscord) var fromDiscordMap = make(map[uint32]fromDiscord)
func DiscordReset() {
fromDiscordMap = make(map[uint32]fromDiscord)
}
// OnError gets called by dgvoice when an error is encountered. // OnError gets called by dgvoice when an error is encountered.
// By default logs to STDERR // By default logs to STDERR
var OnError = func(str string, err error) { var OnError = func(str string, err error) {
@ -56,9 +60,15 @@ func discordSendPCM(v *discordgo.VoiceConnection, pcm <-chan []int16, die chan b
var readyTimeout *time.Timer var readyTimeout *time.Timer
for { for {
select {
case <-die:
log.Println("Killing discordSendPCM")
return
default:
}
<-ticker.C <-ticker.C
if len(pcm) > 1 { if len(pcm) > 1 {
log.Println("looking for speech")
if !streaming { if !streaming {
v.Speaking(true) v.Speaking(true)
streaming = true streaming = true
@ -88,7 +98,7 @@ func discordSendPCM(v *discordgo.VoiceConnection, pcm <-chan []int16, die chan b
lastReady = true lastReady = true
readyTimeout.Stop() readyTimeout.Stop()
} }
log.Println("sending packets to mumble")
v.OpusSend <- opus v.OpusSend <- opus
} else { } else {
if streaming { if streaming {
@ -108,10 +118,19 @@ func discordReceivePCM(v *discordgo.VoiceConnection, die chan bool) {
var readyTimeout *time.Timer var readyTimeout *time.Timer
for { for {
log.Println("Start loop")
select {
case <-die:
log.Println("killing discord ReceivePCM")
return
default:
}
log.Println("checked for death")
if v.Ready == false || v.OpusRecv == nil { if v.Ready == false || v.OpusRecv == nil {
if lastReady == true { if lastReady == true {
OnError(fmt.Sprintf("Discordgo not to receive opus packets. %+v : %+v", v.Ready, v.OpusSend), nil) OnError(fmt.Sprintf("Discordgo not to receive opus packets. %+v : %+v", v.Ready, v.OpusSend), nil)
readyTimeout = time.AfterFunc(30*time.Second, func() { readyTimeout = time.AfterFunc(30*time.Second, func() {
log.Println("set ready timeout")
die <- true die <- true
}) })
lastReady = false lastReady = false
@ -123,13 +142,16 @@ func discordReceivePCM(v *discordgo.VoiceConnection, die chan bool) {
readyTimeout.Stop() readyTimeout.Stop()
} }
log.Println("rec1")
p, ok := <-v.OpusRecv p, ok := <-v.OpusRecv
log.Println("rec2")
if !ok { if !ok {
log.Println("Opus not ok") log.Println("Opus not ok")
continue continue
} }
discordMutex.Lock() discordMutex.Lock()
log.Println("got lock one")
_, ok = fromDiscordMap[p.SSRC] _, ok = fromDiscordMap[p.SSRC]
discordMutex.Unlock() discordMutex.Unlock()
if !ok { if !ok {
@ -147,6 +169,7 @@ func discordReceivePCM(v *discordgo.VoiceConnection, die chan bool) {
} }
discordMutex.Lock() discordMutex.Lock()
log.Println("got lock 2")
p.PCM, err = fromDiscordMap[p.SSRC].decoder.Decode(p.Opus, 960, false) p.PCM, err = fromDiscordMap[p.SSRC].decoder.Decode(p.Opus, 960, false)
discordMutex.Unlock() discordMutex.Unlock()
if err != nil { if err != nil {
@ -155,17 +178,25 @@ func discordReceivePCM(v *discordgo.VoiceConnection, die chan bool) {
} }
discordMutex.Lock() discordMutex.Lock()
log.Println("got lock 3")
fromDiscordMap[p.SSRC].pcm <- p.PCM[0:480] fromDiscordMap[p.SSRC].pcm <- p.PCM[0:480]
fromDiscordMap[p.SSRC].pcm <- p.PCM[480:960] fromDiscordMap[p.SSRC].pcm <- p.PCM[480:960]
discordMutex.Unlock() discordMutex.Unlock()
log.Println("finish loop")
} }
} }
func fromDiscordMixer(toMumble chan<- gumble.AudioBuffer) { func fromDiscordMixer(toMumble chan<- gumble.AudioBuffer, die chan bool) {
ticker := time.NewTicker(10 * time.Millisecond) ticker := time.NewTicker(10 * time.Millisecond)
sendAudio := false sendAudio := false
for { for {
select {
case <-die:
log.Println("killing fromDiscordMixer")
return
default:
}
<-ticker.C <-ticker.C
discordMutex.Lock() discordMutex.Lock()
@ -206,6 +237,7 @@ func fromDiscordMixer(toMumble chan<- gumble.AudioBuffer) {
if sendAudio { if sendAudio {
select { select {
case toMumble <- outBuf: case toMumble <- outBuf:
log.Println("sending to mumble")
default: default:
log.Println("toMumble buffer full. Dropping packet") log.Println("toMumble buffer full. Dropping packet")
} }

View File

@ -71,6 +71,8 @@ func messageCreate(s *discordgo.Session, m *discordgo.MessageCreate) {
log.Printf("Trying to leave GID %v and VID %v\n", g.ID, vs.ChannelID) log.Printf("Trying to leave GID %v and VID %v\n", g.ID, vs.ChannelID)
YBConfig.ActiveConns[vs.ChannelID] <- true YBConfig.ActiveConns[vs.ChannelID] <- true
YBConfig.ActiveConns[vs.ChannelID] = nil YBConfig.ActiveConns[vs.ChannelID] = nil
MumbleReset()
DiscordReset()
return return
} }
} }
@ -97,7 +99,10 @@ func messageCreate(s *discordgo.Session, m *discordgo.MessageCreate) {
if vs.UserID == m.Author.ID { if vs.UserID == m.Author.ID {
log.Printf("Trying to refresh GID %v and VID %v\n", g.ID, vs.ChannelID) log.Printf("Trying to refresh GID %v and VID %v\n", g.ID, vs.ChannelID)
YBConfig.ActiveConns[vs.ChannelID] <- true YBConfig.ActiveConns[vs.ChannelID] <- true
time.Sleep(2 * time.Second) MumbleReset()
DiscordReset()
time.Sleep(5 * time.Second)
YBConfig.ActiveConns[vs.ChannelID] = make(chan bool)
go startBridge(s, g.ID, vs.ChannelID, YBConfig.Config, YBConfig.MumbleAddr, YBConfig.MumbleInsecure, YBConfig.ActiveConns[vs.ChannelID]) go startBridge(s, g.ID, vs.ChannelID, YBConfig.Config, YBConfig.MumbleAddr, YBConfig.MumbleInsecure, YBConfig.ActiveConns[vs.ChannelID])
return return
} }

View File

@ -61,7 +61,7 @@ func main() {
// Open Websocket // Open Websocket
discord.LogLevel = 2 discord.LogLevel = 2
discord.StateEnabled = true discord.StateEnabled = true
discord.Identify.Intents = discordgo.MakeIntent(discordgo.IntentsGuilds | discordgo.IntentsGuildMessages | discordgo.IntentsGuildVoiceStates) discord.Identify.Intents = discordgo.MakeIntent(discordgo.IntentsAllWithoutPrivileged)
// register handlers // register handlers
discord.AddHandler(ready) discord.AddHandler(ready)
discord.AddHandler(messageCreate) discord.AddHandler(messageCreate)

View File

@ -14,7 +14,14 @@ var fromMumbleArr []chan gumble.AudioBuffer
var mumbleStreamingArr []bool var mumbleStreamingArr []bool
// MumbleDuplex - listenera and outgoing // MumbleDuplex - listenera and outgoing
type MumbleDuplex struct{} type MumbleDuplex struct {
Close chan bool
}
func MumbleReset() {
fromMumbleArr = []chan gumble.AudioBuffer{}
mumbleStreamingArr = []bool{}
}
// OnAudioStream - Spawn routines to handle incoming packets // OnAudioStream - Spawn routines to handle incoming packets
func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) { func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
@ -27,10 +34,14 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
mumbleStreamingArr = append(mumbleStreamingArr, false) mumbleStreamingArr = append(mumbleStreamingArr, false)
mutex.Unlock() mutex.Unlock()
go func() { go func(die chan bool) {
log.Println("new mumble audio stream", e.User.Name) log.Println("new mumble audio stream", e.User.Name)
for { for {
select { select {
default:
case <-die:
log.Println("Removing mumble audio stream")
return
case p := <-e.C: case p := <-e.C:
// log.Println("audio packet", p.Sender.Name, len(p.AudioBuffer)) // log.Println("audio packet", p.Sender.Name, len(p.AudioBuffer))
@ -40,15 +51,21 @@ func (m MumbleDuplex) OnAudioStream(e *gumble.AudioStreamEvent) {
} }
} }
} }
}() }(m.Close)
return return
} }
func (m MumbleDuplex) fromMumbleMixer(toDiscord chan []int16) { func (m MumbleDuplex) fromMumbleMixer(toDiscord chan []int16, die chan bool) {
ticker := time.NewTicker(10 * time.Millisecond) ticker := time.NewTicker(10 * time.Millisecond)
sendAudio := false sendAudio := false
for { for {
select {
case <-die:
log.Println("Killing fromMumbleMixer")
return
default:
}
<-ticker.C <-ticker.C
mutex.Lock() mutex.Lock()

View File

@ -27,7 +27,9 @@ func startBridge(discord *discordgo.Session, discordGID string, discordCID strin
// MUMBLE Setup // MUMBLE Setup
m := MumbleDuplex{} m := MumbleDuplex{
Close: make(chan bool),
}
var tlsConfig tls.Config var tlsConfig tls.Config
if mumbleInsecure { if mumbleInsecure {
@ -51,11 +53,11 @@ func startBridge(discord *discordgo.Session, discordGID string, discordCID strin
// Start Passing Between // Start Passing Between
// Mumble // Mumble
go m.fromMumbleMixer(toDiscord) go m.fromMumbleMixer(toDiscord, die)
config.AudioListeners.Attach(m) det := config.AudioListeners.Attach(m)
//Discord //Discord
go discordReceivePCM(dgv, die) go discordReceivePCM(dgv, die)
go fromDiscordMixer(toMumble) go fromDiscordMixer(toMumble, die)
go discordSendPCM(dgv, toDiscord, die) go discordSendPCM(dgv, toDiscord, die)
c := make(chan os.Signal) c := make(chan os.Signal)
signal.Notify(c, os.Interrupt) signal.Notify(c, os.Interrupt)
@ -67,6 +69,7 @@ func startBridge(discord *discordgo.Session, discordGID string, discordCID strin
if mumble.State() != 2 { if mumble.State() != 2 {
log.Println("Lost mumble connection " + strconv.Itoa(int(mumble.State()))) log.Println("Lost mumble connection " + strconv.Itoa(int(mumble.State())))
die <- true die <- true
m.Close <- true
} }
} }
}() }()
@ -76,7 +79,10 @@ func startBridge(discord *discordgo.Session, discordGID string, discordCID strin
log.Printf("\nGot %s signal. Terminating Mumble-Bridge\n", sig) log.Printf("\nGot %s signal. Terminating Mumble-Bridge\n", sig)
case <-die: case <-die:
log.Println("\nGot internal die request. Terminating Mumble-Bridge") log.Println("\nGot internal die request. Terminating Mumble-Bridge")
close(toMumble)
dgv.Disconnect() dgv.Disconnect()
log.Println("Closing mumble threads")
det.Detach()
} }
} }