provider.go 2.66 KB
Newer Older
1 2 3 4 5 6 7
// Package simple implements structures and methods to provide blocks,
// keep track of which blocks are provided, and to allow those blocks to
// be reprovided.
package simple

import (
	"context"
8
	"time"
9

Michael Avila's avatar
Michael Avila committed
10
	"github.com/ipfs/go-cid"
Michael Avila's avatar
Michael Avila committed
11
	q "github.com/ipfs/go-ipfs-provider/queue"
12
	logging "github.com/ipfs/go-log"
Michael Avila's avatar
Michael Avila committed
13
	"github.com/libp2p/go-libp2p-core/routing"
14 15 16 17 18 19 20 21 22 23 24
)

var logP = logging.Logger("provider.simple")

// Provider announces blocks to the network
type Provider struct {
	ctx context.Context
	// the CIDs for which provide announcements should be made
	queue *q.Queue
	// used to announce providing to the network
	contentRouting routing.ContentRouting
Michael Avila's avatar
Michael Avila committed
25 26
	// how long to wait for announce to complete before giving up
	timeout time.Duration
27 28
	// how many workers concurrently work through thhe queue
	workerLimit int
Michael Avila's avatar
Michael Avila committed
29 30
}

31 32
// Option defines the functional option type that can be used to configure
// provider instances
Michael Avila's avatar
Michael Avila committed
33 34
type Option func(*Provider)

35
// WithTimeout is an option to set a timeout on a provider
Michael Avila's avatar
Michael Avila committed
36 37 38 39
func WithTimeout(timeout time.Duration) Option {
	return func(p *Provider) {
		p.timeout = timeout
	}
40 41
}

42 43 44 45 46 47 48
// MaxWorkers is an option to set the max workers on a provider
func MaxWorkers(count int) Option {
	return func(p *Provider) {
		p.workerLimit = count
	}
}

49
// NewProvider creates a provider that announces blocks to the network using a content router
Michael Avila's avatar
Michael Avila committed
50 51
func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting, options ...Option) *Provider {
	p := &Provider{
52 53 54
		ctx:            ctx,
		queue:          queue,
		contentRouting: contentRouting,
55
		workerLimit:    8,
56
	}
Michael Avila's avatar
Michael Avila committed
57 58 59 60 61 62

	for _, option := range options {
		option(p)
	}

	return p
63 64 65 66
}

// Close stops the provider
func (p *Provider) Close() error {
67
	return p.queue.Close()
68 69 70 71 72 73 74 75 76
}

// Run workers to handle provide requests.
func (p *Provider) Run() {
	p.handleAnnouncements()
}

// Provide the given cid using specified strategy.
func (p *Provider) Provide(root cid.Cid) error {
77
	return p.queue.Enqueue(root)
78 79 80 81
}

// Handle all outgoing cids by providing (announcing) them
func (p *Provider) handleAnnouncements() {
82
	for workers := 0; workers < p.workerLimit; workers++ {
83 84 85 86 87 88
		go func() {
			for p.ctx.Err() == nil {
				select {
				case <-p.ctx.Done():
					return
				case c := <-p.queue.Dequeue():
Steven Allen's avatar
Steven Allen committed
89
					p.doProvide(c)
90 91 92 93 94
				}
			}
		}()
	}
}
Steven Allen's avatar
Steven Allen committed
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111

func (p *Provider) doProvide(c cid.Cid) {
	ctx := p.ctx
	if p.timeout > 0 {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, p.timeout)
		defer cancel()
	} else {
		ctx = p.ctx
	}

	logP.Info("announce - start - ", c)
	if err := p.contentRouting.Provide(ctx, c, true); err != nil {
		logP.Warningf("Unable to provide entry: %s, %s", c, err)
	}
	logP.Info("announce - end - ", c)
}