provider.go 1.97 KB
Newer Older
1 2 3 4 5 6 7
// Package simple implements structures and methods to provide blocks,
// keep track of which blocks are provided, and to allow those blocks to
// be reprovided.
package simple

import (
	"context"
8
	"time"
9 10

	cid "github.com/ipfs/go-cid"
Michael Avila's avatar
Michael Avila committed
11
	q "github.com/ipfs/go-ipfs-provider/queue"
12
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
13
	routing "github.com/libp2p/go-libp2p-core/routing"
14 15 16 17
)

var logP = logging.Logger("provider.simple")

18 19 20 21
const (
	provideOutgoingWorkerLimit = 8
	provideTimeout             = 3 * time.Minute
)
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66

// Provider announces blocks to the network
type Provider struct {
	ctx context.Context
	// the CIDs for which provide announcements should be made
	queue *q.Queue
	// used to announce providing to the network
	contentRouting routing.ContentRouting
}

// NewProvider creates a provider that announces blocks to the network using a content router
func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting) *Provider {
	return &Provider{
		ctx:            ctx,
		queue:          queue,
		contentRouting: contentRouting,
	}
}

// Close stops the provider
func (p *Provider) Close() error {
	p.queue.Close()
	return nil
}

// Run workers to handle provide requests.
func (p *Provider) Run() {
	p.handleAnnouncements()
}

// Provide the given cid using specified strategy.
func (p *Provider) Provide(root cid.Cid) error {
	p.queue.Enqueue(root)
	return nil
}

// Handle all outgoing cids by providing (announcing) them
func (p *Provider) handleAnnouncements() {
	for workers := 0; workers < provideOutgoingWorkerLimit; workers++ {
		go func() {
			for p.ctx.Err() == nil {
				select {
				case <-p.ctx.Done():
					return
				case c := <-p.queue.Dequeue():
67 68
					ctx, cancel := context.WithTimeout(p.ctx, provideTimeout)
					defer cancel()
69
					logP.Info("announce - start - ", c)
70
					if err := p.contentRouting.Provide(ctx, c, true); err != nil {
71 72 73 74 75 76 77 78
						logP.Warningf("Unable to provide entry: %s, %s", c, err)
					}
					logP.Info("announce - end - ", c)
				}
			}
		}()
	}
}