blockservice.go 4.24 KB
Newer Older
1 2 3
// package blockservice implements a BlockService interface that provides
// a single GetBlock/AddBlock interface that seamlessly retrieves data either
// locally or from a remote peer through the exchange.
4 5 6
package blockservice

import (
7
	"errors"
8 9
	"fmt"

10 11 12 13 14 15
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	blocks "github.com/ipfs/go-ipfs/blocks"
	"github.com/ipfs/go-ipfs/blocks/blockstore"
	worker "github.com/ipfs/go-ipfs/blockservice/worker"
	exchange "github.com/ipfs/go-ipfs/exchange"
	u "github.com/ipfs/go-ipfs/util"
16 17
)

18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
var wc = worker.Config{
	// When running on a single core, NumWorkers has a harsh negative effect on
	// throughput. (-80% when < 25)
	// Running a lot more workers appears to have very little effect on both
	// single and multicore configurations.
	NumWorkers: 25,

	// These have no effect on when running on multiple cores, but harsh
	// negative effect on throughput when running on a single core
	// On multicore configurations these buffers have little effect on
	// throughput.
	// On single core configurations, larger buffers have severe adverse
	// effects on throughput.
	ClientBufferSize: 0,
	WorkerBufferSize: 0,
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
var log = u.Logger("blockservice")
36
var ErrNotFound = errors.New("blockservice: key not found")
Jeromy's avatar
Jeromy committed
37

38 39
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
40 41
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct {
42 43
	// TODO don't expose underlying impl details
	Blockstore blockstore.Blockstore
44
	Exchange   exchange.Interface
45

46
	worker *worker.Worker
47 48 49
}

// NewBlockService creates a BlockService with given datastore instance.
50 51 52
func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error) {
	if bs == nil {
		return nil, fmt.Errorf("BlockService requires valid blockstore")
53
	}
Jeromy's avatar
Jeromy committed
54
	if rem == nil {
Jeromy's avatar
Jeromy committed
55
		log.Warning("blockservice running in local (offline) mode.")
Jeromy's avatar
Jeromy committed
56
	}
57 58

	return &BlockService{
59 60 61
		Blockstore: bs,
		Exchange:   rem,
		worker:     worker.NewWorker(rem, wc),
62
	}, nil
63 64 65
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
66
// TODO pass a context into this if the remote.HasBlock is going to remain here.
67 68
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
	k := b.Key()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
69
	err := s.Blockstore.Put(b)
Jeromy's avatar
Jeromy committed
70 71 72
	if err != nil {
		return k, err
	}
73 74
	if err := s.worker.HasBlock(b); err != nil {
		return "", errors.New("blockservice is closed")
75
	}
76
	return k, nil
77 78 79 80
}

// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
Jeromy's avatar
Jeromy committed
81
func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) {
Jeromy's avatar
Jeromy committed
82
	log.Debugf("BlockService GetBlock: '%s'", k)
83
	block, err := s.Blockstore.Get(k)
Jeromy's avatar
Jeromy committed
84
	if err == nil {
85 86 87
		return block, nil
		// TODO be careful checking ErrNotFound. If the underlying
		// implementation changes, this will break.
88
	} else if err == blockstore.ErrNotFound && s.Exchange != nil {
Jeromy's avatar
Jeromy committed
89
		log.Debug("Blockservice: Searching bitswap.")
90
		blk, err := s.Exchange.GetBlock(ctx, k)
Jeromy's avatar
Jeromy committed
91 92 93 94 95
		if err != nil {
			return nil, err
		}
		return blk, nil
	} else {
Jeromy's avatar
Jeromy committed
96
		log.Debug("Blockservice GetBlock: Not found.")
97
		return nil, ErrNotFound
98 99
	}
}
Jeromy's avatar
Jeromy committed
100

101 102 103
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
104
func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks.Block {
105
	out := make(chan *blocks.Block, 0)
106
	go func() {
107 108
		defer close(out)
		var misses []u.Key
109
		for _, k := range ks {
110
			hit, err := s.Blockstore.Get(k)
111
			if err != nil {
112
				misses = append(misses, k)
113
				continue
114
			}
115
			log.Debug("Blockservice: Got data in datastore.")
116 117 118 119 120
			select {
			case out <- hit:
			case <-ctx.Done():
				return
			}
121
		}
Jeromy's avatar
Jeromy committed
122

123
		rblocks, err := s.Exchange.GetBlocks(ctx, misses)
Jeromy's avatar
Jeromy committed
124
		if err != nil {
125
			log.Debugf("Error with GetBlocks: %s", err)
Jeromy's avatar
Jeromy committed
126 127
			return
		}
128

129 130 131 132 133 134
		for b := range rblocks {
			select {
			case out <- b:
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
135
		}
136 137
	}()
	return out
Jeromy's avatar
Jeromy committed
138 139
}

Jeromy's avatar
Jeromy committed
140
// DeleteBlock deletes a block in the blockservice from the datastore
Jeromy's avatar
Jeromy committed
141
func (s *BlockService) DeleteBlock(k u.Key) error {
142
	return s.Blockstore.DeleteBlock(k)
Jeromy's avatar
Jeromy committed
143
}
144 145 146 147 148

func (s *BlockService) Close() error {
	log.Debug("blockservice is shutting down...")
	return s.worker.Close()
}