Commit cb07b212 authored by Michael Avila's avatar Michael Avila Committed by Steven Allen

Query for provider head/tail

License: MIT
Signed-off-by: default avatarMichael Avila <davidmichaelavila@gmail.com>
parent b34d1787
......@@ -2,7 +2,7 @@ package provider
import (
"context"
"math"
"fmt"
"strconv"
"strings"
......@@ -32,7 +32,7 @@ type Queue struct {
// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
head, tail, err := getQueueHeadTail(ctx, name, namespaced)
head, tail, err := getQueueHeadTail(ctx, namespaced)
if err != nil {
return nil, err
}
......@@ -142,40 +142,52 @@ func (q *Queue) work() {
}
func (q *Queue) queueKey(id uint64) datastore.Key {
return datastore.NewKey(strconv.FormatUint(id, 10))
s := fmt.Sprintf("%016X", id)
return datastore.NewKey(s)
}
// crawl over the queue entries to find the head and tail
func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Datastore) (uint64, uint64, error) {
q := query.Query{}
results, err := datastore.Query(q)
func getQueueHeadTail(ctx context.Context, datastore datastore.Datastore) (uint64, uint64, error) {
head, err := getQueueHead(datastore)
if err != nil {
return 0, 0, err
}
tail, err := getQueueTail(datastore)
if err != nil {
return 0, 0, err
}
return head, tail, nil
}
var tail uint64
var head uint64 = math.MaxUint64
for entry := range results.Next() {
trimmed := strings.TrimPrefix(entry.Key, "/")
id, err := strconv.ParseUint(trimmed, 10, 64)
if err != nil {
return 0, 0, err
}
func getQueueHead(ds datastore.Datastore) (uint64, error) {
return getFirstIDByOrder(ds, query.OrderByKey{})
}
if id < head {
head = id
}
func getQueueTail(ds datastore.Datastore) (uint64, error) {
tail, err := getFirstIDByOrder(ds, query.OrderByKeyDescending{})
if err != nil {
return 0, err
}
if tail > 0 {
tail++
}
return tail, nil
}
if (id + 1) > tail {
tail = (id + 1)
}
func getFirstIDByOrder(ds datastore.Datastore, order query.Order) (uint64, error) {
q := query.Query{Orders: []query.Order{order}}
results, err := ds.Query(q)
if err != nil {
return 0, err
}
if err := results.Close(); err != nil {
return 0, 0, err
defer results.Close()
r, ok := results.NextSync()
if !ok {
return 0, nil
}
if head == math.MaxUint64 {
head = 0
trimmed := strings.TrimPrefix(r.Key, "/")
id, err := strconv.ParseUint(trimmed, 16, 64)
if err != nil {
return 0, err
}
return head, tail, nil
return id, nil
}
......@@ -12,7 +12,7 @@ import (
func makeCids(n int) []cid.Cid {
cids := make([]cid.Cid, 0, n)
for i := 0; i < 10; i++ {
for i := 0; i < n; i++ {
c := blockGenerator.Next().Cid()
cids = append(cids, c)
}
......@@ -129,3 +129,27 @@ func TestInitialization(t *testing.T) {
assertOrdered(cids[5:], queue, t)
}
func TestInitializationWithManyCids(t *testing.T) {
ctx := context.Background()
defer ctx.Done()
ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
cids := makeCids(25)
for _, c := range cids {
queue.Enqueue(c)
}
// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
assertOrdered(cids, queue, t)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment