Commit 221a9d9c authored by tavit ohanian's avatar tavit ohanian

Merge branch 'port-2021-05-02'

parents 208c5420 2afcfcc4
version: 2.1
orbs:
ci-go: ipfs/ci-go@0.2.9
workflows:
version: 2
test:
jobs:
- ci-go/build
- ci-go/lint
- ci-go/test
- ci-go/test:
race: true
name: "ci-go/test/race"
#- ci-go/benchmark:
# tolerance: 50
# requires:
# - ci-go/test
blank_issues_enabled: false
contact_links:
- name: Getting Help on IPFS
url: https://ipfs.io/help
about: All information about how and where to get help on IPFS.
- name: IPFS Official Forum
url: https://discuss.ipfs.io
about: Please post general questions, support requests, and discussions here.
---
name: Open an issue
about: Only for actionable issues relevant to this repository.
title: ''
labels: need/triage
assignees: ''
---
<!--
Hello! To ensure this issue is correctly addressed as soon as possible by the IPFS team, please try to make sure:
- This issue is relevant to this repository's topic or codebase.
- A clear description is provided. It should includes as much relevant information as possible and clear scope for the issue to be actionable.
FOR GENERAL DISCUSSION, HELP OR QUESTIONS, please see the options at https://ipfs.io/help or head directly to https://discuss.ipfs.io.
(you can delete this section after reading)
-->
# Configuration for welcome - https://github.com/behaviorbot/welcome
# Configuration for new-issue-welcome - https://github.com/behaviorbot/new-issue-welcome
# Comment to be posted to on first time issues
newIssueWelcomeComment: >
Thank you for submitting your first issue to this repository! A maintainer
will be here shortly to triage and review.
In the meantime, please double-check that you have provided all the
necessary information to make this process easy! Any information that can
help save additional round trips is useful! We currently aim to give
initial feedback within **two business days**. If this does not happen, feel
free to leave a comment.
Please keep an eye on how this issue will be labeled, as labels give an
overview of priorities, assignments and additional actions requested by the
maintainers:
- "Priority" labels will show how urgent this is for the team.
- "Status" labels will show if this is ready to be worked on, blocked, or in progress.
- "Need" labels will indicate if additional input or analysis is required.
Finally, remember to use https://discuss.ipfs.io if you just need general
support.
# Configuration for new-pr-welcome - https://github.com/behaviorbot/new-pr-welcome
# Comment to be posted to on PRs from first time contributors in your repository
newPRWelcomeComment: >
Thank you for submitting this PR!
A maintainer will be here shortly to review it.
We are super grateful, but we are also overloaded! Help us by making sure
that:
* The context for this PR is clear, with relevant discussion, decisions
and stakeholders linked/mentioned.
* Your contribution itself is clear (code comments, self-review for the
rest) and in its best form. Follow the [code contribution
guidelines](https://github.com/ipfs/community/blob/master/CONTRIBUTING.md#code-contribution-guidelines)
if they apply.
Getting other community members to do a review would be great help too on
complex PRs (you can ask in the chats/forums). If you are unsure about
something, just leave us a comment.
Next steps:
* A maintainer will triage and assign priority to this PR, commenting on
any missing things and potentially assigning a reviewer for high
priority items.
* The PR gets reviews, discussed and approvals as needed.
* The PR is merged by maintainers when it has been approved and comments addressed.
We currently aim to provide initial feedback/triaging within **two business
days**. Please keep an eye on any labelling actions, as these will indicate
priorities and status of your contribution.
We are very grateful for your contribution!
# Configuration for first-pr-merge - https://github.com/behaviorbot/first-pr-merge
# Comment to be posted to on pull requests merged by a first time user
# Currently disabled
#firstPRMergeComment: ""
1.1.31: QmVNRbcH1kKEQUVhCsH75kTGUVFMw2b7zEWyFKyfCwmJjo
The MIT License (MIT)
Copyright (c) 2014-2018 Juan Batiz-Benet
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
gx:
go get github.com/whyrusleeping/gx
go get github.com/whyrusleeping/gx-go
deps: gx
gx --verbose install --global
gx-go rewrite
publish:
gx-go rewrite --undo
# go-bitswap
go-bitswap
==================
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![Matrix](https://img.shields.io/badge/matrix-%23ipfs%3Amatrix.org-blue.svg?style=flat-square)](https://matrix.to/#/#ipfs:matrix.org)
[![IRC](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Discord](https://img.shields.io/discord/475789330380488707?color=blueviolet&label=discord&style=flat-square)](https://discord.gg/24fmuwR)
[![Coverage Status](https://codecov.io/gh/ipfs/go-bitswap/branch/master/graph/badge.svg)](https://codecov.io/gh/ipfs/go-bitswap/branch/master)
[![Build Status](https://circleci.com/gh/ipfs/go-bitswap.svg?style=svg)](https://circleci.com/gh/ipfs/go-bitswap)
> An implementation of the bitswap protocol in go!
## Lead Maintainer
[Dirk McCormick](https://github.com/dirkmc)
## Table of Contents
- [Background](#background)
- [Install](#install)
- [Usage](#usage)
- [Implementation](#implementation)
- [Contribute](#contribute)
- [License](#license)
## Background
Bitswap is the data trading module for ipfs. It manages requesting and sending
blocks to and from other peers in the network. Bitswap has two main jobs:
- to acquire blocks requested by the client from the network
- to judiciously send blocks in its possession to other peers who want them
Bitswap is a message based protocol, as opposed to request-response. All messages
contain wantlists or blocks.
A node sends a wantlist to tell peers which blocks it wants. When a node receives
a wantlist it should check which blocks it has from the wantlist, and consider
sending the matching blocks to the requestor.
When a node receives blocks that it asked for, the node should send out a
notification called a 'Cancel' to tell its peers that the node no longer
wants those blocks.
`go-bitswap` provides an implementation of the Bitswap protocol in go.
[Learn more about how Bitswap works](./docs/how-bitswap-works.md)
## Install
`go-bitswap` requires Go >= 1.11 and can be installed using Go modules
## Usage
### Initializing a Bitswap Exchange
```golang
import (
"context"
bitswap "github.com/ipfs/go-bitswap"
bsnet "github.com/ipfs/go-graphsync/network"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-core/host"
)
var ctx context.Context
var host host.Host
var router routing.ContentRouting
var bstore blockstore.Blockstore
network := bsnet.NewFromIPFSHost(host, router)
exchange := bitswap.New(ctx, network, bstore)
```
Parameter Notes:
1. `ctx` is just the parent context for all of Bitswap
2. `network` is a network abstraction provided to Bitswap on top of libp2p & content routing.
3. `bstore` is an IPFS blockstore
### Get A Block Synchronously
```golang
var c cid.Cid
var ctx context.Context
var exchange bitswap.Bitswap
block, err := exchange.GetBlock(ctx, c)
```
Parameter Notes:
1. `ctx` is the context for this request, which can be cancelled to cancel the request
2. `c` is the content ID of the block you're requesting
### Get Several Blocks Asynchronously
```golang
var cids []cid.Cid
var ctx context.Context
var exchange bitswap.Bitswap
blockChannel, err := exchange.GetBlocks(ctx, cids)
```
Parameter Notes:
1. `ctx` is the context for this request, which can be cancelled to cancel the request
2. `cids` is a slice of content IDs for the blocks you're requesting
### Get Related Blocks Faster With Sessions
In IPFS, content blocks are often connected to each other through a MerkleDAG. If you know ahead of time that block requests are related, Bitswap can make several optimizations internally in how it requests those blocks in order to get them faster. Bitswap provides a mechanism called a Bitswap Session to manage a series of block requests as part of a single higher level operation. You should initialize a Bitswap Session any time you intend to make a series of block requests that are related -- and whose responses are likely to come from the same peers.
```golang
var ctx context.Context
var cids []cids.cid
var exchange bitswap.Bitswap
session := exchange.NewSession(ctx)
blocksChannel, err := session.GetBlocks(ctx, cids)
// later
var relatedCids []cids.cid
relatedBlocksChannel, err := session.GetBlocks(ctx, relatedCids)
```
Note that `NewSession` returns an interface with `GetBlock` and `GetBlocks` methods that have the same signature as the overall Bitswap exchange.
### Tell bitswap a new block was added to the local datastore
```golang
var blk blocks.Block
var exchange bitswap.Bitswap
err := exchange.HasBlock(blk)
```
## Contribute
PRs are welcome!
Small note: If editing the Readme, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
MIT © Juan Batiz-Benet
This diff is collapsed.
// Package bitswap implements the DMS3 exchange interface with the BitSwap
// bilateral exchange protocol.
package bitswap
import (
"context"
"errors"
"fmt"
"sync"
"time"
delay "gitlab.dms3.io/dms3/go-dms3-delay"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
deciface "gitlab.dms3.io/dms3/go-bitswap/decision"
bsbpm "gitlab.dms3.io/dms3/go-bitswap/internal/blockpresencemanager"
decision "gitlab.dms3.io/dms3/go-bitswap/internal/decision"
bsgetter "gitlab.dms3.io/dms3/go-bitswap/internal/getter"
bsmq "gitlab.dms3.io/dms3/go-bitswap/internal/messagequeue"
notifications "gitlab.dms3.io/dms3/go-bitswap/internal/notifications"
bspm "gitlab.dms3.io/dms3/go-bitswap/internal/peermanager"
bspqm "gitlab.dms3.io/dms3/go-bitswap/internal/providerquerymanager"
bssession "gitlab.dms3.io/dms3/go-bitswap/internal/session"
bssim "gitlab.dms3.io/dms3/go-bitswap/internal/sessioninterestmanager"
bssm "gitlab.dms3.io/dms3/go-bitswap/internal/sessionmanager"
bsspm "gitlab.dms3.io/dms3/go-bitswap/internal/sessionpeermanager"
bsmsg "gitlab.dms3.io/dms3/go-bitswap/message"
bsnet "gitlab.dms3.io/dms3/go-bitswap/network"
blocks "gitlab.dms3.io/dms3/go-block-format"
cid "gitlab.dms3.io/dms3/go-cid"
blockstore "gitlab.dms3.io/dms3/go-dms3-blockstore"
exchange "gitlab.dms3.io/dms3/go-dms3-exchange-interface"
logging "gitlab.dms3.io/dms3/go-log"
metrics "gitlab.dms3.io/dms3/go-metrics-interface"
peer "gitlab.dms3.io/p2p/go-p2p-core/peer"
)
var log = logging.Logger("bitswap")
var sflog = log.Desugar()
var _ exchange.SessionExchange = (*Bitswap)(nil)
const (
// these requests take at _least_ two minutes at the moment.
provideTimeout = time.Minute * 3
defaultProvSearchDelay = time.Second
// Number of concurrent workers in decision engine that process requests to the blockstore
defaulEngineBlockstoreWorkerCount = 128
)
var (
// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256
provideKeysBufferSize = 2048
provideWorkerMax = 6
// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)
// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Bitswap)
// ProvideEnabled is an option for enabling/disabling provide announcements
func ProvideEnabled(enabled bool) Option {
return func(bs *Bitswap) {
bs.provideEnabled = enabled
}
}
// ProviderSearchDelay overwrites the global provider search delay
func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
return func(bs *Bitswap) {
bs.provSearchDelay = newProvSearchDelay
}
}
// RebroadcastDelay overwrites the global provider rebroadcast delay
func RebroadcastDelay(newRebroadcastDelay delay.D) Option {
return func(bs *Bitswap) {
bs.rebroadcastDelay = newRebroadcastDelay
}
}
// EngineBlockstoreWorkerCount sets the number of worker threads used for
// blockstore operations in the decision engine
func EngineBlockstoreWorkerCount(count int) Option {
if count <= 0 {
panic(fmt.Sprintf("Engine blockstore worker count is %d but must be > 0", count))
}
return func(bs *Bitswap) {
bs.engineBstoreWorkerCount = count
}
}
// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
// - Simply don't respond
// This option is only used for testing.
func SetSendDontHaves(send bool) Option {
return func(bs *Bitswap) {
bs.engine.SetSendDontHaves(send)
}
}
// Configures the engine to use the given score decision logic.
func WithScoreLedger(scoreLedger deciface.ScoreLedger) Option {
return func(bs *Bitswap) {
bs.engineScoreLedger = scoreLedger
}
}
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
func New(parent context.Context, network bsnet.BitSwapNetwork,
bstore blockstore.Blockstore, options ...Option) exchange.Interface {
// important to use provided parent context (since it may include important
// loggable data). It's probably not a good idea to allow bitswap to be
// coupled to the concerns of the dms3 daemon in this way.
//
// FIXME(btc) Now that bitswap manages itself using a process, it probably
// shouldn't accept a context anymore. Clients should probably use Close()
// exclusively. We should probably find another way to share logging data
ctx, cancelFunc := context.WithCancel(parent)
ctx = metrics.CtxSubScope(ctx, "bitswap")
dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
" data blocks recived").Histogram(metricsBuckets)
allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
" data blocks recived").Histogram(metricsBuckets)
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)
px := process.WithTeardown(func() error {
return nil
})
// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
var sm *bssm.SessionManager
onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) {
// Simulate a message arriving with DONT_HAVEs
sm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
}
sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pqm := bspqm.New(ctx, network)
sessionFactory := func(
sessctx context.Context,
sessmgr bssession.SessionManager,
id uint64,
spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
}
notif := notifications.New()
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
bs := &Bitswap{
blockstore: bstore,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
engineBstoreWorkerCount: defaulEngineBlockstoreWorkerCount,
}
// apply functional options before starting and running bitswap
for _, option := range options {
option(bs)
}
// Set up decision engine
bs.engine = decision.NewEngine(bstore, bs.engineBstoreWorkerCount, network.ConnectionManager(), network.Self(), bs.engineScoreLedger)
bs.pqm.Startup()
network.SetDelegate(bs)
// Start up bitswaps async worker routines
bs.startWorkers(ctx, px)
bs.engine.StartWorkers(ctx, px)
// bind the context and process.
// do it over here to avoid closing before all setup is done.
go func() {
<-px.Closing() // process closes first
sm.Shutdown()
cancelFunc()
notif.Shutdown()
}()
procctx.CloseAfterContext(px, ctx) // parent cancelled first
return bs
}
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
pm *bspm.PeerManager
// the provider query manager manages requests to find providers
pqm *bspqm.ProviderQueryManager
// the engine is the bit of logic that decides who to send which blocks to
engine *decision.Engine
// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork
// blockstore is the local database
// NB: ensure threadsafety
blockstore blockstore.Blockstore
// manages channels of outgoing blocks for sessions
notif notifications.PubSub
// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan cid.Cid
// provideKeys directly feeds provide workers
provideKeys chan cid.Cid
process process.Process
// Counters for various statistics
counterLk sync.Mutex
counters *counters
// Metrics interface metrics
dupMetric metrics.Histogram
allMetric metrics.Histogram
sentHistogram metrics.Histogram
// External statistics interface
wiretap WireTap
// the SessionManager routes requests to interested sessions
sm *bssm.SessionManager
// the SessionInterestManager keeps track of which sessions are interested
// in which CIDs
sim *bssim.SessionInterestManager
// whether or not to make provide announcements
provideEnabled bool
// how long to wait before looking for providers in a session
provSearchDelay time.Duration
// how often to rebroadcast providing requests to find more optimized providers
rebroadcastDelay delay.D
// how many worker threads to start for decision engine blockstore worker
engineBstoreWorkerCount int
// the score ledger used by the decision engine
engineScoreLedger deciface.ScoreLedger
}
type counters struct {
blocksRecvd uint64
dupBlocksRecvd uint64
dupDataRecvd uint64
blocksSent uint64
dataSent uint64
dataRecvd uint64
messagesRecvd uint64
}
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
}
// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
var out []cid.Cid
for _, e := range bs.engine.WantlistForPeer(p) {
out = append(out, e.Cid)
}
return out
}
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
return bs.engine.LedgerForPeer(p)
}
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
return session.GetBlocks(ctx, keys)
}
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil)
}
// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}
wanted := blks
// If blocks came from the network
if from != "" {
var notWanted []blocks.Block
wanted, notWanted = bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}
}
// Put wanted blocks into blockstore
if len(wanted) > 0 {
err := bs.blockstore.PutMany(wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
}
}
// NOTE: There exists the possiblity for a race condition here. If a user
// creates a node, then adds it to the dagservice while another goroutine
// is waiting on a GetBlock for that object, they will receive a reference
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isnt causing immediate problems.
allKs := make([]cid.Cid, 0, len(blks))
for _, b := range blks {
allKs = append(allKs, b.Cid())
}
// If the message came from the network
if from != "" {
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)
}
// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)
// Send wanted blocks to decision engine
bs.engine.ReceiveFrom(from, wanted, haves)
// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
for _, b := range wanted {
bs.notif.Publish(b)
}
// If the reprovider is enabled, send wanted blocks to reprovider
if bs.provideEnabled {
for _, blk := range wanted {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}
if from != "" {
for _, b := range wanted {
log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid())
}
}
return nil
}
// ReceiveMessage is called by the network interface when a new message is
// received.
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
bs.counterLk.Lock()
bs.counters.messagesRecvd++
bs.counterLk.Unlock()
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(ctx, p, incoming)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger
if bs.wiretap != nil {
bs.wiretap.MessageReceived(p, incoming)
}
iblocks := incoming.Blocks()
if len(iblocks) > 0 {
bs.updateReceiveCounters(iblocks)
for _, b := range iblocks {
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
}
}
haves := incoming.Haves()
dontHaves := incoming.DontHaves()
if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 {
// Process blocks
err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves)
if err != nil {
log.Warnf("ReceiveMessage recvBlockFrom error: %s", err)
return
}
}
}
func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {
// Check which blocks are in the datastore
// (Note: any errors from the blockstore are simply logged out in
// blockstoreHas())
blocksHas := bs.blockstoreHas(blocks)
bs.counterLk.Lock()
defer bs.counterLk.Unlock()
// Do some accounting for each block
for i, b := range blocks {
has := blocksHas[i]
blkLen := len(b.RawData())
bs.allMetric.Observe(float64(blkLen))
if has {
bs.dupMetric.Observe(float64(blkLen))
}
c := bs.counters
c.blocksRecvd++
c.dataRecvd += uint64(blkLen)
if has {
c.dupBlocksRecvd++
c.dupDataRecvd += uint64(blkLen)
}
}
}
func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
res := make([]bool, len(blks))
wg := sync.WaitGroup{}
for i, block := range blks {
wg.Add(1)
go func(i int, b blocks.Block) {
defer wg.Done()
has, err := bs.blockstore.Has(b.Cid())
if err != nil {
log.Infof("blockstore.Has error: %s", err)
has = false
}
res[i] = has
}(i, block)
}
wg.Wait()
return res
}
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.pm.Connected(p)
bs.engine.PeerConnected(p)
}
// PeerDisconnected is called by the network interface when a peer
// closes a connection
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.pm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}
// ReceiveError is called by the network interface when an error happens
// at the network layer. Currently just logs error.
func (bs *Bitswap) ReceiveError(err error) {
log.Infof("Bitswap ReceiveError: %s", err)
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}
// Close is called to shutdown Bitswap
func (bs *Bitswap) Close() error {
return bs.process.Close()
}
// GetWantlist returns the current local wantlist (both want-blocks and
// want-haves).
func (bs *Bitswap) GetWantlist() []cid.Cid {
return bs.pm.CurrentWants()
}
// GetWantBlocks returns the current list of want-blocks.
func (bs *Bitswap) GetWantBlocks() []cid.Cid {
return bs.pm.CurrentWantBlocks()
}
// GetWanthaves returns the current list of want-haves.
func (bs *Bitswap) GetWantHaves() []cid.Cid {
return bs.pm.CurrentWantHaves()
}
// IsOnline is needed to match go-dms3-exchange-interface
func (bs *Bitswap) IsOnline() bool {
return true
}
// NewSession generates a new Bitswap session. You should use this, rather
// that calling Bitswap.GetBlocks, any time you intend to do several related
// block requests in a row. The session returned will have it's own GetBlocks
// method, but the session will use the fact that the requests are related to
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
}
This diff is collapsed.
package bitswap_test
import (
"context"
"fmt"
"testing"
"time"
bitswap "gitlab.dms3.io/dms3/go-bitswap"
bssession "gitlab.dms3.io/dms3/go-bitswap/internal/session"
testinstance "gitlab.dms3.io/dms3/go-bitswap/testinstance"
tn "gitlab.dms3.io/dms3/go-bitswap/testnet"
blocks "gitlab.dms3.io/dms3/go-block-format"
cid "gitlab.dms3.io/dms3/go-cid"
blocksutil "gitlab.dms3.io/dms3/go-dms3-blocksutil"
delay "gitlab.dms3.io/dms3/go-dms3-delay"
mockrouting "gitlab.dms3.io/dms3/go-dms3-routing/mock"
tu "gitlab.dms3.io/p2p/go-p2p-testing/etc"
)
func TestBasicSessions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
block := bgen.Next()
inst := ig.Instances(2)
a := inst[0]
b := inst[1]
// Add a block to Peer B
if err := b.Blockstore().Put(block); err != nil {
t.Fatal(err)
}
// Create a session on Peer A
sesa := a.Exchange.NewSession(ctx)
// Get the block
blkout, err := sesa.GetBlock(ctx, block.Cid())
if err != nil {
t.Fatal(err)
}
if !blkout.Cid().Equals(block.Cid()) {
t.Fatal("got wrong block")
}
}
func assertBlockLists(got, exp []blocks.Block) error {
if len(got) != len(exp) {
return fmt.Errorf("got wrong number of blocks, %d != %d", len(got), len(exp))
}
h := cid.NewSet()
for _, b := range got {
h.Add(b.Cid())
}
for _, b := range exp {
if !h.Has(b.Cid()) {
return fmt.Errorf("didnt have: %s", b.Cid())
}
}
return nil
}
func TestSessionBetweenPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
inst := ig.Instances(10)
// Add 101 blocks to Peer A
blks := bgen.Blocks(101)
if err := inst[0].Blockstore().PutMany(blks); err != nil {
t.Fatal(err)
}
var cids []cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}
// Create a session on Peer B
ses := inst[1].Exchange.NewSession(ctx)
if _, err := ses.GetBlock(ctx, cids[0]); err != nil {
t.Fatal(err)
}
blks = blks[1:]
cids = cids[1:]
// Fetch blocks with the session, 10 at a time
for i := 0; i < 10; i++ {
ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10])
if err != nil {
t.Fatal(err)
}
var got []blocks.Block
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
// Uninvolved nodes should receive
// - initial broadcast want-have of root block
// - CANCEL (when Peer A receives the root block from Peer B)
for _, is := range inst[2:] {
stat, err := is.Exchange.Stat()
if err != nil {
t.Fatal(err)
}
if stat.MessagesReceived > 2 {
t.Fatal("uninvolved nodes should only receive two messages", stat.MessagesReceived)
}
}
}
func TestSessionSplitFetch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
inst := ig.Instances(11)
// Add 10 distinct blocks to each of 10 peers
blks := bgen.Blocks(100)
for i := 0; i < 10; i++ {
if err := inst[i].Blockstore().PutMany(blks[i*10 : (i+1)*10]); err != nil {
t.Fatal(err)
}
}
var cids []cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}
// Create a session on the remaining peer and fetch all the blocks 10 at a time
ses := inst[10].Exchange.NewSession(ctx).(*bssession.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)
for i := 0; i < 10; i++ {
ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10])
if err != nil {
t.Fatal(err)
}
var got []blocks.Block
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
}
func TestFetchNotConnected(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
other := ig.Next()
// Provide 10 blocks on Peer A
blks := bgen.Blocks(10)
for _, block := range blks {
if err := other.Exchange.HasBlock(block); err != nil {
t.Fatal(err)
}
}
var cids []cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}
// Request blocks with Peer B
// Note: Peer A and Peer B are not initially connected, so this tests
// that Peer B will search for and find Peer A
thisNode := ig.Next()
ses := thisNode.Exchange.NewSession(ctx).(*bssession.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)
ch, err := ses.GetBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
var got []blocks.Block
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks); err != nil {
t.Fatal(err)
}
}
func TestFetchAfterDisconnect(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{
bitswap.ProviderSearchDelay(10 * time.Millisecond),
bitswap.RebroadcastDelay(delay.Fixed(15 * time.Millisecond)),
})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
inst := ig.Instances(2)
peerA := inst[0]
peerB := inst[1]
// Provide 5 blocks on Peer A
blks := bgen.Blocks(10)
var cids []cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}
firstBlks := blks[:5]
for _, block := range firstBlks {
if err := peerA.Exchange.HasBlock(block); err != nil {
t.Fatal(err)
}
}
// Request all blocks with Peer B
ses := peerB.Exchange.NewSession(ctx).(*bssession.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)
ch, err := ses.GetBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
// Should get first 5 blocks
var got []blocks.Block
for i := 0; i < 5; i++ {
b := <-ch
got = append(got, b)
}
if err := assertBlockLists(got, blks[:5]); err != nil {
t.Fatal(err)
}
// Break connection
err = peerA.Adapter.DisconnectFrom(ctx, peerB.Peer)
if err != nil {
t.Fatal(err)
}
time.Sleep(20 * time.Millisecond)
// Provide remaining blocks
lastBlks := blks[5:]
for _, block := range lastBlks {
if err := peerA.Exchange.HasBlock(block); err != nil {
t.Fatal(err)
}
}
// Peer B should call FindProviders() and find Peer A
// Should get last 5 blocks
for i := 0; i < 5; i++ {
select {
case b := <-ch:
got = append(got, b)
case <-ctx.Done():
}
}
if err := assertBlockLists(got, blks); err != nil {
t.Fatal(err)
}
}
func TestInterestCacheOverflow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
blks := bgen.Blocks(2049)
inst := ig.Instances(2)
a := inst[0]
b := inst[1]
ses := a.Exchange.NewSession(ctx)
zeroch, err := ses.GetBlocks(ctx, []cid.Cid{blks[0].Cid()})
if err != nil {
t.Fatal(err)
}
var restcids []cid.Cid
for _, blk := range blks[1:] {
restcids = append(restcids, blk.Cid())
}
restch, err := ses.GetBlocks(ctx, restcids)
if err != nil {
t.Fatal(err)
}
// wait to ensure that all the above cids were added to the sessions cache
time.Sleep(time.Millisecond * 50)
if err := b.Exchange.HasBlock(blks[0]); err != nil {
t.Fatal(err)
}
select {
case blk, ok := <-zeroch:
if ok && blk.Cid().Equals(blks[0].Cid()) {
// success!
} else {
t.Fatal("failed to get the block")
}
case <-restch:
t.Fatal("should not get anything on restch")
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for block")
}
}
func TestPutAfterSessionCacheEvict(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
blks := bgen.Blocks(2500)
inst := ig.Instances(1)
a := inst[0]
ses := a.Exchange.NewSession(ctx)
var allcids []cid.Cid
for _, blk := range blks[1:] {
allcids = append(allcids, blk.Cid())
}
blkch, err := ses.GetBlocks(ctx, allcids)
if err != nil {
t.Fatal(err)
}
// wait to ensure that all the above cids were added to the sessions cache
time.Sleep(time.Millisecond * 50)
if err := a.Exchange.HasBlock(blks[17]); err != nil {
t.Fatal(err)
}
select {
case <-blkch:
case <-time.After(time.Millisecond * 50):
t.Fatal("timed out waiting for block")
}
}
func TestMultipleSessions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
blk := bgen.Blocks(1)[0]
inst := ig.Instances(2)
a := inst[0]
b := inst[1]
ctx1, cancel1 := context.WithCancel(ctx)
ses := a.Exchange.NewSession(ctx1)
blkch, err := ses.GetBlocks(ctx, []cid.Cid{blk.Cid()})
if err != nil {
t.Fatal(err)
}
cancel1()
ses2 := a.Exchange.NewSession(ctx)
blkch2, err := ses2.GetBlocks(ctx, []cid.Cid{blk.Cid()})
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 10)
if err := b.Exchange.HasBlock(blk); err != nil {
t.Fatal(err)
}
select {
case <-blkch2:
case <-time.After(time.Second * 20):
t.Fatal("bad juju")
}
_ = blkch
}
func TestWantlistClearsOnCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
blks := bgen.Blocks(10)
var cids []cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}
inst := ig.Instances(1)
a := inst[0]
ctx1, cancel1 := context.WithCancel(ctx)
ses := a.Exchange.NewSession(ctx1)
_, err := ses.GetBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
cancel1()
if err := tu.WaitFor(ctx, func() error {
if len(a.Exchange.GetWantlist()) > 0 {
return fmt.Errorf("expected empty wantlist")
}
return nil
}); err != nil {
t.Fatal(err)
}
}
package decision
import intdec "gitlab.dms3.io/dms3/go-bitswap/internal/decision"
// Expose Receipt externally
type Receipt = intdec.Receipt
// Expose ScoreLedger externally
type ScoreLedger = intdec.ScoreLedger
// Expose ScorePeerFunc externally
type ScorePeerFunc = intdec.ScorePeerFunc
@startuml Bitswap Components
node "Top Level Interface" {
[Bitswap]
}
node "Sending Blocks" {
[Bitswap] --* [Engine]
[Engine] -left-* [Ledger]
[Engine] -right-* [PeerTaskQueue]
[Engine] --> [TaskWorker (workers.go)]
}
node "Providing" {
[Bitswap] --* [Provide Collector (workers.go)]
[Provide Collector (workers.go)] --* [Provide Worker (workers.go)]
}
node "Finding Providers" {
[Bitswap] --* [ProvideQueryManager]
}
node "Sessions (smart requests)" {
[Bitswap] --* [SessionManager]
[SessionManager] --> [SessionInterestManager]
[SessionManager] --o [Session]
[SessionManager] --> [BlockPresenceManager]
[Session] --* [sessionWantSender]
[Session] --* [SessionPeerManager]
[Session] --> [ProvideQueryManager]
[Session] --* [sessionWants]
[Session] --> [SessionInterestManager]
[sessionWantSender] --> [BlockPresenceManager]
}
node "Requesting Blocks" {
[SessionManager] --> [PeerManager]
[sessionWantSender] --> [PeerManager]
[PeerManager] --* [MessageQueue]
}
node "Network" {
[BitSwapNetwork]
[MessageQueue] --> [BitSwapNetwork]
[ProvideQueryManager] --> [BitSwapNetwork]
[TaskWorker (workers.go)] --> [BitSwapNetwork]
[Provide Worker (workers.go)] --> [BitSwapNetwork]
}
@enduml
\ No newline at end of file
How Bitswap Works
=================
When a client requests blocks, Bitswap sends the CID of those blocks to its peers as "wants". When Bitswap receives a "want" from a peer, it responds with the corresponding block.
### Requesting Blocks
#### Sessions
Bitswap Sessions allow the client to make related requests to the same group of peers. For example typically requests to fetch all the blocks in a file would be made with a single session.
#### Discovery
To discover which peers have a block, Bitswap broadcasts a `want-have` message to all peers it is connected to asking if they have the block.
Any peers that have the block respond with a `HAVE` message. They are added to the Session.
If no connected peers have the block, Bitswap queries the DHT to find peers that have the block.
### Wants
When the client requests a block, Bitswap sends a `want-have` message with the block CID to all peers in the Session to ask who has the block.
Bitswap simultaneously sends a `want-block` message to one of the peers in the Session to request the block. If the peer does not have the block, it responds with a `DONT_HAVE` message. In that case Bitswap selects another peer and sends the `want-block` to that peer.
If no peers have the block, Bitswap broadcasts a `want-have` to all connected peers, and queries the DHT to find peers that have the block.
#### Peer Selection
Bitswap uses a probabilistic algorithm to select which peer to send `want-block` to, favouring peers that
- sent `HAVE` for the block
- were discovered as providers of the block in the DHT
- were first to send blocks to previous session requests
The selection algorithm includes some randomness so as to allow peers that are discovered later, but are more responsive, to rise in the ranking.
#### Periodic Search Widening
Periodically the Bitswap Session selects a random CID from the list of "pending wants" (wants that have been sent but for which no block has been received). Bitswap broadcasts a `want-have` to all connected peers and queries the DHT for the CID.
### Serving Blocks
#### Processing Requests
When Bitswap receives a `want-have` it checks if the block is in the local blockstore.
If the block is in the local blockstore Bitswap responds with `HAVE`. If the block is small Bitswap sends the block itself instead of `HAVE`.
If the block is not in the local blockstore, Bitswap checks the `send-dont-have` flag on the request. If `send-dont-have` is true, Bitswap sends `DONT_HAVE`. Otherwise it does not respond.
#### Processing Incoming Blocks
When Bitswap receives a block, it checks to see if any peers sent `want-have` or `want-block` for the block. If so it sends `HAVE` or the block itself to those peers.
#### Priority
Bitswap keeps requests from each peer in separate queues, ordered by the priority specified in the request message.
To select which peer to send the next response to, Bitswap chooses the peer with the least amount of data in its send queue. That way it will tend to "keep peers busy" by always keeping some data in each peer's send queue.
Implementation
==============
![Bitswap Components](./go-bitswap.png)
### Bitswap
The Bitswap class receives incoming messages and implements the Exchange API.
When a message is received, Bitswap
- Records some statistics about the message
- Informs the Engine of any new wants
So that the Engine can send responses to the wants
- Informs the Engine of any received blocks
So that the Engine can send the received blocks to any peers that want them
- Informs the SessionManager of received blocks, HAVEs and DONT_HAVEs
So that the SessionManager can inform interested sessions
When the client makes an API call, Bitswap creates a new Session and calls the corresponding method (eg `GetBlocks()`).
### Sending Blocks
When the Engine is informed of new wants it
- Adds the wants to the Ledger (peer A wants block with CID Qmhash...)
- Checks the blockstore for the corresponding blocks, and adds a task to the PeerTaskQueue
- If the blockstore does not have a wanted block, adds a `DONT_HAVE` task
- If the blockstore has the block
- for a `want-have` adds a `HAVE` task
- for a `want-block` adds a `block` task
When the Engine is informed of new blocks it checks the Ledger to see if any peers want information about those blocks.
- For each block
- For each peer that sent a `want-have` for the corresponding block
Adds a `HAVE` task to the PeerTaskQueue
- For each peer that sent a `want-block` for the corresponding block
Adds a `block` task to the PeerTaskQueue
The Engine periodically pops tasks off the PeerTaskQueue, and creates a message with `blocks`, `HAVEs` and `DONT_HAVEs`.
The PeerTaskQueue prioritizes tasks such that the peers with the least amount of data in their send queue are highest priority, so as to "keep peers busy".
### Requesting Blocks
When the SessionManager is informed of a new message, it
- informs the BlockPresenceManager
The BlockPresenceManager keeps track of which peers have sent HAVES and DONT_HAVEs for each block
- informs the Sessions that are interested in the received blocks and wants
- informs the PeerManager of received blocks
The PeerManager checks if any wants were send to a peer for the received blocks. If so it sends a `CANCEL` message to those peers.
### Sessions
The Session starts in "discovery" mode. This means it doesn't have any peers yet, and needs to discover which peers have the blocks it wants.
When the client initially requests blocks from a Session, the Session
- informs the SessionInterestManager that it is interested in the want
- informs the sessionWantManager of the want
- tells the PeerManager to broadcast a `want-have` to all connected peers so as to discover which peers have the block
- queries the ProviderQueryManager to discover which peers have the block
When the session receives a message with `HAVE` or a `block`, it informs the SessionPeerManager. The SessionPeerManager keeps track of all peers in the session.
When the session receives a message with a `block` it informs the SessionInterestManager.
Once the session has peers it is no longer in "discovery" mode. When the client requests subsequent blocks the Session informs the sessionWantSender. The sessionWantSender tells the PeerManager to send `want-have` and `want-block` to peers in the session.
For each block that the Session wants, the sessionWantSender decides which peer is most likely to have a block by checking with the BlockPresenceManager which peers have sent a `HAVE` for the block. If no peers or multiple peers have sent `HAVE`, a peer is chosen probabilistically according to which how many times each peer was first to send a block in response to previous wants requested by the Session. The sessionWantSender sends a single "optimistic" `want-block` to the chosen peer, and sends `want-have` to all other peers in the Session.
When a peer responds with `DONT_HAVE`, the Session sends `want-block` to the next best peer, and so on until the block is received.
### PeerManager
The PeerManager creates a MessageQueue for each peer that connects to Bitswap. It remembers which `want-have` / `want-block` has been sent to each peer, and directs any new wants to the correct peer.
The MessageQueue groups together wants into a message, and sends the message to the peer. It monitors for timeouts and simulates a `DONT_HAVE` response if a peer takes too long to respond.
### Finding Providers
When bitswap can't find a connected peer who already has the block it wants, it falls back to querying a content routing system (a DHT in IPFS's case) to try to locate a peer with the block.
Bitswap routes these requests through the ProviderQueryManager system, which rate-limits these requests and also deduplicates in-process requests.
### Providing
As a bitswap client receives blocks, by default it announces them on the provided content routing system (again, a DHT in most cases). This behaviour can be disabled by passing `bitswap.ProvideEnabled(false)` as a parameter when initializing Bitswap. IPFS currently has its own experimental provider system ([go-ipfs-provider](https://github.com/ipfs/go-ipfs-provider)) which will eventually replace Bitswap's system entirely.
This diff is collapsed.
package blockpresencemanager
import (
"sync"
cid "gitlab.dms3.io/dms3/go-cid"
peer "gitlab.dms3.io/p2p/go-p2p-core/peer"
)
// BlockPresenceManager keeps track of which peers have indicated that they
// have or explicitly don't have a block
type BlockPresenceManager struct {
sync.RWMutex
presence map[cid.Cid]map[peer.ID]bool
}
func New() *BlockPresenceManager {
return &BlockPresenceManager{
presence: make(map[cid.Cid]map[peer.ID]bool),
}
}
// ReceiveFrom is called when a peer sends us information about which blocks
// it has and does not have
func (bpm *BlockPresenceManager) ReceiveFrom(p peer.ID, haves []cid.Cid, dontHaves []cid.Cid) {
bpm.Lock()
defer bpm.Unlock()
for _, c := range haves {
bpm.updateBlockPresence(p, c, true)
}
for _, c := range dontHaves {
bpm.updateBlockPresence(p, c, false)
}
}
func (bpm *BlockPresenceManager) updateBlockPresence(p peer.ID, c cid.Cid, present bool) {
_, ok := bpm.presence[c]
if !ok {
bpm.presence[c] = make(map[peer.ID]bool)
}
// Make sure not to change HAVE to DONT_HAVE
has, pok := bpm.presence[c][p]
if pok && has {
return
}
bpm.presence[c][p] = present
}
// PeerHasBlock indicates whether the given peer has sent a HAVE for the given
// cid
func (bpm *BlockPresenceManager) PeerHasBlock(p peer.ID, c cid.Cid) bool {
bpm.RLock()
defer bpm.RUnlock()
return bpm.presence[c][p]
}
// PeerDoesNotHaveBlock indicates whether the given peer has sent a DONT_HAVE
// for the given cid
func (bpm *BlockPresenceManager) PeerDoesNotHaveBlock(p peer.ID, c cid.Cid) bool {
bpm.RLock()
defer bpm.RUnlock()
have, known := bpm.presence[c][p]
return known && !have
}
// Filters the keys such that all the given peers have received a DONT_HAVE
// for a key.
// This allows us to know if we've exhausted all possibilities of finding
// the key with the peers we know about.
func (bpm *BlockPresenceManager) AllPeersDoNotHaveBlock(peers []peer.ID, ks []cid.Cid) []cid.Cid {
bpm.RLock()
defer bpm.RUnlock()
var res []cid.Cid
for _, c := range ks {
if bpm.allDontHave(peers, c) {
res = append(res, c)
}
}
return res
}
func (bpm *BlockPresenceManager) allDontHave(peers []peer.ID, c cid.Cid) bool {
// Check if we know anything about the cid's block presence
ps, cok := bpm.presence[c]
if !cok {
return false
}
// Check if we explicitly know that all the given peers do not have the cid
for _, p := range peers {
if has, pok := ps[p]; !pok || has {
return false
}
}
return true
}
// RemoveKeys cleans up the given keys from the block presence map
func (bpm *BlockPresenceManager) RemoveKeys(ks []cid.Cid) {
bpm.Lock()
defer bpm.Unlock()
for _, c := range ks {
delete(bpm.presence, c)
}
}
// HasKey indicates whether the BlockPresenceManager is tracking the given key
// (used by the tests)
func (bpm *BlockPresenceManager) HasKey(c cid.Cid) bool {
bpm.Lock()
defer bpm.Unlock()
_, ok := bpm.presence[c]
return ok
}
package blockpresencemanager
import (
"fmt"
"testing"
"gitlab.dms3.io/dms3/go-bitswap/internal/testutil"
peer "gitlab.dms3.io/p2p/go-p2p-core/peer"
cid "gitlab.dms3.io/dms3/go-cid"
)
const (
expHasFalseMsg = "Expected PeerHasBlock to return false"
expHasTrueMsg = "Expected PeerHasBlock to return true"
expDoesNotHaveFalseMsg = "Expected PeerDoesNotHaveBlock to return false"
expDoesNotHaveTrueMsg = "Expected PeerDoesNotHaveBlock to return true"
)
func TestBlockPresenceManager(t *testing.T) {
bpm := New()
p := testutil.GeneratePeers(1)[0]
cids := testutil.GenerateCids(2)
c0 := cids[0]
c1 := cids[1]
// Nothing stored yet, both PeerHasBlock and PeerDoesNotHaveBlock should
// return false
if bpm.PeerHasBlock(p, c0) {
t.Fatal(expHasFalseMsg)
}
if bpm.PeerDoesNotHaveBlock(p, c0) {
t.Fatal(expDoesNotHaveFalseMsg)
}
// HAVE cid0 / DONT_HAVE cid1
bpm.ReceiveFrom(p, []cid.Cid{c0}, []cid.Cid{c1})
// Peer has received HAVE for cid0
if !bpm.PeerHasBlock(p, c0) {
t.Fatal(expHasTrueMsg)
}
if bpm.PeerDoesNotHaveBlock(p, c0) {
t.Fatal(expDoesNotHaveFalseMsg)
}
// Peer has received DONT_HAVE for cid1
if !bpm.PeerDoesNotHaveBlock(p, c1) {
t.Fatal(expDoesNotHaveTrueMsg)
}
if bpm.PeerHasBlock(p, c1) {
t.Fatal(expHasFalseMsg)
}
// HAVE cid1 / DONT_HAVE cid0
bpm.ReceiveFrom(p, []cid.Cid{c1}, []cid.Cid{c0})
// DONT_HAVE cid0 should NOT over-write earlier HAVE cid0
if bpm.PeerDoesNotHaveBlock(p, c0) {
t.Fatal(expDoesNotHaveFalseMsg)
}
if !bpm.PeerHasBlock(p, c0) {
t.Fatal(expHasTrueMsg)
}
// HAVE cid1 should over-write earlier DONT_HAVE cid1
if !bpm.PeerHasBlock(p, c1) {
t.Fatal(expHasTrueMsg)
}
if bpm.PeerDoesNotHaveBlock(p, c1) {
t.Fatal(expDoesNotHaveFalseMsg)
}
// Remove cid0
bpm.RemoveKeys([]cid.Cid{c0})
// Nothing stored, both PeerHasBlock and PeerDoesNotHaveBlock should
// return false
if bpm.PeerHasBlock(p, c0) {
t.Fatal(expHasFalseMsg)
}
if bpm.PeerDoesNotHaveBlock(p, c0) {
t.Fatal(expDoesNotHaveFalseMsg)
}
// Remove cid1
bpm.RemoveKeys([]cid.Cid{c1})
// Nothing stored, both PeerHasBlock and PeerDoesNotHaveBlock should
// return false
if bpm.PeerHasBlock(p, c1) {
t.Fatal(expHasFalseMsg)
}
if bpm.PeerDoesNotHaveBlock(p, c1) {
t.Fatal(expDoesNotHaveFalseMsg)
}
}
func TestAddRemoveMulti(t *testing.T) {
bpm := New()
peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(3)
c0 := cids[0]
c1 := cids[1]
c2 := cids[2]
// p0: HAVE cid0, cid1 / DONT_HAVE cid1, cid2
// p1: HAVE cid1, cid2 / DONT_HAVE cid0
bpm.ReceiveFrom(p0, []cid.Cid{c0, c1}, []cid.Cid{c1, c2})
bpm.ReceiveFrom(p1, []cid.Cid{c1, c2}, []cid.Cid{c0})
// Peer 0 should end up with
// - HAVE cid0
// - HAVE cid1
// - DONT_HAVE cid2
if !bpm.PeerHasBlock(p0, c0) {
t.Fatal(expHasTrueMsg)
}
if !bpm.PeerHasBlock(p0, c1) {
t.Fatal(expHasTrueMsg)
}
if !bpm.PeerDoesNotHaveBlock(p0, c2) {
t.Fatal(expDoesNotHaveTrueMsg)
}
// Peer 1 should end up with
// - HAVE cid1
// - HAVE cid2
// - DONT_HAVE cid0
if !bpm.PeerHasBlock(p1, c1) {
t.Fatal(expHasTrueMsg)
}
if !bpm.PeerHasBlock(p1, c2) {
t.Fatal(expHasTrueMsg)
}
if !bpm.PeerDoesNotHaveBlock(p1, c0) {
t.Fatal(expDoesNotHaveTrueMsg)
}
// Remove cid1 and cid2. Should end up with
// Peer 0: HAVE cid0
// Peer 1: DONT_HAVE cid0
bpm.RemoveKeys([]cid.Cid{c1, c2})
if !bpm.PeerHasBlock(p0, c0) {
t.Fatal(expHasTrueMsg)
}
if !bpm.PeerDoesNotHaveBlock(p1, c0) {
t.Fatal(expDoesNotHaveTrueMsg)
}
// The other keys should have been cleared, so both HasBlock() and
// DoesNotHaveBlock() should return false
if bpm.PeerHasBlock(p0, c1) {
t.Fatal(expHasFalseMsg)
}
if bpm.PeerDoesNotHaveBlock(p0, c1) {
t.Fatal(expDoesNotHaveFalseMsg)
}
if bpm.PeerHasBlock(p0, c2) {
t.Fatal(expHasFalseMsg)
}
if bpm.PeerDoesNotHaveBlock(p0, c2) {
t.Fatal(expDoesNotHaveFalseMsg)
}
if bpm.PeerHasBlock(p1, c1) {
t.Fatal(expHasFalseMsg)
}
if bpm.PeerDoesNotHaveBlock(p1, c1) {
t.Fatal(expDoesNotHaveFalseMsg)
}
if bpm.PeerHasBlock(p1, c2) {
t.Fatal(expHasFalseMsg)
}
if bpm.PeerDoesNotHaveBlock(p1, c2) {
t.Fatal(expDoesNotHaveFalseMsg)
}
}
func TestAllPeersDoNotHaveBlock(t *testing.T) {
bpm := New()
peers := testutil.GeneratePeers(3)
p0 := peers[0]
p1 := peers[1]
p2 := peers[2]
cids := testutil.GenerateCids(3)
c0 := cids[0]
c1 := cids[1]
c2 := cids[2]
// c0 c1 c2
// p0 ? N N
// p1 N Y ?
// p2 Y Y N
bpm.ReceiveFrom(p0, []cid.Cid{}, []cid.Cid{c1, c2})
bpm.ReceiveFrom(p1, []cid.Cid{c1}, []cid.Cid{c0})
bpm.ReceiveFrom(p2, []cid.Cid{c0, c1}, []cid.Cid{c2})
type testcase struct {
peers []peer.ID
ks []cid.Cid
exp []cid.Cid
}
testcases := []testcase{
testcase{[]peer.ID{p0}, []cid.Cid{c0}, []cid.Cid{}},
testcase{[]peer.ID{p1}, []cid.Cid{c0}, []cid.Cid{c0}},
testcase{[]peer.ID{p2}, []cid.Cid{c0}, []cid.Cid{}},
testcase{[]peer.ID{p0}, []cid.Cid{c1}, []cid.Cid{c1}},
testcase{[]peer.ID{p1}, []cid.Cid{c1}, []cid.Cid{}},
testcase{[]peer.ID{p2}, []cid.Cid{c1}, []cid.Cid{}},
testcase{[]peer.ID{p0}, []cid.Cid{c2}, []cid.Cid{c2}},
testcase{[]peer.ID{p1}, []cid.Cid{c2}, []cid.Cid{}},
testcase{[]peer.ID{p2}, []cid.Cid{c2}, []cid.Cid{c2}},
// p0 recieved DONT_HAVE for c1 & c2 (but not for c0)
testcase{[]peer.ID{p0}, []cid.Cid{c0, c1, c2}, []cid.Cid{c1, c2}},
testcase{[]peer.ID{p0, p1}, []cid.Cid{c0, c1, c2}, []cid.Cid{}},
// Both p0 and p2 received DONT_HAVE for c2
testcase{[]peer.ID{p0, p2}, []cid.Cid{c0, c1, c2}, []cid.Cid{c2}},
testcase{[]peer.ID{p0, p1, p2}, []cid.Cid{c0, c1, c2}, []cid.Cid{}},
}
for i, tc := range testcases {
if !testutil.MatchKeysIgnoreOrder(
bpm.AllPeersDoNotHaveBlock(tc.peers, tc.ks),
tc.exp,
) {
t.Fatal(fmt.Sprintf("test case %d failed: expected matching keys", i))
}
}
}
package decision
import (
"context"
"fmt"
"sync"
process "github.com/jbenet/goprocess"
blocks "gitlab.dms3.io/dms3/go-block-format"
cid "gitlab.dms3.io/dms3/go-cid"
bstore "gitlab.dms3.io/dms3/go-dms3-blockstore"
)
// blockstoreManager maintains a pool of workers that make requests to the blockstore.
type blockstoreManager struct {
bs bstore.Blockstore
workerCount int
jobs chan func()
px process.Process
}
// newBlockstoreManager creates a new blockstoreManager with the given context
// and number of workers
func newBlockstoreManager(bs bstore.Blockstore, workerCount int) *blockstoreManager {
return &blockstoreManager{
bs: bs,
workerCount: workerCount,
jobs: make(chan func()),
px: process.WithTeardown(func() error { return nil }),
}
}
func (bsm *blockstoreManager) start(px process.Process) {
px.AddChild(bsm.px)
// Start up workers
for i := 0; i < bsm.workerCount; i++ {
bsm.px.Go(func(px process.Process) {
bsm.worker(px)
})
}
}
func (bsm *blockstoreManager) worker(px process.Process) {
for {
select {
case <-px.Closing():
return
case job := <-bsm.jobs:
job()
}
}
}
func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-bsm.px.Closing():
return fmt.Errorf("shutting down")
case bsm.jobs <- job:
return nil
}
}
func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) (map[cid.Cid]int, error) {
res := make(map[cid.Cid]int)
if len(ks) == 0 {
return res, nil
}
var lk sync.Mutex
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
size, err := bsm.bs.GetSize(c)
if err != nil {
if err != bstore.ErrNotFound {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.GetSize(%s) error: %s", c, err)
}
} else {
lk.Lock()
res[c] = size
lk.Unlock()
}
})
}
func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) {
res := make(map[cid.Cid]blocks.Block)
if len(ks) == 0 {
return res, nil
}
var lk sync.Mutex
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
blk, err := bsm.bs.Get(c)
if err != nil {
if err != bstore.ErrNotFound {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.Get(%s) error: %s", c, err)
}
} else {
lk.Lock()
res[c] = blk
lk.Unlock()
}
})
}
func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) error {
var err error
wg := sync.WaitGroup{}
for _, k := range ks {
c := k
wg.Add(1)
err = bsm.addJob(ctx, func() {
jobFn(c)
wg.Done()
})
if err != nil {
wg.Done()
break
}
}
wg.Wait()
return err
}
package decision
import (
"context"
"crypto/rand"
"sync"
"testing"
"time"
"gitlab.dms3.io/dms3/go-bitswap/internal/testutil"
cid "gitlab.dms3.io/dms3/go-cid"
process "github.com/jbenet/goprocess"
blocks "gitlab.dms3.io/dms3/go-block-format"
ds "gitlab.dms3.io/dms3/go-datastore"
"gitlab.dms3.io/dms3/go-datastore/delayed"
ds_sync "gitlab.dms3.io/dms3/go-datastore/sync"
blockstore "gitlab.dms3.io/dms3/go-dms3-blockstore"
delay "gitlab.dms3.io/dms3/go-dms3-delay"
)
func TestBlockstoreManagerNotFoundKey(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))
cids := testutil.GenerateCids(4)
sizes, err := bsm.getBlockSizes(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != 0 {
t.Fatal("Wrong response length")
}
for _, c := range cids {
if _, ok := sizes[c]; ok {
t.Fatal("Non-existent block should have no size")
}
}
blks, err := bsm.getBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(blks) != 0 {
t.Fatal("Wrong response length")
}
for _, c := range cids {
if _, ok := blks[c]; ok {
t.Fatal("Non-existent block should have no size")
}
}
}
func TestBlockstoreManager(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))
exp := make(map[cid.Cid]blocks.Block)
var blks []blocks.Block
for i := 0; i < 32; i++ {
buf := make([]byte, 1024*(i+1))
_, _ = rand.Read(buf)
b := blocks.NewBlock(buf)
blks = append(blks, b)
exp[b.Cid()] = b
}
// Put all blocks in the blockstore except the last one
if err := bstore.PutMany(blks[:len(blks)-1]); err != nil {
t.Fatal(err)
}
var cids []cid.Cid
for _, b := range blks {
cids = append(cids, b.Cid())
}
sizes, err := bsm.getBlockSizes(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != len(blks)-1 {
t.Fatal("Wrong response length")
}
for _, c := range cids {
expSize := len(exp[c].RawData())
size, ok := sizes[c]
// Only the last key should be missing
if c.Equals(cids[len(cids)-1]) {
if ok {
t.Fatal("Non-existent block should not be in sizes map")
}
} else {
if !ok {
t.Fatal("Block should be in sizes map")
}
if size != expSize {
t.Fatal("Block has wrong size")
}
}
}
fetched, err := bsm.getBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(fetched) != len(blks)-1 {
t.Fatal("Wrong response length")
}
for _, c := range cids {
blk, ok := fetched[c]
// Only the last key should be missing
if c.Equals(cids[len(cids)-1]) {
if ok {
t.Fatal("Non-existent block should not be in blocks map")
}
} else {
if !ok {
t.Fatal("Block should be in blocks map")
}
if !blk.Cid().Equals(c) {
t.Fatal("Block has wrong cid")
}
}
}
}
func TestBlockstoreManagerConcurrency(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
workerCount := 5
bsm := newBlockstoreManager(bstore, workerCount)
bsm.start(process.WithTeardown(func() error { return nil }))
blkSize := int64(8 * 1024)
blks := testutil.GenerateBlocksOfSize(32, blkSize)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
}
err := bstore.PutMany(blks)
if err != nil {
t.Fatal(err)
}
// Create more concurrent requests than the number of workers
wg := sync.WaitGroup{}
for i := 0; i < 16; i++ {
wg.Add(1)
go func(t *testing.T) {
defer wg.Done()
sizes, err := bsm.getBlockSizes(ctx, ks)
if err != nil {
t.Error(err)
}
if len(sizes) != len(blks) {
t.Error("Wrong response length")
}
}(t)
}
wg.Wait()
}
func TestBlockstoreManagerClose(t *testing.T) {
ctx := context.Background()
delayTime := 20 * time.Millisecond
bsdelay := delay.Fixed(delayTime)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(bstore, 3)
px := process.WithTeardown(func() error { return nil })
bsm.start(px)
blks := testutil.GenerateBlocksOfSize(10, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
}
err := bstore.PutMany(blks)
if err != nil {
t.Fatal(err)
}
go px.Close()
time.Sleep(5 * time.Millisecond)
before := time.Now()
_, err = bsm.getBlockSizes(ctx, ks)
if err == nil {
t.Error("expected an error")
}
// would expect to wait delayTime*10 if we didn't cancel.
if time.Since(before) > delayTime*2 {
t.Error("expected a fast timeout")
}
}
func TestBlockstoreManagerCtxDone(t *testing.T) {
delayTime := 20 * time.Millisecond
bsdelay := delay.Fixed(delayTime)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)
blks := testutil.GenerateBlocksOfSize(10, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
}
err := bstore.PutMany(blks)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), delayTime/2)
defer cancel()
before := time.Now()
_, err = bsm.getBlockSizes(ctx, ks)
if err == nil {
t.Error("expected an error")
}
// would expect to wait delayTime*10 if we didn't cancel.
if time.Since(before) > delayTime*2 {
t.Error("expected a fast timeout")
}
}
This diff is collapsed.
This diff is collapsed.
package decision
func ewma(old, new, alpha float64) float64 {
return new*alpha + (1-alpha)*old
}
package decision
import (
"sync"
pb "gitlab.dms3.io/dms3/go-bitswap/message/pb"
wl "gitlab.dms3.io/dms3/go-bitswap/wantlist"
cid "gitlab.dms3.io/dms3/go-cid"
peer "gitlab.dms3.io/p2p/go-p2p-core/peer"
)
func newLedger(p peer.ID) *ledger {
return &ledger{
wantList: wl.New(),
Partner: p,
}
}
// Keeps the wantlist for the partner. NOT threadsafe!
type ledger struct {
// Partner is the remote Peer.
Partner peer.ID
// wantList is a (bounded, small) set of keys that Partner desires.
wantList *wl.Wantlist
lk sync.RWMutex
}
func (l *ledger) Wants(k cid.Cid, priority int32, wantType pb.Message_Wantlist_WantType) {
log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList.Add(k, priority, wantType)
}
func (l *ledger) CancelWant(k cid.Cid) bool {
return l.wantList.Remove(k)
}
func (l *ledger) WantListContains(k cid.Cid) (wl.Entry, bool) {
return l.wantList.Contains(k)
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofaster_out=. $<
clean:
rm -f *.pb.go
rm -f *.go
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment