vega/botlib/custom_syncer.go

174 lines
5.4 KiB
Go

package botlib
import (
"encoding/json"
"fmt"
"os"
"runtime/debug"
"time"
"maunium.net/go/mautrix"
. "maunium.net/go/mautrix"
)
// CustomSyncer is the default syncing implementation. You can either write your own syncer, or selectively
// replace parts of this default syncer (e.g. the ProcessResponse method). The default syncer uses the observer
// pattern to notify callers about incoming events. See CustomSyncer.OnEventType for more information.
type CustomSyncer struct {
UserID string
Store Storer
listeners map[EventType][]OnEventListener // event type to listeners array
}
// OnEventListener can be used with CustomSyncer.OnEventType to be informed of incoming events.
type OnEventListener func(*Event)
// NewCustomSyncer returns an instantiated CustomSyncer
func NewCustomSyncer(userID string, store Storer) *CustomSyncer {
return &CustomSyncer{
UserID: userID,
Store: store,
listeners: make(map[EventType][]mautrix.OnEventListener),
}
}
func parseEvent(roomID string, data json.RawMessage) *Event {
event := &Event{}
err := json.Unmarshal(data, event)
if err != nil {
// TODO add separate handler for these
_, _ = fmt.Fprintf(os.Stderr, "Failed to unmarshal event: %v\n%s\n", err, string(data))
return nil
}
if roomID != "" && event.RoomID != roomID {
event.RoomID = roomID
}
return event
}
// ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of
// unrepeating events. Returns a fatal error if a listener panics.
func (s *CustomSyncer) ProcessResponse(res *RespSync, since string) (err error) {
if !s.shouldProcessResponse(res, since) {
return
}
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("ProcessResponse panicked! userID=%s since=%s panic=%s\n%s", s.UserID, since, r, debug.Stack())
}
}()
for roomID, roomData := range res.Rooms.Join {
room := s.getOrCreateRoom(roomID)
for _, data := range roomData.State.Events {
event := parseEvent(roomID, data)
if event != nil {
room.UpdateState(event)
s.notifyListeners(event)
}
}
for _, data := range roomData.Timeline.Events {
event := parseEvent(roomID, data)
if event != nil {
s.notifyListeners(event)
}
}
}
for roomID, roomData := range res.Rooms.Invite {
room := s.getOrCreateRoom(roomID)
for _, data := range roomData.State.Events {
event := parseEvent(roomID, data)
if event != nil {
room.UpdateState(event)
s.notifyListeners(event)
}
}
}
for roomID, roomData := range res.Rooms.Leave {
room := s.getOrCreateRoom(roomID)
for _, data := range roomData.Timeline.Events {
event := parseEvent(roomID, data)
if event.StateKey != nil {
room.UpdateState(event)
s.notifyListeners(event)
}
}
}
return
}
// OnEventType allows callers to be notified when there are new events for the given event type.
// There are no duplicate checks.
func (s *CustomSyncer) OnEventType(eventType EventType, callback mautrix.OnEventListener) {
_, exists := s.listeners[eventType]
if !exists {
s.listeners[eventType] = []mautrix.OnEventListener{}
}
s.listeners[eventType] = append(s.listeners[eventType], callback)
}
// shouldProcessResponse returns true if the response should be processed. May modify the response to remove
// stuff that shouldn't be processed.
func (s *CustomSyncer) shouldProcessResponse(resp *RespSync, since string) bool {
if since == "" {
return false
}
// This is a horrible hack because /sync will return the most recent messages for a room
// as soon as you /join it. We do NOT want to process those events in that particular room
// because they may have already been processed (if you toggle the bot in/out of the room).
//
// Work around this by inspecting each room's timeline and seeing if an m.room.member event for us
// exists and is "join" and then discard processing that room entirely if so.
// TODO: We probably want to process messages from after the last join event in the timeline.
for roomID, roomData := range resp.Rooms.Join {
for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- {
evtData := roomData.Timeline.Events[i]
// TODO this is horribly inefficient since it's also parsed in ProcessResponse
e := parseEvent(roomID, evtData)
if e != nil && e.Type == StateMember && e.GetStateKey() == s.UserID {
if e.Content.Membership == "join" {
_, ok := resp.Rooms.Join[roomID]
if !ok {
continue
}
delete(resp.Rooms.Join, roomID) // don't re-process messages
delete(resp.Rooms.Invite, roomID) // don't re-process invites
break
}
}
}
}
return true
}
// getOrCreateRoom must only be called by the Sync() goroutine which calls ProcessResponse()
func (s *CustomSyncer) getOrCreateRoom(roomID string) *Room {
room := s.Store.LoadRoom(roomID)
if room == nil { // create a new Room
room = NewRoom(roomID)
s.Store.SaveRoom(room)
}
return room
}
func (s *CustomSyncer) notifyListeners(event *Event) {
listeners, exists := s.listeners[event.Type]
if !exists {
return
}
for _, fn := range listeners {
fn(event)
}
}
// OnFailedSync always returns a 10 second wait period between failed /syncs, never a fatal error.
func (s *CustomSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) {
return 10 * time.Second, nil
}
// GetFilterJSON returns a filter with a timeline limit of 50.
func (s *CustomSyncer) GetFilterJSON(userID string) json.RawMessage {
return json.RawMessage(`{"room":{"timeline":{"limit":50}}}`)
}