package couchbase

import (
	"fmt"
	"github.com/couchbase/goutils/logging"
	"sync"
)

type PersistTo uint8

const (
	PersistNone   = PersistTo(0x00)
	PersistMaster = PersistTo(0x01)
	PersistOne    = PersistTo(0x02)
	PersistTwo    = PersistTo(0x03)
	PersistThree  = PersistTo(0x04)
	PersistFour   = PersistTo(0x05)
)

type ObserveTo uint8

const (
	ObserveNone           = ObserveTo(0x00)
	ObserveReplicateOne   = ObserveTo(0x01)
	ObserveReplicateTwo   = ObserveTo(0x02)
	ObserveReplicateThree = ObserveTo(0x03)
	ObserveReplicateFour  = ObserveTo(0x04)
)

type JobType uint8

const (
	OBSERVE = JobType(0x00)
	PERSIST = JobType(0x01)
)

type ObservePersistJob struct {
	vb                 uint16
	vbuuid             uint64
	hostname           string
	jobType            JobType
	failover           uint8
	lastPersistedSeqNo uint64
	currentSeqNo       uint64
	resultChan         chan *ObservePersistJob
	errorChan          chan *OPErrResponse
}

type OPErrResponse struct {
	vb     uint16
	vbuuid uint64
	err    error
	job    *ObservePersistJob
}

var ObservePersistPool = NewPool(1024)
var OPJobChan = make(chan *ObservePersistJob, 1024)
var OPJobDone = make(chan bool)

var wg sync.WaitGroup

func (b *Bucket) StartOPPollers(maxWorkers int) {

	for i := 0; i < maxWorkers; i++ {
		go b.OPJobPoll()
		wg.Add(1)
	}
	wg.Wait()
}

func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) {

	numNodes := len(b.Nodes())
	if int(nPersist) > numNodes || int(nObserve) > numNodes {
		return fmt.Errorf("Not enough healthy nodes in the cluster")
	}

	if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas {
		return fmt.Errorf("Not enough replicas in the cluster")
	}

	if EnableMutationToken == false {
		return fmt.Errorf("Mutation Tokens not enabled ")
	}

	b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)}
	return
}

func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) {
	b.RLock()
	ds := b.ds
	b.RUnlock()

	if ds == nil {
		return
	}

	nj := 0 // total number of jobs
	resultChan := make(chan *ObservePersistJob, 10)
	errChan := make(chan *OPErrResponse, 10)

	nodes := b.GetNodeList(vb)
	if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) {
		return fmt.Errorf("Not enough healthy nodes in the cluster"), false
	}

	logging.Infof("Node list %v", nodes)

	if ds.Observe >= ObserveReplicateOne {
		// create a job for each host
		for i := ObserveReplicateOne; i < ds.Observe+1; i++ {
			opJob := ObservePersistPool.Get()
			opJob.vb = vb
			opJob.vbuuid = vbuuid
			opJob.jobType = OBSERVE
			opJob.hostname = nodes[i]
			opJob.resultChan = resultChan
			opJob.errorChan = errChan

			OPJobChan <- opJob
			nj++

		}
	}

	if ds.Persist >= PersistMaster {
		for i := PersistMaster; i < ds.Persist+1; i++ {
			opJob := ObservePersistPool.Get()
			opJob.vb = vb
			opJob.vbuuid = vbuuid
			opJob.jobType = PERSIST
			opJob.hostname = nodes[i]
			opJob.resultChan = resultChan
			opJob.errorChan = errChan

			OPJobChan <- opJob
			nj++

		}
	}

	ok := true
	for ok {
		select {
		case res := <-resultChan:
			jobDone := false
			if res.failover == 0 {
				// no failover
				if res.jobType == PERSIST {
					if res.lastPersistedSeqNo >= seqNo {
						jobDone = true
					}

				} else {
					if res.currentSeqNo >= seqNo {
						jobDone = true
					}
				}

				if jobDone == true {
					nj--
					ObservePersistPool.Put(res)
				} else {
					// requeue this job
					OPJobChan <- res
				}

			} else {
				// Not currently handling failover scenarios TODO
				nj--
				ObservePersistPool.Put(res)
				failover = true
			}

			if nj == 0 {
				// done with all the jobs
				ok = false
				close(resultChan)
				close(errChan)
			}

		case Err := <-errChan:
			logging.Errorf("Error in Observe/Persist %v", Err.err)
			err = fmt.Errorf("Error in Observe/Persist job %v", Err.err)
			nj--
			ObservePersistPool.Put(Err.job)
			if nj == 0 {
				close(resultChan)
				close(errChan)
				ok = false
			}
		}
	}

	return
}

func (b *Bucket) OPJobPoll() {

	ok := true
	for ok == true {
		select {
		case job := <-OPJobChan:
			pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */)
			if pool == nil {
				errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
				errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname)
				errRes.job = job
				job.errorChan <- errRes
				continue
			}
			conn, err := pool.Get()
			if err != nil {
				errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
				errRes.err = fmt.Errorf("Unable to get connection from pool %v", err)
				errRes.job = job
				job.errorChan <- errRes
				continue
			}

			res, err := conn.ObserveSeq(job.vb, job.vbuuid)
			if err != nil {
				errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
				errRes.err = fmt.Errorf("Command failed %v", err)
				errRes.job = job
				job.errorChan <- errRes
				continue

			}
			pool.Return(conn)
			job.lastPersistedSeqNo = res.LastPersistedSeqNo
			job.currentSeqNo = res.CurrentSeqNo
			job.failover = res.Failover

			job.resultChan <- job
		case <-OPJobDone:
			logging.Infof("Observe Persist Poller exitting")
			ok = false
		}
	}
	wg.Done()
}

func (b *Bucket) GetNodeList(vb uint16) []string {

	vbm := b.VBServerMap()
	if len(vbm.VBucketMap) < int(vb) {
		logging.Infof("vbmap smaller than vblist")
		return nil
	}

	nodes := make([]string, len(vbm.VBucketMap[vb]))
	for i := 0; i < len(vbm.VBucketMap[vb]); i++ {
		n := vbm.VBucketMap[vb][i]
		if n < 0 {
			continue
		}

		node := b.getMasterNode(n)
		if len(node) > 1 {
			nodes[i] = node
		}
		continue

	}
	return nodes
}

//pool of ObservePersist Jobs
type OPpool struct {
	pool chan *ObservePersistJob
}

// NewPool creates a new pool of jobs
func NewPool(max int) *OPpool {
	return &OPpool{
		pool: make(chan *ObservePersistJob, max),
	}
}

// Borrow a Client from the pool.
func (p *OPpool) Get() *ObservePersistJob {
	var o *ObservePersistJob
	select {
	case o = <-p.pool:
	default:
		o = &ObservePersistJob{}
	}
	return o
}

// Return returns a Client to the pool.
func (p *OPpool) Put(o *ObservePersistJob) {
	select {
	case p.pool <- o:
	default:
		// let it go, let it go...
	}
}