Slightly simplify the queue settings code to help reduce the risk of problems (#12976)
Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: techknowlogick <techknowlogick@gitea.io>
This commit is contained in:
parent
e374bb7e2d
commit
c8f7a6b774
|
@ -9,25 +9,56 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Mappable represents an interface that can MapTo another interface
|
||||||
|
type Mappable interface {
|
||||||
|
MapTo(v interface{}) error
|
||||||
|
}
|
||||||
|
|
||||||
// toConfig will attempt to convert a given configuration cfg into the provided exemplar type.
|
// toConfig will attempt to convert a given configuration cfg into the provided exemplar type.
|
||||||
//
|
//
|
||||||
// It will tolerate the cfg being passed as a []byte or string of a json representation of the
|
// It will tolerate the cfg being passed as a []byte or string of a json representation of the
|
||||||
// exemplar or the correct type of the exemplar itself
|
// exemplar or the correct type of the exemplar itself
|
||||||
func toConfig(exemplar, cfg interface{}) (interface{}, error) {
|
func toConfig(exemplar, cfg interface{}) (interface{}, error) {
|
||||||
|
|
||||||
|
// First of all check if we've got the same type as the exemplar - if so it's all fine.
|
||||||
if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
|
if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Now if not - does it provide a MapTo function we can try?
|
||||||
|
if mappable, ok := cfg.(Mappable); ok {
|
||||||
|
newVal := reflect.New(reflect.TypeOf(exemplar))
|
||||||
|
if err := mappable.MapTo(newVal.Interface()); err == nil {
|
||||||
|
return newVal.Elem().Interface(), nil
|
||||||
|
}
|
||||||
|
// MapTo has failed us ... let's try the json route ...
|
||||||
|
}
|
||||||
|
|
||||||
|
// OK we've been passed a byte array right?
|
||||||
configBytes, ok := cfg.([]byte)
|
configBytes, ok := cfg.([]byte)
|
||||||
if !ok {
|
if !ok {
|
||||||
configStr, ok := cfg.(string)
|
// oh ... it's a string then?
|
||||||
if !ok {
|
var configStr string
|
||||||
return nil, ErrInvalidConfiguration{cfg: cfg}
|
|
||||||
}
|
configStr, ok = cfg.(string)
|
||||||
configBytes = []byte(configStr)
|
configBytes = []byte(configStr)
|
||||||
}
|
}
|
||||||
|
if !ok {
|
||||||
|
// hmm ... can we marshal it to json?
|
||||||
|
var err error
|
||||||
|
|
||||||
|
configBytes, err = json.Marshal(cfg)
|
||||||
|
ok = (err == nil)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
// no ... we've tried hard enough at this point - throw an error!
|
||||||
|
return nil, ErrInvalidConfiguration{cfg: cfg}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OK unmarshal the byte array into a new copy of the exemplar
|
||||||
newVal := reflect.New(reflect.TypeOf(exemplar))
|
newVal := reflect.New(reflect.TypeOf(exemplar))
|
||||||
if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
|
if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
|
||||||
|
// If we can't unmarshal it then return an error!
|
||||||
return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
|
return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
|
||||||
}
|
}
|
||||||
return newVal.Elem().Interface(), nil
|
return newVal.Elem().Interface(), nil
|
||||||
|
|
|
@ -27,27 +27,10 @@ func validType(t string) (Type, error) {
|
||||||
|
|
||||||
func getQueueSettings(name string) (setting.QueueSettings, []byte) {
|
func getQueueSettings(name string) (setting.QueueSettings, []byte) {
|
||||||
q := setting.GetQueueSettings(name)
|
q := setting.GetQueueSettings(name)
|
||||||
opts := make(map[string]interface{})
|
|
||||||
opts["Name"] = name
|
|
||||||
opts["QueueLength"] = q.Length
|
|
||||||
opts["BatchLength"] = q.BatchLength
|
|
||||||
opts["DataDir"] = q.DataDir
|
|
||||||
opts["Addresses"] = q.Addresses
|
|
||||||
opts["Network"] = q.Network
|
|
||||||
opts["Password"] = q.Password
|
|
||||||
opts["DBIndex"] = q.DBIndex
|
|
||||||
opts["QueueName"] = q.QueueName
|
|
||||||
opts["SetName"] = q.SetName
|
|
||||||
opts["Workers"] = q.Workers
|
|
||||||
opts["MaxWorkers"] = q.MaxWorkers
|
|
||||||
opts["BlockTimeout"] = q.BlockTimeout
|
|
||||||
opts["BoostTimeout"] = q.BoostTimeout
|
|
||||||
opts["BoostWorkers"] = q.BoostWorkers
|
|
||||||
opts["ConnectionString"] = q.ConnectionString
|
|
||||||
|
|
||||||
cfg, err := json.Marshal(opts)
|
cfg, err := json.Marshal(q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unable to marshall generic options: %v Error: %v", opts, err)
|
log.Error("Unable to marshall generic options: %v Error: %v", q, err)
|
||||||
log.Error("Unable to create queue for %s", name, err)
|
log.Error("Unable to create queue for %s", name, err)
|
||||||
return q, []byte{}
|
return q, []byte{}
|
||||||
}
|
}
|
||||||
|
@ -75,7 +58,7 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
|
||||||
Timeout: q.Timeout,
|
Timeout: q.Timeout,
|
||||||
MaxAttempts: q.MaxAttempts,
|
MaxAttempts: q.MaxAttempts,
|
||||||
Config: cfg,
|
Config: cfg,
|
||||||
QueueLength: q.Length,
|
QueueLength: q.QueueLength,
|
||||||
Name: name,
|
Name: name,
|
||||||
}, exemplar)
|
}, exemplar)
|
||||||
}
|
}
|
||||||
|
@ -114,7 +97,7 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un
|
||||||
Timeout: q.Timeout,
|
Timeout: q.Timeout,
|
||||||
MaxAttempts: q.MaxAttempts,
|
MaxAttempts: q.MaxAttempts,
|
||||||
Config: cfg,
|
Config: cfg,
|
||||||
QueueLength: q.Length,
|
QueueLength: q.QueueLength,
|
||||||
}, exemplar)
|
}, exemplar)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -16,8 +16,9 @@ import (
|
||||||
|
|
||||||
// QueueSettings represent the settings for a queue from the ini
|
// QueueSettings represent the settings for a queue from the ini
|
||||||
type QueueSettings struct {
|
type QueueSettings struct {
|
||||||
|
Name string
|
||||||
DataDir string
|
DataDir string
|
||||||
Length int
|
QueueLength int `ini:"LENGTH"`
|
||||||
BatchLength int
|
BatchLength int
|
||||||
ConnectionString string
|
ConnectionString string
|
||||||
Type string
|
Type string
|
||||||
|
@ -44,6 +45,8 @@ var Queue = QueueSettings{}
|
||||||
func GetQueueSettings(name string) QueueSettings {
|
func GetQueueSettings(name string) QueueSettings {
|
||||||
q := QueueSettings{}
|
q := QueueSettings{}
|
||||||
sec := Cfg.Section("queue." + name)
|
sec := Cfg.Section("queue." + name)
|
||||||
|
q.Name = name
|
||||||
|
|
||||||
// DataDir is not directly inheritable
|
// DataDir is not directly inheritable
|
||||||
q.DataDir = filepath.Join(Queue.DataDir, name)
|
q.DataDir = filepath.Join(Queue.DataDir, name)
|
||||||
// QueueName is not directly inheritable either
|
// QueueName is not directly inheritable either
|
||||||
|
@ -65,8 +68,9 @@ func GetQueueSettings(name string) QueueSettings {
|
||||||
q.DataDir = filepath.Join(AppDataPath, q.DataDir)
|
q.DataDir = filepath.Join(AppDataPath, q.DataDir)
|
||||||
}
|
}
|
||||||
_, _ = sec.NewKey("DATADIR", q.DataDir)
|
_, _ = sec.NewKey("DATADIR", q.DataDir)
|
||||||
|
|
||||||
// The rest are...
|
// The rest are...
|
||||||
q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
|
q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength)
|
||||||
q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
|
q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
|
||||||
q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
|
q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
|
||||||
q.Type = sec.Key("TYPE").MustString(Queue.Type)
|
q.Type = sec.Key("TYPE").MustString(Queue.Type)
|
||||||
|
@ -91,7 +95,7 @@ func NewQueueService() {
|
||||||
if !filepath.IsAbs(Queue.DataDir) {
|
if !filepath.IsAbs(Queue.DataDir) {
|
||||||
Queue.DataDir = filepath.Join(AppDataPath, Queue.DataDir)
|
Queue.DataDir = filepath.Join(AppDataPath, Queue.DataDir)
|
||||||
}
|
}
|
||||||
Queue.Length = sec.Key("LENGTH").MustInt(20)
|
Queue.QueueLength = sec.Key("LENGTH").MustInt(20)
|
||||||
Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
|
Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
|
||||||
Queue.ConnectionString = sec.Key("CONN_STR").MustString("")
|
Queue.ConnectionString = sec.Key("CONN_STR").MustString("")
|
||||||
Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
|
Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
|
||||||
|
|
Loading…
Reference in a new issue