Mark device list as stale, if we don't have the requesting device (#2728)
This hopefully makes E2EE chats a little bit more reliable by re-syncing
devices if we don't have the `requesting_device_id` in our database. (As
seen in
[Synapse](c52abc1cfd/synapse/handlers/devicemessage.py (L157-L201)
))
This commit is contained in:
parent
47af4bff5b
commit
e007b8038f
@ -21,9 +21,10 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/keyserver/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type KeyInternalAPI interface {
|
||||
@ -56,6 +57,7 @@ type UserKeyAPI interface {
|
||||
type SyncKeyAPI interface {
|
||||
QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse) error
|
||||
QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse) error
|
||||
PerformMarkAsStaleIfNeeded(ctx context.Context, req *PerformMarkAsStaleRequest, res *struct{}) error
|
||||
}
|
||||
|
||||
type FederationKeyAPI interface {
|
||||
@ -335,3 +337,9 @@ type QuerySignaturesResponse struct {
|
||||
// The request error, if any
|
||||
Error *KeyError
|
||||
}
|
||||
|
||||
type PerformMarkAsStaleRequest struct {
|
||||
UserID string
|
||||
Domain gomatrixserverlib.ServerName
|
||||
DeviceID string
|
||||
}
|
||||
|
@ -23,16 +23,17 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/producers"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/tidwall/gjson"
|
||||
"github.com/tidwall/sjson"
|
||||
|
||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/producers"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
)
|
||||
|
||||
type KeyInternalAPI struct {
|
||||
@ -224,6 +225,19 @@ func (a *KeyInternalAPI) QueryDeviceMessages(ctx context.Context, req *api.Query
|
||||
return nil
|
||||
}
|
||||
|
||||
// PerformMarkAsStaleIfNeeded marks the users device list as stale, if the given deviceID is not present
|
||||
// in our database.
|
||||
func (a *KeyInternalAPI) PerformMarkAsStaleIfNeeded(ctx context.Context, req *api.PerformMarkAsStaleRequest, res *struct{}) error {
|
||||
knownDevices, err := a.DB.DeviceKeysForUser(ctx, req.UserID, []string{req.DeviceID}, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(knownDevices) == 0 {
|
||||
return a.Updater.ManualUpdate(ctx, req.Domain, req.UserID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// nolint:gocyclo
|
||||
func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse) error {
|
||||
res.DeviceKeys = make(map[string]map[string]json.RawMessage)
|
||||
|
@ -37,6 +37,7 @@ const (
|
||||
QueryOneTimeKeysPath = "/keyserver/queryOneTimeKeys"
|
||||
QueryDeviceMessagesPath = "/keyserver/queryDeviceMessages"
|
||||
QuerySignaturesPath = "/keyserver/querySignatures"
|
||||
PerformMarkAsStalePath = "/keyserver/markAsStale"
|
||||
)
|
||||
|
||||
// NewKeyServerClient creates a KeyInternalAPI implemented by talking to a HTTP POST API.
|
||||
@ -172,3 +173,14 @@ func (h *httpKeyInternalAPI) QuerySignatures(
|
||||
h.httpClient, ctx, request, response,
|
||||
)
|
||||
}
|
||||
|
||||
func (h *httpKeyInternalAPI) PerformMarkAsStaleIfNeeded(
|
||||
ctx context.Context,
|
||||
request *api.PerformMarkAsStaleRequest,
|
||||
response *struct{},
|
||||
) error {
|
||||
return httputil.CallInternalRPCAPI(
|
||||
"MarkAsStale", h.apiURL+PerformMarkAsStalePath,
|
||||
h.httpClient, ctx, request, response,
|
||||
)
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package inthttp
|
||||
|
||||
import (
|
||||
"github.com/gorilla/mux"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
)
|
||||
@ -70,4 +71,9 @@ func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
|
||||
QuerySignaturesPath,
|
||||
httputil.MakeInternalRPCAPI("KeyserverQuerySignatures", s.QuerySignatures),
|
||||
)
|
||||
|
||||
internalAPIMux.Handle(
|
||||
PerformMarkAsStalePath,
|
||||
httputil.MakeInternalRPCAPI("KeyserverMarkAsStale", s.PerformMarkAsStaleIfNeeded),
|
||||
)
|
||||
}
|
||||
|
@ -23,7 +23,9 @@ import (
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
@ -39,6 +41,7 @@ type OutputSendToDeviceEventConsumer struct {
|
||||
durable string
|
||||
topic string
|
||||
db storage.Database
|
||||
keyAPI keyapi.SyncKeyAPI
|
||||
serverName gomatrixserverlib.ServerName // our server name
|
||||
stream types.StreamProvider
|
||||
notifier *notifier.Notifier
|
||||
@ -51,6 +54,7 @@ func NewOutputSendToDeviceEventConsumer(
|
||||
cfg *config.SyncAPI,
|
||||
js nats.JetStreamContext,
|
||||
store storage.Database,
|
||||
keyAPI keyapi.SyncKeyAPI,
|
||||
notifier *notifier.Notifier,
|
||||
stream types.StreamProvider,
|
||||
) *OutputSendToDeviceEventConsumer {
|
||||
@ -60,6 +64,7 @@ func NewOutputSendToDeviceEventConsumer(
|
||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
||||
durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"),
|
||||
db: store,
|
||||
keyAPI: keyAPI,
|
||||
serverName: cfg.Matrix.ServerName,
|
||||
notifier: notifier,
|
||||
stream: stream,
|
||||
@ -96,12 +101,28 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
||||
return true
|
||||
}
|
||||
|
||||
util.GetLogger(context.TODO()).WithFields(log.Fields{
|
||||
logger := util.GetLogger(context.TODO()).WithFields(log.Fields{
|
||||
"sender": output.Sender,
|
||||
"user_id": output.UserID,
|
||||
"device_id": output.DeviceID,
|
||||
"event_type": output.Type,
|
||||
}).Debugf("sync API received send-to-device event from the clientapi/federationsender")
|
||||
})
|
||||
logger.Debugf("sync API received send-to-device event from the clientapi/federationsender")
|
||||
|
||||
// Check we actually got the requesting device in our store, if we receive a room key request
|
||||
if output.Type == "m.room_key_request" {
|
||||
requestingDeviceID := gjson.GetBytes(output.SendToDeviceEvent.Content, "requesting_device_id").Str
|
||||
_, senderDomain, _ := gomatrixserverlib.SplitID('@', output.Sender)
|
||||
if requestingDeviceID != "" && senderDomain != s.serverName {
|
||||
// Mark the requesting device as stale, if we don't know about it.
|
||||
if err = s.keyAPI.PerformMarkAsStaleIfNeeded(ctx, &keyapi.PerformMarkAsStaleRequest{
|
||||
UserID: output.Sender, Domain: senderDomain, DeviceID: requestingDeviceID,
|
||||
}, &struct{}{}); err != nil {
|
||||
logger.WithError(err).Errorf("failed to mark as stale if needed")
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
streamPos, err := s.db.StoreNewSendForDeviceMessage(
|
||||
s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent,
|
||||
|
@ -22,6 +22,10 @@ var (
|
||||
|
||||
type mockKeyAPI struct{}
|
||||
|
||||
func (k *mockKeyAPI) PerformMarkAsStaleIfNeeded(ctx context.Context, req *keyapi.PerformMarkAsStaleRequest, res *struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *mockKeyAPI) PerformUploadKeys(ctx context.Context, req *keyapi.PerformUploadKeysRequest, res *keyapi.PerformUploadKeysResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -17,9 +17,10 @@ package syncapi
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
@ -126,7 +127,7 @@ func AddPublicRoutes(
|
||||
}
|
||||
|
||||
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
|
||||
base.ProcessContext, cfg, js, syncDB, notifier, streams.SendToDeviceStreamProvider,
|
||||
base.ProcessContext, cfg, js, syncDB, keyAPI, notifier, streams.SendToDeviceStreamProvider,
|
||||
)
|
||||
if err = sendToDeviceConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
|
||||
|
Loading…
Reference in New Issue
Block a user