Unverified Commit 184f9bd2 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #22 from ipfs/fix/bugs

Fix several race on close bugs
parents e52462ce c835ad80
...@@ -56,10 +56,12 @@ func (q *Queue) Close() error { ...@@ -56,10 +56,12 @@ func (q *Queue) Close() error {
} }
// Enqueue puts a cid in the queue // Enqueue puts a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) { func (q *Queue) Enqueue(cid cid.Cid) error {
select { select {
case q.enqueue <- cid: case q.enqueue <- cid:
return nil
case <-q.ctx.Done(): case <-q.ctx.Done():
return fmt.Errorf("failed to enqueue CID: shutting down")
} }
} }
...@@ -75,22 +77,27 @@ func (q *Queue) work() { ...@@ -75,22 +77,27 @@ func (q *Queue) work() {
var c cid.Cid = cid.Undef var c cid.Cid = cid.Undef
defer func() { defer func() {
// also cancels any in-progess enqueue tasks.
q.close()
// unblocks anyone waiting
close(q.dequeue)
// unblocks the close call
close(q.closed) close(q.closed)
}() }()
for { for {
if c == cid.Undef { if c == cid.Undef {
head, e := q.getQueueHead() head, err := q.getQueueHead()
if e != nil { if err != nil {
log.Errorf("error querying for head of queue: %s, stopping provider", e) log.Errorf("error querying for head of queue: %s, stopping provider", err)
return return
} else if head != nil { } else if head != nil {
k = datastore.NewKey(head.Key) k = datastore.NewKey(head.Key)
c, e = cid.Parse(head.Value) c, err = cid.Parse(head.Value)
if e != nil { if err != nil {
log.Warningf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, e) log.Warningf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err)
err := q.ds.Delete(k) err = q.ds.Delete(k)
if err != nil { if err != nil {
log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err)
return return
...@@ -132,7 +139,7 @@ func (q *Queue) work() { ...@@ -132,7 +139,7 @@ func (q *Queue) work() {
}() }()
} }
func (q *Queue) getQueueHead() (*query.Result, error) { func (q *Queue) getQueueHead() (*query.Entry, error) {
qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1} qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1}
results, err := q.ds.Query(qry) results, err := q.ds.Query(qry)
if err != nil { if err != nil {
...@@ -144,5 +151,5 @@ func (q *Queue) getQueueHead() (*query.Result, error) { ...@@ -144,5 +151,5 @@ func (q *Queue) getQueueHead() (*query.Result, error) {
return nil, nil return nil, nil
} }
return &r, nil return &r.Entry, r.Error
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment