Commit e2ee98e1 authored by Steven Allen's avatar Steven Allen

fix several bugs

1. Don't log an error when shutting down while reproviding.
2. Reprovide on a fixed interval instead of treating the interval as a delay.
3. Remove trigger muting logic and use the simpler way.
4. Add some tests for triggering.
5. Make sure Reprovider.Close actually, you know, does something. And waits for the reprovider to stop.
parent 0505db7f
...@@ -20,12 +20,16 @@ var logR = logging.Logger("reprovider.simple") ...@@ -20,12 +20,16 @@ var logR = logging.Logger("reprovider.simple")
// KeyChanFunc is function streaming CIDs to pass to content routing // KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) type KeyChanFunc func(context.Context) (<-chan cid.Cid, error)
type doneFunc func(error)
// Reprovider reannounces blocks to the network // Reprovider reannounces blocks to the network
type Reprovider struct { type Reprovider struct {
ctx context.Context // Reprovider context. Cancel to stop, then wait on doneCh.
trigger chan doneFunc ctx context.Context
cancel context.CancelFunc
doneCh chan struct{}
// Trigger triggers a reprovide.
trigger chan chan<- error
// The routing system to provide values through // The routing system to provide values through
rsys routing.ContentRouting rsys routing.ContentRouting
...@@ -37,9 +41,12 @@ type Reprovider struct { ...@@ -37,9 +41,12 @@ type Reprovider struct {
// NewReprovider creates new Reprovider instance. // NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
ctx, cancel := context.WithCancel(ctx)
return &Reprovider{ return &Reprovider{
ctx: ctx, ctx: ctx,
trigger: make(chan doneFunc), cancel: cancel,
doneCh: make(chan struct{}),
trigger: make(chan chan<- error),
rsys: rsys, rsys: rsys,
keyProvider: keyProvider, keyProvider: keyProvider,
...@@ -49,44 +56,60 @@ func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys r ...@@ -49,44 +56,60 @@ func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys r
// Close the reprovider // Close the reprovider
func (rp *Reprovider) Close() error { func (rp *Reprovider) Close() error {
rp.cancel()
<-rp.doneCh
return nil return nil
} }
// Run re-provides keys with 'tick' interval or when triggered // Run re-provides keys with 'tick' interval or when triggered
func (rp *Reprovider) Run() { func (rp *Reprovider) Run() {
// dont reprovide immediately. defer close(rp.doneCh)
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime. var initialReprovideCh, reprovideCh <-chan time.Time
after := time.After(time.Minute)
var done doneFunc // If reproviding is enabled (non-zero)
for { if rp.tick > 0 {
if rp.tick == 0 { reprovideTicker := time.NewTicker(rp.tick)
after = make(chan time.Time) defer reprovideTicker.Stop()
reprovideCh = reprovideTicker.C
// If the reprovide ticker is larger than a minute (likely),
// provide once after we've been up a minute.
//
// Don't provide _immediately_ as we might be just about to stop.
if rp.tick > time.Minute {
initialReprovideTimer := time.NewTimer(time.Minute)
defer initialReprovideTimer.Stop()
initialReprovideCh = initialReprovideTimer.C
} }
}
var done chan<- error
for rp.ctx.Err() == nil {
select { select {
case <-initialReprovideCh:
case <-reprovideCh:
case done = <-rp.trigger:
case <-rp.ctx.Done(): case <-rp.ctx.Done():
return return
case done = <-rp.trigger:
case <-after:
} }
//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
//a 'reprovider is already running' error is returned
unmute := rp.muteTrigger()
err := rp.Reprovide() err := rp.Reprovide()
if err != nil {
// only log if we've hit an actual error, otherwise just tell the client we're shutting down
if rp.ctx.Err() != nil {
err = fmt.Errorf("shutting down")
} else if err != nil {
logR.Errorf("failed to reprovide: %s", err) logR.Errorf("failed to reprovide: %s", err)
} }
if done != nil { if done != nil {
done(err) if err != nil {
done <- err
}
close(done)
} }
unmute()
after = time.After(rp.tick)
} }
} }
...@@ -119,44 +142,27 @@ func (rp *Reprovider) Reprovide() error { ...@@ -119,44 +142,27 @@ func (rp *Reprovider) Reprovide() error {
return nil return nil
} }
// Trigger starts reprovision process in rp.Run and waits for it // Trigger starts reprovision process in rp.Run and waits for it to finish.
//
// Returns an error if a reprovide is already in progress.
func (rp *Reprovider) Trigger(ctx context.Context) error { func (rp *Reprovider) Trigger(ctx context.Context) error {
progressCtx, done := context.WithCancel(ctx) doneCh := make(chan error, 1)
select {
var err error case rp.trigger <- doneCh:
df := func(e error) { default:
err = e return fmt.Errorf("reprovider is already running")
done()
} }
select { select {
case err := <-doneCh:
return err
case <-rp.ctx.Done(): case <-rp.ctx.Done():
return context.Canceled return fmt.Errorf("reprovide service stopping")
case <-ctx.Done(): case <-ctx.Done():
return context.Canceled return ctx.Err()
case rp.trigger <- df:
<-progressCtx.Done()
return err
} }
} }
func (rp *Reprovider) muteTrigger() context.CancelFunc {
ctx, cf := context.WithCancel(rp.ctx)
go func() {
defer cf()
for {
select {
case <-ctx.Done():
return
case done := <-rp.trigger:
done(fmt.Errorf("reprovider is already running"))
}
}
}()
return cf
}
// Strategies // Strategies
// NewBlockstoreProvider returns key provider using bstore.AllKeysChan // NewBlockstoreProvider returns key provider using bstore.AllKeysChan
......
...@@ -63,6 +63,22 @@ func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) { ...@@ -63,6 +63,22 @@ func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) {
} }
func TestReprovide(t *testing.T) { func TestReprovide(t *testing.T) {
testReprovide(t, func(r *Reprovider, ctx context.Context) error {
return r.Reprovide()
})
}
func TestTrigger(t *testing.T) {
testReprovide(t, func(r *Reprovider, ctx context.Context) error {
go r.Run()
time.Sleep(1 * time.Second)
defer r.Close()
err := r.Trigger(ctx)
return err
})
}
func testReprovide(t *testing.T, trigger func(r *Reprovider, ctx context.Context) error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
...@@ -71,7 +87,7 @@ func TestReprovide(t *testing.T) { ...@@ -71,7 +87,7 @@ func TestReprovide(t *testing.T) {
keyProvider := NewBlockstoreProvider(bstore) keyProvider := NewBlockstoreProvider(bstore)
reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) reprov := NewReprovider(ctx, time.Hour, clA, keyProvider)
err := reprov.Reprovide() err := trigger(reprov, ctx)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -95,6 +111,67 @@ func TestReprovide(t *testing.T) { ...@@ -95,6 +111,67 @@ func TestReprovide(t *testing.T) {
} }
} }
func TestTriggerTwice(t *testing.T) {
// Ensure we can only trigger once at a time.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clA, _, _, _ := setupRouting(t)
keyCh := make(chan cid.Cid)
startCh := make(chan struct{})
keyFunc := func(ctx context.Context) (<-chan cid.Cid, error) {
<-startCh
return keyCh, nil
}
reprov := NewReprovider(ctx, time.Hour, clA, keyFunc)
go reprov.Run()
defer reprov.Close()
// Wait for the reprovider to start, otherwise, the reprovider will
// think a concurrent reprovide is running.
//
// We _could_ fix this race... but that would be complexity for nothing.
// 1. We start a reprovide 1 minute after startup anyways.
// 2. The window is really narrow.
time.Sleep(1 * time.Second)
errCh := make(chan error, 2)
// Trigger in the background
go func() {
errCh <- reprov.Trigger(ctx)
}()
// Wait for the trigger to really start.
startCh <- struct{}{}
// Try to trigger again, this should fail immediately.
if err := reprov.Trigger(ctx); err == nil {
t.Fatal("expected an error")
}
// Let the trigger progress.
close(keyCh)
// Check the result.
err := <-errCh
if err != nil {
t.Fatal(err)
}
// Try to trigger again, this should work.
go func() {
errCh <- reprov.Trigger(ctx)
}()
startCh <- struct{}{}
err = <-errCh
if err != nil {
t.Fatal(err)
}
}
type mockPinner struct { type mockPinner struct {
recursive []cid.Cid recursive []cid.Cid
direct []cid.Cid direct []cid.Cid
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment