provider.go 2.67 KB
Newer Older
1 2 3 4 5 6 7
// Package simple implements structures and methods to provide blocks,
// keep track of which blocks are provided, and to allow those blocks to
// be reprovided.
package simple

import (
	"context"
8
	"time"
9

Michael Avila's avatar
Michael Avila committed
10
	"github.com/ipfs/go-cid"
Michael Avila's avatar
Michael Avila committed
11
	q "github.com/ipfs/go-ipfs-provider/queue"
12
	logging "github.com/ipfs/go-log"
Michael Avila's avatar
Michael Avila committed
13
	"github.com/libp2p/go-libp2p-core/routing"
14 15 16 17 18 19 20 21 22 23 24
)

var logP = logging.Logger("provider.simple")

// Provider announces blocks to the network
type Provider struct {
	ctx context.Context
	// the CIDs for which provide announcements should be made
	queue *q.Queue
	// used to announce providing to the network
	contentRouting routing.ContentRouting
Michael Avila's avatar
Michael Avila committed
25 26
	// how long to wait for announce to complete before giving up
	timeout time.Duration
Michael Avila's avatar
Michael Avila committed
27 28
	// how many workers concurrently work through thhe queue
	workerLimit int
Michael Avila's avatar
Michael Avila committed
29 30
}

Michael Avila's avatar
Michael Avila committed
31 32
// Option defines the functional option type that can be used to configure
// provider instances
Michael Avila's avatar
Michael Avila committed
33 34
type Option func(*Provider)

Michael Avila's avatar
Michael Avila committed
35
// WithTimeout is an option to set a timeout on a provider
Michael Avila's avatar
Michael Avila committed
36 37 38 39
func WithTimeout(timeout time.Duration) Option {
	return func(p *Provider) {
		p.timeout = timeout
	}
40 41
}

Michael Avila's avatar
Michael Avila committed
42 43 44 45 46 47 48
// MaxWorkers is an option to set the max workers on a provider
func MaxWorkers(count int) Option {
	return func(p *Provider) {
		p.workerLimit = count
	}
}

49
// NewProvider creates a provider that announces blocks to the network using a content router
Michael Avila's avatar
Michael Avila committed
50 51
func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting, options ...Option) *Provider {
	p := &Provider{
52 53 54
		ctx:            ctx,
		queue:          queue,
		contentRouting: contentRouting,
Michael Avila's avatar
Michael Avila committed
55
		workerLimit:    8,
56
	}
Michael Avila's avatar
Michael Avila committed
57 58 59 60 61 62

	for _, option := range options {
		option(p)
	}

	return p
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
}

// Close stops the provider
func (p *Provider) Close() error {
	p.queue.Close()
	return nil
}

// Run workers to handle provide requests.
func (p *Provider) Run() {
	p.handleAnnouncements()
}

// Provide the given cid using specified strategy.
func (p *Provider) Provide(root cid.Cid) error {
	p.queue.Enqueue(root)
	return nil
}

// Handle all outgoing cids by providing (announcing) them
func (p *Provider) handleAnnouncements() {
Michael Avila's avatar
Michael Avila committed
84
	for workers := 0; workers < p.workerLimit; workers++ {
85 86 87 88 89 90
		go func() {
			for p.ctx.Err() == nil {
				select {
				case <-p.ctx.Done():
					return
				case c := <-p.queue.Dequeue():
Michael Avila's avatar
Michael Avila committed
91 92 93 94 95 96 97 98 99
					var ctx context.Context
					var cancel context.CancelFunc
					if p.timeout > 0 {
						ctx, cancel = context.WithTimeout(p.ctx, p.timeout)
						defer cancel()
					} else {
						ctx = p.ctx
					}

100
					logP.Info("announce - start - ", c)
101
					if err := p.contentRouting.Provide(ctx, c, true); err != nil {
102 103 104 105 106 107 108 109
						logP.Warningf("Unable to provide entry: %s, %s", c, err)
					}
					logP.Info("announce - end - ", c)
				}
			}
		}()
	}
}