Add contexts to the internal roomserver APIs (#228)

This commit is contained in:
Mark Haines 2017-09-13 13:37:50 +01:00 committed by GitHub
parent 79adba43f0
commit 3133bef797
26 changed files with 177 additions and 87 deletions

View File

@ -15,6 +15,7 @@
package consumers package consumers
import ( import (
"context"
"encoding/json" "encoding/json"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
@ -137,7 +138,7 @@ func (s *OutputRoomEvent) lookupStateEvents(
// Request the missing events from the roomserver // Request the missing events from the roomserver
eventReq := api.QueryEventsByIDRequest{EventIDs: missing} eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse var eventResp api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil { if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
return nil, err return nil, err
} }

View File

@ -15,6 +15,7 @@
package events package events
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"time" "time"
@ -37,6 +38,7 @@ var ErrRoomNoExists = errors.New("Room does not exist")
// the room doesn't exist // the room doesn't exist
// Returns an error if something else went wrong // Returns an error if something else went wrong
func BuildEvent( func BuildEvent(
ctx context.Context,
builder *gomatrixserverlib.EventBuilder, cfg config.Dendrite, builder *gomatrixserverlib.EventBuilder, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, queryRes *api.QueryLatestEventsAndStateResponse, queryAPI api.RoomserverQueryAPI, queryRes *api.QueryLatestEventsAndStateResponse,
) (*gomatrixserverlib.Event, error) { ) (*gomatrixserverlib.Event, error) {
@ -53,7 +55,7 @@ func BuildEvent(
if queryRes == nil { if queryRes == nil {
queryRes = &api.QueryLatestEventsAndStateResponse{} queryRes = &api.QueryLatestEventsAndStateResponse{}
} }
if queryErr := queryAPI.QueryLatestEventsAndState(&queryReq, queryRes); queryErr != nil { if err = queryAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes); err != nil {
return nil, err return nil, err
} }

View File

@ -15,6 +15,8 @@
package producers package producers
import ( import (
"context"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -32,7 +34,9 @@ func NewRoomserverProducer(inputAPI api.RoomserverInputAPI) *RoomserverProducer
} }
// SendEvents writes the given events to the roomserver input log. The events are written with KindNew. // SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName) error { func (c *RoomserverProducer) SendEvents(
ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName,
) error {
ires := make([]api.InputRoomEvent, len(events)) ires := make([]api.InputRoomEvent, len(events))
for i, event := range events { for i, event := range events {
ires[i] = api.InputRoomEvent{ ires[i] = api.InputRoomEvent{
@ -42,12 +46,14 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAs
SendAsServer: string(sendAsServer), SendAsServer: string(sendAsServer),
} }
} }
return c.SendInputRoomEvents(ires) return c.SendInputRoomEvents(ctx, ires)
} }
// SendEventWithState writes an event with KindNew to the roomserver input log // SendEventWithState writes an event with KindNew to the roomserver input log
// with the state at the event as KindOutlier before it. // with the state at the event as KindOutlier before it.
func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespState, event gomatrixserverlib.Event) error { func (c *RoomserverProducer) SendEventWithState(
ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.Event,
) error {
outliers, err := state.Events() outliers, err := state.Events()
if err != nil { if err != nil {
return err return err
@ -75,23 +81,23 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat
StateEventIDs: stateEventIDs, StateEventIDs: stateEventIDs,
} }
return c.SendInputRoomEvents(ires) return c.SendInputRoomEvents(ctx, ires)
} }
// SendInputRoomEvents writes the given input room events to the roomserver input API. // SendInputRoomEvents writes the given input room events to the roomserver input API.
func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent) error { func (c *RoomserverProducer) SendInputRoomEvents(ctx context.Context, ires []api.InputRoomEvent) error {
request := api.InputRoomEventsRequest{InputRoomEvents: ires} request := api.InputRoomEventsRequest{InputRoomEvents: ires}
var response api.InputRoomEventsResponse var response api.InputRoomEventsResponse
return c.InputAPI.InputRoomEvents(&request, &response) return c.InputAPI.InputRoomEvents(ctx, &request, &response)
} }
// SendInvite writes the invite event to the roomserver input API. // SendInvite writes the invite event to the roomserver input API.
// This should only be needed for invite events that occur outside of a known room. // This should only be needed for invite events that occur outside of a known room.
// If we are in the room then the event should be sent using the SendEvents method. // If we are in the room then the event should be sent using the SendEvents method.
func (c *RoomserverProducer) SendInvite(inviteEvent gomatrixserverlib.Event) error { func (c *RoomserverProducer) SendInvite(ctx context.Context, inviteEvent gomatrixserverlib.Event) error {
request := api.InputRoomEventsRequest{ request := api.InputRoomEventsRequest{
InputInviteEvents: []api.InputInviteEvent{{Event: inviteEvent}}, InputInviteEvents: []api.InputInviteEvent{{Event: inviteEvent}},
} }
var response api.InputRoomEventsResponse var response api.InputRoomEventsResponse
return c.InputAPI.InputRoomEvents(&request, &response) return c.InputAPI.InputRoomEvents(ctx, &request, &response)
} }

View File

@ -48,7 +48,7 @@ func DirectoryRoom(
if domain == cfg.Matrix.ServerName { if domain == cfg.Matrix.ServerName {
queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias} queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias}
var queryRes api.GetAliasRoomIDResponse var queryRes api.GetAliasRoomIDResponse
if err = aliasAPI.GetAliasRoomID(&queryReq, &queryRes); err != nil { if err = aliasAPI.GetAliasRoomID(req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -126,7 +126,7 @@ func SetLocalAlias(
Alias: alias, Alias: alias,
} }
var queryRes api.SetRoomAliasResponse var queryRes api.SetRoomAliasResponse
if err := aliasAPI.SetRoomAlias(&queryReq, &queryRes); err != nil { if err := aliasAPI.SetRoomAlias(req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -156,7 +156,7 @@ func RemoveLocalAlias(
UserID: device.UserID, UserID: device.UserID,
} }
var queryRes api.RemoveRoomAliasResponse var queryRes api.RemoveRoomAliasResponse
if err := aliasAPI.RemoveRoomAlias(&queryReq, &queryRes); err != nil { if err := aliasAPI.RemoveRoomAlias(req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }

View File

@ -42,7 +42,7 @@ func GetMemberships(
Sender: device.UserID, Sender: device.UserID,
} }
var queryRes api.QueryMembershipsForRoomResponse var queryRes api.QueryMembershipsForRoomResponse
if err := queryAPI.QueryMembershipsForRoom(&queryReq, &queryRes); err != nil { if err := queryAPI.QueryMembershipsForRoom(req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }

View File

@ -15,6 +15,7 @@
package readers package readers
import ( import (
"context"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@ -146,12 +147,12 @@ func SetAvatarURL(
AvatarURL: r.AvatarURL, AvatarURL: r.AvatarURL,
} }
events, err := buildMembershipEvents(memberships, newProfile, userID, cfg, queryAPI) events, err := buildMembershipEvents(req.Context(), memberships, newProfile, userID, cfg, queryAPI)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
if err := rsProducer.SendEvents(events, cfg.Matrix.ServerName); err != nil { if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -238,12 +239,12 @@ func SetDisplayName(
AvatarURL: oldProfile.AvatarURL, AvatarURL: oldProfile.AvatarURL,
} }
events, err := buildMembershipEvents(memberships, newProfile, userID, cfg, queryAPI) events, err := buildMembershipEvents(req.Context(), memberships, newProfile, userID, cfg, queryAPI)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
if err := rsProducer.SendEvents(events, cfg.Matrix.ServerName); err != nil { if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -258,6 +259,7 @@ func SetDisplayName(
} }
func buildMembershipEvents( func buildMembershipEvents(
ctx context.Context,
memberships []authtypes.Membership, memberships []authtypes.Membership,
newProfile authtypes.Profile, userID string, cfg *config.Dendrite, newProfile authtypes.Profile, userID string, cfg *config.Dendrite,
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
@ -283,7 +285,7 @@ func buildMembershipEvents(
return nil, err return nil, err
} }
event, err := events.BuildEvent(&builder, *cfg, queryAPI, nil) event, err := events.BuildEvent(ctx, &builder, *cfg, queryAPI, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -15,6 +15,7 @@
package threepid package threepid
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -85,6 +86,7 @@ var (
// fills the Matrix ID in the request body so a normal invite membership event // fills the Matrix ID in the request body so a normal invite membership event
// can be emitted. // can be emitted.
func CheckAndProcessInvite( func CheckAndProcessInvite(
ctx context.Context,
device *authtypes.Device, body *MembershipRequest, cfg config.Dendrite, device *authtypes.Device, body *MembershipRequest, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, db *accounts.Database, queryAPI api.RoomserverQueryAPI, db *accounts.Database,
producer *producers.RoomserverProducer, membership string, roomID string, producer *producers.RoomserverProducer, membership string, roomID string,
@ -109,7 +111,7 @@ func CheckAndProcessInvite(
// No Matrix ID could be found for this 3PID, meaning that a // No Matrix ID could be found for this 3PID, meaning that a
// "m.room.third_party_invite" have to be emitted from the data in // "m.room.third_party_invite" have to be emitted from the data in
// storeInviteRes. // storeInviteRes.
err = emit3PIDInviteEvent(body, storeInviteRes, device, roomID, cfg, queryAPI, producer) err = emit3PIDInviteEvent(ctx, body, storeInviteRes, device, roomID, cfg, queryAPI, producer)
inviteStoredOnIDServer = err == nil inviteStoredOnIDServer = err == nil
return return
@ -312,6 +314,7 @@ func checkIDServerSignatures(body *MembershipRequest, res *idServerLookupRespons
// emit3PIDInviteEvent builds and sends a "m.room.third_party_invite" event. // emit3PIDInviteEvent builds and sends a "m.room.third_party_invite" event.
// Returns an error if something failed in the process. // Returns an error if something failed in the process.
func emit3PIDInviteEvent( func emit3PIDInviteEvent(
ctx context.Context,
body *MembershipRequest, res *idServerStoreInviteResponse, body *MembershipRequest, res *idServerStoreInviteResponse,
device *authtypes.Device, roomID string, cfg config.Dendrite, device *authtypes.Device, roomID string, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer,
@ -336,12 +339,12 @@ func emit3PIDInviteEvent(
} }
var queryRes *api.QueryLatestEventsAndStateResponse var queryRes *api.QueryLatestEventsAndStateResponse
event, err := events.BuildEvent(builder, cfg, queryAPI, queryRes) event, err := events.BuildEvent(ctx, builder, cfg, queryAPI, queryRes)
if err != nil { if err != nil {
return err return err
} }
if err := producer.SendEvents([]gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName); err != nil { if err := producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName); err != nil {
return err return err
} }

View File

@ -204,7 +204,7 @@ func createRoom(req *http.Request, device *authtypes.Device,
} }
// send events to the room server // send events to the room server
if err := producer.SendEvents(builtEvents, cfg.Matrix.ServerName); err != nil { if err := producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }

View File

@ -116,7 +116,7 @@ func (r joinRoomReq) joinRoomByAlias(roomAlias string) util.JSONResponse {
if domain == r.cfg.Matrix.ServerName { if domain == r.cfg.Matrix.ServerName {
queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias} queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias}
var queryRes api.GetAliasRoomIDResponse var queryRes api.GetAliasRoomIDResponse
if err = r.aliasAPI.GetAliasRoomID(&queryReq, &queryRes); err != nil { if err = r.aliasAPI.GetAliasRoomID(r.req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(r.req, err) return httputil.LogThenError(r.req, err)
} }
@ -170,18 +170,19 @@ func (r joinRoomReq) joinRoomUsingServers(
r.writeToBuilder(&eb, roomID) r.writeToBuilder(&eb, roomID)
var queryRes api.QueryLatestEventsAndStateResponse var queryRes api.QueryLatestEventsAndStateResponse
if event, err := events.BuildEvent(&eb, r.cfg, r.queryAPI, &queryRes); err == nil { event, err := events.BuildEvent(r.req.Context(), &eb, r.cfg, r.queryAPI, &queryRes)
if sendErr := r.producer.SendEvents([]gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName); err != nil { if err == nil {
return httputil.LogThenError(r.req, sendErr) if err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName); err != nil {
return httputil.LogThenError(r.req, err)
} }
return util.JSONResponse{ return util.JSONResponse{
Code: 200, Code: 200,
JSON: struct { JSON: struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
}{roomID}, }{roomID},
} }
} else if err != events.ErrRoomNoExists { }
if err != events.ErrRoomNoExists {
return httputil.LogThenError(r.req, err) return httputil.LogThenError(r.req, err)
} }
@ -256,7 +257,7 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
} }
if err = r.producer.SendEventWithState( if err = r.producer.SendEventWithState(
gomatrixserverlib.RespState(respSendJoin), event, r.req.Context(), gomatrixserverlib.RespState(respSendJoin), event,
); err != nil { ); err != nil {
res := httputil.LogThenError(r.req, err) res := httputil.LogThenError(r.req, err)
return &res, nil return &res, nil

View File

@ -15,6 +15,7 @@
package writers package writers
import ( import (
"context"
"errors" "errors"
"net/http" "net/http"
@ -48,6 +49,7 @@ func SendMembership(
} }
inviteStored, err := threepid.CheckAndProcessInvite( inviteStored, err := threepid.CheckAndProcessInvite(
req.Context(),
device, &body, cfg, queryAPI, accountDB, producer, membership, roomID, device, &body, cfg, queryAPI, accountDB, producer, membership, roomID,
) )
if err == threepid.ErrMissingParameter { if err == threepid.ErrMissingParameter {
@ -80,7 +82,7 @@ func SendMembership(
} }
event, err := buildMembershipEvent( event, err := buildMembershipEvent(
body, accountDB, device, membership, roomID, cfg, queryAPI, req.Context(), body, accountDB, device, membership, roomID, cfg, queryAPI,
) )
if err == errMissingUserID { if err == errMissingUserID {
return util.JSONResponse{ return util.JSONResponse{
@ -96,7 +98,9 @@ func SendMembership(
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
if err := producer.SendEvents([]gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName); err != nil { if err := producer.SendEvents(
req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName,
); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -107,6 +111,7 @@ func SendMembership(
} }
func buildMembershipEvent( func buildMembershipEvent(
ctx context.Context,
body threepid.MembershipRequest, accountDB *accounts.Database, body threepid.MembershipRequest, accountDB *accounts.Database,
device *authtypes.Device, membership string, roomID string, cfg config.Dendrite, device *authtypes.Device, membership string, roomID string, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
@ -144,7 +149,7 @@ func buildMembershipEvent(
return nil, err return nil, err
} }
return events.BuildEvent(&builder, cfg, queryAPI, nil) return events.BuildEvent(ctx, &builder, cfg, queryAPI, nil)
} }
// loadProfile lookups the profile of a given user from the database and returns // loadProfile lookups the profile of a given user from the database and returns

View File

@ -63,7 +63,7 @@ func SendEvent(
builder.SetContent(r) builder.SetContent(r)
var queryRes api.QueryLatestEventsAndStateResponse var queryRes api.QueryLatestEventsAndStateResponse
e, err := events.BuildEvent(&builder, cfg, queryAPI, &queryRes) e, err := events.BuildEvent(req.Context(), &builder, cfg, queryAPI, &queryRes)
if err == events.ErrRoomNoExists { if err == events.ErrRoomNoExists {
return util.JSONResponse{ return util.JSONResponse{
Code: 404, Code: 404,
@ -87,7 +87,9 @@ func SendEvent(
} }
// pass the new event to the roomserver // pass the new event to the roomserver
if err := producer.SendEvents([]gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName); err != nil { if err := producer.SendEvents(
req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName,
); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }

View File

@ -15,6 +15,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
@ -199,7 +200,7 @@ func writeToRoomServer(input []string, roomserverURL string) error {
} }
} }
x := api.NewRoomserverInputAPIHTTP(roomserverURL, nil) x := api.NewRoomserverInputAPIHTTP(roomserverURL, nil)
return x.InputRoomEvents(&request, &response) return x.InputRoomEvents(context.Background(), &request, &response)
} }
// testRoomserver is used to run integration tests against a single roomserver. // testRoomserver is used to run integration tests against a single roomserver.
@ -389,10 +390,11 @@ func main() {
testRoomserver(input, want, func(q api.RoomserverQueryAPI) { testRoomserver(input, want, func(q api.RoomserverQueryAPI) {
var response api.QueryLatestEventsAndStateResponse var response api.QueryLatestEventsAndStateResponse
if err := q.QueryLatestEventsAndState( if err := q.QueryLatestEventsAndState(
context.Background(),
&api.QueryLatestEventsAndStateRequest{ &api.QueryLatestEventsAndStateRequest{
RoomID: "!HCXfdvrfksxuYnIFiJ:matrix.org", RoomID: "!HCXfdvrfksxuYnIFiJ:matrix.org",
StateToFetch: []gomatrixserverlib.StateKeyTuple{ StateToFetch: []gomatrixserverlib.StateKeyTuple{
{"m.room.member", "@richvdh:matrix.org"}, {EventType: "m.room.member", StateKey: "@richvdh:matrix.org"},
}, },
}, },
&response, &response,

View File

@ -15,6 +15,7 @@
package readers package readers
import ( import (
"context"
"time" "time"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
@ -25,6 +26,7 @@ import (
// GetEvent returns the requested event // GetEvent returns the requested event
func GetEvent( func GetEvent(
ctx context.Context,
request *gomatrixserverlib.FederationRequest, request *gomatrixserverlib.FederationRequest,
cfg config.Dendrite, cfg config.Dendrite,
query api.RoomserverQueryAPI, query api.RoomserverQueryAPI,
@ -33,10 +35,14 @@ func GetEvent(
eventID string, eventID string,
) util.JSONResponse { ) util.JSONResponse {
var authResponse api.QueryServerAllowedToSeeEventResponse var authResponse api.QueryServerAllowedToSeeEventResponse
err := query.QueryServerAllowedToSeeEvent(&api.QueryServerAllowedToSeeEventRequest{ err := query.QueryServerAllowedToSeeEvent(
ctx,
&api.QueryServerAllowedToSeeEventRequest{
EventID: eventID, EventID: eventID,
ServerName: request.Origin(), ServerName: request.Origin(),
}, &authResponse) },
&authResponse,
)
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
@ -47,10 +53,10 @@ func GetEvent(
var eventsResponse api.QueryEventsByIDResponse var eventsResponse api.QueryEventsByIDResponse
err = query.QueryEventsByID( err = query.QueryEventsByID(
ctx,
&api.QueryEventsByIDRequest{EventIDs: []string{eventID}}, &api.QueryEventsByIDRequest{EventIDs: []string{eventID}},
&eventsResponse, &eventsResponse,
) )
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }

View File

@ -102,7 +102,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq) vars := mux.Vars(httpReq)
return readers.GetEvent( return readers.GetEvent(
request, cfg, query, time.Now(), keys, vars["eventID"], httpReq.Context(), request, cfg, query, time.Now(), keys, vars["eventID"],
) )
}, },
)).Methods("GET") )).Methods("GET")

View File

@ -93,7 +93,7 @@ func Invite(
) )
// Add the invite event to the roomserver. // Add the invite event to the roomserver.
if err = producer.SendInvite(signedEvent); err != nil { if err = producer.SendInvite(httpReq.Context(), signedEvent); err != nil {
return httputil.LogThenError(httpReq, err) return httputil.LogThenError(httpReq, err)
} }

View File

@ -143,7 +143,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
StateToFetch: needed.Tuples(), StateToFetch: needed.Tuples(),
} }
var stateResp api.QueryStateAfterEventsResponse var stateResp api.QueryStateAfterEventsResponse
if err := t.query.QueryStateAfterEvents(&stateReq, &stateResp); err != nil { if err := t.query.QueryStateAfterEvents(t.context, &stateReq, &stateResp); err != nil {
return err return err
} }
@ -170,7 +170,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
// TODO: Check that the event is allowed by its auth_events. // TODO: Check that the event is allowed by its auth_events.
// pass the event to the roomserver // pass the event to the roomserver
if err := t.producer.SendEvents([]gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers); err != nil { if err := t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers); err != nil {
return err return err
} }
@ -215,7 +215,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error {
return err return err
} }
// pass the event along with the state to the roomserver // pass the event along with the state to the roomserver
if err := t.producer.SendEventWithState(state, e); err != nil { if err := t.producer.SendEventWithState(t.context, state, e); err != nil {
return err return err
} }
return nil return nil

View File

@ -81,7 +81,7 @@ func CreateInvitesFrom3PIDInvites(
} }
// Send all the events // Send all the events
if err := producer.SendEvents(evs, cfg.Matrix.ServerName); err != nil { if err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -135,7 +135,7 @@ func ExchangeThirdPartyInvite(
} }
// Auth and build the event from what the remote server sent us // Auth and build the event from what the remote server sent us
event, err := buildMembershipEvent(&builder, queryAPI, cfg) event, err := buildMembershipEvent(httpReq.Context(), &builder, queryAPI, cfg)
if err == errNotInRoom { if err == errNotInRoom {
return util.JSONResponse{ return util.JSONResponse{
Code: 404, Code: 404,
@ -153,7 +153,9 @@ func ExchangeThirdPartyInvite(
} }
// Send the event to the roomserver // Send the event to the roomserver
if err = producer.SendEvents([]gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName); err != nil { if err = producer.SendEvents(
httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName,
); err != nil {
return httputil.LogThenError(httpReq, err) return httputil.LogThenError(httpReq, err)
} }
@ -207,7 +209,7 @@ func createInviteFrom3PIDInvite(
return nil, err return nil, err
} }
event, err := buildMembershipEvent(builder, queryAPI, cfg) event, err := buildMembershipEvent(ctx, builder, queryAPI, cfg)
if err == errNotInRoom { if err == errNotInRoom {
return nil, sendToRemoteServer(ctx, inv, federation, cfg, *builder) return nil, sendToRemoteServer(ctx, inv, federation, cfg, *builder)
} }
@ -224,6 +226,7 @@ func createInviteFrom3PIDInvite(
// Returns errNotInRoom if the server is not in the room the invite is for. // Returns errNotInRoom if the server is not in the room the invite is for.
// Returns an error if something failed during the process. // Returns an error if something failed during the process.
func buildMembershipEvent( func buildMembershipEvent(
ctx context.Context,
builder *gomatrixserverlib.EventBuilder, queryAPI api.RoomserverQueryAPI, builder *gomatrixserverlib.EventBuilder, queryAPI api.RoomserverQueryAPI,
cfg config.Dendrite, cfg config.Dendrite,
) (*gomatrixserverlib.Event, error) { ) (*gomatrixserverlib.Event, error) {
@ -238,7 +241,7 @@ func buildMembershipEvent(
StateToFetch: eventsNeeded.Tuples(), StateToFetch: eventsNeeded.Tuples(),
} }
var queryRes api.QueryLatestEventsAndStateResponse var queryRes api.QueryLatestEventsAndStateResponse
if err = queryAPI.QueryLatestEventsAndState(&queryReq, &queryRes); err != nil { if err = queryAPI.QueryLatestEventsAndState(ctx, &queryReq, &queryRes); err != nil {
return nil, err return nil, err
} }

View File

@ -15,6 +15,7 @@
package consumers package consumers
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -312,7 +313,7 @@ func (s *OutputRoomEvent) lookupStateEvents(
// from the roomserver using the query API. // from the roomserver using the query API.
eventReq := api.QueryEventsByIDRequest{EventIDs: missing} eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse var eventResp api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil { if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
return nil, err return nil, err
} }

View File

@ -15,6 +15,7 @@
package consumers package consumers
import ( import (
"context"
"encoding/json" "encoding/json"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
@ -83,16 +84,16 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
"type": ev.Type(), "type": ev.Type(),
}).Info("received event from roomserver") }).Info("received event from roomserver")
addQueryReq := api.QueryEventsByIDRequest{output.NewRoomEvent.AddsStateEventIDs} addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs}
var addQueryRes api.QueryEventsByIDResponse var addQueryRes api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(&addQueryReq, &addQueryRes); err != nil { if err := s.query.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil {
log.Warn(err) log.Warn(err)
return err return err
} }
remQueryReq := api.QueryEventsByIDRequest{output.NewRoomEvent.RemovesStateEventIDs} remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
var remQueryRes api.QueryEventsByIDResponse var remQueryRes api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(&remQueryReq, &remQueryRes); err != nil { if err := s.query.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
log.Warn(err) log.Warn(err)
return err return err
} }

View File

@ -15,6 +15,7 @@
package alias package alias
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
@ -53,6 +54,7 @@ type RoomserverAliasAPI struct {
// SetRoomAlias implements api.RoomserverAliasAPI // SetRoomAlias implements api.RoomserverAliasAPI
func (r *RoomserverAliasAPI) SetRoomAlias( func (r *RoomserverAliasAPI) SetRoomAlias(
ctx context.Context,
request *api.SetRoomAliasRequest, request *api.SetRoomAliasRequest,
response *api.SetRoomAliasResponse, response *api.SetRoomAliasResponse,
) error { ) error {
@ -74,7 +76,10 @@ func (r *RoomserverAliasAPI) SetRoomAlias(
} }
// Send a m.room.aliases event with the updated list of aliases for this room // Send a m.room.aliases event with the updated list of aliases for this room
if err := r.sendUpdatedAliasesEvent(request.UserID, request.RoomID); err != nil { // At this point we've already committed the alias to the database so we
// shouldn't cancel this request.
// TODO: Ensure that we send unsent events when if server restarts.
if err := r.sendUpdatedAliasesEvent(context.TODO(), request.UserID, request.RoomID); err != nil {
return err return err
} }
@ -83,6 +88,7 @@ func (r *RoomserverAliasAPI) SetRoomAlias(
// GetAliasRoomID implements api.RoomserverAliasAPI // GetAliasRoomID implements api.RoomserverAliasAPI
func (r *RoomserverAliasAPI) GetAliasRoomID( func (r *RoomserverAliasAPI) GetAliasRoomID(
ctx context.Context,
request *api.GetAliasRoomIDRequest, request *api.GetAliasRoomIDRequest,
response *api.GetAliasRoomIDResponse, response *api.GetAliasRoomIDResponse,
) error { ) error {
@ -98,6 +104,7 @@ func (r *RoomserverAliasAPI) GetAliasRoomID(
// RemoveRoomAlias implements api.RoomserverAliasAPI // RemoveRoomAlias implements api.RoomserverAliasAPI
func (r *RoomserverAliasAPI) RemoveRoomAlias( func (r *RoomserverAliasAPI) RemoveRoomAlias(
ctx context.Context,
request *api.RemoveRoomAliasRequest, request *api.RemoveRoomAliasRequest,
response *api.RemoveRoomAliasResponse, response *api.RemoveRoomAliasResponse,
) error { ) error {
@ -113,7 +120,7 @@ func (r *RoomserverAliasAPI) RemoveRoomAlias(
} }
// Send an updated m.room.aliases event // Send an updated m.room.aliases event
if err := r.sendUpdatedAliasesEvent(request.UserID, roomID); err != nil { if err := r.sendUpdatedAliasesEvent(ctx, request.UserID, roomID); err != nil {
return err return err
} }
@ -126,7 +133,9 @@ type roomAliasesContent struct {
// Build the updated m.room.aliases event to send to the room after addition or // Build the updated m.room.aliases event to send to the room after addition or
// removal of an alias // removal of an alias
func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(userID string, roomID string) error { func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(
ctx context.Context, userID string, roomID string,
) error {
serverName := string(r.Cfg.Matrix.ServerName) serverName := string(r.Cfg.Matrix.ServerName)
builder := gomatrixserverlib.EventBuilder{ builder := gomatrixserverlib.EventBuilder{
@ -162,7 +171,7 @@ func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(userID string, roomID strin
StateToFetch: eventsNeeded.Tuples(), StateToFetch: eventsNeeded.Tuples(),
} }
var res api.QueryLatestEventsAndStateResponse var res api.QueryLatestEventsAndStateResponse
if err = r.QueryAPI.QueryLatestEventsAndState(&req, &res); err != nil { if err = r.QueryAPI.QueryLatestEventsAndState(ctx, &req, &res); err != nil {
return err return err
} }
builder.Depth = res.Depth builder.Depth = res.Depth
@ -182,7 +191,9 @@ func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(userID string, roomID strin
// Build the event // Build the event
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.Cfg.Matrix.ServerName) eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.Cfg.Matrix.ServerName)
now := time.Now() now := time.Now()
event, err := builder.Build(eventID, now, r.Cfg.Matrix.ServerName, r.Cfg.Matrix.KeyID, r.Cfg.Matrix.PrivateKey) event, err := builder.Build(
eventID, now, r.Cfg.Matrix.ServerName, r.Cfg.Matrix.KeyID, r.Cfg.Matrix.PrivateKey,
)
if err != nil { if err != nil {
return err return err
} }
@ -200,7 +211,7 @@ func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(userID string, roomID strin
var inputRes api.InputRoomEventsResponse var inputRes api.InputRoomEventsResponse
// Send the request // Send the request
if err := r.InputAPI.InputRoomEvents(&inputReq, &inputRes); err != nil { if err := r.InputAPI.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
return err return err
} }
@ -217,7 +228,7 @@ func (r *RoomserverAliasAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := r.SetRoomAlias(&request, &response); err != nil { if err := r.SetRoomAlias(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}
@ -231,7 +242,7 @@ func (r *RoomserverAliasAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := r.GetAliasRoomID(&request, &response); err != nil { if err := r.GetAliasRoomID(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}
@ -245,7 +256,7 @@ func (r *RoomserverAliasAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := r.RemoveRoomAlias(&request, &response); err != nil { if err := r.RemoveRoomAlias(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}

View File

@ -15,6 +15,7 @@
package api package api
import ( import (
"context"
"net/http" "net/http"
) )
@ -61,18 +62,21 @@ type RemoveRoomAliasResponse struct{}
type RoomserverAliasAPI interface { type RoomserverAliasAPI interface {
// Set a room alias // Set a room alias
SetRoomAlias( SetRoomAlias(
ctx context.Context,
req *SetRoomAliasRequest, req *SetRoomAliasRequest,
response *SetRoomAliasResponse, response *SetRoomAliasResponse,
) error ) error
// Get the room ID for an alias // Get the room ID for an alias
GetAliasRoomID( GetAliasRoomID(
ctx context.Context,
req *GetAliasRoomIDRequest, req *GetAliasRoomIDRequest,
response *GetAliasRoomIDResponse, response *GetAliasRoomIDResponse,
) error ) error
// Remove a room alias // Remove a room alias
RemoveRoomAlias( RemoveRoomAlias(
ctx context.Context,
req *RemoveRoomAliasRequest, req *RemoveRoomAliasRequest,
response *RemoveRoomAliasResponse, response *RemoveRoomAliasResponse,
) error ) error
@ -103,27 +107,30 @@ type httpRoomserverAliasAPI struct {
// SetRoomAlias implements RoomserverAliasAPI // SetRoomAlias implements RoomserverAliasAPI
func (h *httpRoomserverAliasAPI) SetRoomAlias( func (h *httpRoomserverAliasAPI) SetRoomAlias(
ctx context.Context,
request *SetRoomAliasRequest, request *SetRoomAliasRequest,
response *SetRoomAliasResponse, response *SetRoomAliasResponse,
) error { ) error {
apiURL := h.roomserverURL + RoomserverSetRoomAliasPath apiURL := h.roomserverURL + RoomserverSetRoomAliasPath
return postJSON(h.httpClient, apiURL, request, response) return postJSON(ctx, h.httpClient, apiURL, request, response)
} }
// GetAliasRoomID implements RoomserverAliasAPI // GetAliasRoomID implements RoomserverAliasAPI
func (h *httpRoomserverAliasAPI) GetAliasRoomID( func (h *httpRoomserverAliasAPI) GetAliasRoomID(
ctx context.Context,
request *GetAliasRoomIDRequest, request *GetAliasRoomIDRequest,
response *GetAliasRoomIDResponse, response *GetAliasRoomIDResponse,
) error { ) error {
// RemoveRoomAlias implements RoomserverAliasAPI
apiURL := h.roomserverURL + RoomserverGetAliasRoomIDPath apiURL := h.roomserverURL + RoomserverGetAliasRoomIDPath
return postJSON(h.httpClient, apiURL, request, response) return postJSON(ctx, h.httpClient, apiURL, request, response)
} }
// RemoveRoomAlias implements RoomserverAliasAPI
func (h *httpRoomserverAliasAPI) RemoveRoomAlias( func (h *httpRoomserverAliasAPI) RemoveRoomAlias(
ctx context.Context,
request *RemoveRoomAliasRequest, request *RemoveRoomAliasRequest,
response *RemoveRoomAliasResponse, response *RemoveRoomAliasResponse,
) error { ) error {
apiURL := h.roomserverURL + RoomserverRemoveRoomAliasPath apiURL := h.roomserverURL + RoomserverRemoveRoomAliasPath
return postJSON(h.httpClient, apiURL, request, response) return postJSON(ctx, h.httpClient, apiURL, request, response)
} }

View File

@ -16,6 +16,7 @@
package api package api
import ( import (
"context"
"net/http" "net/http"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -87,6 +88,7 @@ type InputRoomEventsResponse struct{}
// RoomserverInputAPI is used to write events to the room server. // RoomserverInputAPI is used to write events to the room server.
type RoomserverInputAPI interface { type RoomserverInputAPI interface {
InputRoomEvents( InputRoomEvents(
ctx context.Context,
request *InputRoomEventsRequest, request *InputRoomEventsRequest,
response *InputRoomEventsResponse, response *InputRoomEventsResponse,
) error ) error
@ -111,9 +113,10 @@ type httpRoomserverInputAPI struct {
// InputRoomEvents implements RoomserverInputAPI // InputRoomEvents implements RoomserverInputAPI
func (h *httpRoomserverInputAPI) InputRoomEvents( func (h *httpRoomserverInputAPI) InputRoomEvents(
ctx context.Context,
request *InputRoomEventsRequest, request *InputRoomEventsRequest,
response *InputRoomEventsResponse, response *InputRoomEventsResponse,
) error { ) error {
apiURL := h.roomserverURL + RoomserverInputRoomEventsPath apiURL := h.roomserverURL + RoomserverInputRoomEventsPath
return postJSON(h.httpClient, apiURL, request, response) return postJSON(ctx, h.httpClient, apiURL, request, response)
} }

View File

@ -16,6 +16,7 @@ package api
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
@ -154,36 +155,42 @@ type QueryServerAllowedToSeeEventResponse struct {
type RoomserverQueryAPI interface { type RoomserverQueryAPI interface {
// Query the latest events and state for a room from the room server. // Query the latest events and state for a room from the room server.
QueryLatestEventsAndState( QueryLatestEventsAndState(
ctx context.Context,
request *QueryLatestEventsAndStateRequest, request *QueryLatestEventsAndStateRequest,
response *QueryLatestEventsAndStateResponse, response *QueryLatestEventsAndStateResponse,
) error ) error
// Query the state after a list of events in a room from the room server. // Query the state after a list of events in a room from the room server.
QueryStateAfterEvents( QueryStateAfterEvents(
ctx context.Context,
request *QueryStateAfterEventsRequest, request *QueryStateAfterEventsRequest,
response *QueryStateAfterEventsResponse, response *QueryStateAfterEventsResponse,
) error ) error
// Query a list of events by event ID. // Query a list of events by event ID.
QueryEventsByID( QueryEventsByID(
ctx context.Context,
request *QueryEventsByIDRequest, request *QueryEventsByIDRequest,
response *QueryEventsByIDResponse, response *QueryEventsByIDResponse,
) error ) error
// Query a list of membership events for a room // Query a list of membership events for a room
QueryMembershipsForRoom( QueryMembershipsForRoom(
ctx context.Context,
request *QueryMembershipsForRoomRequest, request *QueryMembershipsForRoomRequest,
response *QueryMembershipsForRoomResponse, response *QueryMembershipsForRoomResponse,
) error ) error
// Query a list of invite event senders for a user in a room. // Query a list of invite event senders for a user in a room.
QueryInvitesForUser( QueryInvitesForUser(
ctx context.Context,
request *QueryInvitesForUserRequest, request *QueryInvitesForUserRequest,
response *QueryInvitesForUserResponse, response *QueryInvitesForUserResponse,
) error ) error
// Query whether a server is allowed to see an event // Query whether a server is allowed to see an event
QueryServerAllowedToSeeEvent( QueryServerAllowedToSeeEvent(
ctx context.Context,
request *QueryServerAllowedToSeeEventRequest, request *QueryServerAllowedToSeeEventRequest,
response *QueryServerAllowedToSeeEventResponse, response *QueryServerAllowedToSeeEventResponse,
) error ) error
@ -223,64 +230,81 @@ type httpRoomserverQueryAPI struct {
// QueryLatestEventsAndState implements RoomserverQueryAPI // QueryLatestEventsAndState implements RoomserverQueryAPI
func (h *httpRoomserverQueryAPI) QueryLatestEventsAndState( func (h *httpRoomserverQueryAPI) QueryLatestEventsAndState(
ctx context.Context,
request *QueryLatestEventsAndStateRequest, request *QueryLatestEventsAndStateRequest,
response *QueryLatestEventsAndStateResponse, response *QueryLatestEventsAndStateResponse,
) error { ) error {
apiURL := h.roomserverURL + RoomserverQueryLatestEventsAndStatePath apiURL := h.roomserverURL + RoomserverQueryLatestEventsAndStatePath
return postJSON(h.httpClient, apiURL, request, response) return postJSON(ctx, h.httpClient, apiURL, request, response)
} }
// QueryStateAfterEvents implements RoomserverQueryAPI // QueryStateAfterEvents implements RoomserverQueryAPI
func (h *httpRoomserverQueryAPI) QueryStateAfterEvents( func (h *httpRoomserverQueryAPI) QueryStateAfterEvents(
ctx context.Context,
request *QueryStateAfterEventsRequest, request *QueryStateAfterEventsRequest,
response *QueryStateAfterEventsResponse, response *QueryStateAfterEventsResponse,
) error { ) error {
apiURL := h.roomserverURL + RoomserverQueryStateAfterEventsPath apiURL := h.roomserverURL + RoomserverQueryStateAfterEventsPath
return postJSON(h.httpClient, apiURL, request, response) return postJSON(ctx, h.httpClient, apiURL, request, response)
} }
// QueryEventsByID implements RoomserverQueryAPI // QueryEventsByID implements RoomserverQueryAPI
func (h *httpRoomserverQueryAPI) QueryEventsByID( func (h *httpRoomserverQueryAPI) QueryEventsByID(
ctx context.Context,
request *QueryEventsByIDRequest, request *QueryEventsByIDRequest,
response *QueryEventsByIDResponse, response *QueryEventsByIDResponse,
) error { ) error {
apiURL := h.roomserverURL + RoomserverQueryEventsByIDPath apiURL := h.roomserverURL + RoomserverQueryEventsByIDPath
return postJSON(h.httpClient, apiURL, request, response) return postJSON(ctx, h.httpClient, apiURL, request, response)
} }
// QueryMembershipsForRoom implements RoomserverQueryAPI // QueryMembershipsForRoom implements RoomserverQueryAPI
func (h *httpRoomserverQueryAPI) QueryMembershipsForRoom( func (h *httpRoomserverQueryAPI) QueryMembershipsForRoom(
ctx context.Context,
request *QueryMembershipsForRoomRequest, request *QueryMembershipsForRoomRequest,
response *QueryMembershipsForRoomResponse, response *QueryMembershipsForRoomResponse,
) error { ) error {
apiURL := h.roomserverURL + RoomserverQueryMembershipsForRoomPath apiURL := h.roomserverURL + RoomserverQueryMembershipsForRoomPath
return postJSON(h.httpClient, apiURL, request, response) return postJSON(ctx, h.httpClient, apiURL, request, response)
} }
// QueryInvitesForUser implements RoomserverQueryAPI // QueryInvitesForUser implements RoomserverQueryAPI
func (h *httpRoomserverQueryAPI) QueryInvitesForUser( func (h *httpRoomserverQueryAPI) QueryInvitesForUser(
ctx context.Context,
request *QueryInvitesForUserRequest, request *QueryInvitesForUserRequest,
response *QueryInvitesForUserResponse, response *QueryInvitesForUserResponse,
) error { ) error {
apiURL := h.roomserverURL + RoomserverQueryInvitesForUserPath apiURL := h.roomserverURL + RoomserverQueryInvitesForUserPath
return postJSON(h.httpClient, apiURL, request, response) return postJSON(ctx, h.httpClient, apiURL, request, response)
} }
// QueryServerAllowedToSeeEvent implements RoomserverQueryAPI // QueryServerAllowedToSeeEvent implements RoomserverQueryAPI
func (h *httpRoomserverQueryAPI) QueryServerAllowedToSeeEvent( func (h *httpRoomserverQueryAPI) QueryServerAllowedToSeeEvent(
ctx context.Context,
request *QueryServerAllowedToSeeEventRequest, request *QueryServerAllowedToSeeEventRequest,
response *QueryServerAllowedToSeeEventResponse, response *QueryServerAllowedToSeeEventResponse,
) error { ) error {
apiURL := h.roomserverURL + RoomserverQueryServerAllowedToSeeEventPath apiURL := h.roomserverURL + RoomserverQueryServerAllowedToSeeEventPath
return postJSON(h.httpClient, apiURL, request, response) return postJSON(ctx, h.httpClient, apiURL, request, response)
} }
func postJSON(httpClient *http.Client, apiURL string, request, response interface{}) error { func postJSON(
ctx context.Context, httpClient *http.Client,
apiURL string, request, response interface{},
) error {
jsonBytes, err := json.Marshal(request) jsonBytes, err := json.Marshal(request)
if err != nil { if err != nil {
return err return err
} }
res, err := httpClient.Post(apiURL, "application/json", bytes.NewReader(jsonBytes))
req, err := http.NewRequest("POST", apiURL, bytes.NewReader(jsonBytes))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
res, err := httpClient.Do(req.WithContext(ctx))
if res != nil { if res != nil {
defer res.Body.Close() defer res.Body.Close()
} }

View File

@ -16,6 +16,7 @@
package input package input
import ( import (
"context"
"encoding/json" "encoding/json"
"net/http" "net/http"
@ -53,6 +54,7 @@ func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.Outp
// InputRoomEvents implements api.RoomserverInputAPI // InputRoomEvents implements api.RoomserverInputAPI
func (r *RoomserverInputAPI) InputRoomEvents( func (r *RoomserverInputAPI) InputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest, request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse, response *api.InputRoomEventsResponse,
) error { ) error {
@ -78,7 +80,7 @@ func (r *RoomserverInputAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(400, err.Error()) return util.MessageResponse(400, err.Error())
} }
if err := r.InputRoomEvents(&request, &response); err != nil { if err := r.InputRoomEvents(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}

View File

@ -15,6 +15,7 @@
package query package query
import ( import (
"context"
"encoding/json" "encoding/json"
"net/http" "net/http"
@ -70,6 +71,7 @@ type RoomserverQueryAPI struct {
// QueryLatestEventsAndState implements api.RoomserverQueryAPI // QueryLatestEventsAndState implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryLatestEventsAndState( func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
ctx context.Context,
request *api.QueryLatestEventsAndStateRequest, request *api.QueryLatestEventsAndStateRequest,
response *api.QueryLatestEventsAndStateResponse, response *api.QueryLatestEventsAndStateResponse,
) error { ) error {
@ -105,6 +107,7 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
// QueryStateAfterEvents implements api.RoomserverQueryAPI // QueryStateAfterEvents implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryStateAfterEvents( func (r *RoomserverQueryAPI) QueryStateAfterEvents(
ctx context.Context,
request *api.QueryStateAfterEventsRequest, request *api.QueryStateAfterEventsRequest,
response *api.QueryStateAfterEventsResponse, response *api.QueryStateAfterEventsResponse,
) error { ) error {
@ -146,6 +149,7 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents(
// QueryEventsByID implements api.RoomserverQueryAPI // QueryEventsByID implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryEventsByID( func (r *RoomserverQueryAPI) QueryEventsByID(
ctx context.Context,
request *api.QueryEventsByIDRequest, request *api.QueryEventsByIDRequest,
response *api.QueryEventsByIDResponse, response *api.QueryEventsByIDResponse,
) error { ) error {
@ -193,6 +197,7 @@ func (r *RoomserverQueryAPI) loadEvents(eventNIDs []types.EventNID) ([]gomatrixs
// QueryMembershipsForRoom implements api.RoomserverQueryAPI // QueryMembershipsForRoom implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryMembershipsForRoom( func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
ctx context.Context,
request *api.QueryMembershipsForRoomRequest, request *api.QueryMembershipsForRoomRequest,
response *api.QueryMembershipsForRoomResponse, response *api.QueryMembershipsForRoomResponse,
) error { ) error {
@ -299,6 +304,7 @@ func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID(eventNID types.EventNI
// QueryInvitesForUser implements api.RoomserverQueryAPI // QueryInvitesForUser implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryInvitesForUser( func (r *RoomserverQueryAPI) QueryInvitesForUser(
_ context.Context,
request *api.QueryInvitesForUserRequest, request *api.QueryInvitesForUserRequest,
response *api.QueryInvitesForUserResponse, response *api.QueryInvitesForUserResponse,
) error { ) error {
@ -332,6 +338,7 @@ func (r *RoomserverQueryAPI) QueryInvitesForUser(
// QueryServerAllowedToSeeEvent implements api.RoomserverQueryAPI // QueryServerAllowedToSeeEvent implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryServerAllowedToSeeEvent( func (r *RoomserverQueryAPI) QueryServerAllowedToSeeEvent(
ctx context.Context,
request *api.QueryServerAllowedToSeeEventRequest, request *api.QueryServerAllowedToSeeEventRequest,
response *api.QueryServerAllowedToSeeEventResponse, response *api.QueryServerAllowedToSeeEventResponse,
) error { ) error {
@ -399,7 +406,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := r.QueryLatestEventsAndState(&request, &response); err != nil { if err := r.QueryLatestEventsAndState(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}
@ -413,7 +420,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := r.QueryStateAfterEvents(&request, &response); err != nil { if err := r.QueryStateAfterEvents(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}
@ -427,7 +434,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := r.QueryEventsByID(&request, &response); err != nil { if err := r.QueryEventsByID(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}
@ -441,7 +448,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := r.QueryMembershipsForRoom(&request, &response); err != nil { if err := r.QueryMembershipsForRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}
@ -455,7 +462,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := r.QueryInvitesForUser(&request, &response); err != nil { if err := r.QueryInvitesForUser(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}
@ -469,7 +476,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := r.QueryServerAllowedToSeeEvent(&request, &response); err != nil { if err := r.QueryServerAllowedToSeeEvent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}

View File

@ -15,6 +15,7 @@
package consumers package consumers
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -180,7 +181,7 @@ func (s *OutputRoomEvent) lookupStateEvents(
// from the roomserver using the query API. // from the roomserver using the query API.
eventReq := api.QueryEventsByIDRequest{EventIDs: missing} eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse var eventResp api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil { if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
return nil, err return nil, err
} }