merkledag.go 6.93 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
Jeromy's avatar
Jeromy committed
6
	"sync"
Jeromy's avatar
Jeromy committed
7

8 9
	"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	blocks "github.com/ipfs/go-ipfs/blocks"
10
	key "github.com/ipfs/go-ipfs/blocks/key"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
12
	logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14
)

Jeromy's avatar
Jeromy committed
15
var log = logging.Logger("merkledag")
16
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
17

Jeromy's avatar
Jeromy committed
18 19
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
20
	Add(*Node) (key.Key, error)
Jeromy's avatar
Jeromy committed
21
	AddRecursive(*Node) error
22
	Get(context.Context, key.Key) (*Node, error)
Jeromy's avatar
Jeromy committed
23
	Remove(*Node) error
24 25 26

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
27
	GetDAG(context.Context, *Node) []NodeGetter
28
	GetNodes(context.Context, []key.Key) []NodeGetter
29 30

	Batch() *Batch
Jeromy's avatar
Jeromy committed
31 32 33 34 35 36
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

37
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
38 39
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
40 41
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
42
type dagService struct {
43
	Blocks *bserv.BlockService
44 45
}

46
// Add adds a node to the dagService, storing the block in the BlockService
47
func (n *dagService) Add(nd *Node) (key.Key, error) {
48
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
49
		return "", fmt.Errorf("dagService is nil")
50 51 52 53 54 55 56
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

57 58 59
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
60 61 62 63 64 65 66
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

67 68 69 70
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

Jeromy's avatar
Jeromy committed
71
// AddRecursive adds the given node and all child nodes to the BlockService
72
func (n *dagService) AddRecursive(nd *Node) error {
73 74
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
75
		log.Info("AddRecursive Error: %s\n", err)
76 77 78 79
		return err
	}

	for _, link := range nd.Links {
80 81 82 83 84
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
85 86 87 88 89 90
		}
	}

	return nil
}

91
// Get retrieves a node from the dagService, fetching the block in the BlockService
92
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
93
	if n == nil {
94
		return nil, fmt.Errorf("dagService is nil")
95
	}
96 97
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
98

Jeromy's avatar
Jeromy committed
99
	b, err := n.Blocks.GetBlock(ctx, k)
100
	if err != nil {
101 102 103
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
104 105 106 107 108
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
109

Jeromy's avatar
Jeromy committed
110
// Remove deletes the given node and all of its children from the BlockService
111
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
112 113 114 115 116 117 118 119 120 121 122
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
123

Jeromy's avatar
Jeromy committed
124 125
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
Jeromy's avatar
Jeromy committed
126
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
127
	log.Warning("Untested.")
Jeromy's avatar
Jeromy committed
128 129 130
	var wg sync.WaitGroup
	done := make(chan struct{})

Jeromy's avatar
Jeromy committed
131
	for _, l := range root.Links {
Jeromy's avatar
Jeromy committed
132
		wg.Add(1)
Jeromy's avatar
Jeromy committed
133
		go func(lnk *Link) {
Jeromy's avatar
Jeromy committed
134 135 136

			// Signal child is done on way out
			defer wg.Done()
Jeromy's avatar
Jeromy committed
137 138 139 140 141
			select {
			case <-ctx.Done():
				return
			}

Jeromy's avatar
Jeromy committed
142
			nd, err := lnk.GetNode(ctx, serv)
Jeromy's avatar
Jeromy committed
143
			if err != nil {
144
				log.Debug(err)
Jeromy's avatar
Jeromy committed
145 146
				return
			}
Jeromy's avatar
Jeromy committed
147 148 149

			// Wait for children to finish
			<-FetchGraph(ctx, nd, serv)
Jeromy's avatar
Jeromy committed
150 151
		}(l)
	}
Jeromy's avatar
Jeromy committed
152 153 154 155 156 157 158

	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	return done
Jeromy's avatar
Jeromy committed
159
}
160

