Commit bfdc7966 authored by Jeromy's avatar Jeromy

use an option type to simplify concurrency

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent b88c9c04
...@@ -3,6 +3,7 @@ package merkledag ...@@ -3,6 +3,7 @@ package merkledag
import ( import (
"fmt" "fmt"
"sync"
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
...@@ -24,7 +25,7 @@ type DAGService interface { ...@@ -24,7 +25,7 @@ type DAGService interface {
// GetDAG returns, in order, all the single leve child // GetDAG returns, in order, all the single leve child
// nodes of the passed in node. // nodes of the passed in node.
GetMany(context.Context, []key.Key) (<-chan *Node, <-chan error) GetMany(context.Context, []key.Key) <-chan *NodeOption
Batch() *Batch Batch() *Batch
} }
...@@ -145,9 +146,13 @@ func FindLinks(links []key.Key, k key.Key, start int) []int { ...@@ -145,9 +146,13 @@ func FindLinks(links []key.Key, k key.Key, start int) []int {
return out return out
} }
func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node, <-chan error) { type NodeOption struct {
out := make(chan *Node, len(keys)) Node *Node
errs := make(chan error, 1) Err error
}
func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption {
out := make(chan *NodeOption, len(keys))
blocks := ds.Blocks.GetBlocks(ctx, keys) blocks := ds.Blocks.GetBlocks(ctx, keys)
var count int var count int
...@@ -158,27 +163,27 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node ...@@ -158,27 +163,27 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node
case b, ok := <-blocks: case b, ok := <-blocks:
if !ok { if !ok {
if count != len(keys) { if count != len(keys) {
errs <- fmt.Errorf("failed to fetch all nodes") out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
} }
return return
} }
nd, err := Decoded(b.Data) nd, err := Decoded(b.Data)
if err != nil { if err != nil {
errs <- err out <- &NodeOption{Err: err}
return return
} }
// buffered, no need to select // buffered, no need to select
out <- nd out <- &NodeOption{Node: nd}
count++ count++
case <-ctx.Done(): case <-ctx.Done():
errs <- ctx.Err() out <- &NodeOption{Err: ctx.Err()}
return return
} }
} }
}() }()
return out, errs return out
} }
// GetDAG will fill out all of the links of the given Node. // GetDAG will fill out all of the links of the given Node.
...@@ -213,15 +218,22 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter { ...@@ -213,15 +218,22 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
nodechan, errchan := ds.GetMany(ctx, dedupedKeys) nodechan := ds.GetMany(ctx, dedupedKeys)
for count := 0; count < len(keys); { for count := 0; count < len(keys); {
select { select {
case nd, ok := <-nodechan: case opt, ok := <-nodechan:
if !ok { if !ok {
return return
} }
if opt.Err != nil {
log.Error("error fetching: ", opt.Err)
return
}
nd := opt.Node
k, err := nd.Key() k, err := nd.Key()
if err != nil { if err != nil {
log.Error("Failed to get node key: ", err) log.Error("Failed to get node key: ", err)
...@@ -233,9 +245,6 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter { ...@@ -233,9 +245,6 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
count++ count++
sendChans[i] <- nd sendChans[i] <- nd
} }
case err := <-errchan:
log.Error("error fetching: ", err)
return
case <-ctx.Done(): case <-ctx.Done():
return return
} }
...@@ -356,24 +365,30 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K ...@@ -356,24 +365,30 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error { func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
toprocess := make(chan []key.Key, 8) toprocess := make(chan []key.Key, 8)
nodes := make(chan *Node, 8) nodes := make(chan *NodeOption, 8)
errs := make(chan error, 1)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
defer close(toprocess) defer close(toprocess)
go fetchNodes(ctx, ds, toprocess, nodes, errs) go fetchNodes(ctx, ds, toprocess, nodes)
nodes <- root nodes <- &NodeOption{Node: root}
live := 1 live := 1
for { for {
select { select {
case nd, ok := <-nodes: case opt, ok := <-nodes:
if !ok { if !ok {
return nil return nil
} }
if opt.Err != nil {
return opt.Err
}
nd := opt.Node
// a node has been fetched // a node has been fetched
live-- live--
...@@ -398,38 +413,35 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set ...@@ -398,38 +413,35 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set
return ctx.Err() return ctx.Err()
} }
} }
case err := <-errs:
return err
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
} }
} }
} }
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) { func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *NodeOption) {
defer close(out) var wg sync.WaitGroup
defer func() {
// wait for all 'get' calls to complete so we don't accidentally send
// on a closed channel
wg.Wait()
close(out)
}()
get := func(ks []key.Key) { get := func(ks []key.Key) {
nodes, errch := ds.GetMany(ctx, ks) defer wg.Done()
for { nodes := ds.GetMany(ctx, ks)
for opt := range nodes {
select { select {
case nd, ok := <-nodes: case out <- opt:
if !ok { case <-ctx.Done():
return
}
select {
case out <- nd:
case <-ctx.Done():
return
}
case err := <-errch:
errs <- err
return return
} }
} }
} }
for ks := range in { for ks := range in {
wg.Add(1)
go get(ks) go get(ks)
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment