queue.go 4.33 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
package provider

import (
	"context"
	"errors"
	"github.com/ipfs/go-cid"
	ds "github.com/ipfs/go-datastore"
	"github.com/ipfs/go-datastore/namespace"
	"github.com/ipfs/go-datastore/query"
	"math"
	"strconv"
	"strings"
	"sync"
)

// Entry allows for the durability in the queue. When a cid is dequeued it is
// not removed from the datastore until you call Complete() on the entry you
// receive.
type Entry struct {
20 21
	cid   cid.Cid
	key   ds.Key
22 23 24
	queue *Queue
}

25
// Complete the entry by removing it from the queue
26
func (e *Entry) Complete() error {
27 28
	e.queue.lock.Lock()
	defer e.queue.lock.Unlock()
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
	return e.queue.remove(e.key)
}

// Queue provides a durable, FIFO interface to the datastore for storing cids
//
// Durability just means that cids in the process of being provided when a
// crash or shutdown occurs will still be in the queue when the node is
// brought back online.
type Queue struct {
	// used to differentiate queues in datastore
	// e.g. provider vs reprovider
	name string

	ctx context.Context

	tail uint64
	head uint64

47
	lock      sync.Mutex
48 49
	datastore ds.Datastore

50
	dequeue  chan *Entry
51
	added chan struct{}
52 53
}

54 55 56 57
// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, datastore ds.Datastore) (*Queue, error) {
	namespaced := namespace.Wrap(datastore, ds.NewKey("/"+name+"/queue/"))
	head, tail, err := getQueueHeadTail(ctx, name, namespaced)
58 59 60 61
	if err != nil {
		return nil, err
	}
	q := &Queue{
62 63 64 65 66
		name:      name,
		ctx:       ctx,
		head:      head,
		tail:      tail,
		lock:      sync.Mutex{},
67
		datastore: namespaced,
68
		dequeue:   make(chan *Entry),
69
		added:  make(chan struct{}),
70 71 72 73
	}
	return q, nil
}

74
// Enqueue puts a cid in the queue
75 76 77 78 79 80 81 82 83 84 85 86
func (q *Queue) Enqueue(cid cid.Cid) error {
	q.lock.Lock()
	defer q.lock.Unlock()

	nextKey := q.queueKey(q.tail)

	if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil {
		return err
	}

	q.tail++

87 88
	select {
		case q.added <- struct{}{}:
89
		case <-q.ctx.Done():
90
		default:
91 92 93 94 95
	}

	return nil
}

96
// Dequeue returns a channel that if listened to will remove entries from the queue
97 98 99 100
func (q *Queue) Dequeue() <-chan *Entry {
	return q.dequeue
}

101
// IsEmpty returns whether or not the queue has any items
102 103 104 105
func (q *Queue) IsEmpty() bool {
	return (q.tail - q.head) == 0
}

106 107
// Run dequeues items when the dequeue channel is available to
// be written to.
108 109 110 111 112 113 114
func (q *Queue) Run() {
	go func() {
		for {
			if q.IsEmpty() {
				select {
				case <-q.ctx.Done():
					return
115
				case <-q.added:
116 117 118 119 120 121 122 123 124 125 126 127 128 129
				}
			}

			entry, err := q.next()
			if err != nil {
				log.Warningf("Error Dequeue()-ing: %s, %s", entry, err)
				continue
			}

			select {
			case <-q.ctx.Done():
				return
			case q.dequeue <- entry:
			}
130

131 132 133 134 135 136 137 138
		}
	}()
}

// Find the next item in the queue, crawl forward if an entry is not
// found in the next spot.
func (q *Queue) next() (*Entry, error) {
	q.lock.Lock()
139 140 141
	defer func() {
		q.lock.Unlock()
	}()
142 143 144 145 146 147

	var nextKey ds.Key
	var value []byte
	var err error
	for {
		if q.head >= q.tail {
148
			return nil, errors.New("next: no more entries in queue returning")
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
		}
		select {
		case <-q.ctx.Done():
			return nil, nil
		default:
		}
		nextKey = q.queueKey(q.head)
		value, err = q.datastore.Get(nextKey)
		if err == ds.ErrNotFound {
			q.head++
			continue
		} else if err != nil {
			return nil, err
		} else {
			break
		}
	}

	id, err := cid.Parse(value)
	if err != nil {
		return nil, err
	}

172 173 174
	entry := &Entry{
		cid:   id,
		key:   nextKey,
175 176 177 178 179 180 181 182 183 184 185 186 187
		queue: q,
	}

	q.head++

	return entry, nil
}

func (q *Queue) queueKey(id uint64) ds.Key {
	return ds.NewKey(strconv.FormatUint(id, 10))
}

// crawl over the queue entries to find the head and tail
188
func getQueueHeadTail(ctx context.Context, name string, datastore ds.Datastore) (uint64, uint64, error) {
189 190
	q := query.Query{}
	results, err := datastore.Query(q)
191 192 193 194
	if err != nil {
		return 0, 0, err
	}

195
	var tail uint64
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
	var head uint64 = math.MaxUint64
	for entry := range results.Next() {
		select {
		case <-ctx.Done():
			return 0, 0, nil
		default:
		}
		trimmed := strings.TrimPrefix(entry.Key, "/")
		id, err := strconv.ParseUint(trimmed, 10, 64)
		if err != nil {
			return 0, 0, err
		}

		if id < head {
			head = id
		}

213 214
		if (id + 1) > tail {
			tail = (id + 1)
215 216 217 218 219 220 221 222 223 224 225 226
		}
	}
	if err := results.Close(); err != nil {
		return 0, 0, err
	}
	if head == math.MaxUint64 {
		head = 0
	}

	return head, tail, nil
}

227 228 229
func (q *Queue) remove(key ds.Key) error {
	return q.datastore.Delete(key)
}