package themis

import (
	"fmt"
	"sort"

	"github.com/juju/errors"
	"github.com/ngaut/log"
	"github.com/pingcap/go-hbase"
	"github.com/pingcap/go-hbase/proto"
)

type mutationValuePair struct {
	typ   hbase.Type
	value []byte
}

func (mp *mutationValuePair) String() string {
	return fmt.Sprintf("type: %d value: %s", mp.typ, mp.value)
}

type columnMutation struct {
	*hbase.Column
	*mutationValuePair
}

func getEntriesFromDel(p *hbase.Delete) ([]*columnMutation, error) {
	errMsg := "must set at least one column for themis delete"
	if len(p.FamilyQuals) == 0 {
		return nil, errors.New(errMsg)
	}

	var ret []*columnMutation
	for f, _ := range p.Families {
		quilifiers := p.FamilyQuals[f]
		if len(quilifiers) == 0 {
			return nil, errors.New(errMsg)
		}
		for q, _ := range quilifiers {
			mutation := &columnMutation{
				Column: &hbase.Column{
					Family: []byte(f),
					Qual:   []byte(q),
				},
				mutationValuePair: &mutationValuePair{
					typ: hbase.TypeDeleteColumn,
				},
			}
			ret = append(ret, mutation)
		}
	}
	return ret, nil
}

func getEntriesFromPut(p *hbase.Put) []*columnMutation {
	var ret []*columnMutation
	for i, f := range p.Families {
		qualifiers := p.Qualifiers[i]
		for j, q := range qualifiers {
			mutation := &columnMutation{
				Column: &hbase.Column{
					Family: f,
					Qual:   q,
				},
				mutationValuePair: &mutationValuePair{
					typ:   hbase.TypePut,
					value: p.Values[i][j],
				},
			}
			ret = append(ret, mutation)
		}
	}
	return ret
}

func (cm *columnMutation) toCell() *proto.Cell {
	ret := &proto.Cell{
		Family:    cm.Family,
		Qualifier: cm.Qual,
		Value:     cm.value,
	}
	if cm.typ == hbase.TypePut { // put
		ret.CellType = proto.CellType_PUT.Enum()
	} else if cm.typ == hbase.TypeMinimum { // onlyLock
		ret.CellType = proto.CellType_MINIMUM.Enum()
	} else { // delete, themis delete API only support delete column
		ret.CellType = proto.CellType_DELETE_COLUMN.Enum()
	}
	return ret
}

type rowMutation struct {
	tbl []byte
	row []byte
	// mutations := { 'cf:col' => mutationValuePair }
	mutations map[string]*mutationValuePair
}

func (r *rowMutation) getColumns() []hbase.Column {
	var ret []hbase.Column
	for k, _ := range r.mutations {
		c := &hbase.Column{}
		// TODO: handle error, now just ignore
		if err := c.ParseFromString(k); err != nil {
			log.Warnf("parse from string error, column: %s, mutation: %s, error: %v", c, k, err)
		}
		ret = append(ret, *c)
	}
	return ret
}

func (r *rowMutation) getSize() int {
	return len(r.mutations)
}

func (r *rowMutation) getType(c hbase.Column) hbase.Type {
	p, ok := r.mutations[c.String()]
	if !ok {
		return hbase.TypeMinimum
	}
	return p.typ
}

func newRowMutation(tbl, row []byte) *rowMutation {
	return &rowMutation{
		tbl:       tbl,
		row:       row,
		mutations: map[string]*mutationValuePair{},
	}
}

func (r *rowMutation) addMutation(c *hbase.Column, typ hbase.Type, val []byte, onlyLock bool) {
	// 3 scene: put, delete, onlyLock
	// if it is onlyLock scene, then has not data modify, when has contained the qualifier, can't replace exist value,
	// becuase put or delete operation has add mutation
	if onlyLock && r.mutations[c.String()] != nil {
		return
	}

	r.mutations[c.String()] = &mutationValuePair{
		typ:   typ,
		value: val,
	}
}

func (r *rowMutation) mutationList(withValue bool) []*columnMutation {
	var ret []*columnMutation
	var keys []string
	for k, _ := range r.mutations {
		keys = append(keys, k)
	}
	sort.Strings(keys)
	for _, k := range keys {
		v := &mutationValuePair{
			typ: r.mutations[k].typ,
		}
		if withValue {
			v.value = r.mutations[k].value
		}
		c := &hbase.Column{}
		// TODO: handle error, now just ignore
		if err := c.ParseFromString(k); err != nil {
			log.Warnf("parse from string error, column: %s, mutation: %s, error: %v", c, k, err)
		}
		ret = append(ret, &columnMutation{
			Column:            c,
			mutationValuePair: v,
		})
	}
	return ret
}

type columnMutationCache struct {
	// mutations => {table => { rowKey => row mutations } }
	mutations map[string]map[string]*rowMutation
}

func newColumnMutationCache() *columnMutationCache {
	return &columnMutationCache{
		mutations: map[string]map[string]*rowMutation{},
	}
}

func (c *columnMutationCache) addMutation(tbl []byte, row []byte, col *hbase.Column, t hbase.Type, v []byte, onlyLock bool) {
	tblRowMutations, ok := c.mutations[string(tbl)]
	if !ok {
		// create table mutation map
		tblRowMutations = map[string]*rowMutation{}
		c.mutations[string(tbl)] = tblRowMutations
	}

	rowMutations, ok := tblRowMutations[string(row)]
	if !ok {
		// create row mutation map
		rowMutations = newRowMutation(tbl, row)
		tblRowMutations[string(row)] = rowMutations
	}
	rowMutations.addMutation(col, t, v, onlyLock)
}

func (c *columnMutationCache) getMutation(cc *hbase.ColumnCoordinate) *mutationValuePair {
	t, ok := c.mutations[string(cc.Table)]
	if !ok {
		return nil
	}
	rowMutation, ok := t[string(cc.Row)]
	if !ok {
		return nil
	}
	p, ok := rowMutation.mutations[cc.GetColumn().String()]
	if !ok {
		return nil
	}
	return p
}

func (c *columnMutationCache) getRowCount() int {
	ret := 0
	for _, v := range c.mutations {
		ret += len(v)
	}
	return ret
}

func (c *columnMutationCache) getMutationCount() int {
	ret := 0
	for _, v := range c.mutations {
		for _, vv := range v {
			ret += len(vv.mutationList(false))
		}
	}
	return ret
}