merkledag.go 6.8 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
Jeromy's avatar
Jeromy committed
6
	"sync"
Jeromy's avatar
Jeromy committed
7

8 9
	"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	blocks "github.com/ipfs/go-ipfs/blocks"
10
	key "github.com/ipfs/go-ipfs/blocks/key"
11 12
	bserv "github.com/ipfs/go-ipfs/blockservice"
	u "github.com/ipfs/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
var log = u.Logger("merkledag")
16
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
17

Jeromy's avatar
Jeromy committed
18 19
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
20
	Add(*Node) (key.Key, error)
Jeromy's avatar
Jeromy committed
21
	AddRecursive(*Node) error
22
	Get(context.Context, key.Key) (*Node, error)
Jeromy's avatar
Jeromy committed
23
	Remove(*Node) error
24 25 26

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
27
	GetDAG(context.Context, *Node) []NodeGetter
28
	GetNodes(context.Context, []key.Key) []NodeGetter
29 30

	Batch() *Batch
Jeromy's avatar
Jeromy committed
31 32 33 34 35 36
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

37
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
38 39
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
40 41
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
42
type dagService struct {
43
	Blocks *bserv.BlockService
44 45
}

46
// Add adds a node to the dagService, storing the block in the BlockService
47
func (n *dagService) Add(nd *Node) (key.Key, error) {
48
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
49
		return "", fmt.Errorf("dagService is nil")
50 51 52 53 54 55 56
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

57 58 59
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
60 61 62 63 64 65 66
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

67 68 69 70
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

Jeromy's avatar
Jeromy committed
71
// AddRecursive adds the given node and all child nodes to the BlockService
72
func (n *dagService) AddRecursive(nd *Node) error {
73 74
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
75
		log.Info("AddRecursive Error: %s\n", err)
76 77 78 79
		return err
	}

	for _, link := range nd.Links {
80 81 82 83 84
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
85 86 87 88 89 90
		}
	}

	return nil
}

91
// Get retrieves a node from the dagService, fetching the block in the BlockService
92
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
93
	if n == nil {
94
		return nil, fmt.Errorf("dagService is nil")
95
	}
96 97
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
98

Jeromy's avatar
Jeromy committed
99
	b, err := n.Blocks.GetBlock(ctx, k)
100 101 102 103 104 105
	if err != nil {
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
106

Jeromy's avatar
Jeromy committed
107
// Remove deletes the given node and all of its children from the BlockService
108
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
109 110 111 112 113 114 115 116 117 118 119
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
120

Jeromy's avatar
Jeromy committed
121 122
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
Jeromy's avatar
Jeromy committed
123
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
124
	log.Warning("Untested.")
Jeromy's avatar
Jeromy committed
125 126 127
	var wg sync.WaitGroup
	done := make(chan struct{})

Jeromy's avatar
Jeromy committed
128
	for _, l := range root.Links {
Jeromy's avatar
Jeromy committed
129
		wg.Add(1)
Jeromy's avatar
Jeromy committed
130
		go func(lnk *Link) {
Jeromy's avatar
Jeromy committed
131 132 133

			// Signal child is done on way out
			defer wg.Done()
Jeromy's avatar
Jeromy committed
134 135 136 137 138
			select {
			case <-ctx.Done():
				return
			}

Jeromy's avatar
Jeromy committed
139
			nd, err := lnk.GetNode(ctx, serv)
Jeromy's avatar
Jeromy committed
140
			if err != nil {
141
				log.Debug(err)
Jeromy's avatar
Jeromy committed
142 143
				return
			}
Jeromy's avatar
Jeromy committed
144 145 146

			// Wait for children to finish
			<-FetchGraph(ctx, nd, serv)
Jeromy's avatar
Jeromy committed
147 148
		}(l)
	}
Jeromy's avatar
Jeromy committed
149 150 151 152 153 154 155

	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	return done
Jeromy's avatar
Jeromy committed
156
}
157

