Commit 925e6c88 authored by Erik Ingenito's avatar Erik Ingenito

Provider queue updates to address deadlocks

License: MIT
Signed-off-by: default avatarErik Ingenito <erik@carbonfive.com>
parent 7bdb5546
......@@ -5,9 +5,10 @@ package provider
import (
"context"
"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-routing"
routing "github.com/libp2p/go-libp2p-routing"
)
var (
......
......@@ -24,6 +24,8 @@ type Entry struct {
// Complete the entry by removing it from the queue
func (e *Entry) Complete() error {
e.queue.lock.Lock()
defer e.queue.lock.Unlock()
return e.queue.remove(e.key)
}
......@@ -46,9 +48,7 @@ type Queue struct {
datastore ds.Datastore
dequeue chan *Entry
notEmpty chan struct{}
isRunning bool
added chan struct{}
}
// NewQueue creates a queue for cids
......@@ -66,8 +66,7 @@ func NewQueue(ctx context.Context, name string, datastore ds.Datastore) (*Queue,
lock: sync.Mutex{},
datastore: namespaced,
dequeue: make(chan *Entry),
notEmpty: make(chan struct{}),
isRunning: false,
added: make(chan struct{}),
}
return q, nil
}
......@@ -77,8 +76,6 @@ func (q *Queue) Enqueue(cid cid.Cid) error {
q.lock.Lock()
defer q.lock.Unlock()
wasEmpty := q.IsEmpty()
nextKey := q.queueKey(q.tail)
if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil {
......@@ -87,11 +84,10 @@ func (q *Queue) Enqueue(cid cid.Cid) error {
q.tail++
if q.isRunning && wasEmpty {
select {
case q.notEmpty <- struct{}{}:
select {
case q.added <- struct{}{}:
case <-q.ctx.Done():
}
default:
}
return nil
......@@ -110,20 +106,13 @@ func (q *Queue) IsEmpty() bool {
// Run dequeues items when the dequeue channel is available to
// be written to.
func (q *Queue) Run() {
q.isRunning = true
go func() {
for {
select {
case <-q.ctx.Done():
return
default:
}
if q.IsEmpty() {
select {
case <-q.ctx.Done():
return
// wait for a notEmpty message
case <-q.notEmpty:
case <-q.added:
}
}
......@@ -138,6 +127,7 @@ func (q *Queue) Run() {
return
case q.dequeue <- entry:
}
}
}()
}
......@@ -146,14 +136,16 @@ func (q *Queue) Run() {
// found in the next spot.
func (q *Queue) next() (*Entry, error) {
q.lock.Lock()
defer q.lock.Unlock()
defer func() {
q.lock.Unlock()
}()
var nextKey ds.Key
var value []byte
var err error
for {
if q.head >= q.tail {
return nil, errors.New("no more entries in queue")
return nil, errors.New("next: no more entries in queue returning")
}
select {
case <-q.ctx.Done():
......@@ -194,8 +186,8 @@ func (q *Queue) queueKey(id uint64) ds.Key {
// crawl over the queue entries to find the head and tail
func getQueueHeadTail(ctx context.Context, name string, datastore ds.Datastore) (uint64, uint64, error) {
query := query.Query{}
results, err := datastore.Query(query)
q := query.Query{}
results, err := datastore.Query(q)
if err != nil {
return 0, 0, err
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment