From 2b5052eccfca49e736b39a9879ebde2ab196f6da Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 15 May 2020 09:41:12 +0100 Subject: [PATCH] Add Range (#1037) * Add Range * Use Range --- syncapi/storage/interface.go | 2 +- .../storage/postgres/account_data_table.go | 11 +--- syncapi/storage/postgres/invites_table.go | 4 +- .../postgres/output_room_events_table.go | 16 +++--- syncapi/storage/shared/syncserver.go | 57 +++++++++++-------- syncapi/storage/sqlite3/account_data_table.go | 11 +--- syncapi/storage/sqlite3/invites_table.go | 4 +- .../sqlite3/output_room_events_table.go | 16 +++--- syncapi/storage/tables/interface.go | 18 +++--- syncapi/sync/requestpool.go | 15 ++++- syncapi/types/types.go | 28 +++++++++ 11 files changed, 108 insertions(+), 74 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 22716789..eba008b3 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -64,7 +64,7 @@ type Database interface { // Returns a map following the format data[roomID] = []dataTypes // If no data is retrieved, returns an empty map // If there was an issue with the retrieval, returns an error - GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) + GetAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) // UpsertAccountData keeps track of new or updated account data, by saving the type // of the new/updated data, and the user ID and room ID the data is related to (empty) // room ID means the data isn't specific to any room) diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go index 58fb2198..a5e0c121 100644 --- a/syncapi/storage/postgres/account_data_table.go +++ b/syncapi/storage/postgres/account_data_table.go @@ -100,19 +100,12 @@ func (s *accountDataStatements) InsertAccountData( func (s *accountDataStatements) SelectAccountDataInRange( ctx context.Context, userID string, - oldPos, newPos types.StreamPosition, + r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter, ) (data map[string][]string, err error) { data = make(map[string][]string) - // If both positions are the same, it means that the data was saved after the - // latest room event. In that case, we need to decrement the old position as - // it would prevent the SQL request from returning anything. - if oldPos == newPos { - oldPos-- - } - - rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos, + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High(), pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.Types)), pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.NotTypes)), accountDataEventFilter.Limit, diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go index 78ca4d6d..01f2e7f4 100644 --- a/syncapi/storage/postgres/invites_table.go +++ b/syncapi/storage/postgres/invites_table.go @@ -117,10 +117,10 @@ func (s *inviteEventsStatements) DeleteInviteEvent( // selectInviteEventsInRange returns a map of room ID to invite event for the // active invites for the target user ID in the supplied range. func (s *inviteEventsStatements) SelectInviteEventsInRange( - ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition, + ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range, ) (map[string]gomatrixserverlib.HeaderedEvent, error) { stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt) - rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos) + rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High()) if err != nil { return nil, err } diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 5870bfd5..5020d1e7 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -155,13 +155,13 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. func (s *outputRoomEventsStatements) SelectStateInRange( - ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, + ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter, ) (map[string]map[string]bool, map[string]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectStateInRangeStmt) rows, err := stmt.QueryContext( - ctx, oldPos, newPos, + ctx, r.Low(), r.High(), pq.StringArray(stateFilter.Senders), pq.StringArray(stateFilter.NotSenders), pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)), @@ -198,8 +198,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange( // since it'll just mark the event as not being needed. if len(addIDs) < len(delIDs) { log.WithFields(log.Fields{ - "since": oldPos, - "current": newPos, + "since": r.From, + "current": r.To, "adds": addIDs, "dels": delIDs, }).Warn("StateBetween: ignoring deleted state") @@ -298,7 +298,7 @@ func (s *outputRoomEventsStatements) InsertEvent( // from sync. func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, - roomID string, fromPos, toPos types.StreamPosition, limit int, + roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool, ) ([]types.StreamEvent, error) { var stmt *sql.Stmt @@ -307,7 +307,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( } else { stmt = common.TxStmt(txn, s.selectRecentEventsStmt) } - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) if err != nil { return nil, err } @@ -331,10 +331,10 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( // from a given position, up to a maximum of 'limit'. func (s *outputRoomEventsStatements) SelectEarlyEvents( ctx context.Context, txn *sql.Tx, - roomID string, fromPos, toPos types.StreamPosition, limit int, + roomID string, r types.Range, limit int, ) ([]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectEarlyEventsStmt) - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) if err != nil { return nil, err } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e03a6b9f..be1f9c7a 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -54,17 +54,22 @@ func (d *Database) GetEventsInStreamingRange( roomID string, limit int, backwardOrdering bool, ) (events []types.StreamEvent, err error) { + r := types.Range{ + From: from.PDUPosition(), + To: to.PDUPosition(), + Backwards: backwardOrdering, + } if backwardOrdering { // When using backward ordering, we want the most recent events first. if events, err = d.OutputEvents.SelectRecentEvents( - ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false, + ctx, nil, roomID, r, limit, false, false, ); err != nil { return } } else { // When using forward ordering, we want the least recent events first. if events, err = d.OutputEvents.SelectEarlyEvents( - ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit, + ctx, nil, roomID, r, limit, ); err != nil { return } @@ -167,10 +172,10 @@ func (d *Database) RetireInviteEvent( // If no data is retrieved, returns an empty map // If there was an issue with the retrieval, returns an error func (d *Database) GetAccountDataInRange( - ctx context.Context, userID string, oldPos, newPos types.StreamPosition, + ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter, ) (map[string][]string, error) { - return d.AccountData.SelectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) + return d.AccountData.SelectAccountDataInRange(ctx, userID, r, accountDataFilterPart) } // UpsertAccountData keeps track of new or updated account data, by saving the type @@ -417,7 +422,7 @@ func (d *Database) syncPositionTx( func (d *Database) addPDUDeltaToResponse( ctx context.Context, device authtypes.Device, - fromPos, toPos types.StreamPosition, + r types.Range, numRecentEventsPerRoom int, wantFullState bool, res *types.Response, @@ -443,11 +448,11 @@ func (d *Database) addPDUDeltaToResponse( var deltas []stateDelta if !wantFullState { deltas, joinedRoomIDs, err = d.getStateDeltas( - ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter, + ctx, &device, txn, r, device.UserID, &stateFilter, ) } else { deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync( - ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter, + ctx, &device, txn, r, device.UserID, &stateFilter, ) } if err != nil { @@ -455,14 +460,14 @@ func (d *Database) addPDUDeltaToResponse( } for _, delta := range deltas { - err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) + err = d.addRoomDeltaToResponse(ctx, &device, txn, r, delta, numRecentEventsPerRoom, res) if err != nil { return nil, err } } // TODO: This should be done in getStateDeltas - if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, device.UserID, r, res); err != nil { return nil, err } @@ -534,8 +539,12 @@ func (d *Database) IncrementalSync( var joinedRoomIDs []string var err error if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState { + r := types.Range{ + From: fromPos.PDUPosition(), + To: toPos.PDUPosition(), + } joinedRoomIDs, err = d.addPDUDeltaToResponse( - ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res, + ctx, device, r, numRecentEventsPerRoom, wantFullState, res, ) } else { joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership( @@ -589,6 +598,10 @@ func (d *Database) getResponseWithPDUsForCompleteSync( if err != nil { return } + r := types.Range{ + From: 0, + To: toPos.PDUPosition(), + } res = types.NewResponse(toPos) @@ -611,8 +624,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent recentStreamEvents, err = d.OutputEvents.SelectRecentEvents( - ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), - numRecentEventsPerRoom, true, true, + ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { return @@ -644,7 +656,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( res.Rooms.Join[roomID] = *jr } - if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil { return } @@ -686,11 +698,11 @@ var txReadOnlySnapshot = sql.TxOptions{ func (d *Database) addInvitesToResponse( ctx context.Context, txn *sql.Tx, userID string, - fromPos, toPos types.StreamPosition, + r types.Range, res *types.Response, ) error { invites, err := d.Invites.SelectInviteEventsInRange( - ctx, txn, userID, fromPos, toPos, + ctx, txn, userID, r, ) if err != nil { return err @@ -726,12 +738,11 @@ func (d *Database) addRoomDeltaToResponse( ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, + r types.Range, delta stateDelta, numRecentEventsPerRoom int, res *types.Response, ) error { - endPos := toPos if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: @@ -739,10 +750,10 @@ func (d *Database) addRoomDeltaToResponse( // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave // in a single /sync request // This is all "okay" assuming history_visibility == "shared" which it is by default. - endPos = delta.membershipPos + r.To = delta.membershipPos } recentStreamEvents, err := d.OutputEvents.SelectRecentEvents( - ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), + ctx, txn, delta.roomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { @@ -872,7 +883,7 @@ func (d *Database) fetchMissingStateEvents( // A list of joined room IDs is also returned in case the caller needs it. func (d *Database) getStateDeltas( ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, + r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]stateDelta, []string, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 @@ -886,7 +897,7 @@ func (d *Database) getStateDeltas( var deltas []stateDelta // get all the state events ever between these two positions - stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) + stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) if err != nil { return nil, nil, err } @@ -947,7 +958,7 @@ func (d *Database) getStateDeltas( // updates for other rooms. func (d *Database) getStateDeltasForFullStateSync( ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, + r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]stateDelta, []string, error) { joinedRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) @@ -972,7 +983,7 @@ func (d *Database) getStateDeltasForFullStateSync( } // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) + stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) if err != nil { return nil, nil, err } diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go index e5f2417b..435a399c 100644 --- a/syncapi/storage/sqlite3/account_data_table.go +++ b/syncapi/storage/sqlite3/account_data_table.go @@ -91,19 +91,12 @@ func (s *accountDataStatements) InsertAccountData( func (s *accountDataStatements) SelectAccountDataInRange( ctx context.Context, userID string, - oldPos, newPos types.StreamPosition, + r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter, ) (data map[string][]string, err error) { data = make(map[string][]string) - // If both positions are the same, it means that the data was saved after the - // latest room event. In that case, we need to decrement the old position as - // it would prevent the SQL request from returning anything. - if oldPos == newPos { - oldPos-- - } - - rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos) + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High()) if err != nil { return } diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index 26b3a316..9cb184f2 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -121,10 +121,10 @@ func (s *inviteEventsStatements) DeleteInviteEvent( // selectInviteEventsInRange returns a map of room ID to invite event for the // active invites for the target user ID in the supplied range. func (s *inviteEventsStatements) SelectInviteEventsInRange( - ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition, + ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range, ) (map[string]gomatrixserverlib.HeaderedEvent, error) { stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt) - rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos) + rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High()) if err != nil { return nil, err } diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index c1647a14..c89e007e 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -146,13 +146,13 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. func (s *outputRoomEventsStatements) SelectStateInRange( - ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, + ctx context.Context, txn *sql.Tx, r types.Range, stateFilterPart *gomatrixserverlib.StateFilter, ) (map[string]map[string]bool, map[string]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectStateInRangeStmt) rows, err := stmt.QueryContext( - ctx, oldPos, newPos, + ctx, r.Low(), r.High(), /*pq.StringArray(stateFilterPart.Senders), pq.StringArray(stateFilterPart.NotSenders), pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), @@ -195,8 +195,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange( // since it'll just mark the event as not being needed. if len(addIDs) < len(delIDs) { log.WithFields(log.Fields{ - "since": oldPos, - "current": newPos, + "since": r.From, + "current": r.To, "adds": addIDsJSON, "dels": delIDsJSON, }).Warn("StateBetween: ignoring deleted state") @@ -308,7 +308,7 @@ func (s *outputRoomEventsStatements) InsertEvent( func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, - roomID string, fromPos, toPos types.StreamPosition, limit int, + roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool, ) ([]types.StreamEvent, error) { var stmt *sql.Stmt @@ -318,7 +318,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( stmt = common.TxStmt(txn, s.selectRecentEventsStmt) } - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) if err != nil { return nil, err } @@ -340,10 +340,10 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( func (s *outputRoomEventsStatements) SelectEarlyEvents( ctx context.Context, txn *sql.Tx, - roomID string, fromPos, toPos types.StreamPosition, limit int, + roomID string, r types.Range, limit int, ) ([]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectEarlyEventsStmt) - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) if err != nil { return nil, err } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 1e5351b5..f31bdc2e 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -11,29 +11,29 @@ import ( type AccountData interface { InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error) - // SelectAccountDataInRange returns a map of room ID to a list of `dataType`. The range is exclusive of `lowPos` and inclusive of `hiPos`. - SelectAccountDataInRange(ctx context.Context, userID string, lowPos, hiPos types.StreamPosition, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error) + // SelectAccountDataInRange returns a map of room ID to a list of `dataType`. + SelectAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error) SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error) } type Invites interface { InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error) DeleteInviteEvent(ctx context.Context, inviteEventID string) error - // SelectInviteEventsInRange returns a map of room ID to invite events. The range is exclusive of `startPos` and inclusive of `endPos`. - SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition) (map[string]gomatrixserverlib.HeaderedEvent, error) + // SelectInviteEventsInRange returns a map of room ID to invite events. + SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]gomatrixserverlib.HeaderedEvent, error) SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error) } type Events interface { - SelectStateInRange(ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error) + SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error) SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error) InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error) - // SelectRecentEvents returns events between the two stream positions: exclusive of `fromPos` and inclusive of `toPos`. + // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. // Returns up to `limit` events. - SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error) - // SelectEarlyEvents returns the earliest events in the given room, exclusive of `fromPos` and inclusive of `toPos`. - SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]types.StreamEvent, error) + SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error) + // SelectEarlyEvents returns the earliest events in the given room. + SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 126e76f5..6e0f44e9 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -184,11 +184,20 @@ func (rp *RequestPool) appendAccountData( return data, nil } + r := types.Range{ + From: req.since.PDUPosition(), + To: currentPos, + } + // If both positions are the same, it means that the data was saved after the + // latest room event. In that case, we need to decrement the old position as + // results are exclusive of Low. + if r.Low() == r.High() { + r.From-- + } + // Sync is not initial, get all account data since the latest sync dataTypes, err := rp.db.GetAccountDataInRange( - req.ctx, userID, - types.StreamPosition(req.since.PDUPosition()), types.StreamPosition(currentPos), - accountDataFilter, + req.ctx, userID, r, accountDataFilter, ) if err != nil { return nil, err diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 8a79ccd4..caa1b3ad 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -47,6 +47,34 @@ type StreamEvent struct { ExcludeFromSync bool } +// Range represents a range between two stream positions. +type Range struct { + // From is the position the client has already received. + From StreamPosition + // To is the position the client is going towards. + To StreamPosition + // True if the client is going backwards + Backwards bool +} + +// Low returns the low number of the range. +// This represents the position the client already has and hence is exclusive. +func (r *Range) Low() StreamPosition { + if !r.Backwards { + return r.From + } + return r.To +} + +// High returns the high number of the range +// This represents the position the client is going towards and hence is inclusive. +func (r *Range) High() StreamPosition { + if !r.Backwards { + return r.To + } + return r.From +} + // SyncTokenType represents the type of a sync token. // It can be either "s" (representing a position in the whole stream of events) // or "t" (representing a position in a room's topology/depth).