queue.go 3.67 KB
Newer Older
1
package queue
2 3 4

import (
	"context"
Michael Avila's avatar
Michael Avila committed
5
	"fmt"
6
	"time"
7 8 9 10 11

	cid "github.com/ipfs/go-cid"
	datastore "github.com/ipfs/go-datastore"
	namespace "github.com/ipfs/go-datastore/namespace"
	query "github.com/ipfs/go-datastore/query"
12
	logging "github.com/ipfs/go-log"
13 14
)

15 16
var log = logging.Logger("provider.queue")

17 18 19 20 21 22 23 24
// Queue provides a durable, FIFO interface to the datastore for storing cids
//
// Durability just means that cids in the process of being provided when a
// crash or shutdown occurs will still be in the queue when the node is
// brought back online.
type Queue struct {
	// used to differentiate queues in datastore
	// e.g. provider vs reprovider
Erik Ingenito's avatar
Gofmt  
Erik Ingenito committed
25 26 27
	name    string
	ctx     context.Context
	ds      datastore.Datastore // Must be threadsafe
Erik Ingenito's avatar
Erik Ingenito committed
28 29
	dequeue chan cid.Cid
	enqueue chan cid.Cid
30 31
	close   context.CancelFunc
	closed  chan struct{}
32 33
}

34
// NewQueue creates a queue for cids
35 36
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
	namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
37
	cancelCtx, cancel := context.WithCancel(ctx)
38
	q := &Queue{
Erik Ingenito's avatar
Gofmt  
Erik Ingenito committed
39
		name:    name,
40
		ctx:     cancelCtx,
Erik Ingenito's avatar
Gofmt  
Erik Ingenito committed
41 42 43
		ds:      namespaced,
		dequeue: make(chan cid.Cid),
		enqueue: make(chan cid.Cid),
44 45
		close:   cancel,
		closed:  make(chan struct{}, 1),
46
	}
Erik Ingenito's avatar
Erik Ingenito committed
47
	q.work()
48 49 50
	return q, nil
}

51 52 53 54 55 56 57
// Close stops the queue
func (q *Queue) Close() error {
	q.close()
	<-q.closed
	return nil
}

58
// Enqueue puts a cid in the queue
59
func (q *Queue) Enqueue(cid cid.Cid) error {
60
	select {
Erik Ingenito's avatar
Gofmt  
Erik Ingenito committed
61
	case q.enqueue <- cid:
62
		return nil
Erik Ingenito's avatar
Gofmt  
Erik Ingenito committed
63
	case <-q.ctx.Done():
64
		return fmt.Errorf("failed to enqueue CID: shutting down")
65 66 67
	}
}

68
// Dequeue returns a channel that if listened to will remove entries from the queue
Erik Ingenito's avatar
Erik Ingenito committed
69
func (q *Queue) Dequeue() <-chan cid.Cid {
70 71 72
	return q.dequeue
}

Erik Ingenito's avatar
Erik Ingenito committed
73 74
// Run dequeues and enqueues when available.
func (q *Queue) work() {
75
	go func() {
76 77
		var k datastore.Key = datastore.Key{}
		var c cid.Cid = cid.Undef
78

79
		defer func() {
80 81 82 83 84
			// also cancels any in-progess enqueue tasks.
			q.close()
			// unblocks anyone waiting
			close(q.dequeue)
			// unblocks the close call
85 86 87
			close(q.closed)
		}()

88
		for {
89
			if c == cid.Undef {
Steven Allen's avatar
Steven Allen committed
90
				head, err := q.getQueueHead()
91

Steven Allen's avatar
Steven Allen committed
92 93
				if err != nil {
					log.Errorf("error querying for head of queue: %s, stopping provider", err)
94 95 96
					return
				} else if head != nil {
					k = datastore.NewKey(head.Key)
Steven Allen's avatar
Steven Allen committed
97 98 99 100
					c, err = cid.Parse(head.Value)
					if err != nil {
						log.Warningf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err)
						err = q.ds.Delete(k)
101 102 103 104 105 106 107 108 109
						if err != nil {
							log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err)
							return
						}
						continue
					}
				} else {
					c = cid.Undef
				}
110
			}
Erik Ingenito's avatar
Erik Ingenito committed
111

112 113
			// If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue
			var dequeue chan cid.Cid
Erik Ingenito's avatar
Erik Ingenito committed
114 115
			if c != cid.Undef {
				dequeue = q.dequeue
116 117 118
			}

			select {
Erik Ingenito's avatar
Erik Ingenito committed
119
			case toQueue := <-q.enqueue:
120 121
				keyPath := fmt.Sprintf("%d/%s", time.Now().UnixNano(), c.String())
				nextKey := datastore.NewKey(keyPath)
Erik Ingenito's avatar
Erik Ingenito committed
122 123 124

				if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil {
					log.Errorf("Failed to enqueue cid: %s", err)
125
					continue
Erik Ingenito's avatar
Erik Ingenito committed
126 127
				}
			case dequeue <- c:
128
				err := q.ds.Delete(k)
Erik Ingenito's avatar
Erik Ingenito committed
129 130

				if err != nil {
131 132
					log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err)
					continue
Erik Ingenito's avatar
Erik Ingenito committed
133
				}
134
				c = cid.Undef
135 136 137 138 139 140 141
			case <-q.ctx.Done():
				return
			}
		}
	}()
}

Steven Allen's avatar
Steven Allen committed
142
func (q *Queue) getQueueHead() (*query.Entry, error) {
143 144
	qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1}
	results, err := q.ds.Query(qry)
Michael Avila's avatar
Michael Avila committed
145
	if err != nil {
146
		return nil, err
147
	}
Michael Avila's avatar
Michael Avila committed
148 149 150
	defer results.Close()
	r, ok := results.NextSync()
	if !ok {
151
		return nil, nil
152
	}
153

Steven Allen's avatar
Steven Allen committed
154
	return &r.Entry, r.Error
155
}