Commit 25a24e57 authored by Erik Ingenito's avatar Erik Ingenito

Make queue operation more clear

License: MIT
Signed-off-by: default avatarErik Ingenito <erik@carbonfive.com>
parent 4f00ef1b
......@@ -42,7 +42,7 @@ func TestAnnouncement(t *testing.T) {
cids := cid.NewSet()
for i := 0; i < 1000; i++ {
for i := 0; i < 100; i++ {
c := blockGenerator.Next().Cid()
cids.Add(c)
}
......
......@@ -61,37 +61,50 @@ func (q *Queue) Dequeue() <-chan cid.Cid {
return q.dequeue
}
type entry struct {
cid cid.Cid
key datastore.Key
}
// Look for next Cid in the queue and return it. Skip over gaps and mangled data
func (q *Queue) nextEntry() (datastore.Key, cid.Cid) {
for {
if q.head >= q.tail {
return datastore.Key{}, cid.Undef
}
key := q.queueKey(q.head)
value, err := q.ds.Get(key)
if err == datastore.ErrNotFound {
log.Warningf("Error missing entry in queue: %s", key)
q.head++ // move on
continue
} else if err != nil {
log.Warningf("Error fetching from queue: %s", err)
continue
}
c, err := cid.Parse(value)
if err != nil {
log.Warningf("Error marshalling Cid from queue: ", err)
q.head++
err = q.ds.Delete(key)
continue
}
return key, c
}
}
// Run dequeues and enqueues when available.
func (q *Queue) work() {
go func() {
for {
var c cid.Cid = cid.Undef
var key datastore.Key
k, c := q.nextEntry()
var dequeue chan cid.Cid
// If we're not empty dequeue a cid and ship it
if q.head < q.tail {
key = q.queueKey(q.head)
value, err := q.ds.Get(key)
if err == datastore.ErrNotFound {
log.Warningf("Missing entry in queue: %s", err)
q.head++
continue
} else if err != nil {
log.Warningf("Error fetching from queue: %s", err)
continue
}
c, err = cid.Parse(value)
if err != nil {
log.Warningf("Error marshalling Cid from queue: ", err)
q.head++
err = q.ds.Delete(key)
continue
}
}
if c != cid.Undef {
dequeue = q.dequeue
}
......@@ -102,16 +115,19 @@ func (q *Queue) work() {
if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil {
log.Errorf("Failed to enqueue cid: %s", err)
continue
}
q.tail++
case dequeue <- c:
q.head++
err := q.ds.Delete(key)
err := q.ds.Delete(k)
if err != nil {
log.Errorf("Failed to delete queued cid: %s", err)
log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err)
continue
}
q.head++
case <-q.ctx.Done():
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment