merkledag.go 7.54 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"bytes"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8 9
	"time"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Chas Leichner's avatar
Chas Leichner committed
11

12
	mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
13
	blocks "github.com/jbenet/go-ipfs/blocks"
14
	bserv "github.com/jbenet/go-ipfs/blockservice"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18
var log = u.Logger("merkledag")
19
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
20

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
21 22 23
// NodeMap maps u.Keys to Nodes.
// We cannot use []byte/Multihash for keys :(
// so have to convert Multihash bytes to string (u.Key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
type NodeMap map[u.Key]*Node

Jeromy's avatar
Jeromy committed
26 27 28 29 30 31
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
	Add(*Node) (u.Key, error)
	AddRecursive(*Node) error
	Get(u.Key) (*Node, error)
	Remove(*Node) error
32 33 34 35

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
	GetDAG(context.Context, *Node) <-chan *Node
Jeromy's avatar
Jeromy committed
36 37 38 39 40 41
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
42
// Node represents a node in the IPFS Merkle DAG.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
// nodes have opaque data and a set of navigable links.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44
type Node struct {
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
45 46
	Links []*Link
	Data  []byte
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48 49

	// cache encoded/marshaled value
	encoded []byte
50 51

	cached mh.Multihash
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
54
// Link represents an IPFS Merkle DAG Link between Nodes.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
type Link struct {
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
56 57
	// utf string name. should be unique per object
	Name string // utf8
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58

Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
59 60
	// cumulative size of target object
	Size uint64
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61

Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
62 63
	// multihash of the target object
	Hash mh.Multihash
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65 66

	// a ptr to the actual node for graph manipulation
	Node *Node
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67 68
}

Jeromy's avatar
Jeromy committed
69
// MakeLink creates a link to the given node
Jeromy's avatar
Jeromy committed
70 71
func MakeLink(n *Node) (*Link, error) {
	s, err := n.Size()
72
	if err != nil {
Jeromy's avatar
Jeromy committed
73
		return nil, err
74 75
	}

Jeromy's avatar
Jeromy committed
76
	h, err := n.Multihash()
77
	if err != nil {
Jeromy's avatar
Jeromy committed
78
		return nil, err
79
	}
Jeromy's avatar
Jeromy committed
80
	return &Link{
81 82
		Size: s,
		Hash: h,
Jeromy's avatar
Jeromy committed
83
	}, nil
84 85
}

Jeromy's avatar
Jeromy committed
86
// GetNode returns the MDAG Node that this link points to
87
func (l *Link) GetNode(serv DAGService) (*Node, error) {
88 89 90 91 92 93 94
	if l.Node != nil {
		return l.Node, nil
	}

	return serv.Get(u.Key(l.Hash))
}

Jeromy's avatar
Jeromy committed
95 96 97
// AddNodeLink adds a link to another node.
func (n *Node) AddNodeLink(name string, that *Node) error {
	lnk, err := MakeLink(that)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98 99 100
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
101 102 103 104 105 106
	lnk.Name = name
	lnk.Node = that

	n.Links = append(n.Links, lnk)
	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107

Jeromy's avatar
Jeromy committed
108 109 110 111
// AddNodeLink adds a link to another node. without keeping a reference to
// the child node
func (n *Node) AddNodeLinkClean(name string, that *Node) error {
	lnk, err := MakeLink(that)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112 113 114
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
115
	lnk.Name = name
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116

Jeromy's avatar
Jeromy committed
117
	n.Links = append(n.Links, lnk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119
	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120

Jeromy's avatar
Jeromy committed
121
// Remove a link on this node by the given name
122 123 124 125 126 127 128
func (n *Node) RemoveNodeLink(name string) error {
	for i, l := range n.Links {
		if l.Name == name {
			n.Links = append(n.Links[:i], n.Links[i+1:]...)
			return nil
		}
	}
129
	return ErrNotFound
130 131
}

132 133
// Copy returns a copy of the node.
// NOTE: does not make copies of Node objects in the links.
Jeromy's avatar
Jeromy committed
134 135 136 137 138 139 140 141 142 143
func (n *Node) Copy() *Node {
	nnode := new(Node)
	nnode.Data = make([]byte, len(n.Data))
	copy(nnode.Data, n.Data)

	nnode.Links = make([]*Link, len(n.Links))
	copy(nnode.Links, n.Links)
	return nnode
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
144 145
// Size returns the total size of the data addressed by node,
// including the total sizes of references.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146
func (n *Node) Size() (uint64, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147
	b, err := n.Encoded(false)
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
148 149 150 151
	if err != nil {
		return 0, err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152
	s := uint64(len(b))
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
153 154 155 156
	for _, l := range n.Links {
		s += l.Size
	}
	return s, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
159
// Multihash hashes the encoded data of this node.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
func (n *Node) Multihash() (mh.Multihash, error) {
161
	// Note: Encoded generates the hash and puts it in n.cached.
162
	_, err := n.Encoded(false)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163 164 165 166
	if err != nil {
		return nil, err
	}

167
	return n.cached, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
170
// Key returns the Multihash as a key, for maps.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171 172 173 174
func (n *Node) Key() (u.Key, error) {
	h, err := n.Multihash()
	return u.Key(h), err
}
175

176
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
177 178
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
179 180
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
181
type dagService struct {
182
	Blocks *bserv.BlockService
183 184
}

185 186
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd *Node) (u.Key, error) {
187
	k, _ := nd.Key()
188
	log.Debugf("DagService Add [%s]", k)
189
	if n == nil {
190
		return "", fmt.Errorf("dagService is nil")
191 192 193 194 195 196 197
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

198 199 200
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
201 202 203 204 205 206 207
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

Jeromy's avatar
Jeromy committed
208
// AddRecursive adds the given node and all child nodes to the BlockService
209
func (n *dagService) AddRecursive(nd *Node) error {
210 211
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
212
		log.Info("AddRecursive Error: %s\n", err)
213 214 215 216
		return err
	}

	for _, link := range nd.Links {
217 218 219 220 221
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
222 223 224 225 226 227
		}
	}

	return nil
}

228 229
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(k u.Key) (*Node, error) {
230
	if n == nil {
231
		return nil, fmt.Errorf("dagService is nil")
232 233
	}

Jeromy's avatar
Jeromy committed
234 235
	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	b, err := n.Blocks.GetBlock(ctx, k)
236 237 238 239 240 241
	if err != nil {
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
242

Jeromy's avatar
Jeromy committed
243
// Remove deletes the given node and all of its children from the BlockService
244
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
245 246 247 248 249 250 251 252 253 254 255
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
256

Jeromy's avatar
Jeromy committed
257 258
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
Jeromy's avatar
Jeromy committed
259
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
260
	log.Warning("Untested.")
Jeromy's avatar
Jeromy committed
261 262 263
	var wg sync.WaitGroup
	done := make(chan struct{})

Jeromy's avatar
Jeromy committed
264
	for _, l := range root.Links {
Jeromy's avatar
Jeromy committed
265
		wg.Add(1)
Jeromy's avatar
Jeromy committed
266
		go func(lnk *Link) {
Jeromy's avatar
Jeromy committed
267 268 269

			// Signal child is done on way out
			defer wg.Done()
Jeromy's avatar
Jeromy committed
270 271 272 273 274 275 276 277 278 279
			select {
			case <-ctx.Done():
				return
			}

			nd, err := lnk.GetNode(serv)
			if err != nil {
				log.Error(err)
				return
			}
Jeromy's avatar
Jeromy committed
280 281 282

			// Wait for children to finish
			<-FetchGraph(ctx, nd, serv)
Jeromy's avatar
Jeromy committed
283 284
		}(l)
	}
Jeromy's avatar
Jeromy committed
285 286 287 288 289 290 291

	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	return done
Jeromy's avatar
Jeromy committed
292
}
293

Jeromy's avatar
Jeromy committed
294 295
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
296 297
func FindLinks(n *Node, k u.Key) []int {
	var out []int
298
	keybytes := []byte(k)
Jeromy's avatar
Jeromy committed
299
	for i, lnk := range n.Links {
300
		if bytes.Equal([]byte(lnk.Hash), keybytes) {
Jeromy's avatar
Jeromy committed
301
			out = append(out, i)
Jeromy's avatar
Jeromy committed
302 303
		}
	}
Jeromy's avatar
Jeromy committed
304
	return out
Jeromy's avatar
Jeromy committed
305 306
}

307
// GetDAG will fill out all of the links of the given Node.
308 309
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
310
func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
Jeromy's avatar
Jeromy committed
311
	sig := make(chan *Node)
312
	go func() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
313 314
		defer close(sig)

315
		var keys []u.Key
Jeromy's avatar
Jeromy committed
316 317 318 319 320
		for _, lnk := range root.Links {
			keys = append(keys, u.Key(lnk.Hash))
		}
		blkchan := ds.Blocks.GetBlocks(ctx, keys)

321
		nodes := make([]*Node, len(root.Links))
Jeromy's avatar
Jeromy committed
322
		next := 0
323
		for blk := range blkchan {
324 325
			nd, err := Decoded(blk.Data)
			if err != nil {
Jeromy's avatar
Jeromy committed
326
				// NB: can occur in normal situations, with improperly formatted
Brian Tiger Chow's avatar
Brian Tiger Chow committed
327
				// input data
328 329 330
				log.Error("Got back bad block!")
				break
			}
Jeromy's avatar
Jeromy committed
331 332 333
			is := FindLinks(root, blk.Key())
			for _, i := range is {
				nodes[i] = nd
334
			}
335

Jeromy's avatar
Jeromy committed
336 337
			for ; next < len(nodes) && nodes[next] != nil; next++ {
				sig <- nodes[next]
338 339
			}
		}
340
		if next < len(nodes) {
Jeromy's avatar
Jeromy committed
341 342
			// TODO: bubble errors back up.
			log.Errorf("Did not receive correct number of nodes!")
343
		}
344
	}()
345

346
	return sig
347
}