diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index 6f75b8357..baac09739 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -124,7 +124,10 @@ func (q *ChannelQueue) Shutdown() {
 		log.Trace("ChannelQueue: %s Flushing", q.name)
 		// We can't use Cleanup here because that will close the channel
 		if err := q.FlushWithContext(q.terminateCtx); err != nil {
-			log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
+			count := atomic.LoadInt64(&q.numInQueue)
+			if count > 0 {
+				log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
+			}
 			return
 		}
 		log.Debug("ChannelQueue: %s Flushed", q.name)
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index c7526714c..91f91f0df 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -94,7 +94,8 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
 			},
 			Workers: 0,
 		},
-		DataDir: config.DataDir,
+		DataDir:   config.DataDir,
+		QueueName: config.Name + "-level",
 	}
 
 	levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
@@ -172,16 +173,18 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
 	atShutdown(q.Shutdown)
 	atTerminate(q.Terminate)
 
-	if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
+	if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.terminateCtx) != 0 {
 		// Just run the level queue - we shut it down once it's flushed
 		go q.internal.Run(func(_ func()) {}, func(_ func()) {})
 		go func() {
-			for !q.IsEmpty() {
-				_ = q.internal.Flush(0)
+			for !lq.IsEmpty() {
+				_ = lq.Flush(0)
 				select {
 				case <-time.After(100 * time.Millisecond):
-				case <-q.internal.(*LevelQueue).shutdownCtx.Done():
-					log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
+				case <-lq.shutdownCtx.Done():
+					if lq.byteFIFO.Len(lq.terminateCtx) > 0 {
+						log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
+					}
 					return
 				}
 			}
@@ -316,10 +319,22 @@ func (q *PersistableChannelQueue) Shutdown() {
 	// Redirect all remaining data in the chan to the internal channel
 	log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
 	close(q.channelQueue.dataChan)
+	countOK, countLost := 0, 0
 	for data := range q.channelQueue.dataChan {
-		_ = q.internal.Push(data)
+		err := q.internal.Push(data)
+		if err != nil {
+			log.Error("PersistableChannelQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
+			countLost++
+		} else {
+			countOK++
+		}
 		atomic.AddInt64(&q.channelQueue.numInQueue, -1)
 	}
+	if countLost > 0 {
+		log.Warn("PersistableChannelQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
+	} else if countOK > 0 {
+		log.Warn("PersistableChannelQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
+	}
 	log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
 
 	log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index 318610355..4f14a5d79 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -39,7 +39,7 @@ func TestPersistableChannelQueue(t *testing.T) {
 		Workers:      1,
 		BoostWorkers: 0,
 		MaxWorkers:   10,
-		Name:         "first",
+		Name:         "test-queue",
 	}, &testData{})
 	assert.NoError(t, err)
 
@@ -135,7 +135,7 @@ func TestPersistableChannelQueue(t *testing.T) {
 		Workers:      1,
 		BoostWorkers: 0,
 		MaxWorkers:   10,
-		Name:         "second",
+		Name:         "test-queue",
 	}, &testData{})
 	assert.NoError(t, err)
 
@@ -227,7 +227,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
 		Workers:      1,
 		BoostWorkers: 0,
 		MaxWorkers:   10,
-		Name:         "first",
+		Name:         "test-queue",
 	}, &testData{})
 	assert.NoError(t, err)
 
@@ -433,7 +433,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
 		Workers:      1,
 		BoostWorkers: 0,
 		MaxWorkers:   10,
-		Name:         "second",
+		Name:         "test-queue",
 	}, &testData{})
 	assert.NoError(t, err)
 	pausable, ok = queue.(Pausable)
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index c43bd1db3..62c051aa3 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -177,7 +177,9 @@ func (q *ChannelUniqueQueue) Shutdown() {
 	go func() {
 		log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
 		if err := q.FlushWithContext(q.terminateCtx); err != nil {
-			log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
+			if !q.IsEmpty() {
+				log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
+			}
 			return
 		}
 		log.Debug("ChannelUniqueQueue: %s Flushed", q.name)
diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go
index 9372694b8..824015b83 100644
--- a/modules/queue/unique_queue_channel_test.go
+++ b/modules/queue/unique_queue_channel_test.go
@@ -8,10 +8,13 @@ import (
 	"testing"
 	"time"
 
+	"code.gitea.io/gitea/modules/log"
+
 	"github.com/stretchr/testify/assert"
 )
 
 func TestChannelUniqueQueue(t *testing.T) {
+	_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
 	handleChan := make(chan *testData)
 	handle := func(data ...Data) []Data {
 		for _, datum := range data {
@@ -52,6 +55,8 @@ func TestChannelUniqueQueue(t *testing.T) {
 }
 
 func TestChannelUniqueQueue_Batch(t *testing.T) {
+	_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
+
 	handleChan := make(chan *testData)
 	handle := func(data ...Data) []Data {
 		for _, datum := range data {
@@ -98,6 +103,8 @@ func TestChannelUniqueQueue_Batch(t *testing.T) {
 }
 
 func TestChannelUniqueQueue_Pause(t *testing.T) {
+	_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
+
 	lock := sync.Mutex{}
 	var queue Queue
 	var err error
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
index 405726182..cc8a807c6 100644
--- a/modules/queue/unique_queue_disk_channel.go
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -94,7 +94,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
 			},
 			Workers: 0,
 		},
-		DataDir: config.DataDir,
+		DataDir:   config.DataDir,
+		QueueName: config.Name + "-level",
 	}
 
 	queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
@@ -209,17 +210,29 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
 	atTerminate(q.Terminate)
 	_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
 
-	if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
+	if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() {
 		// Just run the level queue - we shut it down once it's flushed
-		go q.internal.Run(func(_ func()) {}, func(_ func()) {})
+		go luq.Run(func(_ func()) {}, func(_ func()) {})
 		go func() {
-			_ = q.internal.Flush(0)
-			log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelUniqueQueue).Name())
-			q.internal.(*LevelUniqueQueue).Shutdown()
-			GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
+			_ = luq.Flush(0)
+			for !luq.IsEmpty() {
+				_ = luq.Flush(0)
+				select {
+				case <-time.After(100 * time.Millisecond):
+				case <-luq.shutdownCtx.Done():
+					if luq.byteFIFO.Len(luq.terminateCtx) > 0 {
+						log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name())
+					}
+					return
+				}
+			}
+			log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name())
+			luq.Shutdown()
+			GetManager().Remove(luq.qid)
 		}()
 	} else {
 		log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
+		_ = q.internal.Flush(0)
 		q.internal.(*LevelUniqueQueue).Shutdown()
 		GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
 	}
@@ -285,8 +298,20 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
 	// Redirect all remaining data in the chan to the internal channel
 	close(q.channelQueue.dataChan)
 	log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
+	countOK, countLost := 0, 0
 	for data := range q.channelQueue.dataChan {
-		_ = q.internal.Push(data)
+		err := q.internal.(*LevelUniqueQueue).Push(data)
+		if err != nil {
+			log.Error("PersistableChannelUniqueQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
+			countLost++
+		} else {
+			countOK++
+		}
+	}
+	if countLost > 0 {
+		log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
+	} else if countOK > 0 {
+		log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
 	}
 	log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
 
diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go
new file mode 100644
index 000000000..fd76163f4
--- /dev/null
+++ b/modules/queue/unique_queue_disk_channel_test.go
@@ -0,0 +1,259 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+	"fmt"
+	"strconv"
+	"sync"
+	"testing"
+	"time"
+
+	"code.gitea.io/gitea/modules/log"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestPersistableChannelUniqueQueue(t *testing.T) {
+	tmpDir := t.TempDir()
+	fmt.Printf("TempDir %s\n", tmpDir)
+	_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
+
+	// Common function to create the Queue
+	newQueue := func(name string, handle func(data ...Data) []Data) Queue {
+		q, err := NewPersistableChannelUniqueQueue(handle,
+			PersistableChannelUniqueQueueConfiguration{
+				Name:         name,
+				DataDir:      tmpDir,
+				QueueLength:  200,
+				MaxWorkers:   1,
+				BlockTimeout: 1 * time.Second,
+				BoostTimeout: 5 * time.Minute,
+				BoostWorkers: 1,
+				Workers:      0,
+			}, "task-0")
+		assert.NoError(t, err)
+		return q
+	}
+
+	// runs the provided queue and provides some timer function
+	type channels struct {
+		readyForShutdown  chan struct{} // closed when shutdown functions have been assigned
+		readyForTerminate chan struct{} // closed when terminate functions have been assigned
+		signalShutdown    chan struct{} // Should close to signal shutdown
+		doneShutdown      chan struct{} // closed when shutdown function is done
+		queueTerminate    []func()      // list of atTerminate functions to call atTerminate - need to be accessed with lock
+	}
+	runQueue := func(q Queue, lock *sync.Mutex) *channels {
+		chans := &channels{
+			readyForShutdown:  make(chan struct{}),
+			readyForTerminate: make(chan struct{}),
+			signalShutdown:    make(chan struct{}),
+			doneShutdown:      make(chan struct{}),
+		}
+		go q.Run(func(atShutdown func()) {
+			go func() {
+				lock.Lock()
+				select {
+				case <-chans.readyForShutdown:
+				default:
+					close(chans.readyForShutdown)
+				}
+				lock.Unlock()
+				<-chans.signalShutdown
+				atShutdown()
+				close(chans.doneShutdown)
+			}()
+		}, func(atTerminate func()) {
+			lock.Lock()
+			defer lock.Unlock()
+			select {
+			case <-chans.readyForTerminate:
+			default:
+				close(chans.readyForTerminate)
+			}
+			chans.queueTerminate = append(chans.queueTerminate, atTerminate)
+		})
+
+		return chans
+	}
+
+	// call to shutdown and terminate the queue associated with the channels
+	doTerminate := func(chans *channels, lock *sync.Mutex) {
+		<-chans.readyForTerminate
+
+		lock.Lock()
+		callbacks := []func(){}
+		callbacks = append(callbacks, chans.queueTerminate...)
+		lock.Unlock()
+
+		for _, callback := range callbacks {
+			callback()
+		}
+	}
+
+	mapLock := sync.Mutex{}
+	executedInitial := map[string][]string{}
+	hasInitial := map[string][]string{}
+
+	fillQueue := func(name string, done chan struct{}) {
+		t.Run("Initial Filling: "+name, func(t *testing.T) {
+			lock := sync.Mutex{}
+
+			startAt100Queued := make(chan struct{})
+			stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
+
+			handle := func(data ...Data) []Data {
+				<-startAt100Queued
+				for _, datum := range data {
+					s := datum.(string)
+					mapLock.Lock()
+					executedInitial[name] = append(executedInitial[name], s)
+					mapLock.Unlock()
+					if s == "task-20" {
+						close(stopAt20Shutdown)
+					}
+				}
+				return nil
+			}
+
+			q := newQueue(name, handle)
+
+			// add 100 tasks to the queue
+			for i := 0; i < 100; i++ {
+				_ = q.Push("task-" + strconv.Itoa(i))
+			}
+			close(startAt100Queued)
+
+			chans := runQueue(q, &lock)
+
+			<-chans.readyForShutdown
+			<-stopAt20Shutdown
+			close(chans.signalShutdown)
+			<-chans.doneShutdown
+			_ = q.Push("final")
+
+			// check which tasks are still in the queue
+			for i := 0; i < 100; i++ {
+				if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
+					mapLock.Lock()
+					hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i))
+					mapLock.Unlock()
+				}
+			}
+			if has, _ := q.(UniqueQueue).Has("final"); has {
+				mapLock.Lock()
+				hasInitial[name] = append(hasInitial[name], "final")
+				mapLock.Unlock()
+			} else {
+				assert.Fail(t, "UnqueQueue %s should have \"final\"", name)
+			}
+			doTerminate(chans, &lock)
+			mapLock.Lock()
+			assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name]))
+			mapLock.Unlock()
+		})
+		close(done)
+	}
+
+	doneA := make(chan struct{})
+	doneB := make(chan struct{})
+
+	go fillQueue("QueueA", doneA)
+	go fillQueue("QueueB", doneB)
+
+	<-doneA
+	<-doneB
+
+	executedEmpty := map[string][]string{}
+	hasEmpty := map[string][]string{}
+	emptyQueue := func(name string, done chan struct{}) {
+		t.Run("Empty Queue: "+name, func(t *testing.T) {
+			lock := sync.Mutex{}
+			stop := make(chan struct{})
+
+			// collect the tasks that have been executed
+			handle := func(data ...Data) []Data {
+				lock.Lock()
+				for _, datum := range data {
+					mapLock.Lock()
+					executedEmpty[name] = append(executedEmpty[name], datum.(string))
+					mapLock.Unlock()
+					if datum.(string) == "final" {
+						close(stop)
+					}
+				}
+				lock.Unlock()
+				return nil
+			}
+
+			q := newQueue(name, handle)
+			chans := runQueue(q, &lock)
+
+			<-chans.readyForShutdown
+			<-stop
+			close(chans.signalShutdown)
+			<-chans.doneShutdown
+
+			// check which tasks are still in the queue
+			for i := 0; i < 100; i++ {
+				if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
+					mapLock.Lock()
+					hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i))
+					mapLock.Unlock()
+				}
+			}
+			doTerminate(chans, &lock)
+
+			mapLock.Lock()
+			assert.Equal(t, 101, len(executedInitial[name])+len(executedEmpty[name]))
+			assert.Equal(t, 0, len(hasEmpty[name]))
+			mapLock.Unlock()
+		})
+		close(done)
+	}
+
+	doneA = make(chan struct{})
+	doneB = make(chan struct{})
+
+	go emptyQueue("QueueA", doneA)
+	go emptyQueue("QueueB", doneB)
+
+	<-doneA
+	<-doneB
+
+	mapLock.Lock()
+	t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
+		len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
+
+	// reset and rerun
+	executedInitial = map[string][]string{}
+	hasInitial = map[string][]string{}
+	executedEmpty = map[string][]string{}
+	hasEmpty = map[string][]string{}
+	mapLock.Unlock()
+
+	doneA = make(chan struct{})
+	doneB = make(chan struct{})
+
+	go fillQueue("QueueA", doneA)
+	go fillQueue("QueueB", doneB)
+
+	<-doneA
+	<-doneB
+
+	doneA = make(chan struct{})
+	doneB = make(chan struct{})
+
+	go emptyQueue("QueueA", doneA)
+	go emptyQueue("QueueB", doneB)
+
+	<-doneA
+	<-doneB
+
+	mapLock.Lock()
+	t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
+		len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
+	mapLock.Unlock()
+}