2021-04-24 14:36:34 -04:00
package bridge
2020-10-29 02:21:07 -04:00
import (
2021-02-01 16:03:38 -05:00
"context"
2020-10-29 02:21:07 -04:00
"fmt"
2020-11-04 01:12:43 -05:00
"log"
2020-10-30 01:38:01 -04:00
"sync"
2020-10-29 02:21:07 -04:00
"time"
"github.com/bwmarrin/discordgo"
2021-04-24 18:32:54 -04:00
"github.com/stieneee/gopus"
"github.com/stieneee/gumble/gumble"
2021-04-24 14:36:34 -04:00
"github.com/stieneee/mumble-discord-bridge/pkg/sleepct"
2020-10-29 02:21:07 -04:00
)
2020-10-30 01:38:01 -04:00
type fromDiscord struct {
2021-05-13 01:30:40 -04:00
decoder * gopus . Decoder
pcm chan [ ] int16
receiving bool // is used to to track the assumption that we are streaming a continuos stream form discord
streaming bool // The buffer streaming is streaming out
lastSequence uint16
lastTimeStamp uint32
2020-10-30 01:38:01 -04:00
}
2021-01-19 01:06:08 -05:00
// DiscordDuplex Handle discord voice stream
type DiscordDuplex struct {
Bridge * BridgeState
2021-04-07 01:24:17 -04:00
discordMutex sync . Mutex
fromDiscordMap map [ uint32 ] fromDiscord
2021-01-19 01:06:08 -05:00
}
2020-10-30 01:38:01 -04:00
2020-10-29 02:21:07 -04:00
// OnError gets called by dgvoice when an error is encountered.
// By default logs to STDERR
var OnError = func ( str string , err error ) {
prefix := "dgVoice: " + str
if err != nil {
2020-11-04 01:12:43 -05:00
log . Println ( prefix + ": " + err . Error ( ) )
2020-10-29 02:21:07 -04:00
} else {
2020-11-04 01:12:43 -05:00
log . Println ( prefix )
2020-10-29 02:21:07 -04:00
}
}
// SendPCM will receive on the provied channel encode
// received PCM data into Opus then send that to Discordgo
2021-02-01 16:03:38 -05:00
func ( dd * DiscordDuplex ) discordSendPCM ( ctx context . Context , wg * sync . WaitGroup , cancel context . CancelFunc , pcm <- chan [ ] int16 ) {
2020-10-29 02:21:07 -04:00
const channels int = 1
const frameRate int = 48000 // audio sampling rate
const frameSize int = 960 // uint16 size of each audio frame
const maxBytes int = ( frameSize * 2 ) * 2 // max size of opus data
streaming := false
opusEncoder , err := gopus . NewEncoder ( frameRate , channels , gopus . Audio )
if err != nil {
OnError ( "NewEncoder Error" , err )
2020-11-04 01:12:43 -05:00
panic ( err )
2020-10-29 02:21:07 -04:00
}
2021-04-10 15:00:13 -04:00
// Generate Opus Silence Frame
opusSilence := [ ] byte { 0xf8 , 0xff , 0xfe }
2021-04-24 14:36:34 -04:00
sleepTick := sleepct . SleepCT { }
sleepTick . Start ( 20 * time . Millisecond )
2020-10-29 02:21:07 -04:00
2020-11-11 22:57:51 -05:00
lastReady := true
var readyTimeout * time . Timer
2021-04-08 01:14:24 -04:00
var speakingStart time . Time
2020-11-11 22:57:51 -05:00
2021-02-01 16:03:38 -05:00
wg . Add ( 1 )
2021-04-10 15:00:13 -04:00
internalSend := func ( opus [ ] byte ) {
dd . Bridge . DiscordVoice . RWMutex . RLock ( )
if ! dd . Bridge . DiscordVoice . Ready || dd . Bridge . DiscordVoice . OpusSend == nil {
if lastReady {
OnError ( fmt . Sprintf ( "Discordgo not ready for opus packets. %+v : %+v" , dd . Bridge . DiscordVoice . Ready , dd . Bridge . DiscordVoice . OpusSend ) , nil )
readyTimeout = time . AfterFunc ( 30 * time . Second , func ( ) {
2021-04-24 14:36:34 -04:00
log . Println ( "Debug: Set ready timeout" )
2021-04-10 15:00:13 -04:00
cancel ( )
} )
lastReady = false
}
} else if ! lastReady {
fmt . Println ( "Discordgo ready to send opus packets" )
lastReady = true
readyTimeout . Stop ( )
} else {
dd . Bridge . DiscordVoice . OpusSend <- opus
2021-08-23 00:00:39 -04:00
promDiscordSentPackets . Inc ( )
2021-04-10 15:00:13 -04:00
}
dd . Bridge . DiscordVoice . RWMutex . RUnlock ( )
}
2020-10-29 02:21:07 -04:00
for {
2020-12-29 18:19:44 -05:00
select {
2021-02-01 16:03:38 -05:00
case <- ctx . Done ( ) :
wg . Done ( )
2020-12-29 18:19:44 -05:00
return
default :
}
2021-04-18 00:30:27 -04:00
2021-08-23 00:00:39 -04:00
promTimerDiscordSend . Observe ( float64 ( sleepTick . SleepNextTarget ( ) ) )
2021-04-18 00:30:27 -04:00
2021-04-10 15:00:13 -04:00
if ( len ( pcm ) > 1 && streaming ) || ( len ( pcm ) > dd . Bridge . BridgeConfig . DiscordStartStreamingCount && ! streaming ) {
2020-10-29 02:21:07 -04:00
if ! streaming {
2021-04-08 01:14:24 -04:00
speakingStart = time . Now ( )
2021-01-19 01:06:08 -05:00
dd . Bridge . DiscordVoice . Speaking ( true )
2020-10-29 02:21:07 -04:00
streaming = true
}
r1 := <- pcm
r2 := <- pcm
// try encoding pcm frame with Opus
opus , err := opusEncoder . Encode ( append ( r1 , r2 ... ) , frameSize , maxBytes )
if err != nil {
OnError ( "Encoding Error" , err )
2020-11-04 01:12:43 -05:00
continue
2020-10-29 02:21:07 -04:00
}
2021-04-10 15:00:13 -04:00
internalSend ( opus )
2021-04-07 01:24:17 -04:00
2020-10-29 02:21:07 -04:00
} else {
if streaming {
2021-04-10 15:00:13 -04:00
// Check to see if there is a short speaking cycle.
// It is possible that short speaking cycle is the result of a short input to mumble (Not a problem). ie a quick tap of push to talk button.
// Or when timing delays are introduced via network, hardware or kernel delays (Problem).
// The problem delays result in choppy or stuttering sounds, especially when the silence frames are introduced into the opus frames below.
// Multiple short cycle delays can result in a Discrod rate limiter being trigger due to of multiple JSON speaking/not-speaking state changes
2021-05-13 02:16:50 -04:00
if time . Since ( speakingStart ) . Milliseconds ( ) < 50 {
2021-04-19 23:25:45 -04:00
log . Println ( "Warning: Short Mumble to Discord speaking cycle. Consider increaseing the size of the to Discord jitter buffer." )
2021-04-10 15:00:13 -04:00
}
// Send silence as suggested by Discord Documentation.
// We want to do this after alerting the user of possible short speaking cycles
for i := 0 ; i < 5 ; i ++ {
internalSend ( opusSilence )
2021-08-23 00:00:39 -04:00
promTimerDiscordSend . Observe ( float64 ( sleepTick . SleepNextTarget ( ) ) )
2021-04-10 15:00:13 -04:00
}
2021-01-19 01:06:08 -05:00
dd . Bridge . DiscordVoice . Speaking ( false )
2020-10-29 02:21:07 -04:00
streaming = false
}
}
}
}
// ReceivePCM will receive on the the Discordgo OpusRecv channel and decode
// the opus audio into PCM then send it on the provided channel.
2021-02-01 16:03:38 -05:00
func ( dd * DiscordDuplex ) discordReceivePCM ( ctx context . Context , wg * sync . WaitGroup , cancel context . CancelFunc ) {
2020-10-29 02:21:07 -04:00
var err error
2020-11-11 22:57:51 -05:00
lastReady := true
var readyTimeout * time . Timer
2021-05-13 02:16:50 -04:00
var zeros [ 480 ] int16
for i := 0 ; i < 480 ; i ++ {
zeros [ i ] = 0
}
2021-02-01 16:03:38 -05:00
wg . Add ( 1 )
2020-10-29 02:21:07 -04:00
for {
2021-04-07 01:24:17 -04:00
dd . Bridge . DiscordVoice . RWMutex . RLock ( )
2021-04-06 22:34:38 -04:00
if ! dd . Bridge . DiscordVoice . Ready || dd . Bridge . DiscordVoice . OpusRecv == nil {
if lastReady {
2021-01-19 01:06:08 -05:00
OnError ( fmt . Sprintf ( "Discordgo not to receive opus packets. %+v : %+v" , dd . Bridge . DiscordVoice . Ready , dd . Bridge . DiscordVoice . OpusSend ) , nil )
2020-11-11 22:57:51 -05:00
readyTimeout = time . AfterFunc ( 30 * time . Second , func ( ) {
2021-04-24 14:36:34 -04:00
log . Println ( "Debug: Set ready timeout" )
2021-02-01 16:03:38 -05:00
cancel ( )
2020-11-11 22:57:51 -05:00
} )
lastReady = false
}
2020-11-04 01:12:43 -05:00
continue
2021-04-06 22:34:38 -04:00
} else if ! lastReady {
2020-11-11 22:57:51 -05:00
fmt . Println ( "Discordgo ready to receive packets" )
lastReady = true
readyTimeout . Stop ( )
2020-10-29 02:21:07 -04:00
}
2021-04-07 01:24:17 -04:00
dd . Bridge . DiscordVoice . RWMutex . RUnlock ( )
2021-01-19 01:06:08 -05:00
2021-01-03 16:19:49 -05:00
var ok bool
var p * discordgo . Packet
2021-01-19 01:06:08 -05:00
2021-01-03 16:19:49 -05:00
select {
2021-02-01 16:03:38 -05:00
case <- ctx . Done ( ) :
wg . Done ( )
2021-01-03 16:19:49 -05:00
return
2021-02-01 16:03:38 -05:00
case p , ok = <- dd . Bridge . DiscordVoice . OpusRecv :
2021-01-03 16:19:49 -05:00
}
2021-01-19 01:06:08 -05:00
2020-10-29 02:21:07 -04:00
if ! ok {
2020-11-04 01:12:43 -05:00
log . Println ( "Opus not ok" )
continue
2020-10-29 02:21:07 -04:00
}
2021-01-19 01:06:08 -05:00
dd . discordMutex . Lock ( )
2021-05-13 01:30:40 -04:00
2021-01-19 01:06:08 -05:00
_ , ok = dd . fromDiscordMap [ p . SSRC ]
2020-10-29 02:21:07 -04:00
if ! ok {
2020-10-30 01:38:01 -04:00
newStream := fromDiscord { }
newStream . pcm = make ( chan [ ] int16 , 100 )
2021-05-13 01:30:40 -04:00
newStream . receiving = false
2020-10-30 01:38:01 -04:00
newStream . streaming = false
2021-05-13 01:30:40 -04:00
newStream . decoder , err = gopus . NewDecoder ( 48000 , 1 ) // Decode into mono
2020-10-29 02:21:07 -04:00
if err != nil {
OnError ( "error creating opus decoder" , err )
2021-05-13 01:30:40 -04:00
dd . discordMutex . Unlock ( )
2020-10-29 02:21:07 -04:00
continue
}
2021-05-13 01:30:40 -04:00
2021-01-19 01:06:08 -05:00
dd . fromDiscordMap [ p . SSRC ] = newStream
2020-10-29 02:21:07 -04:00
}
2021-05-13 01:30:40 -04:00
s := dd . fromDiscordMap [ p . SSRC ]
deltaT := int ( p . Timestamp - s . lastTimeStamp )
if p . Sequence - s . lastSequence != 1 {
s . decoder . ResetState ( )
}
// oldReceiving := s.receiving
2021-05-13 02:16:50 -04:00
if ! s . receiving || deltaT < 1 || deltaT > 960 * 10 {
2021-05-13 01:30:40 -04:00
// First packet assume deltaT
2021-05-13 02:16:50 -04:00
// fmt.Println("replacing", deltaT, 960)
2021-05-13 01:30:40 -04:00
deltaT = 960
s . receiving = true
}
s . lastTimeStamp = p . Timestamp
s . lastSequence = p . Sequence
dd . fromDiscordMap [ p . SSRC ] = s
2021-01-19 01:06:08 -05:00
dd . discordMutex . Unlock ( )
2021-05-13 01:30:40 -04:00
2021-05-13 02:16:50 -04:00
p . PCM , err = s . decoder . Decode ( p . Opus , deltaT * 2 , false )
2020-10-29 02:21:07 -04:00
if err != nil {
OnError ( "Error decoding opus data" , err )
continue
}
2021-05-13 01:30:40 -04:00
// fmt.Println(p.SSRC, p.Type, deltaT, p.Sequence, p.Sequence-s.lastSequence, oldReceiving, s.streaming, len(p.Opus), len(p.PCM))
2021-08-23 00:00:39 -04:00
promDiscordReceivedPackets . Inc ( )
2021-05-13 01:30:40 -04:00
// Push data into pcm channel in 10ms chunks of mono pcm data
2021-01-19 01:06:08 -05:00
dd . discordMutex . Lock ( )
2021-05-13 02:16:50 -04:00
for l := 0 ; l < len ( p . PCM ) ; l = l + 480 {
2021-05-13 01:30:40 -04:00
var next [ ] int16
u := l + 480
2021-05-13 02:16:50 -04:00
2021-05-13 01:30:40 -04:00
next = p . PCM [ l : u ]
select {
case dd . fromDiscordMap [ p . SSRC ] . pcm <- next :
default :
log . Println ( "From Discord buffer full. Dropping packet" )
}
2021-01-03 15:32:59 -05:00
}
2021-01-19 01:06:08 -05:00
dd . discordMutex . Unlock ( )
2020-10-30 01:38:01 -04:00
}
}
2021-02-01 16:03:38 -05:00
func ( dd * DiscordDuplex ) fromDiscordMixer ( ctx context . Context , wg * sync . WaitGroup , toMumble chan <- gumble . AudioBuffer ) {
2021-04-19 23:25:45 -04:00
mumbleSilence := gumble . AudioBuffer { }
for i := 3 ; i < 480 ; i ++ {
mumbleSilence = append ( mumbleSilence , 0x00 )
}
var speakingStart time . Time
2021-04-24 14:36:34 -04:00
sleepTick := sleepct . SleepCT { }
sleepTick . Start ( 10 * time . Millisecond )
2020-10-30 01:38:01 -04:00
sendAudio := false
2021-04-19 23:25:45 -04:00
toMumbleStreaming := false
2021-02-01 16:03:38 -05:00
wg . Add ( 1 )
2020-10-30 01:38:01 -04:00
for {
2020-12-29 18:19:44 -05:00
select {
2021-02-01 16:03:38 -05:00
case <- ctx . Done ( ) :
wg . Done ( )
2020-12-29 18:19:44 -05:00
return
2021-04-18 00:30:27 -04:00
default :
2020-12-29 18:19:44 -05:00
}
2021-01-19 01:06:08 -05:00
2021-08-23 00:00:39 -04:00
promTimerDiscordMixer . Observe ( float64 ( sleepTick . SleepNextTarget ( ) ) )
2021-04-18 00:30:27 -04:00
2021-01-19 01:06:08 -05:00
dd . discordMutex . Lock ( )
2020-10-30 01:38:01 -04:00
sendAudio = false
internalMixerArr := make ( [ ] [ ] int16 , 0 )
2021-08-23 00:00:39 -04:00
streamingCount := 0
2020-10-30 01:38:01 -04:00
// Work through each channel
2021-01-19 01:06:08 -05:00
for i := range dd . fromDiscordMap {
2021-04-19 23:25:45 -04:00
bufferLength := len ( dd . fromDiscordMap [ i ] . pcm )
isStreaming := dd . fromDiscordMap [ i ] . streaming
2021-04-24 14:36:34 -04:00
if ( bufferLength > 0 && isStreaming ) || ( bufferLength > dd . Bridge . BridgeConfig . MumbleStartStreamCount && ! isStreaming ) {
2021-04-19 23:25:45 -04:00
if ! toMumbleStreaming {
speakingStart = time . Now ( )
toMumbleStreaming = true
}
2020-10-30 01:38:01 -04:00
sendAudio = true
2021-04-19 23:25:45 -04:00
if ! isStreaming {
2021-01-19 01:06:08 -05:00
x := dd . fromDiscordMap [ i ]
2020-10-30 01:38:01 -04:00
x . streaming = true
2021-01-19 01:06:08 -05:00
dd . fromDiscordMap [ i ] = x
2020-10-30 01:38:01 -04:00
}
2021-08-23 00:00:39 -04:00
streamingCount ++
2021-01-19 01:06:08 -05:00
x1 := ( <- dd . fromDiscordMap [ i ] . pcm )
2020-10-30 01:38:01 -04:00
internalMixerArr = append ( internalMixerArr , x1 )
} else {
2021-04-06 22:34:38 -04:00
if dd . fromDiscordMap [ i ] . streaming {
2021-01-19 01:06:08 -05:00
x := dd . fromDiscordMap [ i ]
2020-10-30 01:38:01 -04:00
x . streaming = false
2021-05-13 01:30:40 -04:00
x . receiving = false // toggle this here is not optimal but there is no better location atm.
2021-01-19 01:06:08 -05:00
dd . fromDiscordMap [ i ] = x
2020-10-30 01:38:01 -04:00
}
}
}
2021-08-23 00:00:39 -04:00
promDiscordArraySize . Set ( float64 ( len ( dd . fromDiscordMap ) ) )
promDiscordStreaming . Set ( float64 ( streamingCount ) )
2021-01-19 01:06:08 -05:00
dd . discordMutex . Unlock ( )
2020-10-30 01:38:01 -04:00
2021-04-19 23:25:45 -04:00
mumbleTimeoutSend := func ( outBuf [ ] int16 ) {
timeout := make ( chan bool , 1 )
go func ( ) {
time . Sleep ( 10 * time . Millisecond )
timeout <- true
} ( )
2020-10-30 01:38:01 -04:00
2021-04-19 23:25:45 -04:00
select {
case toMumble <- outBuf :
2021-08-23 00:00:39 -04:00
promSentMumblePackets . Inc ( )
2021-04-19 23:25:45 -04:00
case <- timeout :
2021-04-24 14:36:34 -04:00
log . Println ( "To Mumble timeout. Dropping packet" )
2021-08-23 00:00:39 -04:00
promToMumbleDropped . Inc ( )
2020-10-30 01:38:01 -04:00
}
}
if sendAudio {
2021-04-19 23:25:45 -04:00
// Regular send mixed audio
outBuf := make ( [ ] int16 , 480 )
2021-05-13 01:30:40 -04:00
for j := 0 ; j < len ( internalMixerArr ) ; j ++ {
for i := 0 ; i < len ( internalMixerArr [ j ] ) ; i ++ {
2021-04-19 23:25:45 -04:00
outBuf [ i ] += ( internalMixerArr [ j ] ) [ i ]
}
}
mumbleTimeoutSend ( outBuf )
} else if ! sendAudio && toMumbleStreaming {
// Send opus silence to mumble
// See note above about jitter buffer warning
2021-05-13 02:16:50 -04:00
if time . Since ( speakingStart ) . Milliseconds ( ) < 50 {
log . Println ( "Warning: Short Discord to Mumble speaking cycle. Consider increaseing the size of the to Mumble jitter buffer." , time . Since ( speakingStart ) . Milliseconds ( ) )
2021-04-19 23:25:45 -04:00
}
for i := 0 ; i < 5 ; i ++ {
mumbleTimeoutSend ( mumbleSilence )
2021-08-23 00:00:39 -04:00
promTimerDiscordMixer . Observe ( float64 ( sleepTick . SleepNextTarget ( ) ) )
2020-11-04 01:12:43 -05:00
}
2021-04-19 23:25:45 -04:00
toMumbleStreaming = false
2020-10-30 01:38:01 -04:00
}
2020-10-29 02:21:07 -04:00
}
}