ipfs_impl.go 2.74 KB
Newer Older
1 2 3 4 5
package network

import (
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
6
	inet "github.com/jbenet/go-ipfs/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	peer "github.com/jbenet/go-ipfs/p2p/peer"
8
	routing "github.com/jbenet/go-ipfs/routing"
9
	util "github.com/jbenet/go-ipfs/util"
10 11
)

12
var log = util.Logger("bitswap_network")
Jeromy's avatar
Jeromy committed
13

14
// NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS
15
// Dialer & Service
16
func NewFromIpfsNetwork(n inet.Network, r routing.IpfsRouting) BitSwapNetwork {
17
	bitswapNetwork := impl{
18
		network: n,
19
		routing: r,
20
	}
21
	n.SetHandler(inet.ProtocolBitswap, bitswapNetwork.handleNewStream)
22
	return &bitswapNetwork
23 24
}

25 26
// impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface.
27
type impl struct {
28
	network inet.Network
29
	routing routing.IpfsRouting
30 31 32

	// inbound messages from the network are forwarded to the receiver
	receiver Receiver
33 34
}

35 36
func (bsnet *impl) DialPeer(ctx context.Context, p peer.ID) error {
	return bsnet.network.DialPeer(ctx, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37 38
}

39
func (bsnet *impl) SendMessage(
40
	ctx context.Context,
41
	p peer.ID,
42 43
	outgoing bsmsg.BitSwapMessage) error {

44
	s, err := bsnet.network.NewStream(inet.ProtocolBitswap, p)
45 46 47
	if err != nil {
		return err
	}
48 49 50
	defer s.Close()

	return outgoing.ToNet(s)
51 52
}

53
func (bsnet *impl) SendRequest(
54
	ctx context.Context,
55
	p peer.ID,
56 57
	outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {

58
	s, err := bsnet.network.NewStream(inet.ProtocolBitswap, p)
59 60 61
	if err != nil {
		return nil, err
	}
62 63 64
	defer s.Close()

	if err := outgoing.ToNet(s); err != nil {
65 66
		return nil, err
	}
67 68

	return bsmsg.FromNet(s)
69 70
}

71 72
func (bsnet *impl) SetDelegate(r Receiver) {
	bsnet.receiver = r
73
}
74

75
// FindProvidersAsync returns a channel of providers for the given key
76 77 78 79 80 81 82 83 84 85 86 87 88 89
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
	out := make(chan peer.ID)
	go func() {
		defer close(out)
		providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
		for info := range providers {
			bsnet.network.Peerstore().AddAddresses(info.ID, info.Addrs)
			select {
			case <-ctx.Done():
			case out <- info.ID:
			}
		}
	}()
	return out
90 91 92 93 94 95 96
}

// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k util.Key) error {
	return bsnet.routing.Provide(ctx, k)
}

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
// handleNewStream receives a new stream from the network.
func (bsnet *impl) handleNewStream(s inet.Stream) {

	if bsnet.receiver == nil {
		return
	}

	go func() {
		defer s.Close()

		received, err := bsmsg.FromNet(s)
		if err != nil {
			go bsnet.receiver.ReceiveError(err)
			return
		}

		p := s.Conn().RemotePeer()
		ctx := context.Background()
		bsnet.receiver.ReceiveMessage(ctx, p, received)
	}()

}