// package merkledag implements the ipfs Merkle DAG datastructures. package merkledag import ( "bytes" "fmt" "sync" "time" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" blocks "github.com/jbenet/go-ipfs/blocks" bserv "github.com/jbenet/go-ipfs/blockservice" u "github.com/jbenet/go-ipfs/util" ) var log = u.Logger("merkledag") var ErrNotFound = fmt.Errorf("merkledag: not found") // NodeMap maps u.Keys to Nodes. // We cannot use []byte/Multihash for keys :( // so have to convert Multihash bytes to string (u.Key) type NodeMap map[u.Key]*Node // DAGService is an IPFS Merkle DAG service. type DAGService interface { Add(*Node) (u.Key, error) AddRecursive(*Node) error Get(u.Key) (*Node, error) Remove(*Node) error // GetDAG returns, in order, all the single leve child // nodes of the passed in node. GetDAG(context.Context, *Node) <-chan *Node } func NewDAGService(bs *bserv.BlockService) DAGService { return &dagService{bs} } // Node represents a node in the IPFS Merkle DAG. // nodes have opaque data and a set of navigable links. type Node struct { Links []*Link Data []byte // cache encoded/marshaled value encoded []byte cached mh.Multihash } // NodeStat is a statistics object for a Node. Mostly sizes. type NodeStat struct { NumLinks int // number of links in link table BlockSize int // size of the raw data LinksSize int // size of the links segment DataSize int // size of the data segment CumulativeSize int // cumulatie size of object + all it references } func (ns NodeStat) String() string { f := "NodeStat{NumLinks: %d, BlockSize: %d, LinksSize: %d, DataSize: %d, CumulativeSize: %d}" return fmt.Sprintf(f, ns.NumLinks, ns.BlockSize, ns.LinksSize, ns.DataSize, ns.CumulativeSize) } // Link represents an IPFS Merkle DAG Link between Nodes. type Link struct { // utf string name. should be unique per object Name string // utf8 // cumulative size of target object Size uint64 // multihash of the target object Hash mh.Multihash // a ptr to the actual node for graph manipulation Node *Node } type LinkSlice []*Link func (ls LinkSlice) Len() int { return len(ls) } func (ls LinkSlice) Swap(a, b int) { ls[a], ls[b] = ls[b], ls[a] } func (ls LinkSlice) Less(a, b int) bool { return ls[a].Name < ls[b].Name } // MakeLink creates a link to the given node func MakeLink(n *Node) (*Link, error) { s, err := n.Size() if err != nil { return nil, err } h, err := n.Multihash() if err != nil { return nil, err } return &Link{ Size: s, Hash: h, }, nil } // GetNode returns the MDAG Node that this link points to func (l *Link) GetNode(serv DAGService) (*Node, error) { if l.Node != nil { return l.Node, nil } return serv.Get(u.Key(l.Hash)) } // AddNodeLink adds a link to another node. func (n *Node) AddNodeLink(name string, that *Node) error { lnk, err := MakeLink(that) if err != nil { return err } lnk.Name = name lnk.Node = that n.Links = append(n.Links, lnk) return nil } // AddNodeLink adds a link to another node. without keeping a reference to // the child node func (n *Node) AddNodeLinkClean(name string, that *Node) error { lnk, err := MakeLink(that) if err != nil { return err } lnk.Name = name n.Links = append(n.Links, lnk) return nil } // Remove a link on this node by the given name func (n *Node) RemoveNodeLink(name string) error { for i, l := range n.Links { if l.Name == name { n.Links = append(n.Links[:i], n.Links[i+1:]...) return nil } } return ErrNotFound } // Copy returns a copy of the node. // NOTE: does not make copies of Node objects in the links. func (n *Node) Copy() *Node { nnode := new(Node) nnode.Data = make([]byte, len(n.Data)) copy(nnode.Data, n.Data) nnode.Links = make([]*Link, len(n.Links)) copy(nnode.Links, n.Links) return nnode } // Size returns the total size of the data addressed by node, // including the total sizes of references. func (n *Node) Size() (uint64, error) { b, err := n.Encoded(false) if err != nil { return 0, err } s := uint64(len(b)) for _, l := range n.Links { s += l.Size } return s, nil } // Stat returns statistics on the node. func (n *Node) Stat() (NodeStat, error) { enc, err := n.Encoded(false) if err != nil { return NodeStat{}, err } cumSize, err := n.Size() if err != nil { return NodeStat{}, err } return NodeStat{ NumLinks: len(n.Links), BlockSize: len(enc), LinksSize: len(enc) - len(n.Data), // includes framing. DataSize: len(n.Data), CumulativeSize: int(cumSize), }, nil } // Multihash hashes the encoded data of this node. func (n *Node) Multihash() (mh.Multihash, error) { // Note: Encoded generates the hash and puts it in n.cached. _, err := n.Encoded(false) if err != nil { return nil, err } return n.cached, nil } // Key returns the Multihash as a key, for maps. func (n *Node) Key() (u.Key, error) { h, err := n.Multihash() return u.Key(h), err } // dagService is an IPFS Merkle DAG service. // - the root is virtual (like a forest) // - stores nodes' data in a BlockService // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high type dagService struct { Blocks *bserv.BlockService } // Add adds a node to the dagService, storing the block in the BlockService func (n *dagService) Add(nd *Node) (u.Key, error) { if n == nil { // FIXME remove this assertion. protect with constructor invariant return "", fmt.Errorf("dagService is nil") } d, err := nd.Encoded(false) if err != nil { return "", err } b := new(blocks.Block) b.Data = d b.Multihash, err = nd.Multihash() if err != nil { return "", err } return n.Blocks.AddBlock(b) } // AddRecursive adds the given node and all child nodes to the BlockService func (n *dagService) AddRecursive(nd *Node) error { _, err := n.Add(nd) if err != nil { log.Info("AddRecursive Error: %s\n", err) return err } for _, link := range nd.Links { if link.Node != nil { err := n.AddRecursive(link.Node) if err != nil { return err } } } return nil } // Get retrieves a node from the dagService, fetching the block in the BlockService func (n *dagService) Get(k u.Key) (*Node, error) { if n == nil { return nil, fmt.Errorf("dagService is nil") } ctx, _ := context.WithTimeout(context.TODO(), time.Minute) // we shouldn't use an arbitrary timeout here. // since Get doesnt take in a context yet, we give a large upper bound. // think of an http request. we want it to go on as long as the client requests it. b, err := n.Blocks.GetBlock(ctx, k) if err != nil { return nil, err } return Decoded(b.Data) } // Remove deletes the given node and all of its children from the BlockService func (n *dagService) Remove(nd *Node) error { for _, l := range nd.Links { if l.Node != nil { n.Remove(l.Node) } } k, err := nd.Key() if err != nil { return err } return n.Blocks.DeleteBlock(k) } // FetchGraph asynchronously fetches all nodes that are children of the given // node, and returns a channel that may be waited upon for the fetch to complete func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} { log.Warning("Untested.") var wg sync.WaitGroup done := make(chan struct{}) for _, l := range root.Links { wg.Add(1) go func(lnk *Link) { // Signal child is done on way out defer wg.Done() select { case <-ctx.Done(): return } nd, err := lnk.GetNode(serv) if err != nil { log.Error(err) return } // Wait for children to finish <-FetchGraph(ctx, nd, serv) }(l) } go func() { wg.Wait() done <- struct{}{} }() return done } // FindLinks searches this nodes links for the given key, // returns the indexes of any links pointing to it func FindLinks(n *Node, k u.Key, start int) []int { var out []int keybytes := []byte(k) for i, lnk := range n.Links[start:] { if bytes.Equal([]byte(lnk.Hash), keybytes) { out = append(out, i+start) } } return out } // GetDAG will fill out all of the links of the given Node. // It returns a channel of nodes, which the caller can receive // all the child nodes of 'root' on, in proper order. func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node { sig := make(chan *Node) go func() { defer close(sig) var keys []u.Key for _, lnk := range root.Links { keys = append(keys, u.Key(lnk.Hash)) } blkchan := ds.Blocks.GetBlocks(ctx, keys) nodes := make([]*Node, len(root.Links)) next := 0 for blk := range blkchan { nd, err := Decoded(blk.Data) if err != nil { // NB: can occur in normal situations, with improperly formatted // input data log.Error("Got back bad block!") break } is := FindLinks(root, blk.Key(), next) for _, i := range is { nodes[i] = nd } for ; next < len(nodes) && nodes[next] != nil; next++ { sig <- nodes[next] } } if next < len(nodes) { // TODO: bubble errors back up. log.Errorf("Did not receive correct number of nodes!") } }() return sig }