package nodb

import (
	"fmt"
	"sync"

	"github.com/lunny/nodb/store"
)

type ibucket interface {
	Get(key []byte) ([]byte, error)

	Put(key []byte, value []byte) error
	Delete(key []byte) error

	NewIterator() *store.Iterator

	NewWriteBatch() store.WriteBatch

	RangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator
	RevRangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator
	RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *store.RangeLimitIterator
	RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *store.RangeLimitIterator
}

type DB struct {
	l *Nodb

	sdb *store.DB

	bucket ibucket

	index uint8

	kvBatch   *batch
	listBatch *batch
	hashBatch *batch
	zsetBatch *batch
	binBatch  *batch
	setBatch  *batch

	status uint8
}

func (l *Nodb) newDB(index uint8) *DB {
	d := new(DB)

	d.l = l

	d.sdb = l.ldb

	d.bucket = d.sdb

	d.status = DBAutoCommit
	d.index = index

	d.kvBatch = d.newBatch()
	d.listBatch = d.newBatch()
	d.hashBatch = d.newBatch()
	d.zsetBatch = d.newBatch()
	d.binBatch = d.newBatch()
	d.setBatch = d.newBatch()

	return d
}

func (db *DB) newBatch() *batch {
	return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}, nil)
}

func (db *DB) Index() int {
	return int(db.index)
}

func (db *DB) IsAutoCommit() bool {
	return db.status == DBAutoCommit
}

func (db *DB) FlushAll() (drop int64, err error) {
	all := [...](func() (int64, error)){
		db.flush,
		db.lFlush,
		db.hFlush,
		db.zFlush,
		db.bFlush,
		db.sFlush}

	for _, flush := range all {
		if n, e := flush(); e != nil {
			err = e
			return
		} else {
			drop += n
		}
	}

	return
}

func (db *DB) newEliminator() *elimination {
	eliminator := newEliminator(db)

	eliminator.regRetireContext(KVType, db.kvBatch, db.delete)
	eliminator.regRetireContext(ListType, db.listBatch, db.lDelete)
	eliminator.regRetireContext(HashType, db.hashBatch, db.hDelete)
	eliminator.regRetireContext(ZSetType, db.zsetBatch, db.zDelete)
	eliminator.regRetireContext(BitType, db.binBatch, db.bDelete)
	eliminator.regRetireContext(SetType, db.setBatch, db.sDelete)

	return eliminator
}

func (db *DB) flushRegion(t *batch, minKey []byte, maxKey []byte) (drop int64, err error) {
	it := db.bucket.RangeIterator(minKey, maxKey, store.RangeROpen)
	for ; it.Valid(); it.Next() {
		t.Delete(it.RawKey())
		drop++
		if drop&1023 == 0 {
			if err = t.Commit(); err != nil {
				return
			}
		}
	}
	it.Close()
	return
}

func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) {
	var deleteFunc func(t *batch, key []byte) int64
	var metaDataType byte
	switch dataType {
	case KVType:
		deleteFunc = db.delete
		metaDataType = KVType
	case ListType:
		deleteFunc = db.lDelete
		metaDataType = LMetaType
	case HashType:
		deleteFunc = db.hDelete
		metaDataType = HSizeType
	case ZSetType:
		deleteFunc = db.zDelete
		metaDataType = ZSizeType
	case BitType:
		deleteFunc = db.bDelete
		metaDataType = BitMetaType
	case SetType:
		deleteFunc = db.sDelete
		metaDataType = SSizeType
	default:
		return 0, fmt.Errorf("invalid data type: %s", TypeName[dataType])
	}

	var keys [][]byte
	keys, err = db.scan(metaDataType, nil, 1024, false, "")
	for len(keys) != 0 || err != nil {
		for _, key := range keys {
			deleteFunc(t, key)
			db.rmExpire(t, dataType, key)

		}

		if err = t.Commit(); err != nil {
			return
		} else {
			drop += int64(len(keys))
		}
		keys, err = db.scan(metaDataType, nil, 1024, false, "")
	}
	return
}