Use a custom FIFO queue for the RS input API (#1888)
* Use a FIFO queue instead of a channel to reduce backpressure * Make sure someone wakes up * Tweaks * Add comments
This commit is contained in:
parent
a6f7e83596
commit
7c3991ee2f
@ -38,8 +38,7 @@ type Inputer struct {
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
ACLs *acls.ServerACLs
|
||||
OutputRoomEventTopic string
|
||||
|
||||
workers sync.Map // room ID -> *inputWorker
|
||||
workers sync.Map // room ID -> *inputWorker
|
||||
}
|
||||
|
||||
type inputTask struct {
|
||||
@ -52,7 +51,7 @@ type inputTask struct {
|
||||
type inputWorker struct {
|
||||
r *Inputer
|
||||
running atomic.Bool
|
||||
input chan *inputTask
|
||||
input *fifoQueue
|
||||
}
|
||||
|
||||
// Guarded by a CAS on w.running
|
||||
@ -60,7 +59,11 @@ func (w *inputWorker) start() {
|
||||
defer w.running.Store(false)
|
||||
for {
|
||||
select {
|
||||
case task := <-w.input:
|
||||
case <-w.input.wait():
|
||||
task, ok := w.input.pop()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
hooks.Run(hooks.KindNewEventReceived, task.event.Event)
|
||||
_, task.err = w.r.processRoomEvent(task.ctx, task.event)
|
||||
if task.err == nil {
|
||||
@ -143,7 +146,7 @@ func (r *Inputer) InputRoomEvents(
|
||||
// room - the channel will be quite small as it's just pointer types.
|
||||
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
|
||||
r: r,
|
||||
input: make(chan *inputTask, 32),
|
||||
input: newFIFOQueue(),
|
||||
})
|
||||
worker := w.(*inputWorker)
|
||||
|
||||
@ -160,7 +163,7 @@ func (r *Inputer) InputRoomEvents(
|
||||
if worker.running.CAS(false, true) {
|
||||
go worker.start()
|
||||
}
|
||||
worker.input <- tasks[i]
|
||||
worker.input.push(tasks[i])
|
||||
}
|
||||
|
||||
// Wait for all of the workers to return results about our tasks.
|
||||
|
64
roomserver/internal/input/input_fifo.go
Normal file
64
roomserver/internal/input/input_fifo.go
Normal file
@ -0,0 +1,64 @@
|
||||
package input
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type fifoQueue struct {
|
||||
tasks []*inputTask
|
||||
count int
|
||||
mutex sync.Mutex
|
||||
notifs chan struct{}
|
||||
}
|
||||
|
||||
func newFIFOQueue() *fifoQueue {
|
||||
q := &fifoQueue{
|
||||
notifs: make(chan struct{}, 1),
|
||||
}
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *fifoQueue) push(frame *inputTask) {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
q.tasks = append(q.tasks, frame)
|
||||
q.count++
|
||||
select {
|
||||
case q.notifs <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// pop returns the first item of the queue, if there is one.
|
||||
// The second return value will indicate if a task was returned.
|
||||
// You must check this value, even after calling wait().
|
||||
func (q *fifoQueue) pop() (*inputTask, bool) {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
if q.count == 0 {
|
||||
return nil, false
|
||||
}
|
||||
frame := q.tasks[0]
|
||||
q.tasks[0] = nil
|
||||
q.tasks = q.tasks[1:]
|
||||
q.count--
|
||||
if q.count == 0 {
|
||||
// Force a GC of the underlying array, since it might have
|
||||
// grown significantly if the queue was hammered for some reason
|
||||
q.tasks = nil
|
||||
}
|
||||
return frame, true
|
||||
}
|
||||
|
||||
// wait returns a channel which can be used to detect when an
|
||||
// item is waiting in the queue.
|
||||
func (q *fifoQueue) wait() <-chan struct{} {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
if q.count > 0 && len(q.notifs) == 0 {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
return q.notifs
|
||||
}
|
Loading…
Reference in New Issue
Block a user