queue.go 4.18 KB
Newer Older
1 2 3 4
package provider

import (
	"context"
Michael Avila's avatar
Michael Avila committed
5
	"fmt"
6 7
	"strconv"
	"strings"
8 9 10 11 12

	cid "github.com/ipfs/go-cid"
	datastore "github.com/ipfs/go-datastore"
	namespace "github.com/ipfs/go-datastore/namespace"
	query "github.com/ipfs/go-datastore/query"
13 14 15 16 17 18 19 20 21 22
)

// Queue provides a durable, FIFO interface to the datastore for storing cids
//
// Durability just means that cids in the process of being provided when a
// crash or shutdown occurs will still be in the queue when the node is
// brought back online.
type Queue struct {
	// used to differentiate queues in datastore
	// e.g. provider vs reprovider
Erik Ingenito's avatar
Gofmt  
Erik Ingenito committed
23 24 25 26 27
	name    string
	ctx     context.Context
	tail    uint64
	head    uint64
	ds      datastore.Datastore // Must be threadsafe
Erik Ingenito's avatar
Erik Ingenito committed
28 29
	dequeue chan cid.Cid
	enqueue chan cid.Cid
30 31
}

32
// NewQueue creates a queue for cids
33 34
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
	namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
Michael Avila's avatar
Michael Avila committed
35
	head, tail, err := getQueueHeadTail(ctx, namespaced)
36 37 38 39
	if err != nil {
		return nil, err
	}
	q := &Queue{
Erik Ingenito's avatar
Gofmt  
Erik Ingenito committed
40 41 42 43 44 45 46
		name:    name,
		ctx:     ctx,
		head:    head,
		tail:    tail,
		ds:      namespaced,
		dequeue: make(chan cid.Cid),
		enqueue: make(chan cid.Cid),
47
	}
Erik Ingenito's avatar
Erik Ingenito committed
48
	q.work()
49 50 51
	return q, nil
}

52
// Enqueue puts a cid in the queue
Erik Ingenito's avatar
Erik Ingenito committed
53
func (q *Queue) Enqueue(cid cid.Cid) {
54
	select {
Erik Ingenito's avatar
Gofmt  
Erik Ingenito committed
55 56
	case q.enqueue <- cid:
	case <-q.ctx.Done():
57 58 59
	}
}

60
// Dequeue returns a channel that if listened to will remove entries from the queue
Erik Ingenito's avatar
Erik Ingenito committed
61
func (q *Queue) Dequeue() <-chan cid.Cid {
62 63 64
	return q.dequeue
}

65 66 67 68 69 70 71 72 73 74
// Look for next Cid in the queue and return it. Skip over gaps and mangled data
func (q *Queue) nextEntry() (datastore.Key, cid.Cid) {
	for {
		if q.head >= q.tail {
			return datastore.Key{}, cid.Undef
		}

		key := q.queueKey(q.head)
		value, err := q.ds.Get(key)

75 76 77 78 79 80
		if err != nil {
			if err == datastore.ErrNotFound {
				log.Warningf("Error missing entry in queue: %s", key)
			} else {
				log.Errorf("Error fetching from queue: %s", err)
			}
81 82 83 84 85 86 87 88 89
			q.head++ // move on
			continue
		}

		c, err := cid.Parse(value)
		if err != nil {
			log.Warningf("Error marshalling Cid from queue: ", err)
			q.head++
			err = q.ds.Delete(key)
90 91 92
			if err != nil {
				log.Warningf("Provider queue failed to delete: %s", key)
			}
93 94 95 96 97 98 99
			continue
		}

		return key, c
	}
}

Erik Ingenito's avatar
Erik Ingenito committed
100 101
// Run dequeues and enqueues when available.
func (q *Queue) work() {
102
	go func() {
103 104
		var k datastore.Key = datastore.Key{}
		var c cid.Cid = cid.Undef
105

106
		for {
107 108 109
			if c == cid.Undef {
				k, c = q.nextEntry()
			}
Erik Ingenito's avatar
Erik Ingenito committed
110

111 112
			// If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue
			var dequeue chan cid.Cid
Erik Ingenito's avatar
Erik Ingenito committed
113 114
			if c != cid.Undef {
				dequeue = q.dequeue
115 116 117
			}

			select {
Erik Ingenito's avatar
Erik Ingenito committed
118 119 120 121 122
			case toQueue := <-q.enqueue:
				nextKey := q.queueKey(q.tail)

				if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil {
					log.Errorf("Failed to enqueue cid: %s", err)
123
					continue
Erik Ingenito's avatar
Erik Ingenito committed
124 125 126 127
				}

				q.tail++
			case dequeue <- c:
128
				err := q.ds.Delete(k)
Erik Ingenito's avatar
Erik Ingenito committed
129 130

				if err != nil {
131 132
					log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err)
					continue
Erik Ingenito's avatar
Erik Ingenito committed
133
				}
134
				c = cid.Undef
135
				q.head++
136 137 138 139 140 141 142
			case <-q.ctx.Done():
				return
			}
		}
	}()
}

143
func (q *Queue) queueKey(id uint64) datastore.Key {
Michael Avila's avatar
Michael Avila committed
144 145
	s := fmt.Sprintf("%016X", id)
	return datastore.NewKey(s)
146 147
}

Michael Avila's avatar
Michael Avila committed
148 149
func getQueueHeadTail(ctx context.Context, datastore datastore.Datastore) (uint64, uint64, error) {
	head, err := getQueueHead(datastore)
150 151 152
	if err != nil {
		return 0, 0, err
	}
Michael Avila's avatar
Michael Avila committed
153 154 155 156 157 158
	tail, err := getQueueTail(datastore)
	if err != nil {
		return 0, 0, err
	}
	return head, tail, nil
}
159

Michael Avila's avatar
Michael Avila committed
160 161 162
func getQueueHead(ds datastore.Datastore) (uint64, error) {
	return getFirstIDByOrder(ds, query.OrderByKey{})
}
163

Michael Avila's avatar
Michael Avila committed
164 165 166 167 168 169 170 171 172 173
func getQueueTail(ds datastore.Datastore) (uint64, error) {
	tail, err := getFirstIDByOrder(ds, query.OrderByKeyDescending{})
	if err != nil {
		return 0, err
	}
	if tail > 0 {
		tail++
	}
	return tail, nil
}
174

Michael Avila's avatar
Michael Avila committed
175 176 177 178 179
func getFirstIDByOrder(ds datastore.Datastore, order query.Order) (uint64, error) {
	q := query.Query{Orders: []query.Order{order}}
	results, err := ds.Query(q)
	if err != nil {
		return 0, err
180
	}
Michael Avila's avatar
Michael Avila committed
181 182 183 184
	defer results.Close()
	r, ok := results.NextSync()
	if !ok {
		return 0, nil
185
	}
Michael Avila's avatar
Michael Avila committed
186 187 188 189
	trimmed := strings.TrimPrefix(r.Key, "/")
	id, err := strconv.ParseUint(trimmed, 16, 64)
	if err != nil {
		return 0, err
190
	}
Michael Avila's avatar
Michael Avila committed
191
	return id, nil
192
}