diff --git a/scripts/install-local-kafka.sh b/scripts/install-local-kafka.sh index d1fef38e..19ce0911 100755 --- a/scripts/install-local-kafka.sh +++ b/scripts/install-local-kafka.sh @@ -9,9 +9,9 @@ cd `dirname $0`/.. mkdir -p .downloads # The mirror to download kafka from is picked from the list of mirrors at -# https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.11.0.2.tgz +# https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz # TODO: Check the signature since we are downloading over HTTP. -MIRROR=http://apache.mirror.anlx.net/kafka/0.11.0.2/kafka_2.11-0.11.0.2.tgz +MIRROR=http://apache.mirror.anlx.net/kafka/2.1.0/kafka_2.11-2.1.0.tgz # Only download the kafka if it isn't already downloaded. test -f .downloads/kafka.tgz || wget $MIRROR -O .downloads/kafka.tgz diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 5c560ff5..89137eb5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -45,6 +45,8 @@ func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.D // called in a dedicated goroutine for this request. This function will block the goroutine // until a response is ready, or it times out. func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device) util.JSONResponse { + var syncData *types.Response + // Extract values from request logger := util.GetLogger(req.Context()) userID := device.UserID @@ -65,7 +67,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype // If this is an initial sync or timeout=0 we return immediately if syncReq.since == nil || syncReq.timeout == 0 { - syncData, err := rp.currentSyncForUser(*syncReq, currPos) + syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { return httputil.LogThenError(req, err) } @@ -84,6 +86,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype userStreamListener := rp.notifier.GetListener(*syncReq) defer userStreamListener.Close() + // We need the loop in case userStreamListener wakes up even if there isn't + // anything to send down. In this case, we'll jump out of the select but + // don't want to send anything back until we get some actual content to + // respond with, so we skip the return an go back to waiting for content to + // be sent down or the request timing out. + var hasTimedOut bool for { select { // Wait for notifier to wake us up @@ -91,10 +99,11 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype currPos = userStreamListener.GetStreamPosition() // Or for timeout to expire case <-timer.C: - return util.JSONResponse{ - Code: http.StatusOK, - JSON: types.NewResponse(currPos), - } + // We just need to ensure we get out of the select after reaching the + // timeout, but there's nothing specific we want to do in this case + // apart from that, so we do nothing except stating we're timing out + // and need to respond. + hasTimedOut = true // Or for the request to be cancelled case <-req.Context().Done(): return httputil.LogThenError(req, req.Context().Err()) @@ -105,17 +114,17 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype // of calculating the sync only to get timed out before we // can respond - syncData, err := rp.currentSyncForUser(*syncReq, currPos) + syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { return httputil.LogThenError(req, err) } - if !syncData.IsEmpty() { + + if !syncData.IsEmpty() || hasTimedOut { return util.JSONResponse{ Code: http.StatusOK, JSON: syncData, } } - } }