blockservice.go 5.37 KB
Newer Older
1 2 3
// package blockservice implements a BlockService interface that provides
// a single GetBlock/AddBlock interface that seamlessly retrieves data either
// locally or from a remote peer through the exchange.
4 5 6
package blockservice

import (
7
	"context"
8
	"errors"
Jeromy's avatar
Jeromy committed
9
	"fmt"
10

11 12 13
	blocks "github.com/ipfs/go-ipfs/blocks"
	"github.com/ipfs/go-ipfs/blocks/blockstore"
	exchange "github.com/ipfs/go-ipfs/exchange"
Jeromy's avatar
Jeromy committed
14

Jeromy's avatar
Jeromy committed
15
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
16
	cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
17 18
)

Jeromy's avatar
Jeromy committed
19
var log = logging.Logger("blockservice")
20

21
var ErrNotFound = errors.New("blockservice: key not found")
Jeromy's avatar
Jeromy committed
22

23 24
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
25
// It uses an internal `datastore.Datastore` instance to store values.
26 27 28 29 30 31 32 33 34 35 36 37 38 39
type BlockService interface {
	Blockstore() blockstore.Blockstore
	Exchange() exchange.Interface
	AddBlock(o blocks.Block) (*cid.Cid, error)
	AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)
	GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
	GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
	DeleteBlock(o blocks.Block) error
	Close() error
}

type blockService struct {
	blockstore blockstore.Blockstore
	exchange   exchange.Interface
40 41 42
	// If checkFirst is true then first check that a block doesn't
	// already exist to avoid republishing the block on the exchange.
	checkFirst bool
43 44 45
}

// NewBlockService creates a BlockService with given datastore instance.
46
func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
Jeromy's avatar
Jeromy committed
47
	if rem == nil {
Jeromy's avatar
Jeromy committed
48
		log.Warning("blockservice running in local (offline) mode.")
Jeromy's avatar
Jeromy committed
49
	}
50

51 52 53
	return &blockService{
		blockstore: bs,
		exchange:   rem,
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
		checkFirst: true,
	}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
	if rem == nil {
		log.Warning("blockservice running in local (offline) mode.")
	}

	return &blockService{
		blockstore: bs,
		exchange:   rem,
		checkFirst: false,
69
	}
70 71
}

72 73 74 75 76 77 78 79
func (bs *blockService) Blockstore() blockstore.Blockstore {
	return bs.blockstore
}

func (bs *blockService) Exchange() exchange.Interface {
	return bs.exchange
}

80
// AddBlock adds a particular block to the service, Putting it into the datastore.
81
// TODO pass a context into this if the remote.HasBlock is going to remain here.
82
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
Jeromy's avatar
Jeromy committed
83
	c := o.Cid()
84 85 86 87 88
	if s.checkFirst {
		has, err := s.blockstore.Has(c)
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
89

90 91 92
		if has {
			return c, nil
		}
93 94
	}

95
	err := s.blockstore.Put(o)
Jeromy's avatar
Jeromy committed
96
	if err != nil {
Jeromy's avatar
Jeromy committed
97
		return nil, err
Jeromy's avatar
Jeromy committed
98
	}
Jeromy's avatar
Jeromy committed
99

100
	if err := s.exchange.HasBlock(o); err != nil {
Jeromy's avatar
Jeromy committed
101
		return nil, errors.New("blockservice is closed")
102
	}
Jeromy's avatar
Jeromy committed
103 104

	return c, nil
105 106
}

107
func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
108
	var toput []blocks.Block
109 110 111 112 113 114
	if s.checkFirst {
		for _, b := range bs {
			has, err := s.blockstore.Has(b.Cid())
			if err != nil {
				return nil, err
			}
115 116
			if !has {
				toput = append(toput, b)
117
			}
118
		}
119
	} else {
Jeromy's avatar
Jeromy committed
120
		toput = bs
121 122
	}

123
	err := s.blockstore.PutMany(toput)
124 125 126 127
	if err != nil {
		return nil, err
	}

Jeromy's avatar
Jeromy committed
128 129
	var ks []*cid.Cid
	for _, o := range toput {
130
		if err := s.exchange.HasBlock(o); err != nil {
Jeromy's avatar
Jeromy committed
131
			return nil, fmt.Errorf("blockservice is closed (%s)", err)
132
		}
Jeromy's avatar
Jeromy committed
133

134
		ks = append(ks, o.Cid())
135 136 137 138
	}
	return ks, nil
}

139 140
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
141
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
Jeromy's avatar
Jeromy committed
142
	log.Debugf("BlockService GetBlock: '%s'", c)
jbenet's avatar
jbenet committed
143

144
	block, err := s.blockstore.Get(c)
Jeromy's avatar
Jeromy committed
145
	if err == nil {
146
		return block, nil
Jeromy's avatar
Jeromy committed
147 148
	}

149
	if err == blockstore.ErrNotFound && s.exchange != nil {
150 151
		// TODO be careful checking ErrNotFound. If the underlying
		// implementation changes, this will break.
152
		log.Debug("Blockservice: Searching bitswap")
153
		blk, err := s.exchange.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
154
		if err != nil {
155 156 157
			if err == blockstore.ErrNotFound {
				return nil, ErrNotFound
			}
Jeromy's avatar
Jeromy committed
158 159 160
			return nil, err
		}
		return blk, nil
Jeromy's avatar
Jeromy committed
161 162
	}

163
	log.Debug("Blockservice GetBlock: Not found")
Jeromy's avatar
Jeromy committed
164
	if err == blockstore.ErrNotFound {
165
		return nil, ErrNotFound
166
	}
Jeromy's avatar
Jeromy committed
167 168

	return nil, err
169
}
Jeromy's avatar
Jeromy committed
170

171 172 173
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
174
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
175
	out := make(chan blocks.Block)
176
	go func() {
177
		defer close(out)
178
		var misses []*cid.Cid
Jeromy's avatar
Jeromy committed
179
		for _, c := range ks {
180
			hit, err := s.blockstore.Get(c)
181
			if err != nil {
182
				misses = append(misses, c)
183
				continue
184
			}
185
			log.Debug("Blockservice: Got data in datastore")
186 187 188 189 190
			select {
			case out <- hit:
			case <-ctx.Done():
				return
			}
191
		}
Jeromy's avatar
Jeromy committed
192

193 194 195 196
		if len(misses) == 0 {
			return
		}

197
		rblocks, err := s.exchange.GetBlocks(ctx, misses)
Jeromy's avatar
Jeromy committed
198
		if err != nil {
199
			log.Debugf("Error with GetBlocks: %s", err)
Jeromy's avatar
Jeromy committed
200 201
			return
		}
202

203 204 205 206 207 208
		for b := range rblocks {
			select {
			case out <- b:
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
209
		}
210 211
	}()
	return out
Jeromy's avatar
Jeromy committed
212 213
}

Jeromy's avatar
Jeromy committed
214
// DeleteBlock deletes a block in the blockservice from the datastore
215 216
func (s *blockService) DeleteBlock(o blocks.Block) error {
	return s.blockstore.DeleteBlock(o.Cid())
Jeromy's avatar
Jeromy committed
217
}
218

219
func (s *blockService) Close() error {
220
	log.Debug("blockservice is shutting down...")
221
	return s.exchange.Close()
222
}