Jeromy's avatar
Jeromy committed
158 159
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
160
func FindLinks(links []key.Key, k key.Key, start int) []int {
Jeromy's avatar
Jeromy committed
161
	var out []int
Jeromy's avatar
Jeromy committed
162 163
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
164
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
165 166
		}
	}
Jeromy's avatar
Jeromy committed
167
	return out
Jeromy's avatar
Jeromy committed
168 169
}

170
// GetDAG will fill out all of the links of the given Node.
171 172
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
173
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
174
	var keys []key.Key
Jeromy's avatar
Jeromy committed
175
	for _, lnk := range root.Links {
176
		keys = append(keys, key.Key(lnk.Hash))
Jeromy's avatar
Jeromy committed
177 178 179 180 181
	}

	return ds.GetNodes(ctx, keys)
}

Jeromy's avatar
Jeromy committed
182 183
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
184
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter {
185 186 187 188 189 190

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
191 192
	promises := make([]NodeGetter, len(keys))
	sendChans := make([]chan<- *Node, len(keys))
rht's avatar
rht committed
193
	for i := range keys {
Jeromy's avatar
Jeromy committed
194 195 196
		promises[i], sendChans[i] = newNodePromise(ctx)
	}

197
	dedupedKeys := dedupeKeys(keys)
198
	go func() {
199 200 201
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

202
		blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
203

204
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
205 206 207 208 209
			select {
			case blk, ok := <-blkchan:
				if !ok {
					return
				}
Jeromy's avatar
Jeromy committed
210

Jeromy's avatar
Jeromy committed
211 212
				nd, err := Decoded(blk.Data)
				if err != nil {
Jeromy's avatar
Jeromy committed
213
					// NB: can happen with improperly formatted input data
214
					log.Debug("Got back bad block!")
Jeromy's avatar
Jeromy committed
215
					return
Jeromy's avatar
Jeromy committed
216
				}
Jeromy's avatar
Jeromy committed
217
				is := FindLinks(keys, blk.Key(), 0)
Jeromy's avatar
Jeromy committed
218
				for _, i := range is {
219
					count++
220
					sendChans[i] <- nd
Jeromy's avatar
Jeromy committed
221 222 223
				}
			case <-ctx.Done():
				return
224 225 226
			}
		}
	}()
Jeromy's avatar
Jeromy committed
227 228 229
	return promises
}

230
// Remove duplicates from a list of keys
231 232 233
func dedupeKeys(ks []key.Key) []key.Key {
	kmap := make(map[key.Key]struct{})
	var out []key.Key
234 235 236 237 238 239 240 241 242
	for _, k := range ks {
		if _, ok := kmap[k]; !ok {
			kmap[k] = struct{}{}
			out = append(out, k)
		}
	}
	return out
}

Jeromy's avatar
Jeromy committed
243 244 245 246 247 248 249 250 251 252 253 254 255 256
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
	ch := make(chan *Node, 1)
	return &nodePromise{
		recv: ch,
		ctx:  ctx,
	}, ch
}

type nodePromise struct {
	cache *Node
	recv  <-chan *Node
	ctx   context.Context
}

Jeromy's avatar
Jeromy committed
257 258 259 260
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
261
type NodeGetter interface {
262
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
263 264
}

265
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
Jeromy's avatar
Jeromy committed
266 267 268 269 270 271 272 273 274
	if np.cache != nil {
		return np.cache, nil
	}

	select {
	case blk := <-np.recv:
		np.cache = blk
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
275 276
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
277 278
	}
	return np.cache, nil
279
}
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317

type Batch struct {
	ds *dagService

	blocks  []*blocks.Block
	size    int
	MaxSize int
}

func (t *Batch) Add(nd *Node) (key.Key, error) {
	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
	if err != nil {
		return "", err
	}

	k := key.Key(b.Multihash)

	t.blocks = append(t.blocks, b)
	t.size += len(b.Data)
	if t.size > t.MaxSize {
		return k, t.Commit()
	}
	return k, nil
}

func (t *Batch) Commit() error {
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
	t.size = 0
	return err
}