Always reuse transaction (#22362)

This commit is contained in:
Jason Song 2023-01-08 09:34:58 +08:00 committed by GitHub
parent d42b52fcfa
commit 6135359a04
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 104 additions and 42 deletions

View file

@ -141,7 +141,7 @@ func CountNotifications(ctx context.Context, opts *FindNotificationOptions) (int
// CreateRepoTransferNotification creates notification for the user a repository was transferred to // CreateRepoTransferNotification creates notification for the user a repository was transferred to
func CreateRepoTransferNotification(ctx context.Context, doer, newOwner *user_model.User, repo *repo_model.Repository) error { func CreateRepoTransferNotification(ctx context.Context, doer, newOwner *user_model.User, repo *repo_model.Repository) error {
return db.AutoTx(ctx, func(ctx context.Context) error { return db.WithTx(ctx, func(ctx context.Context) error {
var notify []*Notification var notify []*Notification
if newOwner.IsOrganization() { if newOwner.IsOrganization() {

View file

@ -71,6 +71,14 @@ type Engined interface {
// GetEngine will get a db Engine from this context or return an Engine restricted to this context // GetEngine will get a db Engine from this context or return an Engine restricted to this context
func GetEngine(ctx context.Context) Engine { func GetEngine(ctx context.Context) Engine {
if e := getEngine(ctx); e != nil {
return e
}
return x.Context(ctx)
}
// getEngine will get a db Engine from this context or return nil
func getEngine(ctx context.Context) Engine {
if engined, ok := ctx.(Engined); ok { if engined, ok := ctx.(Engined); ok {
return engined.Engine() return engined.Engine()
} }
@ -78,7 +86,7 @@ func GetEngine(ctx context.Context) Engine {
if enginedInterface != nil { if enginedInterface != nil {
return enginedInterface.(Engined).Engine() return enginedInterface.(Engined).Engine()
} }
return x.Context(ctx) return nil
} }
// Committer represents an interface to Commit or Close the Context // Committer represents an interface to Commit or Close the Context
@ -87,10 +95,22 @@ type Committer interface {
Close() error Close() error
} }
// TxContext represents a transaction Context // halfCommitter is a wrapper of Committer.
// It can be closed early, but can't be committed early, it is useful for reusing a transaction.
type halfCommitter struct {
Committer
}
func (*halfCommitter) Commit() error {
// do nothing
return nil
}
// TxContext represents a transaction Context,
// it will reuse the existing transaction in the parent context or create a new one.
func TxContext(parentCtx context.Context) (*Context, Committer, error) { func TxContext(parentCtx context.Context) (*Context, Committer, error) {
if InTransaction(parentCtx) { if sess, ok := inTransaction(parentCtx); ok {
return nil, nil, ErrAlreadyInTransaction return newContext(parentCtx, sess, true), &halfCommitter{Committer: sess}, nil
} }
sess := x.NewSession() sess := x.NewSession()
@ -102,20 +122,11 @@ func TxContext(parentCtx context.Context) (*Context, Committer, error) {
return newContext(DefaultContext, sess, true), sess, nil return newContext(DefaultContext, sess, true), sess, nil
} }
// WithTx represents executing database operations on a transaction // WithTx represents executing database operations on a transaction, if the transaction exist,
// This function will always open a new transaction, if a transaction exist in parentCtx return an error.
func WithTx(parentCtx context.Context, f func(ctx context.Context) error) error {
if InTransaction(parentCtx) {
return ErrAlreadyInTransaction
}
return txWithNoCheck(parentCtx, f)
}
// AutoTx represents executing database operations on a transaction, if the transaction exist,
// this function will reuse it otherwise will create a new one and close it when finished. // this function will reuse it otherwise will create a new one and close it when finished.
func AutoTx(parentCtx context.Context, f func(ctx context.Context) error) error { func WithTx(parentCtx context.Context, f func(ctx context.Context) error) error {
if InTransaction(parentCtx) { if sess, ok := inTransaction(parentCtx); ok {
return f(newContext(parentCtx, GetEngine(parentCtx), true)) return f(newContext(parentCtx, sess, true))
} }
return txWithNoCheck(parentCtx, f) return txWithNoCheck(parentCtx, f)
} }
@ -202,25 +213,25 @@ func EstimateCount(ctx context.Context, bean interface{}) (int64, error) {
// InTransaction returns true if the engine is in a transaction otherwise return false // InTransaction returns true if the engine is in a transaction otherwise return false
func InTransaction(ctx context.Context) bool { func InTransaction(ctx context.Context) bool {
var e Engine _, ok := inTransaction(ctx)
if engined, ok := ctx.(Engined); ok { return ok
e = engined.Engine()
} else {
enginedInterface := ctx.Value(enginedContextKey)
if enginedInterface != nil {
e = enginedInterface.(Engined).Engine()
}
} }
func inTransaction(ctx context.Context) (*xorm.Session, bool) {
e := getEngine(ctx)
if e == nil { if e == nil {
return false return nil, false
} }
switch t := e.(type) { switch t := e.(type) {
case *xorm.Engine: case *xorm.Engine:
return false return nil, false
case *xorm.Session: case *xorm.Session:
return t.IsInTx() if t.IsInTx() {
return t, true
}
return nil, false
default: default:
return false return nil, false
} }
} }

View file

@ -25,8 +25,62 @@ func TestInTransaction(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
defer committer.Close() defer committer.Close()
assert.True(t, db.InTransaction(ctx)) assert.True(t, db.InTransaction(ctx))
assert.Error(t, db.WithTx(ctx, func(ctx context.Context) error { assert.NoError(t, db.WithTx(ctx, func(ctx context.Context) error {
assert.True(t, db.InTransaction(ctx)) assert.True(t, db.InTransaction(ctx))
return nil return nil
})) }))
} }
func TestTxContext(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
{ // create new transaction
ctx, committer, err := db.TxContext(db.DefaultContext)
assert.NoError(t, err)
assert.True(t, db.InTransaction(ctx))
assert.NoError(t, committer.Commit())
}
{ // reuse the transaction created by TxContext and commit it
ctx, committer, err := db.TxContext(db.DefaultContext)
engine := db.GetEngine(ctx)
assert.NoError(t, err)
assert.True(t, db.InTransaction(ctx))
{
ctx, committer, err := db.TxContext(ctx)
assert.NoError(t, err)
assert.True(t, db.InTransaction(ctx))
assert.Equal(t, engine, db.GetEngine(ctx))
assert.NoError(t, committer.Commit())
}
assert.NoError(t, committer.Commit())
}
{ // reuse the transaction created by TxContext and close it
ctx, committer, err := db.TxContext(db.DefaultContext)
engine := db.GetEngine(ctx)
assert.NoError(t, err)
assert.True(t, db.InTransaction(ctx))
{
ctx, committer, err := db.TxContext(ctx)
assert.NoError(t, err)
assert.True(t, db.InTransaction(ctx))
assert.Equal(t, engine, db.GetEngine(ctx))
assert.NoError(t, committer.Close())
}
assert.NoError(t, committer.Close())
}
{ // reuse the transaction created by WithTx
assert.NoError(t, db.WithTx(db.DefaultContext, func(ctx context.Context) error {
assert.True(t, db.InTransaction(ctx))
{
ctx, committer, err := db.TxContext(ctx)
assert.NoError(t, err)
assert.True(t, db.InTransaction(ctx))
assert.NoError(t, committer.Commit())
}
return nil
}))
}
}

View file

@ -4,14 +4,11 @@
package db package db
import ( import (
"errors"
"fmt" "fmt"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
) )
var ErrAlreadyInTransaction = errors.New("database connection has already been in a transaction")
// ErrCancelled represents an error due to context cancellation // ErrCancelled represents an error due to context cancellation
type ErrCancelled struct { type ErrCancelled struct {
Message string Message string

View file

@ -2365,7 +2365,7 @@ func CountOrphanedIssues(ctx context.Context) (int64, error) {
// DeleteOrphanedIssues delete issues without a repo // DeleteOrphanedIssues delete issues without a repo
func DeleteOrphanedIssues(ctx context.Context) error { func DeleteOrphanedIssues(ctx context.Context) error {
var attachmentPaths []string var attachmentPaths []string
err := db.AutoTx(ctx, func(ctx context.Context) error { err := db.WithTx(ctx, func(ctx context.Context) error {
var ids []int64 var ids []int64
if err := db.GetEngine(ctx).Table("issue").Distinct("issue.repo_id"). if err := db.GetEngine(ctx).Table("issue").Distinct("issue.repo_id").

View file

@ -300,7 +300,7 @@ func changeProjectStatus(ctx context.Context, p *Project, isClosed bool) error {
// DeleteProjectByID deletes a project from a repository. if it's not in a database // DeleteProjectByID deletes a project from a repository. if it's not in a database
// transaction, it will start a new database transaction // transaction, it will start a new database transaction
func DeleteProjectByID(ctx context.Context, id int64) error { func DeleteProjectByID(ctx context.Context, id int64) error {
return db.AutoTx(ctx, func(ctx context.Context) error { return db.WithTx(ctx, func(ctx context.Context) error {
p, err := GetProjectByID(ctx, id) p, err := GetProjectByID(ctx, id)
if err != nil { if err != nil {
if IsErrProjectNotExist(err) { if IsErrProjectNotExist(err) {

View file

@ -105,7 +105,7 @@ func ChangeCollaborationAccessMode(ctx context.Context, repo *Repository, uid in
return nil return nil
} }
return db.AutoTx(ctx, func(ctx context.Context) error { return db.WithTx(ctx, func(ctx context.Context) error {
e := db.GetEngine(ctx) e := db.GetEngine(ctx)
collaboration := &Collaboration{ collaboration := &Collaboration{

View file

@ -155,7 +155,7 @@ func TestRepositoryReadyForTransfer(status repo_model.RepositoryStatus) error {
// CreatePendingRepositoryTransfer transfer a repo from one owner to a new one. // CreatePendingRepositoryTransfer transfer a repo from one owner to a new one.
// it marks the repository transfer as "pending" // it marks the repository transfer as "pending"
func CreatePendingRepositoryTransfer(ctx context.Context, doer, newOwner *user_model.User, repoID int64, teams []*organization.Team) error { func CreatePendingRepositoryTransfer(ctx context.Context, doer, newOwner *user_model.User, repoID int64, teams []*organization.Team) error {
return db.AutoTx(ctx, func(ctx context.Context) error { return db.WithTx(ctx, func(ctx context.Context) error {
repo, err := repo_model.GetRepositoryByID(ctx, repoID) repo, err := repo_model.GetRepositoryByID(ctx, repoID)
if err != nil { if err != nil {
return err return err

View file

@ -243,7 +243,7 @@ func (ns *notificationService) NotifyPullReviewRequest(ctx context.Context, doer
} }
func (ns *notificationService) NotifyRepoPendingTransfer(ctx context.Context, doer, newOwner *user_model.User, repo *repo_model.Repository) { func (ns *notificationService) NotifyRepoPendingTransfer(ctx context.Context, doer, newOwner *user_model.User, repo *repo_model.Repository) {
err := db.AutoTx(ctx, func(ctx context.Context) error { err := db.WithTx(ctx, func(ctx context.Context) error {
return activities_model.CreateRepoTransferNotification(ctx, doer, newOwner, repo) return activities_model.CreateRepoTransferNotification(ctx, doer, newOwner, repo)
}) })
if err != nil { if err != nil {

View file

@ -14,7 +14,7 @@ import (
) )
func AddCollaborator(ctx context.Context, repo *repo_model.Repository, u *user_model.User) error { func AddCollaborator(ctx context.Context, repo *repo_model.Repository, u *user_model.User) error {
return db.AutoTx(ctx, func(ctx context.Context) error { return db.WithTx(ctx, func(ctx context.Context) error {
collaboration := &repo_model.Collaboration{ collaboration := &repo_model.Collaboration{
RepoID: repo.ID, RepoID: repo.ID,
UserID: u.ID, UserID: u.ID,

View file

@ -123,7 +123,7 @@ func UpdateComment(ctx context.Context, c *issues_model.Comment, doer *user_mode
// DeleteComment deletes the comment // DeleteComment deletes the comment
func DeleteComment(ctx context.Context, doer *user_model.User, comment *issues_model.Comment) error { func DeleteComment(ctx context.Context, doer *user_model.User, comment *issues_model.Comment) error {
err := db.AutoTx(ctx, func(ctx context.Context) error { err := db.WithTx(ctx, func(ctx context.Context) error {
return issues_model.DeleteComment(ctx, comment) return issues_model.DeleteComment(ctx, comment)
}) })
if err != nil { if err != nil {