package provider import ( "context" "errors" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/query" "math" "strconv" "strings" "sync" ) // Entry allows for the durability in the queue. When a cid is dequeued it is // not removed from the datastore until you call Complete() on the entry you // receive. type Entry struct { cid cid.Cid key ds.Key queue *Queue } // Complete the entry by removing it from the queue func (e *Entry) Complete() error { e.queue.lock.Lock() defer e.queue.lock.Unlock() return e.queue.remove(e.key) } // Queue provides a durable, FIFO interface to the datastore for storing cids // // Durability just means that cids in the process of being provided when a // crash or shutdown occurs will still be in the queue when the node is // brought back online. type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider name string ctx context.Context tail uint64 head uint64 lock sync.Mutex datastore ds.Datastore dequeue chan *Entry added chan struct{} } // NewQueue creates a queue for cids func NewQueue(ctx context.Context, name string, datastore ds.Datastore) (*Queue, error) { namespaced := namespace.Wrap(datastore, ds.NewKey("/"+name+"/queue/")) head, tail, err := getQueueHeadTail(ctx, name, namespaced) if err != nil { return nil, err } q := &Queue{ name: name, ctx: ctx, head: head, tail: tail, lock: sync.Mutex{}, datastore: namespaced, dequeue: make(chan *Entry), added: make(chan struct{}), } return q, nil } // Enqueue puts a cid in the queue func (q *Queue) Enqueue(cid cid.Cid) error { q.lock.Lock() defer q.lock.Unlock() nextKey := q.queueKey(q.tail) if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil { return err } q.tail++ select { case q.added <- struct{}{}: case <-q.ctx.Done(): default: } return nil } // Dequeue returns a channel that if listened to will remove entries from the queue func (q *Queue) Dequeue() <-chan *Entry { return q.dequeue } // IsEmpty returns whether or not the queue has any items func (q *Queue) IsEmpty() bool { return (q.tail - q.head) == 0 } // Run dequeues items when the dequeue channel is available to // be written to. func (q *Queue) Run() { go func() { for { if q.IsEmpty() { select { case <-q.ctx.Done(): return case <-q.added: } } entry, err := q.next() if err != nil { log.Warningf("Error Dequeue()-ing: %s, %s", entry, err) continue } select { case <-q.ctx.Done(): return case q.dequeue <- entry: } } }() } // Find the next item in the queue, crawl forward if an entry is not // found in the next spot. func (q *Queue) next() (*Entry, error) { q.lock.Lock() defer func() { q.lock.Unlock() }() var nextKey ds.Key var value []byte var err error for { if q.head >= q.tail { return nil, errors.New("next: no more entries in queue returning") } select { case <-q.ctx.Done(): return nil, nil default: } nextKey = q.queueKey(q.head) value, err = q.datastore.Get(nextKey) if err == ds.ErrNotFound { q.head++ continue } else if err != nil { return nil, err } else { break } } id, err := cid.Parse(value) if err != nil { return nil, err } entry := &Entry{ cid: id, key: nextKey, queue: q, } q.head++ return entry, nil } func (q *Queue) queueKey(id uint64) ds.Key { return ds.NewKey(strconv.FormatUint(id, 10)) } // crawl over the queue entries to find the head and tail func getQueueHeadTail(ctx context.Context, name string, datastore ds.Datastore) (uint64, uint64, error) { q := query.Query{} results, err := datastore.Query(q) if err != nil { return 0, 0, err } var tail uint64 var head uint64 = math.MaxUint64 for entry := range results.Next() { select { case <-ctx.Done(): return 0, 0, nil default: } trimmed := strings.TrimPrefix(entry.Key, "/") id, err := strconv.ParseUint(trimmed, 10, 64) if err != nil { return 0, 0, err } if id < head { head = id } if (id + 1) > tail { tail = (id + 1) } } if err := results.Close(); err != nil { return 0, 0, err } if head == math.MaxUint64 { head = 0 } return head, tail, nil } func (q *Queue) remove(key ds.Key) error { return q.datastore.Delete(key) }