Commit e5715368 authored by Erik Ingenito's avatar Erik Ingenito

Remove locking entirely

License: MIT
Signed-off-by: default avatarErik Ingenito <erik@carbonfive.com>
parent 0200a0e9
...@@ -11,13 +11,9 @@ import ( ...@@ -11,13 +11,9 @@ import (
routing "github.com/libp2p/go-libp2p-routing" routing "github.com/libp2p/go-libp2p-routing"
) )
var ( var log = logging.Logger("provider")
log = logging.Logger("provider")
)
const ( const provideOutgoingWorkerLimit = 8
provideOutgoingWorkerLimit = 8
)
// Provider announces blocks to the network // Provider announces blocks to the network
type Provider interface { type Provider interface {
...@@ -44,13 +40,13 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte ...@@ -44,13 +40,13 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte
// Start workers to handle provide requests. // Start workers to handle provide requests.
func (p *provider) Run() { func (p *provider) Run() {
p.queue.Run()
p.handleAnnouncements() p.handleAnnouncements()
} }
// Provide the given cid using specified strategy. // Provide the given cid using specified strategy.
func (p *provider) Provide(root cid.Cid) error { func (p *provider) Provide(root cid.Cid) error {
return p.queue.Enqueue(root) p.queue.Enqueue(root)
return nil
} }
// Handle all outgoing cids by providing (announcing) them // Handle all outgoing cids by providing (announcing) them
...@@ -61,12 +57,12 @@ func (p *provider) handleAnnouncements() { ...@@ -61,12 +57,12 @@ func (p *provider) handleAnnouncements() {
select { select {
case <-p.ctx.Done(): case <-p.ctx.Done():
return return
case entry := <-p.queue.Dequeue(): case c := <-p.queue.Dequeue():
log.Info("announce - start - ", entry.cid) log.Info("announce - start - ", c)
if err := p.contentRouting.Provide(p.ctx, entry.cid, true); err != nil { if err := p.contentRouting.Provide(p.ctx, c, true); err != nil {
log.Warningf("Unable to provide entry: %s, %s", entry.cid, err) log.Warningf("Unable to provide entry: %s, %s", c, err)
} }
log.Info("announce - end - ", entry.cid) log.Info("announce - end - ", c)
} }
} }
}() }()
......
...@@ -42,7 +42,7 @@ func TestAnnouncement(t *testing.T) { ...@@ -42,7 +42,7 @@ func TestAnnouncement(t *testing.T) {
cids := cid.NewSet() cids := cid.NewSet()
for i := 0; i < 100; i++ { for i := 0; i < 1000; i++ {
c := blockGenerator.Next().Cid() c := blockGenerator.Next().Cid()
cids.Add(c) cids.Add(c)
} }
...@@ -63,7 +63,7 @@ func TestAnnouncement(t *testing.T) { ...@@ -63,7 +63,7 @@ func TestAnnouncement(t *testing.T) {
t.Fatal("Wrong CID provided") t.Fatal("Wrong CID provided")
} }
cids.Remove(cp) cids.Remove(cp)
case <-time.After(time.Second * 1): case <-time.After(time.Second * 5):
t.Fatal("Timeout waiting for cids to be provided.") t.Fatal("Timeout waiting for cids to be provided.")
} }
} }
......
...@@ -2,27 +2,15 @@ package provider ...@@ -2,27 +2,15 @@ package provider
import ( import (
"context" "context"
"errors" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
"math" "math"
"strconv" "strconv"
"strings" "strings"
"sync"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
query "github.com/ipfs/go-datastore/query"
) )
// Entry allows for the durability in the queue. When a cid is dequeued it is
// not removed from the datastore until you call Complete() on the entry you
// receive.
type Entry struct {
cid cid.Cid
key datastore.Key
queue *Queue
}
// Queue provides a durable, FIFO interface to the datastore for storing cids // Queue provides a durable, FIFO interface to the datastore for storing cids
// //
// Durability just means that cids in the process of being provided when a // Durability just means that cids in the process of being provided when a
...@@ -32,17 +20,15 @@ type Queue struct { ...@@ -32,17 +20,15 @@ type Queue struct {
// used to differentiate queues in datastore // used to differentiate queues in datastore
// e.g. provider vs reprovider // e.g. provider vs reprovider
name string name string
ctx context.Context ctx context.Context
tail uint64 tail uint64
head uint64 head uint64
enqueueLock sync.Mutex
ds datastore.Datastore // Must be threadsafe ds datastore.Datastore // Must be threadsafe
dequeue chan *Entry dequeue chan cid.Cid
added chan struct{} enqueue chan cid.Cid
} }
// NewQueue creates a queue for cids // NewQueue creates a queue for cids
...@@ -57,124 +43,85 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, ...@@ -57,124 +43,85 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
ctx: ctx, ctx: ctx,
head: head, head: head,
tail: tail, tail: tail,
enqueueLock: sync.Mutex{},
ds: namespaced, ds: namespaced,
dequeue: make(chan *Entry), dequeue: make(chan cid.Cid),
added: make(chan struct{}), enqueue: make(chan cid.Cid),
} }
q.work()
return q, nil return q, nil
} }
// Enqueue puts a cid in the queue // Enqueue puts a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) error { func (q *Queue) Enqueue(cid cid.Cid) {
q.enqueueLock.Lock()
defer q.enqueueLock.Unlock()
nextKey := q.queueKey(q.tail)
if err := q.ds.Put(nextKey, cid.Bytes()); err != nil {
return err
}
q.tail++
select { select {
case q.added <- struct{}{}: case q.enqueue <- cid:
case <-q.ctx.Done(): case <-q.ctx.Done():
default:
} }
return nil
} }
// Dequeue returns a channel that if listened to will remove entries from the queue // Dequeue returns a channel that if listened to will remove entries from the queue
func (q *Queue) Dequeue() <-chan *Entry { func (q *Queue) Dequeue() <-chan cid.Cid {
return q.dequeue return q.dequeue
} }
// IsEmpty returns whether or not the queue has any items // Run dequeues and enqueues when available.
func (q *Queue) IsEmpty() bool { func (q *Queue) work() {
return (q.tail - q.head) == 0
}
// Run dequeues items when the dequeue channel is available to
// be written to.
func (q *Queue) Run() {
go func() { go func() {
for { for {
if q.IsEmpty() { var c cid.Cid = cid.Undef
select { var key datastore.Key
case <-q.ctx.Done(): var dequeue chan cid.Cid
return
case <-q.added: // If we're not empty dequeue a cid and ship it
if q.head < q.tail {
key = q.queueKey(q.head)
value, err := q.ds.Get(key)
if err == datastore.ErrNotFound {
log.Warningf("Missing entry in queue: %s", err)
q.head++
continue
} else if err != nil {
log.Warningf("Error fetching from queue: %s", err)
continue
}
c, err = cid.Parse(value)
if err != nil {
log.Warningf("Error marshalling Cid from queue: ", err)
q.head++
err = q.ds.Delete(key)
continue
} }
} }
entry, err := q.next() if c != cid.Undef {
if err != nil { dequeue = q.dequeue
log.Warningf("Error Dequeue()-ing: %s, %s", entry, err)
continue
} }
select { select {
case toQueue := <-q.enqueue:
nextKey := q.queueKey(q.tail)
if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil {
log.Errorf("Failed to enqueue cid: %s", err)
}
q.tail++
case dequeue <- c:
q.head++
err := q.ds.Delete(key)
if err != nil {
log.Errorf("Failed to delete queued cid: %s", err)
}
case <-q.ctx.Done(): case <-q.ctx.Done():
return return
case q.dequeue <- entry:
q.head++
err = q.ds.Delete(entry.key)
} }
} }
}() }()
} }
// Find the next item in the queue, crawl forward if an entry is not
// found in the next spot.
func (q *Queue) next() (*Entry, error) {
var key datastore.Key
var value []byte
var err error
for {
if q.head >= q.tail {
return nil, errors.New("next: no more entries in queue returning")
}
select {
case <-q.ctx.Done():
return nil, nil
default:
}
key = q.queueKey(q.head)
value, err = q.ds.Get(key)
value, err = q.ds.Get(key)
if err == datastore.ErrNotFound {
q.head++
continue
} else if err != nil {
return nil, err
} else {
break
}
}
id, err := cid.Parse(value)
if err != nil {
return nil, err
}
entry := &Entry{
cid: id,
key: key,
queue: q,
}
if err != nil {
return nil, err
}
return entry, nil
}
func (q *Queue) queueKey(id uint64) datastore.Key { func (q *Queue) queueKey(id uint64) datastore.Key {
return datastore.NewKey(strconv.FormatUint(id, 10)) return datastore.NewKey(strconv.FormatUint(id, 10))
} }
...@@ -190,11 +137,6 @@ func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Data ...@@ -190,11 +137,6 @@ func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Data
var tail uint64 var tail uint64
var head uint64 = math.MaxUint64 var head uint64 = math.MaxUint64
for entry := range results.Next() { for entry := range results.Next() {
select {
case <-ctx.Done():
return 0, 0, nil
default:
}
trimmed := strings.TrimPrefix(entry.Key, "/") trimmed := strings.TrimPrefix(entry.Key, "/")
id, err := strconv.ParseUint(trimmed, 10, 64) id, err := strconv.ParseUint(trimmed, 10, 64)
if err != nil { if err != nil {
......
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
) )
func makeCids(n int) []cid.Cid { func makeCids(n int) []cid.Cid {
cids := make([]cid.Cid, 0, 10) cids := make([]cid.Cid, 0, n)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
c := blockGenerator.Next().Cid() c := blockGenerator.Next().Cid()
cids = append(cids, c) cids = append(cids, c)
...@@ -23,8 +23,8 @@ func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { ...@@ -23,8 +23,8 @@ func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) {
for _, c := range cids { for _, c := range cids {
select { select {
case dequeued := <- q.dequeue: case dequeued := <- q.dequeue:
if c != dequeued.cid { if c != dequeued {
t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued.cid) t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued)
} }
case <-time.After(time.Second * 1): case <-time.After(time.Second * 1):
...@@ -42,15 +42,11 @@ func TestBasicOperation(t *testing.T) { ...@@ -42,15 +42,11 @@ func TestBasicOperation(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
queue.Run()
cids := makeCids(10) cids := makeCids(10)
for _, c := range cids { for _, c := range cids {
err = queue.Enqueue(c) queue.Enqueue(c)
if err != nil {
t.Fatal("Failed to enqueue CID")
}
} }
assertOrdered(cids, queue, t) assertOrdered(cids, queue, t)
...@@ -65,15 +61,11 @@ func TestInitialization(t *testing.T) { ...@@ -65,15 +61,11 @@ func TestInitialization(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
queue.Run()
cids := makeCids(10) cids := makeCids(10)
for _, c := range cids { for _, c := range cids {
err = queue.Enqueue(c) queue.Enqueue(c)
if err != nil {
t.Fatal("Failed to enqueue CID")
}
} }
assertOrdered(cids[:5], queue, t) assertOrdered(cids[:5], queue, t)
...@@ -83,7 +75,6 @@ func TestInitialization(t *testing.T) { ...@@ -83,7 +75,6 @@ func TestInitialization(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
queue.Run()
assertOrdered(cids[5:], queue, t) assertOrdered(cids[5:], queue, t)
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment