Commit b3a1f4b9 authored by Steven Allen's avatar Steven Allen

[WIP] [RFC] extract dagservice and friends from go-ipfs

This is a WIP/RFC attempt at extracting DAGService from go-ipfs.
parent ac092e5f
package format
import (
"context"
cid "github.com/ipfs/go-cid"
)
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
var out []int
for i, lnk_c := range links[start:] {
if c.Equals(lnk_c) {
out = append(out, i+start)
}
}
return out
}
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func GetDAG(ctx context.Context, ds DAGService, root Node) []NodePromise {
var cids []*cid.Cid
for _, lnk := range root.Links() {
cids = append(cids, lnk.Cid)
}
return GetNodes(ctx, ds, cids)
}
// GetNodes returns an array of 'FutureNode' promises, with each corresponding
// to the key with the same index as the passed in keys
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodePromise {
// Early out if no work to do
if len(keys) == 0 {
return nil
}
promises := make([]NodePromise, len(keys))
for i := range keys {
promises[i] = newNodePromise(ctx)
}
dedupedKeys := dedupeKeys(keys)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodechan := ds.GetMany(ctx, dedupedKeys)
for count := 0; count < len(keys); {
select {
case opt, ok := <-nodechan:
if !ok {
for _, p := range promises {
p.Fail(ErrNotFound)
}
return
}
if opt.Err != nil {
for _, p := range promises {
p.Fail(opt.Err)
}
return
}
nd := opt.Node
is := FindLinks(keys, nd.Cid(), 0)
for _, i := range is {
count++
promises[i].Send(nd)
}
case <-ctx.Done():
return
}
}
}()
return promises
}
// Remove duplicates from a list of keys
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
set := cid.NewSet()
for _, c := range cids {
set.Add(c)
}
return set.Keys()
}
......@@ -44,10 +44,6 @@ type Node interface {
Size() (uint64, error)
}
type NodeGetter interface {
Get(context.Context, *cid.Cid) (Node, error)
}
// Link represents an IPFS Merkle DAG Link between Nodes.
type Link struct {
// utf string name. should be unique per object
......
package format
import (
"context"
"fmt"
cid "github.com/ipfs/go-cid"
)
var ErrNotFound = fmt.Errorf("merkledag: not found")
// Either a node or an error.
type NodeOption struct {
Node Node
Err error
}
// TODO: This name kind of sucks.
// NodeResolver?
// NodeService?
// Just Resolver?
type NodeGetter interface {
Get(context.Context, *cid.Cid) (Node, error)
}
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
NodeGetter
Add(Node) (*cid.Cid, error)
Remove(Node) error
// TODO: This is returning them in-order?? Why not just use []NodePromise?
// Maybe add a couple of helpers for getting them in-order and as-available?
// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
Batch() Batch
LinkService
}
// An interface for batch-adding nodes to a DAG.
// TODO: Is this really the *right* level to do this at?
// Why not just `DAGService.AddMany` + a concrete helper type?
//
// This will be a breaking change *regardless* of what we do as `Batch` *used*
// to be a plain struct (passed around by pointer). I had to change this to
// avoid requiring a `BlockService` (which would introduce the concept of
// exchanges and I really don't want to go down that rabbit hole).
type Batch interface {
Add(nd Node) (*cid.Cid, error)
Commit() error
}
// TODO: Replace this? I'm really not convinced this interface pulls its weight.
//
// Instead, we could add an `Offline()` function to `NodeGetter` that returns an
// offline `NodeGetter` and then define the following function:
//
// ```
// func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) {
// if c.Type() == cid.Raw {
// return nil, nil
// }
// node, err := ng.Get(ctx, c)
// if err != nil {
// return nil, err
// }
// return node.Links(), nil
// }
// ```
//
// Why *not* do this? We might decide to store a light-weight DAG of links
// without actually storing the data. I don't really find that to be a
// convincing argument.
type LinkService interface {
// GetLinks return all links for a node. The complete node does not
// necessarily have to exist locally, or at all. For example, raw
// leaves cannot possibly have links so there is no need to look
// at the node.
// TODO: These *really* should be Cids, not Links
GetLinks(context.Context, *cid.Cid) ([]*Link, error)
GetOfflineLinkService() LinkService
}
package format
import (
"context"
"sync"
)
// TODO: I renamed this to NodePromise because:
// 1. NodeGetter is a naming conflict.
// 2. It's a promise...
// TODO: Should this even be an interface? It seems like a simple struct would
// suffice.
// NodePromise provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
type NodePromise interface {
Get(context.Context) (Node, error)
Fail(err error)
Send(Node)
}
func newNodePromise(ctx context.Context) NodePromise {
return &nodePromise{
recv: make(chan Node, 1),
ctx: ctx,
err: make(chan error, 1),
}
}
type nodePromise struct {
cache Node
clk sync.Mutex
recv chan Node
ctx context.Context
err chan error
}
func (np *nodePromise) Fail(err error) {
np.clk.Lock()
v := np.cache
np.clk.Unlock()
// if promise has a value, don't fail it
if v != nil {
return
}
np.err <- err
}
func (np *nodePromise) Send(nd Node) {
var already bool
np.clk.Lock()
if np.cache != nil {
already = true
}
np.cache = nd
np.clk.Unlock()
if already {
panic("sending twice to the same promise is an error!")
}
np.recv <- nd
}
func (np *nodePromise) Get(ctx context.Context) (Node, error) {
np.clk.Lock()
c := np.cache
np.clk.Unlock()
if c != nil {
return c, nil
}
select {
case nd := <-np.recv:
return nd, nil
case <-np.ctx.Done():
return nil, np.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
case err := <-np.err:
return nil, err
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment