ipfs_impl.go 2.84 KB
Newer Older
1 2 3 4 5
package network

import (
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	host "github.com/jbenet/go-ipfs/p2p/host"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	inet "github.com/jbenet/go-ipfs/p2p/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
	peer "github.com/jbenet/go-ipfs/p2p/peer"
9
	routing "github.com/jbenet/go-ipfs/routing"
10
	util "github.com/jbenet/go-ipfs/util"
11 12
)

13
var log = util.Logger("bitswap_network")
Jeromy's avatar
Jeromy committed
14

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host
func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork {
17
	bitswapNetwork := impl{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18
		host:    host,
19
		routing: r,
20
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
	host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream)
22
	return &bitswapNetwork
23 24
}

25 26
// impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface.
27
type impl struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
	host    host.Host
29
	routing routing.IpfsRouting
30 31 32

	// inbound messages from the network are forwarded to the receiver
	receiver Receiver
33 34
}

35
func (bsnet *impl) DialPeer(ctx context.Context, p peer.ID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36
	return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37 38
}

39
func (bsnet *impl) SendMessage(
40
	ctx context.Context,
41
	p peer.ID,
42 43
	outgoing bsmsg.BitSwapMessage) error {

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44
	s, err := bsnet.host.NewStream(ProtocolBitswap, p)
45 46 47
	if err != nil {
		return err
	}
48 49 50
	defer s.Close()

	return outgoing.ToNet(s)
51 52
}

53
func (bsnet *impl) SendRequest(
54
	ctx context.Context,
55
	p peer.ID,
56 57
	outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58
	log.Debugf("bsnet SendRequest to %s", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59
	s, err := bsnet.host.NewStream(ProtocolBitswap, p)
60 61 62
	if err != nil {
		return nil, err
	}
63 64 65
	defer s.Close()

	if err := outgoing.ToNet(s); err != nil {
66 67
		return nil, err
	}
68 69

	return bsmsg.FromNet(s)
70 71
}

72 73
func (bsnet *impl) SetDelegate(r Receiver) {
	bsnet.receiver = r
74
}
75

76
// FindProvidersAsync returns a channel of providers for the given key
77 78 79 80 81 82
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
	out := make(chan peer.ID)
	go func() {
		defer close(out)
		providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
		for info := range providers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83
			bsnet.host.Peerstore().AddAddresses(info.ID, info.Addrs)
84 85 86 87 88 89 90
			select {
			case <-ctx.Done():
			case out <- info.ID:
			}
		}
	}()
	return out
91 92 93 94 95 96 97
}

// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k util.Key) error {
	return bsnet.routing.Provide(ctx, k)
}

98 99
// handleNewStream receives a new stream from the network.
func (bsnet *impl) handleNewStream(s inet.Stream) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100
	defer s.Close()
101 102 103 104 105

	if bsnet.receiver == nil {
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106 107 108 109 110
	received, err := bsmsg.FromNet(s)
	if err != nil {
		go bsnet.receiver.ReceiveError(err)
		return
	}
111

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112 113 114 115
	p := s.Conn().RemotePeer()
	ctx := context.Background()
	log.Debugf("bsnet handleNewStream from %s", s.Conn().RemotePeer())
	bsnet.receiver.ReceiveMessage(ctx, p, received)
116
}