provider.go 1.78 KB
Newer Older
1 2 3 4 5 6 7
// Package provider implements structures and methods to provide blocks,
// keep track of which blocks are provided, and to allow those blocks to
// be reprovided.
package provider

import (
	"context"
8 9

	cid "github.com/ipfs/go-cid"
10
	logging "github.com/ipfs/go-log"
11
	routing "github.com/libp2p/go-libp2p-routing"
12 13 14 15 16 17 18 19 20 21
)

var (
	log = logging.Logger("provider")
)

const (
	provideOutgoingWorkerLimit = 8
)

22
// Provider announces blocks to the network
23 24 25 26 27 28
type Provider interface {
	Run()
	Provide(cid.Cid) error
}

type provider struct {
29 30 31 32 33 34 35
	ctx context.Context
	// the CIDs for which provide announcements should be made
	queue *Queue
	// used to announce providing to the network
	contentRouting routing.ContentRouting
}

36
// NewProvider creates a provider that announces blocks to the network using a content router
37 38
func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider {
	return &provider{
39 40 41 42 43 44 45
		ctx:            ctx,
		queue:          queue,
		contentRouting: contentRouting,
	}
}

// Start workers to handle provide requests.
46
func (p *provider) Run() {
47 48 49 50 51
	p.queue.Run()
	p.handleAnnouncements()
}

// Provide the given cid using specified strategy.
52
func (p *provider) Provide(root cid.Cid) error {
53 54 55 56
	return p.queue.Enqueue(root)
}

// Handle all outgoing cids by providing (announcing) them
57
func (p *provider) handleAnnouncements() {
58 59 60 61 62 63 64
	for workers := 0; workers < provideOutgoingWorkerLimit; workers++ {
		go func() {
			for {
				select {
				case <-p.ctx.Done():
					return
				case entry := <-p.queue.Dequeue():
65 66
					log.Info("announce - start - ", entry.cid)
					if err := p.contentRouting.Provide(p.ctx, entry.cid, true); err != nil {
67 68
						log.Warningf("Unable to provide entry: %s, %s", entry.cid, err)
					}
69
					log.Info("announce - end - ", entry.cid)
70 71 72 73 74
				}
			}
		}()
	}
}