d17efaa114
* Fix bug on migration 111 * Upgrade bleve to 1.0.10 Co-authored-by: zeripath <art27@cantab.net> Co-authored-by: techknowlogick <techknowlogick@gitea.io>
244 lines
6 KiB
Go
Vendored
244 lines
6 KiB
Go
Vendored
// Copyright (c) 2017 Couchbase, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package zap
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"io"
|
|
"reflect"
|
|
|
|
"github.com/golang/snappy"
|
|
)
|
|
|
|
var reflectStaticSizeMetaData int
|
|
|
|
func init() {
|
|
var md MetaData
|
|
reflectStaticSizeMetaData = int(reflect.TypeOf(md).Size())
|
|
}
|
|
|
|
var termSeparator byte = 0xff
|
|
var termSeparatorSplitSlice = []byte{termSeparator}
|
|
|
|
type chunkedContentCoder struct {
|
|
final []byte
|
|
chunkSize uint64
|
|
currChunk uint64
|
|
chunkLens []uint64
|
|
|
|
w io.Writer
|
|
progressiveWrite bool
|
|
|
|
chunkMetaBuf bytes.Buffer
|
|
chunkBuf bytes.Buffer
|
|
|
|
chunkMeta []MetaData
|
|
|
|
compressed []byte // temp buf for snappy compression
|
|
}
|
|
|
|
// MetaData represents the data information inside a
|
|
// chunk.
|
|
type MetaData struct {
|
|
DocNum uint64 // docNum of the data inside the chunk
|
|
DocDvOffset uint64 // offset of data inside the chunk for the given docid
|
|
}
|
|
|
|
// newChunkedContentCoder returns a new chunk content coder which
|
|
// packs data into chunks based on the provided chunkSize
|
|
func newChunkedContentCoder(chunkSize uint64, maxDocNum uint64,
|
|
w io.Writer, progressiveWrite bool) *chunkedContentCoder {
|
|
total := maxDocNum/chunkSize + 1
|
|
rv := &chunkedContentCoder{
|
|
chunkSize: chunkSize,
|
|
chunkLens: make([]uint64, total),
|
|
chunkMeta: make([]MetaData, 0, total),
|
|
w: w,
|
|
progressiveWrite: progressiveWrite,
|
|
}
|
|
|
|
return rv
|
|
}
|
|
|
|
// Reset lets you reuse this chunked content coder. Buffers are reset
|
|
// and re used. You cannot change the chunk size.
|
|
func (c *chunkedContentCoder) Reset() {
|
|
c.currChunk = 0
|
|
c.final = c.final[:0]
|
|
c.chunkBuf.Reset()
|
|
c.chunkMetaBuf.Reset()
|
|
for i := range c.chunkLens {
|
|
c.chunkLens[i] = 0
|
|
}
|
|
c.chunkMeta = c.chunkMeta[:0]
|
|
}
|
|
|
|
func (c *chunkedContentCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
|
|
total := int(maxDocNum/chunkSize + 1)
|
|
c.chunkSize = chunkSize
|
|
if cap(c.chunkLens) < total {
|
|
c.chunkLens = make([]uint64, total)
|
|
} else {
|
|
c.chunkLens = c.chunkLens[:total]
|
|
}
|
|
if cap(c.chunkMeta) < total {
|
|
c.chunkMeta = make([]MetaData, 0, total)
|
|
}
|
|
}
|
|
|
|
// Close indicates you are done calling Add() this allows
|
|
// the final chunk to be encoded.
|
|
func (c *chunkedContentCoder) Close() error {
|
|
return c.flushContents()
|
|
}
|
|
|
|
func (c *chunkedContentCoder) flushContents() error {
|
|
// flush the contents, with meta information at first
|
|
buf := make([]byte, binary.MaxVarintLen64)
|
|
n := binary.PutUvarint(buf, uint64(len(c.chunkMeta)))
|
|
_, err := c.chunkMetaBuf.Write(buf[:n])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// write out the metaData slice
|
|
for _, meta := range c.chunkMeta {
|
|
_, err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvOffset)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// write the metadata to final data
|
|
metaData := c.chunkMetaBuf.Bytes()
|
|
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
|
|
// write the compressed data to the final data
|
|
c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
|
|
c.final = append(c.final, c.compressed...)
|
|
|
|
c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData))
|
|
|
|
if c.progressiveWrite {
|
|
_, err := c.w.Write(c.final)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.final = c.final[:0]
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Add encodes the provided byte slice into the correct chunk for the provided
|
|
// doc num. You MUST call Add() with increasing docNums.
|
|
func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error {
|
|
chunk := docNum / c.chunkSize
|
|
if chunk != c.currChunk {
|
|
// flush out the previous chunk details
|
|
err := c.flushContents()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// clearing the chunk specific meta for next chunk
|
|
c.chunkBuf.Reset()
|
|
c.chunkMetaBuf.Reset()
|
|
c.chunkMeta = c.chunkMeta[:0]
|
|
c.currChunk = chunk
|
|
}
|
|
|
|
// get the starting offset for this doc
|
|
dvOffset := c.chunkBuf.Len()
|
|
dvSize, err := c.chunkBuf.Write(vals)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.chunkMeta = append(c.chunkMeta, MetaData{
|
|
DocNum: docNum,
|
|
DocDvOffset: uint64(dvOffset + dvSize),
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// Write commits all the encoded chunked contents to the provided writer.
|
|
//
|
|
// | ..... data ..... | chunk offsets (varints)
|
|
// | position of chunk offsets (uint64) | number of offsets (uint64) |
|
|
//
|
|
func (c *chunkedContentCoder) Write() (int, error) {
|
|
var tw int
|
|
|
|
if c.final != nil {
|
|
// write out the data section first
|
|
nw, err := c.w.Write(c.final)
|
|
tw += nw
|
|
if err != nil {
|
|
return tw, err
|
|
}
|
|
}
|
|
|
|
chunkOffsetsStart := uint64(tw)
|
|
|
|
if cap(c.final) < binary.MaxVarintLen64 {
|
|
c.final = make([]byte, binary.MaxVarintLen64)
|
|
} else {
|
|
c.final = c.final[0:binary.MaxVarintLen64]
|
|
}
|
|
chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens)
|
|
// write out the chunk offsets
|
|
for _, chunkOffset := range chunkOffsets {
|
|
n := binary.PutUvarint(c.final, chunkOffset)
|
|
nw, err := c.w.Write(c.final[:n])
|
|
tw += nw
|
|
if err != nil {
|
|
return tw, err
|
|
}
|
|
}
|
|
|
|
chunkOffsetsLen := uint64(tw) - chunkOffsetsStart
|
|
|
|
c.final = c.final[0:8]
|
|
// write out the length of chunk offsets
|
|
binary.BigEndian.PutUint64(c.final, chunkOffsetsLen)
|
|
nw, err := c.w.Write(c.final)
|
|
tw += nw
|
|
if err != nil {
|
|
return tw, err
|
|
}
|
|
|
|
// write out the number of chunks
|
|
binary.BigEndian.PutUint64(c.final, uint64(len(c.chunkLens)))
|
|
nw, err = c.w.Write(c.final)
|
|
tw += nw
|
|
if err != nil {
|
|
return tw, err
|
|
}
|
|
|
|
c.final = c.final[:0]
|
|
|
|
return tw, nil
|
|
}
|
|
|
|
// ReadDocValueBoundary elicits the start, end offsets from a
|
|
// metaData header slice
|
|
func ReadDocValueBoundary(chunk int, metaHeaders []MetaData) (uint64, uint64) {
|
|
var start uint64
|
|
if chunk > 0 {
|
|
start = metaHeaders[chunk-1].DocDvOffset
|
|
}
|
|
return start, metaHeaders[chunk].DocDvOffset
|
|
}
|