Jeromy's avatar
Jeromy committed
161 162
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
163
func FindLinks(links []key.Key, k key.Key, start int) []int {
Jeromy's avatar
Jeromy committed
164
	var out []int
Jeromy's avatar
Jeromy committed
165 166
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
167
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
168 169
		}
	}
Jeromy's avatar
Jeromy committed
170
	return out
Jeromy's avatar
Jeromy committed
171 172
}

173
// GetDAG will fill out all of the links of the given Node.
174 175
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
176
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
177
	var keys []key.Key
Jeromy's avatar
Jeromy committed
178
	for _, lnk := range root.Links {
179
		keys = append(keys, key.Key(lnk.Hash))
Jeromy's avatar
Jeromy committed
180 181 182 183 184
	}

	return ds.GetNodes(ctx, keys)
}

Jeromy's avatar
Jeromy committed
185 186
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
187
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter {
188 189 190 191 192 193

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
194 195
	promises := make([]NodeGetter, len(keys))
	sendChans := make([]chan<- *Node, len(keys))
rht's avatar
rht committed
196
	for i := range keys {
Jeromy's avatar
Jeromy committed
197 198 199
		promises[i], sendChans[i] = newNodePromise(ctx)
	}

200
	dedupedKeys := dedupeKeys(keys)
201
	go func() {
202 203 204
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

205
		blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
206

207
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
208 209 210 211 212
			select {
			case blk, ok := <-blkchan:
				if !ok {
					return
				}
Jeromy's avatar
Jeromy committed
213

Jeromy's avatar
Jeromy committed
214 215
				nd, err := Decoded(blk.Data)
				if err != nil {
Jeromy's avatar
Jeromy committed
216
					// NB: can happen with improperly formatted input data
217
					log.Debug("Got back bad block!")
Jeromy's avatar
Jeromy committed
218
					return
Jeromy's avatar
Jeromy committed
219
				}
Jeromy's avatar
Jeromy committed
220
				is := FindLinks(keys, blk.Key(), 0)
Jeromy's avatar
Jeromy committed
221
				for _, i := range is {
222
					count++
223
					sendChans[i] <- nd
Jeromy's avatar
Jeromy committed
224 225 226
				}
			case <-ctx.Done():
				return
227 228 229
			}
		}
	}()
Jeromy's avatar
Jeromy committed
230 231 232
	return promises
}

233
// Remove duplicates from a list of keys
234 235 236
func dedupeKeys(ks []key.Key) []key.Key {
	kmap := make(map[key.Key]struct{})
	var out []key.Key
237 238 239 240 241 242 243 244 245
	for _, k := range ks {
		if _, ok := kmap[k]; !ok {
			kmap[k] = struct{}{}
			out = append(out, k)
		}
	}
	return out
}

Jeromy's avatar
Jeromy committed
246 247 248 249 250 251 252 253 254 255 256 257 258 259
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
	ch := make(chan *Node, 1)
	return &nodePromise{
		recv: ch,
		ctx:  ctx,
	}, ch
}

type nodePromise struct {
	cache *Node
	recv  <-chan *Node
	ctx   context.Context
}

Jeromy's avatar
Jeromy committed
260 261 262 263
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
264
type NodeGetter interface {
265
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
266 267
}

268
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
Jeromy's avatar
Jeromy committed
269 270 271 272 273 274 275 276 277
	if np.cache != nil {
		return np.cache, nil
	}

	select {
	case blk := <-np.recv:
		np.cache = blk
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
278 279
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
280 281
	}
	return np.cache, nil
282
}
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320

type Batch struct {
	ds *dagService

	blocks  []*blocks.Block
	size    int
	MaxSize int
}

func (t *Batch) Add(nd *Node) (key.Key, error) {
	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
	if err != nil {
		return "", err
	}

	k := key.Key(b.Multihash)

	t.blocks = append(t.blocks, b)
	t.size += len(b.Data)
	if t.size > t.MaxSize {
		return k, t.Commit()
	}
	return k, nil
}

func (t *Batch) Commit() error {
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
	t.size = 0
	return err
}