diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/backfill.go b/src/github.com/matrix-org/dendrite/federationapi/routing/backfill.go new file mode 100644 index 00000000..d996db6a --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/backfill.go @@ -0,0 +1,102 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "net/http" + "strconv" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/federationapi/types" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +// Backfill implements the /backfill federation endpoint. +// https://matrix.org/docs/spec/server_server/unstable.html#get-matrix-federation-v1-backfill-roomid +func Backfill( + httpReq *http.Request, + request *gomatrixserverlib.FederationRequest, + query api.RoomserverQueryAPI, + roomID string, + cfg config.Dendrite, +) util.JSONResponse { + var res api.QueryBackfillResponse + var eIDs []string + var limit string + var exists bool + var err error + + // Check the room ID's format. + if _, _, err = gomatrixserverlib.SplitID('!', roomID); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("Bad room ID: " + err.Error()), + } + } + + // Check if all of the required parameters are there. + eIDs, exists = httpReq.URL.Query()["v"] + if !exists { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("v is missing"), + } + } + limit = httpReq.URL.Query().Get("limit") + if len(limit) == 0 { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("limit is missing"), + } + } + + // Populate the request. + req := api.QueryBackfillRequest{ + EarliestEventsIDs: eIDs, + ServerName: request.Origin(), + } + if req.Limit, err = strconv.Atoi(limit); err != nil { + return httputil.LogThenError(httpReq, err) + } + + // Query the roomserver. + if err = query.QueryBackfill(httpReq.Context(), &req, &res); err != nil { + return httputil.LogThenError(httpReq, err) + } + + // Filter any event that's not from the requested room out. + evs := make([]gomatrixserverlib.Event, 0) + + var ev gomatrixserverlib.Event + for _, ev = range res.Events { + if ev.RoomID() == roomID { + evs = append(evs, ev) + } + } + + txn := types.NewTransaction() + txn.Origin = cfg.Matrix.ServerName + txn.PDUs = evs + + // Send the events to the client. + return util.JSONResponse{ + Code: http.StatusOK, + JSON: txn, + } +} diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go index c7c0f7e2..1ca4f667 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go @@ -219,4 +219,12 @@ func Setup( return GetMissingEvents(httpReq, request, query, vars["roomID"]) }, )).Methods(http.MethodGet) + + v1fedmux.Handle("/backfill/{roomID}/", common.MakeFedAPI( + "federation_backfill", cfg.Matrix.ServerName, keys, + func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { + vars := mux.Vars(httpReq) + return Backfill(httpReq, request, query, vars["roomID"], cfg) + }, + )).Methods(http.MethodGet) } diff --git a/src/github.com/matrix-org/dendrite/federationapi/types/types.go b/src/github.com/matrix-org/dendrite/federationapi/types/types.go new file mode 100644 index 00000000..24838d54 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationapi/types/types.go @@ -0,0 +1,43 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "time" + + "github.com/matrix-org/gomatrixserverlib" +) + +// Transaction is the representation of a transaction from the federation API +// See https://matrix.org/docs/spec/server_server/unstable.html for more info. +type Transaction struct { + // The server_name of the homeserver sending this transaction. + Origin gomatrixserverlib.ServerName `json:"origin"` + // POSIX timestamp in milliseconds on originating homeserver when this + // transaction started. + OriginServerTS int64 `json:"origin_server_ts"` + // List of persistent updates to rooms. + PDUs []gomatrixserverlib.Event `json:"pdus"` +} + +// NewTransaction sets the timestamp of a new transaction instance and then +// returns the said instance. +func NewTransaction() Transaction { + // Retrieve the current timestamp in nanoseconds and make it a milliseconds + // one. + ts := time.Now().UnixNano() / int64(time.Millisecond) + + return Transaction{OriginServerTS: ts} +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/query.go b/src/github.com/matrix-org/dendrite/roomserver/api/query.go index 8c375321..a544f8aa 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/query.go @@ -214,6 +214,22 @@ type QueryStateAndAuthChainResponse struct { AuthChainEvents []gomatrixserverlib.Event `json:"auth_chain_events"` } +// QueryBackfillRequest is a request to QueryBackfill. +type QueryBackfillRequest struct { + // Events to start paginating from. + EarliestEventsIDs []string `json:"earliest_event_ids"` + // The maximum number of events to retrieve. + Limit int `json:"limit"` + // The server interested in the events. + ServerName gomatrixserverlib.ServerName `json:"server_name"` +} + +// QueryBackfillResponse is a response to QueryBackfill. +type QueryBackfillResponse struct { + // Missing events, arbritrary order. + Events []gomatrixserverlib.Event `json:"events"` +} + // RoomserverQueryAPI is used to query information from the room server. type RoomserverQueryAPI interface { // Query the latest events and state for a room from the room server. @@ -280,6 +296,13 @@ type RoomserverQueryAPI interface { request *QueryStateAndAuthChainRequest, response *QueryStateAndAuthChainResponse, ) error + + // Query a given amount (or less) of events prior to a given set of events. + QueryBackfill( + ctx context.Context, + request *QueryBackfillRequest, + response *QueryBackfillResponse, + ) error } // RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API. @@ -309,6 +332,9 @@ const RoomserverQueryMissingEventsPath = "/api/roomserver/queryMissingEvents" // RoomserverQueryStateAndAuthChainPath is the HTTP path for the QueryStateAndAuthChain API const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain" +// RoomserverQueryBackfillPath is the HTTP path for the QueryMissingEvents API +const RoomserverQueryBackfillPath = "/api/roomserver/QueryBackfill" + // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // If httpClient is nil then it uses the http.DefaultClient func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI { @@ -439,3 +465,16 @@ func (h *httpRoomserverQueryAPI) QueryStateAndAuthChain( apiURL := h.roomserverURL + RoomserverQueryStateAndAuthChainPath return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +// QueryBackfill implements RoomServerQueryAPI +func (h *httpRoomserverQueryAPI) QueryBackfill( + ctx context.Context, + request *QueryBackfillRequest, + response *QueryBackfillResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill") + defer span.Finish() + + apiURL := h.roomserverURL + RoomserverQueryMissingEventsPath + return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index 836edcef..39e9333c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -437,7 +437,6 @@ func (r *RoomserverQueryAPI) QueryMissingEvents( request *api.QueryMissingEventsRequest, response *api.QueryMissingEventsResponse, ) error { - resultNIDs := make([]types.EventNID, 0, request.Limit) var front []string visited := make(map[string]bool, request.Limit) // request.Limit acts as a hint to size. for _, id := range request.EarliestEvents { @@ -450,41 +449,106 @@ func (r *RoomserverQueryAPI) QueryMissingEvents( } } + resultNIDs, err := r.scanEventTree(ctx, front, visited, request.Limit, request.ServerName) + if err != nil { + return err + } + + response.Events, err = r.loadEvents(ctx, resultNIDs) + return err +} + +// QueryBackfill implements api.RoomServerQueryAPI +func (r *RoomserverQueryAPI) QueryBackfill( + ctx context.Context, + request *api.QueryBackfillRequest, + response *api.QueryBackfillResponse, +) error { + var err error + var front []string + + // The limit defines the maximum number of events to retrieve, so it also + // defines the highest number of elements in the map below. + visited := make(map[string]bool, request.Limit) + + // The provided event IDs have already been seen by the request's emitter, + // and will be retrieved anyway, so there's no need to care about them if + // they appear in our exploration of the event tree. + for _, id := range request.EarliestEventsIDs { + visited[id] = true + } + + front = request.EarliestEventsIDs + + // Scan the event tree for events to send back. + resultNIDs, err := r.scanEventTree(ctx, front, visited, request.Limit, request.ServerName) + if err != nil { + return err + } + + // Retrieve events from the list that was filled previously. + response.Events, err = r.loadEvents(ctx, resultNIDs) + return err +} + +func (r *RoomserverQueryAPI) scanEventTree( + ctx context.Context, front []string, visited map[string]bool, limit int, + serverName gomatrixserverlib.ServerName, +) (resultNIDs []types.EventNID, err error) { + var allowed bool + var events []types.Event + var next []string + var pre string + + resultNIDs = make([]types.EventNID, 0, limit) + + // Loop through the event IDs to retrieve the requested events and go + // through the whole tree (up to the provided limit) using the events' + // "prev_event" key. BFSLoop: for len(front) > 0 { - var next []string - events, err := r.DB.EventsFromIDs(ctx, front) + // Prevent unnecessary allocations: reset the slice only when not empty. + if len(next) > 0 { + next = make([]string, 0) + } + // Retrieve the events to process from the database. + events, err = r.DB.EventsFromIDs(ctx, front) if err != nil { - return err + return } for _, ev := range events { - if len(resultNIDs) > request.Limit { + // Break out of the loop if the provided limit is reached. + if len(resultNIDs) == limit { break BFSLoop } + // Update the list of events to retrieve. resultNIDs = append(resultNIDs, ev.EventNID) - for _, pre := range ev.PrevEventIDs() { + // Loop through the event's parents. + for _, pre = range ev.PrevEventIDs() { + // Only add an event to the list of next events to process if it + // hasn't been seen before. if !visited[pre] { visited[pre] = true - allowed, err := r.checkServerAllowedToSeeEvent( - ctx, ev.EventID(), request.ServerName, - ) + allowed, err = r.checkServerAllowedToSeeEvent(ctx, pre, serverName) if err != nil { - return err + return } + // If the event hasn't been seen before and the HS + // requesting to retrieve it is allowed to do so, add it to + // the list of events to retrieve. if allowed { next = append(next, pre) } } } } + // Repeat the same process with the parent events we just processed. front = next } - var err error - response.Events, err = r.loadEvents(ctx, resultNIDs) - return err + return } // QueryStateAndAuthChain implements api.RoomserverQueryAPI @@ -708,4 +772,18 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + servMux.Handle( + api.RoomserverQueryBackfillPath, + common.MakeInternalAPI("QueryBackfill", func(req *http.Request) util.JSONResponse { + var request api.QueryBackfillRequest + var response api.QueryBackfillResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := r.QueryBackfill(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) }