queue.go 4.23 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
package provider

import (
	"context"
	"errors"
	"github.com/ipfs/go-cid"
	ds "github.com/ipfs/go-datastore"
	"github.com/ipfs/go-datastore/namespace"
	"github.com/ipfs/go-datastore/query"
	"math"
	"strconv"
	"strings"
	"sync"
)

// Entry allows for the durability in the queue. When a cid is dequeued it is
// not removed from the datastore until you call Complete() on the entry you
// receive.
type Entry struct {
	cid cid.Cid
	key ds.Key
	queue *Queue
}

func (e *Entry) Complete() error {
	return e.queue.remove(e.key)
}

// Queue provides a durable, FIFO interface to the datastore for storing cids
//
// Durability just means that cids in the process of being provided when a
// crash or shutdown occurs will still be in the queue when the node is
// brought back online.
type Queue struct {
	// used to differentiate queues in datastore
	// e.g. provider vs reprovider
	name string

	ctx context.Context

	tail uint64
	head uint64

	lock sync.Mutex
	datastore ds.Datastore

	dequeue chan *Entry
	notEmpty chan struct{}

	isRunning bool
}

func NewQueue(name string, ctx context.Context, datastore ds.Datastore) (*Queue, error) {
	namespaced := namespace.Wrap(datastore, ds.NewKey("/" + name + "/queue/"))
	head, tail, err := getQueueHeadTail(name, ctx, namespaced)
	if err != nil {
		return nil, err
	}
	q := &Queue{
		name: name,
		ctx: ctx,
		head: head,
		tail: tail,
		lock: sync.Mutex{},
		datastore: namespaced,
		dequeue: make(chan *Entry),
		notEmpty: make(chan struct{}),
		isRunning: false,
	}
	return q, nil
}

// Put a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) error {
	q.lock.Lock()
	defer q.lock.Unlock()

	wasEmpty := q.IsEmpty()

	nextKey := q.queueKey(q.tail)

	if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil {
		return err
	}

	q.tail++

	if q.isRunning && wasEmpty {
		select {
		case q.notEmpty <- struct{}{}:
		case <-q.ctx.Done():
		}
	}

	return nil
}

// Remove an entry from the queue.
func (q *Queue) Dequeue() <-chan *Entry {
	return q.dequeue
}

func (q *Queue) IsEmpty() bool {
	return (q.tail - q.head) == 0
}

func (q *Queue) remove(key ds.Key) error {
	return q.datastore.Delete(key)
}

// dequeue items when the dequeue channel is available to
// be written to
func (q *Queue) Run() {
	q.isRunning = true
	go func() {
		for {
			select {
			case <-q.ctx.Done():
				return
			default:
			}
			if q.IsEmpty() {
				select {
				case <-q.ctx.Done():
					return
					// wait for a notEmpty message
				case <-q.notEmpty:
				}
			}

			entry, err := q.next()
			if err != nil {
				log.Warningf("Error Dequeue()-ing: %s, %s", entry, err)
				continue
			}

			select {
			case <-q.ctx.Done():
				return
			case q.dequeue <- entry:
			}
		}
	}()
}

// Find the next item in the queue, crawl forward if an entry is not
// found in the next spot.
func (q *Queue) next() (*Entry, error) {
	q.lock.Lock()
	defer q.lock.Unlock()

	var nextKey ds.Key
	var value []byte
	var err error
	for {
		if q.head >= q.tail {
			return nil, errors.New("no more entries in queue")
		}
		select {
		case <-q.ctx.Done():
			return nil, nil
		default:
		}
		nextKey = q.queueKey(q.head)
		value, err = q.datastore.Get(nextKey)
		if err == ds.ErrNotFound {
			q.head++
			continue
		} else if err != nil {
			return nil, err
		} else {
			break
		}
	}

	id, err := cid.Parse(value)
	if err != nil {
		return nil, err
	}

	entry := &Entry {
		cid: id,
		key: nextKey,
		queue: q,
	}

	q.head++

	return entry, nil
}

func (q *Queue) queueKey(id uint64) ds.Key {
	return ds.NewKey(strconv.FormatUint(id, 10))
}

// crawl over the queue entries to find the head and tail
func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) (uint64, uint64, error) {
	query := query.Query{}
	results, err := datastore.Query(query)
	if err != nil {
		return 0, 0, err
	}

	var tail uint64 = 0
	var head uint64 = math.MaxUint64
	for entry := range results.Next() {
		select {
		case <-ctx.Done():
			return 0, 0, nil
		default:
		}
		trimmed := strings.TrimPrefix(entry.Key, "/")
		id, err := strconv.ParseUint(trimmed, 10, 64)
		if err != nil {
			return 0, 0, err
		}

		if id < head {
			head = id
		}

		if (id+1) > tail {
			tail = (id+1)
		}
	}
	if err := results.Close(); err != nil {
		return 0, 0, err
	}
	if head == math.MaxUint64 {
		head = 0
	}

	return head, tail, nil
}