Implement the /backfill federation endpoint (#585)
* Implement the /backfill federation endpoint * Make the BFS loop block common between QueryMissingEvents and QueryPreviousEvents * Improve comments on the BFS loop block * Optimisation: prevent unnecessary redefinitions/reallocations * Add trailing slash at the end of the route for parity with synapse * Replace QueryPreviousEvents with QueryBackfill * Change the backfill response to comply with the specs and synapse's behaviour
This commit is contained in:
parent
daf57b19b7
commit
56058b9469
@ -0,0 +1,102 @@
|
|||||||
|
// Copyright 2018 New Vector Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package routing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
"github.com/matrix-org/dendrite/federationapi/types"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Backfill implements the /backfill federation endpoint.
|
||||||
|
// https://matrix.org/docs/spec/server_server/unstable.html#get-matrix-federation-v1-backfill-roomid
|
||||||
|
func Backfill(
|
||||||
|
httpReq *http.Request,
|
||||||
|
request *gomatrixserverlib.FederationRequest,
|
||||||
|
query api.RoomserverQueryAPI,
|
||||||
|
roomID string,
|
||||||
|
cfg config.Dendrite,
|
||||||
|
) util.JSONResponse {
|
||||||
|
var res api.QueryBackfillResponse
|
||||||
|
var eIDs []string
|
||||||
|
var limit string
|
||||||
|
var exists bool
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Check the room ID's format.
|
||||||
|
if _, _, err = gomatrixserverlib.SplitID('!', roomID); err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.MissingArgument("Bad room ID: " + err.Error()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if all of the required parameters are there.
|
||||||
|
eIDs, exists = httpReq.URL.Query()["v"]
|
||||||
|
if !exists {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.MissingArgument("v is missing"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
limit = httpReq.URL.Query().Get("limit")
|
||||||
|
if len(limit) == 0 {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.MissingArgument("limit is missing"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate the request.
|
||||||
|
req := api.QueryBackfillRequest{
|
||||||
|
EarliestEventsIDs: eIDs,
|
||||||
|
ServerName: request.Origin(),
|
||||||
|
}
|
||||||
|
if req.Limit, err = strconv.Atoi(limit); err != nil {
|
||||||
|
return httputil.LogThenError(httpReq, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query the roomserver.
|
||||||
|
if err = query.QueryBackfill(httpReq.Context(), &req, &res); err != nil {
|
||||||
|
return httputil.LogThenError(httpReq, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter any event that's not from the requested room out.
|
||||||
|
evs := make([]gomatrixserverlib.Event, 0)
|
||||||
|
|
||||||
|
var ev gomatrixserverlib.Event
|
||||||
|
for _, ev = range res.Events {
|
||||||
|
if ev.RoomID() == roomID {
|
||||||
|
evs = append(evs, ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
txn := types.NewTransaction()
|
||||||
|
txn.Origin = cfg.Matrix.ServerName
|
||||||
|
txn.PDUs = evs
|
||||||
|
|
||||||
|
// Send the events to the client.
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
JSON: txn,
|
||||||
|
}
|
||||||
|
}
|
@ -219,4 +219,12 @@ func Setup(
|
|||||||
return GetMissingEvents(httpReq, request, query, vars["roomID"])
|
return GetMissingEvents(httpReq, request, query, vars["roomID"])
|
||||||
},
|
},
|
||||||
)).Methods(http.MethodGet)
|
)).Methods(http.MethodGet)
|
||||||
|
|
||||||
|
v1fedmux.Handle("/backfill/{roomID}/", common.MakeFedAPI(
|
||||||
|
"federation_backfill", cfg.Matrix.ServerName, keys,
|
||||||
|
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
|
||||||
|
vars := mux.Vars(httpReq)
|
||||||
|
return Backfill(httpReq, request, query, vars["roomID"], cfg)
|
||||||
|
},
|
||||||
|
)).Methods(http.MethodGet)
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,43 @@
|
|||||||
|
// Copyright 2018 New Vector Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Transaction is the representation of a transaction from the federation API
|
||||||
|
// See https://matrix.org/docs/spec/server_server/unstable.html for more info.
|
||||||
|
type Transaction struct {
|
||||||
|
// The server_name of the homeserver sending this transaction.
|
||||||
|
Origin gomatrixserverlib.ServerName `json:"origin"`
|
||||||
|
// POSIX timestamp in milliseconds on originating homeserver when this
|
||||||
|
// transaction started.
|
||||||
|
OriginServerTS int64 `json:"origin_server_ts"`
|
||||||
|
// List of persistent updates to rooms.
|
||||||
|
PDUs []gomatrixserverlib.Event `json:"pdus"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTransaction sets the timestamp of a new transaction instance and then
|
||||||
|
// returns the said instance.
|
||||||
|
func NewTransaction() Transaction {
|
||||||
|
// Retrieve the current timestamp in nanoseconds and make it a milliseconds
|
||||||
|
// one.
|
||||||
|
ts := time.Now().UnixNano() / int64(time.Millisecond)
|
||||||
|
|
||||||
|
return Transaction{OriginServerTS: ts}
|
||||||
|
}
|
@ -214,6 +214,22 @@ type QueryStateAndAuthChainResponse struct {
|
|||||||
AuthChainEvents []gomatrixserverlib.Event `json:"auth_chain_events"`
|
AuthChainEvents []gomatrixserverlib.Event `json:"auth_chain_events"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryBackfillRequest is a request to QueryBackfill.
|
||||||
|
type QueryBackfillRequest struct {
|
||||||
|
// Events to start paginating from.
|
||||||
|
EarliestEventsIDs []string `json:"earliest_event_ids"`
|
||||||
|
// The maximum number of events to retrieve.
|
||||||
|
Limit int `json:"limit"`
|
||||||
|
// The server interested in the events.
|
||||||
|
ServerName gomatrixserverlib.ServerName `json:"server_name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryBackfillResponse is a response to QueryBackfill.
|
||||||
|
type QueryBackfillResponse struct {
|
||||||
|
// Missing events, arbritrary order.
|
||||||
|
Events []gomatrixserverlib.Event `json:"events"`
|
||||||
|
}
|
||||||
|
|
||||||
// RoomserverQueryAPI is used to query information from the room server.
|
// RoomserverQueryAPI is used to query information from the room server.
|
||||||
type RoomserverQueryAPI interface {
|
type RoomserverQueryAPI interface {
|
||||||
// Query the latest events and state for a room from the room server.
|
// Query the latest events and state for a room from the room server.
|
||||||
@ -280,6 +296,13 @@ type RoomserverQueryAPI interface {
|
|||||||
request *QueryStateAndAuthChainRequest,
|
request *QueryStateAndAuthChainRequest,
|
||||||
response *QueryStateAndAuthChainResponse,
|
response *QueryStateAndAuthChainResponse,
|
||||||
) error
|
) error
|
||||||
|
|
||||||
|
// Query a given amount (or less) of events prior to a given set of events.
|
||||||
|
QueryBackfill(
|
||||||
|
ctx context.Context,
|
||||||
|
request *QueryBackfillRequest,
|
||||||
|
response *QueryBackfillResponse,
|
||||||
|
) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API.
|
// RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API.
|
||||||
@ -309,6 +332,9 @@ const RoomserverQueryMissingEventsPath = "/api/roomserver/queryMissingEvents"
|
|||||||
// RoomserverQueryStateAndAuthChainPath is the HTTP path for the QueryStateAndAuthChain API
|
// RoomserverQueryStateAndAuthChainPath is the HTTP path for the QueryStateAndAuthChain API
|
||||||
const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain"
|
const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain"
|
||||||
|
|
||||||
|
// RoomserverQueryBackfillPath is the HTTP path for the QueryMissingEvents API
|
||||||
|
const RoomserverQueryBackfillPath = "/api/roomserver/QueryBackfill"
|
||||||
|
|
||||||
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
|
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
|
||||||
// If httpClient is nil then it uses the http.DefaultClient
|
// If httpClient is nil then it uses the http.DefaultClient
|
||||||
func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI {
|
func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI {
|
||||||
@ -439,3 +465,16 @@ func (h *httpRoomserverQueryAPI) QueryStateAndAuthChain(
|
|||||||
apiURL := h.roomserverURL + RoomserverQueryStateAndAuthChainPath
|
apiURL := h.roomserverURL + RoomserverQueryStateAndAuthChainPath
|
||||||
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryBackfill implements RoomServerQueryAPI
|
||||||
|
func (h *httpRoomserverQueryAPI) QueryBackfill(
|
||||||
|
ctx context.Context,
|
||||||
|
request *QueryBackfillRequest,
|
||||||
|
response *QueryBackfillResponse,
|
||||||
|
) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.roomserverURL + RoomserverQueryMissingEventsPath
|
||||||
|
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
|
}
|
||||||
|
@ -437,7 +437,6 @@ func (r *RoomserverQueryAPI) QueryMissingEvents(
|
|||||||
request *api.QueryMissingEventsRequest,
|
request *api.QueryMissingEventsRequest,
|
||||||
response *api.QueryMissingEventsResponse,
|
response *api.QueryMissingEventsResponse,
|
||||||
) error {
|
) error {
|
||||||
resultNIDs := make([]types.EventNID, 0, request.Limit)
|
|
||||||
var front []string
|
var front []string
|
||||||
visited := make(map[string]bool, request.Limit) // request.Limit acts as a hint to size.
|
visited := make(map[string]bool, request.Limit) // request.Limit acts as a hint to size.
|
||||||
for _, id := range request.EarliestEvents {
|
for _, id := range request.EarliestEvents {
|
||||||
@ -450,41 +449,106 @@ func (r *RoomserverQueryAPI) QueryMissingEvents(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resultNIDs, err := r.scanEventTree(ctx, front, visited, request.Limit, request.ServerName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
response.Events, err = r.loadEvents(ctx, resultNIDs)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryBackfill implements api.RoomServerQueryAPI
|
||||||
|
func (r *RoomserverQueryAPI) QueryBackfill(
|
||||||
|
ctx context.Context,
|
||||||
|
request *api.QueryBackfillRequest,
|
||||||
|
response *api.QueryBackfillResponse,
|
||||||
|
) error {
|
||||||
|
var err error
|
||||||
|
var front []string
|
||||||
|
|
||||||
|
// The limit defines the maximum number of events to retrieve, so it also
|
||||||
|
// defines the highest number of elements in the map below.
|
||||||
|
visited := make(map[string]bool, request.Limit)
|
||||||
|
|
||||||
|
// The provided event IDs have already been seen by the request's emitter,
|
||||||
|
// and will be retrieved anyway, so there's no need to care about them if
|
||||||
|
// they appear in our exploration of the event tree.
|
||||||
|
for _, id := range request.EarliestEventsIDs {
|
||||||
|
visited[id] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
front = request.EarliestEventsIDs
|
||||||
|
|
||||||
|
// Scan the event tree for events to send back.
|
||||||
|
resultNIDs, err := r.scanEventTree(ctx, front, visited, request.Limit, request.ServerName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve events from the list that was filled previously.
|
||||||
|
response.Events, err = r.loadEvents(ctx, resultNIDs)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RoomserverQueryAPI) scanEventTree(
|
||||||
|
ctx context.Context, front []string, visited map[string]bool, limit int,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
) (resultNIDs []types.EventNID, err error) {
|
||||||
|
var allowed bool
|
||||||
|
var events []types.Event
|
||||||
|
var next []string
|
||||||
|
var pre string
|
||||||
|
|
||||||
|
resultNIDs = make([]types.EventNID, 0, limit)
|
||||||
|
|
||||||
|
// Loop through the event IDs to retrieve the requested events and go
|
||||||
|
// through the whole tree (up to the provided limit) using the events'
|
||||||
|
// "prev_event" key.
|
||||||
BFSLoop:
|
BFSLoop:
|
||||||
for len(front) > 0 {
|
for len(front) > 0 {
|
||||||
var next []string
|
// Prevent unnecessary allocations: reset the slice only when not empty.
|
||||||
events, err := r.DB.EventsFromIDs(ctx, front)
|
if len(next) > 0 {
|
||||||
|
next = make([]string, 0)
|
||||||
|
}
|
||||||
|
// Retrieve the events to process from the database.
|
||||||
|
events, err = r.DB.EventsFromIDs(ctx, front)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ev := range events {
|
for _, ev := range events {
|
||||||
if len(resultNIDs) > request.Limit {
|
// Break out of the loop if the provided limit is reached.
|
||||||
|
if len(resultNIDs) == limit {
|
||||||
break BFSLoop
|
break BFSLoop
|
||||||
}
|
}
|
||||||
|
// Update the list of events to retrieve.
|
||||||
resultNIDs = append(resultNIDs, ev.EventNID)
|
resultNIDs = append(resultNIDs, ev.EventNID)
|
||||||
for _, pre := range ev.PrevEventIDs() {
|
// Loop through the event's parents.
|
||||||
|
for _, pre = range ev.PrevEventIDs() {
|
||||||
|
// Only add an event to the list of next events to process if it
|
||||||
|
// hasn't been seen before.
|
||||||
if !visited[pre] {
|
if !visited[pre] {
|
||||||
visited[pre] = true
|
visited[pre] = true
|
||||||
allowed, err := r.checkServerAllowedToSeeEvent(
|
allowed, err = r.checkServerAllowedToSeeEvent(ctx, pre, serverName)
|
||||||
ctx, ev.EventID(), request.ServerName,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the event hasn't been seen before and the HS
|
||||||
|
// requesting to retrieve it is allowed to do so, add it to
|
||||||
|
// the list of events to retrieve.
|
||||||
if allowed {
|
if allowed {
|
||||||
next = append(next, pre)
|
next = append(next, pre)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Repeat the same process with the parent events we just processed.
|
||||||
front = next
|
front = next
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
return
|
||||||
response.Events, err = r.loadEvents(ctx, resultNIDs)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryStateAndAuthChain implements api.RoomserverQueryAPI
|
// QueryStateAndAuthChain implements api.RoomserverQueryAPI
|
||||||
@ -708,4 +772,18 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
|
|||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
servMux.Handle(
|
||||||
|
api.RoomserverQueryBackfillPath,
|
||||||
|
common.MakeInternalAPI("QueryBackfill", func(req *http.Request) util.JSONResponse {
|
||||||
|
var request api.QueryBackfillRequest
|
||||||
|
var response api.QueryBackfillResponse
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
if err := r.QueryBackfill(req.Context(), &request, &response); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
}),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user