blockservice.go 4.14 KB
Newer Older
1 2 3
// package blockservice implements a BlockService interface that provides
// a single GetBlock/AddBlock interface that seamlessly retrieves data either
// locally or from a remote peer through the exchange.
4 5 6
package blockservice

import (
7
	"errors"
8 9
	"fmt"

10
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
11
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
12

13
	blocks "github.com/jbenet/go-ipfs/blocks"
14
	"github.com/jbenet/go-ipfs/blocks/blockstore"
15
	exchange "github.com/jbenet/go-ipfs/exchange"
16 17 18
	u "github.com/jbenet/go-ipfs/util"
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19
var log = u.Logger("blockservice")
20
var ErrNotFound = errors.New("blockservice: key not found")
Jeromy's avatar
Jeromy committed
21

22 23
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
24 25
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct {
26 27
	// TODO don't expose underlying impl details
	Blockstore blockstore.Blockstore
28
	Exchange   exchange.Interface
29 30 31
}

// NewBlockService creates a BlockService with given datastore instance.
32 33 34
func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error) {
	if bs == nil {
		return nil, fmt.Errorf("BlockService requires valid blockstore")
35
	}
Jeromy's avatar
Jeromy committed
36
	if rem == nil {
Jeromy's avatar
Jeromy committed
37
		log.Warning("blockservice running in local (offline) mode.")
Jeromy's avatar
Jeromy committed
38
	}
39
	return &BlockService{Blockstore: bs, Exchange: rem}, nil
40 41 42
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
43
// TODO pass a context into this if the remote.HasBlock is going to remain here.
44 45
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
	k := b.Key()
Jeromy's avatar
Jeromy committed
46
	log.Debugf("blockservice: storing [%s] in datastore", k)
47 48
	// TODO(brian): define a block datastore with a Put method which accepts a
	// block parameter
49 50 51 52

	// check if we have it before adding. this is an extra read, but large writes
	// are more expensive.
	// TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6
53
	has, err := s.Blockstore.Has(k)
Jeromy's avatar
Jeromy committed
54 55 56
	if err != nil {
		return k, err
	}
57 58 59 60
	if has {
		log.Debugf("blockservice: storing [%s] in datastore (already stored)", k)
	} else {
		log.Debugf("blockservice: storing [%s] in datastore", k)
61
		err := s.Blockstore.Put(b)
62 63 64 65 66
		if err != nil {
			return k, err
		}
	}

67 68
	// TODO this operation rate-limits blockservice operations, we should
	// consider moving this to an sync process.
69
	if s.Exchange != nil {
70
		ctx := context.TODO()
71
		err = s.Exchange.HasBlock(ctx, b)
72
	}
Jeromy's avatar
Jeromy committed
73
	return k, err
74 75 76 77
}

// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
Jeromy's avatar
Jeromy committed
78
func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) {
Jeromy's avatar
Jeromy committed
79
	log.Debugf("BlockService GetBlock: '%s'", k)
80
	block, err := s.Blockstore.Get(k)
Jeromy's avatar
Jeromy committed
81
	if err == nil {
82 83 84
		return block, nil
		// TODO be careful checking ErrNotFound. If the underlying
		// implementation changes, this will break.
85
	} else if err == ds.ErrNotFound && s.Exchange != nil {
Jeromy's avatar
Jeromy committed
86
		log.Debug("Blockservice: Searching bitswap.")
87
		blk, err := s.Exchange.GetBlock(ctx, k)
Jeromy's avatar
Jeromy committed
88 89 90 91 92
		if err != nil {
			return nil, err
		}
		return blk, nil
	} else {
Jeromy's avatar
Jeromy committed
93
		log.Debug("Blockservice GetBlock: Not found.")
94
		return nil, ErrNotFound
95 96
	}
}
Jeromy's avatar
Jeromy committed
97

98 99 100
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
101
func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks.Block {
102
	out := make(chan *blocks.Block, 0)
103
	go func() {
104 105
		defer close(out)
		var misses []u.Key
106
		for _, k := range ks {
107
			hit, err := s.Blockstore.Get(k)
108
			if err != nil {
109
				misses = append(misses, k)
110
				continue
111
			}
112
			log.Debug("Blockservice: Got data in datastore.")
113 114 115 116 117
			select {
			case out <- hit:
			case <-ctx.Done():
				return
			}
118
		}
Jeromy's avatar
Jeromy committed
119

120
		rblocks, err := s.Exchange.GetBlocks(ctx, misses)
Jeromy's avatar
Jeromy committed
121 122 123 124
		if err != nil {
			log.Errorf("Error with GetBlocks: %s", err)
			return
		}
125

126 127 128 129 130 131
		for b := range rblocks {
			select {
			case out <- b:
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
132
		}
133 134
	}()
	return out
Jeromy's avatar
Jeromy committed
135 136
}

Jeromy's avatar
Jeromy committed
137
// DeleteBlock deletes a block in the blockservice from the datastore
Jeromy's avatar
Jeromy committed
138
func (s *BlockService) DeleteBlock(k u.Key) error {
139
	return s.Blockstore.DeleteBlock(k)
Jeromy's avatar
Jeromy committed
140
}