provider.go 2.23 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Package provider implements structures and methods to provide blocks,
// keep track of which blocks are provided, and to allow those blocks to
// be reprovided.
package provider

import (
	"context"
	"github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
	"github.com/libp2p/go-libp2p-routing"
)

var (
	log = logging.Logger("provider")
)

const (
	provideOutgoingWorkerLimit = 8
)

21
// Provider announces blocks to the network
22 23 24 25 26 27
type Provider interface {
	Run()
	Provide(cid.Cid) error
}

type provider struct {
28 29 30 31 32 33 34
	ctx context.Context
	// the CIDs for which provide announcements should be made
	queue *Queue
	// used to announce providing to the network
	contentRouting routing.ContentRouting
}

35
// NewProvider creates a provider that announces blocks to the network using a content router
36 37
func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider {
	return &provider{
38 39 40 41 42 43 44
		ctx:            ctx,
		queue:          queue,
		contentRouting: contentRouting,
	}
}

// Start workers to handle provide requests.
45
func (p *provider) Run() {
46 47 48 49 50
	p.queue.Run()
	p.handleAnnouncements()
}

// Provide the given cid using specified strategy.
51
func (p *provider) Provide(root cid.Cid) error {
52 53 54 55
	return p.queue.Enqueue(root)
}

// Handle all outgoing cids by providing (announcing) them
56
func (p *provider) handleAnnouncements() {
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
	for workers := 0; workers < provideOutgoingWorkerLimit; workers++ {
		go func() {
			for {
				select {
				case <-p.ctx.Done():
					return
				case entry := <-p.queue.Dequeue():
					if err := doProvide(p.ctx, p.contentRouting, entry.cid); err != nil {
						log.Warningf("Unable to provide entry: %s, %s", entry.cid, err)
					}

					if err := entry.Complete(); err != nil {
						log.Warningf("Unable to complete queue entry when providing: %s, %s", entry.cid, err)
					}
				}
			}
		}()
	}
}

// TODO: better document this provide logic
func doProvide(ctx context.Context, contentRouting routing.ContentRouting, key cid.Cid) error {
	// announce
	log.Info("announce - start - ", key)
	if err := contentRouting.Provide(ctx, key, true); err != nil {
		log.Warningf("Failed to provide cid: %s", err)
		// TODO: Maybe put these failures onto a failures queue?
		return err
	}
	log.Info("announce - end - ", key)
	return nil
}