merkledag.go 7.08 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
Jeromy's avatar
Jeromy committed
6
	"sync"
Jeromy's avatar
Jeromy committed
7 8
	"time"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Chas Leichner's avatar
Chas Leichner committed
10

11
	mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
12
	blocks "github.com/jbenet/go-ipfs/blocks"
13
	bserv "github.com/jbenet/go-ipfs/blockservice"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17
var log = u.Logger("merkledag")
18
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
19

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
20 21 22
// NodeMap maps u.Keys to Nodes.
// We cannot use []byte/Multihash for keys :(
// so have to convert Multihash bytes to string (u.Key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24
type NodeMap map[u.Key]*Node

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
25
// Node represents a node in the IPFS Merkle DAG.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26
// nodes have opaque data and a set of navigable links.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
type Node struct {
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
28 29
	Links []*Link
	Data  []byte
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30 31 32

	// cache encoded/marshaled value
	encoded []byte
33 34

	cached mh.Multihash
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35 36
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37
// Link represents an IPFS Merkle DAG Link between Nodes.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38
type Link struct {
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
39 40
	// utf string name. should be unique per object
	Name string // utf8
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41

Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
42 43
	// cumulative size of target object
	Size uint64
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44

Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
45 46
	// multihash of the target object
	Hash mh.Multihash
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48 49

	// a ptr to the actual node for graph manipulation
	Node *Node
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50 51
}

Jeromy's avatar
Jeromy committed
52
// MakeLink creates a link to the given node
Jeromy's avatar
Jeromy committed
53 54
func MakeLink(n *Node) (*Link, error) {
	s, err := n.Size()
55
	if err != nil {
Jeromy's avatar
Jeromy committed
56
		return nil, err
57 58
	}

Jeromy's avatar
Jeromy committed
59
	h, err := n.Multihash()
60
	if err != nil {
Jeromy's avatar
Jeromy committed
61
		return nil, err
62
	}
Jeromy's avatar
Jeromy committed
63
	return &Link{
64 65
		Size: s,
		Hash: h,
Jeromy's avatar
Jeromy committed
66
	}, nil
67 68
}

Jeromy's avatar
Jeromy committed
69
// GetNode returns the MDAG Node that this link points to
70
func (l *Link) GetNode(serv DAGService) (*Node, error) {
71 72 73 74 75 76 77
	if l.Node != nil {
		return l.Node, nil
	}

	return serv.Get(u.Key(l.Hash))
}

Jeromy's avatar
Jeromy committed
78 79 80
// AddNodeLink adds a link to another node.
func (n *Node) AddNodeLink(name string, that *Node) error {
	lnk, err := MakeLink(that)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81 82 83
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
84 85 86 87 88 89
	lnk.Name = name
	lnk.Node = that

	n.Links = append(n.Links, lnk)
	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90

Jeromy's avatar
Jeromy committed
91 92 93 94
// AddNodeLink adds a link to another node. without keeping a reference to
// the child node
func (n *Node) AddNodeLinkClean(name string, that *Node) error {
	lnk, err := MakeLink(that)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
95 96 97
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
98
	lnk.Name = name
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99

Jeromy's avatar
Jeromy committed
100
	n.Links = append(n.Links, lnk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101 102
	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103

Jeromy's avatar
Jeromy committed
104
// Remove a link on this node by the given name
105 106 107 108 109 110 111
func (n *Node) RemoveNodeLink(name string) error {
	for i, l := range n.Links {
		if l.Name == name {
			n.Links = append(n.Links[:i], n.Links[i+1:]...)
			return nil
		}
	}
112
	return ErrNotFound
113 114
}

115 116
// Copy returns a copy of the node.
// NOTE: does not make copies of Node objects in the links.
Jeromy's avatar
Jeromy committed
117 118 119 120 121 122 123 124 125 126
func (n *Node) Copy() *Node {
	nnode := new(Node)
	nnode.Data = make([]byte, len(n.Data))
	copy(nnode.Data, n.Data)

	nnode.Links = make([]*Link, len(n.Links))
	copy(nnode.Links, n.Links)
	return nnode
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
127 128
// Size returns the total size of the data addressed by node,
// including the total sizes of references.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129
func (n *Node) Size() (uint64, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130
	b, err := n.Encoded(false)
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
131 132 133 134
	if err != nil {
		return 0, err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135
	s := uint64(len(b))
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
136 137 138 139
	for _, l := range n.Links {
		s += l.Size
	}
	return s, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
142
// Multihash hashes the encoded data of this node.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
143
func (n *Node) Multihash() (mh.Multihash, error) {
144
	// Note: Encoded generates the hash and puts it in n.cached.
145
	_, err := n.Encoded(false)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146 147 148 149
	if err != nil {
		return nil, err
	}

150
	return n.cached, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
153
// Key returns the Multihash as a key, for maps.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154 155 156 157
func (n *Node) Key() (u.Key, error) {
	h, err := n.Multihash()
	return u.Key(h), err
}
158

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
159
// DAGService is an IPFS Merkle DAG service.
160 161 162 163 164 165 166 167 168 169 170 171
type DAGService interface {
	Add(*Node) (u.Key, error)
	AddRecursive(*Node) error
	Get(u.Key) (*Node, error)
	Remove(*Node) error
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
172 173
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
174 175
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
176
type dagService struct {
177
	Blocks *bserv.BlockService
178 179
}

180 181
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd *Node) (u.Key, error) {
182
	k, _ := nd.Key()
183
	log.Debugf("DagService Add [%s]", k)
184
	if n == nil {
185
		return "", fmt.Errorf("dagService is nil")
186 187 188 189 190 191 192
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

193 194 195
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
196 197 198 199 200 201 202
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

Jeromy's avatar
Jeromy committed
203
// AddRecursive adds the given node and all child nodes to the BlockService
204
func (n *dagService) AddRecursive(nd *Node) error {
205 206
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
207
		log.Info("AddRecursive Error: %s\n", err)
208 209 210 211
		return err
	}

	for _, link := range nd.Links {
212 213 214 215 216
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
217 218 219 220 221 222
		}
	}

	return nil
}

223 224
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(k u.Key) (*Node, error) {
225
	if n == nil {
226
		return nil, fmt.Errorf("dagService is nil")
227 228
	}

Jeromy's avatar
Jeromy committed
229 230
	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	b, err := n.Blocks.GetBlock(ctx, k)
231 232 233 234 235 236
	if err != nil {
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
237

Jeromy's avatar
Jeromy committed
238
// Remove deletes the given node and all of its children from the BlockService
239
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
240 241 242 243 244 245 246 247 248 249 250
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
251

Jeromy's avatar
Jeromy committed
252 253
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
Jeromy's avatar
Jeromy committed
254
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
255
	log.Warning("Untested.")
Jeromy's avatar
Jeromy committed
256 257 258
	var wg sync.WaitGroup
	done := make(chan struct{})

Jeromy's avatar
Jeromy committed
259
	for _, l := range root.Links {
Jeromy's avatar
Jeromy committed
260
		wg.Add(1)
Jeromy's avatar
Jeromy committed
261
		go func(lnk *Link) {
Jeromy's avatar
Jeromy committed
262 263 264

			// Signal child is done on way out
			defer wg.Done()
Jeromy's avatar
Jeromy committed
265 266 267 268 269 270 271 272 273 274
			select {
			case <-ctx.Done():
				return
			}

			nd, err := lnk.GetNode(serv)
			if err != nil {
				log.Error(err)
				return
			}
Jeromy's avatar
Jeromy committed
275 276 277

			// Wait for children to finish
			<-FetchGraph(ctx, nd, serv)
Jeromy's avatar
Jeromy committed
278 279
		}(l)
	}
Jeromy's avatar
Jeromy committed
280 281 282 283 284 285 286

	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	return done
Jeromy's avatar
Jeromy committed
287
}
288 289 290 291

// Take advantage of blockservice/bitswap batched requests to fetch all
// child nodes of a given node
// TODO: finish this
Jeromy's avatar
Jeromy committed
292 293
func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan int {
	sig := make(chan int)
294 295 296 297 298
	go func() {
		var keys []u.Key
		for _, lnk := range root.Links {
			keys = append(keys, u.Key(lnk.Hash))
		}
299

300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
		blkchan := ds.Blocks.GetBlocks(ctx, keys)

		//
		next := 0
		seen := make(map[int]struct{})
		//

		for blk := range blkchan {
			for i, lnk := range root.Links {

				//
				seen[i] = struct{}{}
				//

				if u.Key(lnk.Hash) != blk.Key() {
					continue
				}
				nd, err := Decoded(blk.Data)
				if err != nil {
					log.Error("Got back bad block!")
					break
				}
				lnk.Node = nd

				//
				if next == i {
Jeromy's avatar
Jeromy committed
326
					sig <- next
327 328 329
					next++
					for {
						if _, ok := seen[next]; ok {
Jeromy's avatar
Jeromy committed
330
							sig <- next
331 332 333 334 335 336 337 338 339 340
							next++
						} else {
							break
						}
					}
				}
				//
			}
		}
	}()
341

342 343
	// TODO: return a channel, and signal when the 'Next' readable block is available
	return sig
344
}