Commit 931c253e authored by Michael Avila's avatar Michael Avila

Provide root node immediately when add and pin add

License: MIT
Signed-off-by: default avatarMichael Avila <davidmichaelavila@gmail.com>
parents
// Package provider implements structures and methods to provide blocks,
// keep track of which blocks are provided, and to allow those blocks to
// be reprovided.
package provider
import (
"context"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-routing"
"time"
)
var (
log = logging.Logger("provider")
)
const (
provideOutgoingWorkerLimit = 8
provideOutgoingTimeout = 15 * time.Second
)
// Provider announces blocks to the network, tracks which blocks are
// being provided, and untracks blocks when they're no longer in the blockstore.
type Provider struct {
ctx context.Context
// the CIDs for which provide announcements should be made
queue *Queue
// used to announce providing to the network
contentRouting routing.ContentRouting
}
func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) *Provider {
return &Provider{
ctx: ctx,
queue: queue,
contentRouting: contentRouting,
}
}
// Start workers to handle provide requests.
func (p *Provider) Run() {
p.queue.Run()
p.handleAnnouncements()
}
// Provide the given cid using specified strategy.
func (p *Provider) Provide(root cid.Cid) error {
return p.queue.Enqueue(root)
}
// Handle all outgoing cids by providing (announcing) them
func (p *Provider) handleAnnouncements() {
for workers := 0; workers < provideOutgoingWorkerLimit; workers++ {
go func() {
for {
select {
case <-p.ctx.Done():
return
case entry := <-p.queue.Dequeue():
if err := doProvide(p.ctx, p.contentRouting, entry.cid); err != nil {
log.Warningf("Unable to provide entry: %s, %s", entry.cid, err)
}
if err := entry.Complete(); err != nil {
log.Warningf("Unable to complete queue entry when providing: %s, %s", entry.cid, err)
}
}
}
}()
}
}
// TODO: better document this provide logic
func doProvide(ctx context.Context, contentRouting routing.ContentRouting, key cid.Cid) error {
// announce
log.Info("announce - start - ", key)
ctx, cancel := context.WithTimeout(ctx, provideOutgoingTimeout)
if err := contentRouting.Provide(ctx, key, true); err != nil {
log.Warningf("Failed to provide cid: %s", err)
// TODO: Maybe put these failures onto a failures queue?
cancel()
return err
}
cancel()
log.Info("announce - end - ", key)
return nil
}
package provider
import (
"context"
"errors"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
"math"
"strconv"
"strings"
"sync"
)
// Entry allows for the durability in the queue. When a cid is dequeued it is
// not removed from the datastore until you call Complete() on the entry you
// receive.
type Entry struct {
cid cid.Cid
key ds.Key
queue *Queue
}
func (e *Entry) Complete() error {
return e.queue.remove(e.key)
}
// Queue provides a durable, FIFO interface to the datastore for storing cids
//
// Durability just means that cids in the process of being provided when a
// crash or shutdown occurs will still be in the queue when the node is
// brought back online.
type Queue struct {
// used to differentiate queues in datastore
// e.g. provider vs reprovider
name string
ctx context.Context
tail uint64
head uint64
lock sync.Mutex
datastore ds.Datastore
dequeue chan *Entry
notEmpty chan struct{}
isRunning bool
}
func NewQueue(name string, ctx context.Context, datastore ds.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(datastore, ds.NewKey("/" + name + "/queue/"))
head, tail, err := getQueueHeadTail(name, ctx, namespaced)
if err != nil {
return nil, err
}
q := &Queue{
name: name,
ctx: ctx,
head: head,
tail: tail,
lock: sync.Mutex{},
datastore: namespaced,
dequeue: make(chan *Entry),
notEmpty: make(chan struct{}),
isRunning: false,
}
return q, nil
}
// Put a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) error {
q.lock.Lock()
defer q.lock.Unlock()
wasEmpty := q.IsEmpty()
nextKey := q.queueKey(q.tail)
if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil {
return err
}
q.tail++
if q.isRunning && wasEmpty {
select {
case q.notEmpty <- struct{}{}:
case <-q.ctx.Done():
}
}
return nil
}
// Remove an entry from the queue.
func (q *Queue) Dequeue() <-chan *Entry {
return q.dequeue
}
func (q *Queue) IsEmpty() bool {
return (q.tail - q.head) == 0
}
func (q *Queue) remove(key ds.Key) error {
return q.datastore.Delete(key)
}
// dequeue items when the dequeue channel is available to
// be written to
func (q *Queue) Run() {
q.isRunning = true
go func() {
for {
select {
case <-q.ctx.Done():
return
default:
}
if q.IsEmpty() {
select {
case <-q.ctx.Done():
return
// wait for a notEmpty message
case <-q.notEmpty:
}
}
entry, err := q.next()
if err != nil {
log.Warningf("Error Dequeue()-ing: %s, %s", entry, err)
continue
}
select {
case <-q.ctx.Done():
return
case q.dequeue <- entry:
}
}
}()
}
// Find the next item in the queue, crawl forward if an entry is not
// found in the next spot.
func (q *Queue) next() (*Entry, error) {
q.lock.Lock()
defer q.lock.Unlock()
var nextKey ds.Key
var value []byte
var err error
for {
if q.head >= q.tail {
return nil, errors.New("no more entries in queue")
}
select {
case <-q.ctx.Done():
return nil, nil
default:
}
nextKey = q.queueKey(q.head)
value, err = q.datastore.Get(nextKey)
if err == ds.ErrNotFound {
q.head++
continue
} else if err != nil {
return nil, err
} else {
break
}
}
id, err := cid.Parse(value)
if err != nil {
return nil, err
}
entry := &Entry {
cid: id,
key: nextKey,
queue: q,
}
q.head++
return entry, nil
}
func (q *Queue) queueKey(id uint64) ds.Key {
return ds.NewKey(strconv.FormatUint(id, 10))
}
// crawl over the queue entries to find the head and tail
func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) (uint64, uint64, error) {
query := query.Query{}
results, err := datastore.Query(query)
if err != nil {
return 0, 0, err
}
var tail uint64 = 0
var head uint64 = math.MaxUint64
for entry := range results.Next() {
select {
case <-ctx.Done():
return 0, 0, nil
default:
}
trimmed := strings.TrimPrefix(entry.Key, "/")
id, err := strconv.ParseUint(trimmed, 10, 64)
if err != nil {
return 0, 0, err
}
if id < head {
head = id
}
if (id+1) > tail {
tail = (id+1)
}
}
if err := results.Close(); err != nil {
return 0, 0, err
}
if head == math.MaxUint64 {
head = 0
}
return head, tail, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment