blockservice.go 4.07 KB
Newer Older
1 2 3
// package blockservice implements a BlockService interface that provides
// a single GetBlock/AddBlock interface that seamlessly retrieves data either
// locally or from a remote peer through the exchange.
4 5 6
package blockservice

import (
7
	"errors"
8

9 10
	blocks "github.com/ipfs/go-ipfs/blocks"
	"github.com/ipfs/go-ipfs/blocks/blockstore"
11
	key "github.com/ipfs/go-ipfs/blocks/key"
12
	exchange "github.com/ipfs/go-ipfs/exchange"
13
	logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
Jeromy's avatar
Jeromy committed
14
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
15 16
)

Jeromy's avatar
Jeromy committed
17
var log = logging.Logger("blockservice")
18

19
var ErrNotFound = errors.New("blockservice: key not found")
Jeromy's avatar
Jeromy committed
20

21 22
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
23 24
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct {
25 26
	// TODO don't expose underlying impl details
	Blockstore blockstore.Blockstore
27
	Exchange   exchange.Interface
28 29 30
}

// NewBlockService creates a BlockService with given datastore instance.
31
func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
Jeromy's avatar
Jeromy committed
32
	if rem == nil {
Jeromy's avatar
Jeromy committed
33
		log.Warning("blockservice running in local (offline) mode.")
Jeromy's avatar
Jeromy committed
34
	}
35 36

	return &BlockService{
37 38
		Blockstore: bs,
		Exchange:   rem,
39
	}
40 41 42
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
43
// TODO pass a context into this if the remote.HasBlock is going to remain here.
44
func (s *BlockService) AddBlock(b blocks.Block) (key.Key, error) {
45
	k := b.Key()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46
	err := s.Blockstore.Put(b)
Jeromy's avatar
Jeromy committed
47 48 49
	if err != nil {
		return k, err
	}
50
	if err := s.Exchange.HasBlock(b); err != nil {
51
		return "", errors.New("blockservice is closed")
52
	}
53
	return k, nil
54 55
}

56
func (s *BlockService) AddBlocks(bs []blocks.Block) ([]key.Key, error) {
57 58 59 60 61 62 63
	err := s.Blockstore.PutMany(bs)
	if err != nil {
		return nil, err
	}

	var ks []key.Key
	for _, b := range bs {
64
		if err := s.Exchange.HasBlock(b); err != nil {
65 66 67 68 69 70 71
			return nil, errors.New("blockservice is closed")
		}
		ks = append(ks, b.Key())
	}
	return ks, nil
}

72 73
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
74
func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, error) {
jbenet's avatar
jbenet committed
75 76 77 78 79
	if k == "" {
		log.Debug("BlockService GetBlock: Nil Key")
		return nil, ErrNotFound
	}

Jeromy's avatar
Jeromy committed
80
	log.Debugf("BlockService GetBlock: '%s'", k)
81
	block, err := s.Blockstore.Get(k)
Jeromy's avatar
Jeromy committed
82
	if err == nil {
83
		return block, nil
Jeromy's avatar
Jeromy committed
84 85 86
	}

	if err == blockstore.ErrNotFound && s.Exchange != nil {
87 88
		// TODO be careful checking ErrNotFound. If the underlying
		// implementation changes, this will break.
89
		log.Debug("Blockservice: Searching bitswap")
90
		blk, err := s.Exchange.GetBlock(ctx, k)
Jeromy's avatar
Jeromy committed
91
		if err != nil {
92 93 94
			if err == blockstore.ErrNotFound {
				return nil, ErrNotFound
			}
Jeromy's avatar
Jeromy committed
95 96 97
			return nil, err
		}
		return blk, nil
Jeromy's avatar
Jeromy committed
98 99
	}

100
	log.Debug("Blockservice GetBlock: Not found")
Jeromy's avatar
Jeromy committed
101
	if err == blockstore.ErrNotFound {
102
		return nil, ErrNotFound
103
	}
Jeromy's avatar
Jeromy committed
104 105

	return nil, err
106
}
Jeromy's avatar
Jeromy committed
107

108 109 110
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
111 112
func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan blocks.Block {
	out := make(chan blocks.Block, 0)
113
	go func() {
114
		defer close(out)
115
		var misses []key.Key
116
		for _, k := range ks {
117
			hit, err := s.Blockstore.Get(k)
118
			if err != nil {
119
				misses = append(misses, k)
120
				continue
121
			}
122
			log.Debug("Blockservice: Got data in datastore")
123 124 125 126 127
			select {
			case out <- hit:
			case <-ctx.Done():
				return
			}
128
		}
Jeromy's avatar
Jeromy committed
129

130 131 132 133
		if len(misses) == 0 {
			return
		}

134
		rblocks, err := s.Exchange.GetBlocks(ctx, misses)
Jeromy's avatar
Jeromy committed
135
		if err != nil {
136
			log.Debugf("Error with GetBlocks: %s", err)
Jeromy's avatar
Jeromy committed
137 138
			return
		}
139

140 141 142 143 144 145
		for b := range rblocks {
			select {
			case out <- b:
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
146
		}
147 148
	}()
	return out
Jeromy's avatar
Jeromy committed
149 150
}

Jeromy's avatar
Jeromy committed
151
// DeleteBlock deletes a block in the blockservice from the datastore
152
func (s *BlockService) DeleteBlock(k key.Key) error {
153
	return s.Blockstore.DeleteBlock(k)
Jeromy's avatar
Jeromy committed
154
}
155 156 157

func (s *BlockService) Close() error {
	log.Debug("blockservice is shutting down...")
158
	return s.Exchange.Close()
159
}