reprovide.go 6.05 KB
Newer Older
1 2 3 4
package simple

import (
	"context"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
6 7 8
	"fmt"
	"time"

9 10 11
	"github.com/cenkalti/backoff"
	"github.com/ipfs/go-cid"
	"github.com/ipfs/go-cidutil"
12
	"github.com/ipfs/go-fetcher"
13 14
	blocks "github.com/ipfs/go-ipfs-blockstore"
	logging "github.com/ipfs/go-log"
15
	"github.com/ipfs/go-verifcid"
16
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
17
	"github.com/libp2p/go-libp2p-core/routing"
18 19 20 21
)

var logR = logging.Logger("reprovider.simple")

Steven Allen's avatar
Steven Allen committed
22 23 24
// ErrClosed is returned by Trigger when operating on a closed reprovider.
var ErrClosed = errors.New("reprovider service stopped")

25
// KeyChanFunc is function streaming CIDs to pass to content routing
26 27 28 29
type KeyChanFunc func(context.Context) (<-chan cid.Cid, error)

// Reprovider reannounces blocks to the network
type Reprovider struct {
Steven Allen's avatar
Steven Allen committed
30 31 32 33
	// Reprovider context. Cancel to stop, then wait on closedCh.
	ctx      context.Context
	cancel   context.CancelFunc
	closedCh chan struct{}
Steven Allen's avatar
Steven Allen committed
34 35 36

	// Trigger triggers a reprovide.
	trigger chan chan<- error
37 38 39 40 41 42 43 44 45 46 47

	// The routing system to provide values through
	rsys routing.ContentRouting

	keyProvider KeyChanFunc

	tick time.Duration
}

// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
Steven Allen's avatar
Steven Allen committed
48
	ctx, cancel := context.WithCancel(ctx)
49
	return &Reprovider{
Steven Allen's avatar
Steven Allen committed
50 51 52 53
		ctx:      ctx,
		cancel:   cancel,
		closedCh: make(chan struct{}),
		trigger:  make(chan chan<- error),
54 55 56 57 58 59 60 61 62

		rsys:        rsys,
		keyProvider: keyProvider,
		tick:        reprovideIniterval,
	}
}

// Close the reprovider
func (rp *Reprovider) Close() error {
Steven Allen's avatar
Steven Allen committed
63
	rp.cancel()
Steven Allen's avatar
Steven Allen committed
64
	<-rp.closedCh
65 66 67 68 69
	return nil
}

// Run re-provides keys with 'tick' interval or when triggered
func (rp *Reprovider) Run() {
Steven Allen's avatar
Steven Allen committed
70
	defer close(rp.closedCh)
Steven Allen's avatar
Steven Allen committed
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88

	var initialReprovideCh, reprovideCh <-chan time.Time

	// If reproviding is enabled (non-zero)
	if rp.tick > 0 {
		reprovideTicker := time.NewTicker(rp.tick)
		defer reprovideTicker.Stop()
		reprovideCh = reprovideTicker.C

		// If the reprovide ticker is larger than a minute (likely),
		// provide once after we've been up a minute.
		//
		// Don't provide _immediately_ as we might be just about to stop.
		if rp.tick > time.Minute {
			initialReprovideTimer := time.NewTimer(time.Minute)
			defer initialReprovideTimer.Stop()

			initialReprovideCh = initialReprovideTimer.C
89
		}
Steven Allen's avatar
Steven Allen committed
90
	}
91

Steven Allen's avatar
Steven Allen committed
92 93
	var done chan<- error
	for rp.ctx.Err() == nil {
94
		select {
Steven Allen's avatar
Steven Allen committed
95 96 97
		case <-initialReprovideCh:
		case <-reprovideCh:
		case done = <-rp.trigger:
98 99 100 101 102
		case <-rp.ctx.Done():
			return
		}

		err := rp.Reprovide()
Steven Allen's avatar
Steven Allen committed
103 104 105

		// only log if we've hit an actual error, otherwise just tell the client we're shutting down
		if rp.ctx.Err() != nil {
Steven Allen's avatar
Steven Allen committed
106
			err = ErrClosed
Steven Allen's avatar
Steven Allen committed
107
		} else if err != nil {
108
			logR.Errorf("failed to reprovide: %s", err)
109 110 111
		}

		if done != nil {
Steven Allen's avatar
Steven Allen committed
112 113 114 115
			if err != nil {
				done <- err
			}
			close(done)
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
		}
	}
}

// Reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) Reprovide() error {
	keychan, err := rp.keyProvider(rp.ctx)
	if err != nil {
		return fmt.Errorf("failed to get key chan: %s", err)
	}
	for c := range keychan {
		// hash security
		if err := verifcid.ValidateCid(c); err != nil {
			logR.Errorf("insecure hash in reprovider, %s (%s)", c, err)
			continue
		}
		op := func() error {
			err := rp.rsys.Provide(rp.ctx, c, true)
			if err != nil {
				logR.Debugf("Failed to provide key: %s", err)
			}
			return err
		}

140
		err := backoff.Retry(op, backoff.WithContext(backoff.NewExponentialBackOff(), rp.ctx))
141 142 143 144 145 146 147 148
		if err != nil {
			logR.Debugf("Providing failed after number of retries: %s", err)
			return err
		}
	}
	return nil
}

Steven Allen's avatar
Steven Allen committed
149
// Trigger starts the reprovision process in rp.Run and waits for it to finish.
Steven Allen's avatar
Steven Allen committed
150 151
//
// Returns an error if a reprovide is already in progress.
152
func (rp *Reprovider) Trigger(ctx context.Context) error {
Steven Allen's avatar
Steven Allen committed
153
	resultCh := make(chan error, 1)
Steven Allen's avatar
Steven Allen committed
154
	select {
Steven Allen's avatar
Steven Allen committed
155
	case rp.trigger <- resultCh:
Steven Allen's avatar
Steven Allen committed
156 157
	default:
		return fmt.Errorf("reprovider is already running")
158 159 160
	}

	select {
Steven Allen's avatar
Steven Allen committed
161
	case err := <-resultCh:
Steven Allen's avatar
Steven Allen committed
162
		return err
163
	case <-rp.ctx.Done():
Steven Allen's avatar
Steven Allen committed
164
		return ErrClosed
165
	case <-ctx.Done():
Steven Allen's avatar
Steven Allen committed
166
		return ctx.Err()
167 168 169 170 171 172 173 174 175 176 177 178
	}
}

// Strategies

// NewBlockstoreProvider returns key provider using bstore.AllKeysChan
func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
	return func(ctx context.Context) (<-chan cid.Cid, error) {
		return bstore.AllKeysChan(ctx)
	}
}

179 180 181
// Pinner interface defines how the simple.Reprovider wants to interact
// with a Pinning service
type Pinner interface {
182 183
	DirectKeys(ctx context.Context) ([]cid.Cid, error)
	RecursiveKeys(ctx context.Context) ([]cid.Cid, error)
184 185
}

186
// NewPinnedProvider returns provider supplying pinned keys
187
func NewPinnedProvider(onlyRoots bool, pinning Pinner, fetchConfig fetcher.FetcherConfig) KeyChanFunc {
188
	return func(ctx context.Context) (<-chan cid.Cid, error) {
189
		set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots)
190 191 192
		if err != nil {
			return nil, err
		}
193

194 195 196 197 198 199 200 201
		outCh := make(chan cid.Cid)
		go func() {
			defer close(outCh)
			for c := range set.New {
				select {
				case <-ctx.Done():
					return
				case outCh <- c:
202
				}
203
			}
204

205
		}()
206

207
		return outCh, nil
208 209 210
	}
}

211
func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.FetcherConfig, onlyRoots bool) (*cidutil.StreamingSet, error) {
212 213 214 215 216 217 218
	set := cidutil.NewStreamingSet()

	go func() {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()
		defer close(set.New)

219 220 221 222 223 224
		dkeys, err := pinning.DirectKeys(ctx)
		if err != nil {
			logR.Errorf("reprovide direct pins: %s", err)
			return
		}
		for _, key := range dkeys {
225 226 227
			set.Visitor(ctx)(key)
		}

228 229 230 231 232
		rkeys, err := pinning.RecursiveKeys(ctx)
		if err != nil {
			logR.Errorf("reprovide indirect pins: %s", err)
			return
		}
233 234

		session := fetchConfig.NewSession(ctx)
235
		for _, key := range rkeys {
236 237 238 239 240 241 242 243 244 245 246
			set.Visitor(ctx)(key)
			if !onlyRoots {
				nodeCh, errCh := fetcher.BlockAll(ctx, session, cidlink.Link{key})
				for res := range nodeCh {
					clink, ok := res.LastBlockLink.(cidlink.Link)
					if ok {
						set.Visitor(ctx)(clink.Cid)
					}
				}

				if err := <-errCh; err != nil {
247 248 249 250 251 252 253 254 255
					logR.Errorf("reprovide indirect pins: %s", err)
					return
				}
			}
		}
	}()

	return set, nil
}