provider.go 1.85 KB
Newer Older
1 2 3 4 5 6 7
// Package provider implements structures and methods to provide blocks,
// keep track of which blocks are provided, and to allow those blocks to
// be reprovided.
package provider

import (
	"context"
8
	"github.com/ipfs/go-cid"
9
	logging "github.com/ipfs/go-log"
10
	"github.com/libp2p/go-libp2p-routing"
11 12
)

Erik Ingenito's avatar
Gofmt  
Erik Ingenito committed
13
var log = logging.Logger("provider")
14

Erik Ingenito's avatar
Erik Ingenito committed
15
const provideOutgoingWorkerLimit = 8
16

17
// Provider announces blocks to the network
18
type Provider interface {
19
	// Run is used to begin processing the provider work
20
	Run()
21
	// Provide takes a cid and makes an attempt to announce it to the network
22 23 24 25
	Provide(cid.Cid) error
}

type provider struct {
26 27 28 29 30 31 32
	ctx context.Context
	// the CIDs for which provide announcements should be made
	queue *Queue
	// used to announce providing to the network
	contentRouting routing.ContentRouting
}

33
// NewProvider creates a provider that announces blocks to the network using a content router
34 35
func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider {
	return &provider{
36 37 38 39 40 41 42
		ctx:            ctx,
		queue:          queue,
		contentRouting: contentRouting,
	}
}

// Start workers to handle provide requests.
43
func (p *provider) Run() {
44 45 46 47
	p.handleAnnouncements()
}

// Provide the given cid using specified strategy.
48
func (p *provider) Provide(root cid.Cid) error {
Erik Ingenito's avatar
Erik Ingenito committed
49 50
	p.queue.Enqueue(root)
	return nil
51 52 53
}

// Handle all outgoing cids by providing (announcing) them
54
func (p *provider) handleAnnouncements() {
55 56
	for workers := 0; workers < provideOutgoingWorkerLimit; workers++ {
		go func() {
57
			for p.ctx.Err() == nil {
58 59 60
				select {
				case <-p.ctx.Done():
					return
Erik Ingenito's avatar
Erik Ingenito committed
61 62 63 64
				case c := <-p.queue.Dequeue():
					log.Info("announce - start - ", c)
					if err := p.contentRouting.Provide(p.ctx, c, true); err != nil {
						log.Warningf("Unable to provide entry: %s, %s", c, err)
65
					}
Erik Ingenito's avatar
Erik Ingenito committed
66
					log.Info("announce - end - ", c)
67 68 69 70 71
				}
			}
		}()
	}
}