Commit f3ed34ed authored by Jeromy's avatar Jeromy

some bitswap cleanup

parent b6ffb6a3
...@@ -28,7 +28,7 @@ type DAGService interface { ...@@ -28,7 +28,7 @@ type DAGService interface {
AddRecursive(*Node) error AddRecursive(*Node) error
Get(u.Key) (*Node, error) Get(u.Key) (*Node, error)
Remove(*Node) error Remove(*Node) error
BatchFetch(context.Context, *Node) <-chan *Node GetKeysAsync(context.Context, *Node) <-chan *Node
} }
func NewDAGService(bs *bserv.BlockService) DAGService { func NewDAGService(bs *bserv.BlockService) DAGService {
...@@ -298,41 +298,33 @@ func FindLink(n *Node, k u.Key, found []*Node) (int, error) { ...@@ -298,41 +298,33 @@ func FindLink(n *Node, k u.Key, found []*Node) (int, error) {
return -1, u.ErrNotFound return -1, u.ErrNotFound
} }
// BatchFetch will fill out all of the links of the given Node. // GetKeysAsync will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive // It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order. // all the child nodes of 'root' on, in proper order.
func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node { func (ds *dagService) GetKeysAsync(ctx context.Context, root *Node) <-chan *Node {
sig := make(chan *Node) sig := make(chan *Node)
go func() { go func() {
var keys []u.Key var keys []u.Key
nodes := make([]*Node, len(root.Links)) nodes := make([]*Node, len(root.Links))
//temp
recvd := []int{}
//
//
next := 0
//
for _, lnk := range root.Links { for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash)) keys = append(keys, u.Key(lnk.Hash))
} }
blkchan := ds.Blocks.GetBlocks(ctx, keys) blkchan := ds.Blocks.GetBlocks(ctx, keys)
count := 0 next := 0
for blk := range blkchan { for blk := range blkchan {
count++
i, err := FindLink(root, blk.Key(), nodes) i, err := FindLink(root, blk.Key(), nodes)
if err != nil { if err != nil {
// NB: can only occur as a result of programmer error
panic("Received block that wasnt in this nodes links!") panic("Received block that wasnt in this nodes links!")
} }
recvd = append(recvd, i)
nd, err := Decoded(blk.Data) nd, err := Decoded(blk.Data)
if err != nil { if err != nil {
// NB: can occur in normal situations, with improperly formatted
// input data
log.Error("Got back bad block!") log.Error("Got back bad block!")
break break
} }
...@@ -347,23 +339,11 @@ func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node { ...@@ -347,23 +339,11 @@ func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node {
} }
} }
if next < len(nodes) { if next < len(nodes) {
log.Errorf("count = %d, links = %d", count, len(nodes)) // TODO: bubble errors back up.
log.Error(recvd) log.Errorf("Did not receive correct number of nodes!")
panic("didnt receive all requested blocks!")
} }
close(sig) close(sig)
}() }()
return sig return sig
} }
func checkForDupes(ks []u.Key) bool {
seen := make(map[u.Key]struct{})
for _, k := range ks {
if _, ok := seen[k]; ok {
return true
}
seen[k] = struct{}{}
}
return false
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment