package botlib import ( "encoding/json" "fmt" "os" "runtime/debug" "time" "maunium.net/go/mautrix" . "maunium.net/go/mautrix" ) // CustomSyncer is the default syncing implementation. You can either write your own syncer, or selectively // replace parts of this default syncer (e.g. the ProcessResponse method). The default syncer uses the observer // pattern to notify callers about incoming events. See CustomSyncer.OnEventType for more information. type CustomSyncer struct { UserID string Store Storer listeners map[EventType][]OnEventListener // event type to listeners array } // OnEventListener can be used with CustomSyncer.OnEventType to be informed of incoming events. type OnEventListener func(*Event) // NewCustomSyncer returns an instantiated CustomSyncer func NewCustomSyncer(userID string, store Storer) *CustomSyncer { return &CustomSyncer{ UserID: userID, Store: store, listeners: make(map[EventType][]mautrix.OnEventListener), } } func parseEvent(roomID string, data json.RawMessage) *Event { event := &Event{} err := json.Unmarshal(data, event) if err != nil { // TODO add separate handler for these _, _ = fmt.Fprintf(os.Stderr, "Failed to unmarshal event: %v\n%s\n", err, string(data)) return nil } if roomID != "" && event.RoomID != roomID { event.RoomID = roomID } return event } // ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of // unrepeating events. Returns a fatal error if a listener panics. func (s *CustomSyncer) ProcessResponse(res *RespSync, since string) (err error) { if !s.shouldProcessResponse(res, since) { return } defer func() { if r := recover(); r != nil { err = fmt.Errorf("ProcessResponse panicked! userID=%s since=%s panic=%s\n%s", s.UserID, since, r, debug.Stack()) } }() for roomID, roomData := range res.Rooms.Join { room := s.getOrCreateRoom(roomID) for _, data := range roomData.State.Events { event := parseEvent(roomID, data) if event != nil { room.UpdateState(event) s.notifyListeners(event) } } for _, data := range roomData.Timeline.Events { event := parseEvent(roomID, data) if event != nil { s.notifyListeners(event) } } } for roomID, roomData := range res.Rooms.Invite { room := s.getOrCreateRoom(roomID) for _, data := range roomData.State.Events { event := parseEvent(roomID, data) if event != nil { room.UpdateState(event) s.notifyListeners(event) } } } for roomID, roomData := range res.Rooms.Leave { room := s.getOrCreateRoom(roomID) for _, data := range roomData.Timeline.Events { event := parseEvent(roomID, data) if event.StateKey != nil { room.UpdateState(event) s.notifyListeners(event) } } } return } // OnEventType allows callers to be notified when there are new events for the given event type. // There are no duplicate checks. func (s *CustomSyncer) OnEventType(eventType EventType, callback mautrix.OnEventListener) { _, exists := s.listeners[eventType] if !exists { s.listeners[eventType] = []mautrix.OnEventListener{} } s.listeners[eventType] = append(s.listeners[eventType], callback) } // shouldProcessResponse returns true if the response should be processed. May modify the response to remove // stuff that shouldn't be processed. func (s *CustomSyncer) shouldProcessResponse(resp *RespSync, since string) bool { if since == "" { return false } // This is a horrible hack because /sync will return the most recent messages for a room // as soon as you /join it. We do NOT want to process those events in that particular room // because they may have already been processed (if you toggle the bot in/out of the room). // // Work around this by inspecting each room's timeline and seeing if an m.room.member event for us // exists and is "join" and then discard processing that room entirely if so. // TODO: We probably want to process messages from after the last join event in the timeline. for roomID, roomData := range resp.Rooms.Join { for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { evtData := roomData.Timeline.Events[i] // TODO this is horribly inefficient since it's also parsed in ProcessResponse e := parseEvent(roomID, evtData) if e != nil && e.Type == StateMember && e.GetStateKey() == s.UserID { if e.Content.Membership == "join" { _, ok := resp.Rooms.Join[roomID] if !ok { continue } delete(resp.Rooms.Join, roomID) // don't re-process messages delete(resp.Rooms.Invite, roomID) // don't re-process invites break } } } } return true } // getOrCreateRoom must only be called by the Sync() goroutine which calls ProcessResponse() func (s *CustomSyncer) getOrCreateRoom(roomID string) *Room { room := s.Store.LoadRoom(roomID) if room == nil { // create a new Room room = NewRoom(roomID) s.Store.SaveRoom(room) } return room } func (s *CustomSyncer) notifyListeners(event *Event) { listeners, exists := s.listeners[event.Type] if !exists { return } for _, fn := range listeners { fn(event) } } // OnFailedSync always returns a 10 second wait period between failed /syncs, never a fatal error. func (s *CustomSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) { return 10 * time.Second, nil } // GetFilterJSON returns a filter with a timeline limit of 50. func (s *CustomSyncer) GetFilterJSON(userID string) json.RawMessage { return json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) }