ipfs_impl.go 3.73 KB
Newer Older
1 2 3 4
package network

import (
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
5

6
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	host "github.com/jbenet/go-ipfs/p2p/host"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
	inet "github.com/jbenet/go-ipfs/p2p/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	peer "github.com/jbenet/go-ipfs/p2p/peer"
10
	routing "github.com/jbenet/go-ipfs/routing"
11
	util "github.com/jbenet/go-ipfs/util"
12
	eventlog "github.com/jbenet/go-ipfs/util/eventlog"
13 14
)

15
var log = eventlog.Logger("bitswap_network")
Jeromy's avatar
Jeromy committed
16

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host
func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork {
19
	bitswapNetwork := impl{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20
		host:    host,
21
		routing: r,
22
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
	host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream)
24
	return &bitswapNetwork
25 26
}

27 28
// impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface.
29
type impl struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30
	host    host.Host
31
	routing routing.IpfsRouting
32 33 34

	// inbound messages from the network are forwarded to the receiver
	receiver Receiver
35 36
}

37
func (bsnet *impl) SendMessage(
38
	ctx context.Context,
39
	p peer.ID,
40 41
	outgoing bsmsg.BitSwapMessage) error {

42 43
	log := log.Prefix("bitswap net SendMessage to %s", p)

44 45 46 47 48 49
	// ensure we're connected
	//TODO(jbenet) move this into host.NewStream?
	if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil {
		return err
	}

50
	log.Debug("opening stream")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51
	s, err := bsnet.host.NewStream(ProtocolBitswap, p)
52 53 54
	if err != nil {
		return err
	}
55 56
	defer s.Close()

57 58 59 60 61 62 63 64
	log.Debug("sending")
	if err := outgoing.ToNet(s); err != nil {
		log.Errorf("error: %s", err)
		return err
	}

	log.Debug("sent")
	return err
65 66
}

67
func (bsnet *impl) SendRequest(
68
	ctx context.Context,
69
	p peer.ID,
70 71
	outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {

72 73
	log := log.Prefix("bitswap net SendRequest to %s", p)

74 75 76 77 78 79
	// ensure we're connected
	//TODO(jbenet) move this into host.NewStream?
	if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil {
		return nil, err
	}

80
	log.Debug("opening stream")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
	s, err := bsnet.host.NewStream(ProtocolBitswap, p)
82 83 84
	if err != nil {
		return nil, err
	}
85 86
	defer s.Close()

87
	log.Debug("sending")
88
	if err := outgoing.ToNet(s); err != nil {
89
		log.Errorf("error: %s", err)
90 91
		return nil, err
	}
92

93 94 95 96 97 98 99 100 101
	log.Debug("sent, now receiveing")
	incoming, err := bsmsg.FromNet(s)
	if err != nil {
		log.Errorf("error: %s", err)
		return incoming, err
	}

	log.Debug("received")
	return incoming, nil
102 103
}

104 105
func (bsnet *impl) SetDelegate(r Receiver) {
	bsnet.receiver = r
106
}
107

108
// FindProvidersAsync returns a channel of providers for the given key
109 110 111 112 113 114
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
	out := make(chan peer.ID)
	go func() {
		defer close(out)
		providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
		for info := range providers {
115 116 117
			if info.ID != bsnet.host.ID() { // dont add addrs for ourselves.
				bsnet.host.Peerstore().AddAddresses(info.ID, info.Addrs)
			}
118 119
			select {
			case <-ctx.Done():
120
				return
121 122 123 124 125
			case out <- info.ID:
			}
		}
	}()
	return out
126 127 128 129 130 131 132
}

// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k util.Key) error {
	return bsnet.routing.Provide(ctx, k)
}

133 134
// handleNewStream receives a new stream from the network.
func (bsnet *impl) handleNewStream(s inet.Stream) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135
	defer s.Close()
136 137 138 139 140

	if bsnet.receiver == nil {
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141 142 143
	received, err := bsmsg.FromNet(s)
	if err != nil {
		go bsnet.receiver.ReceiveError(err)
144
		log.Errorf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145 146
		return
	}
147

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148 149
	p := s.Conn().RemotePeer()
	ctx := context.Background()
150
	log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151
	bsnet.receiver.ReceiveMessage(ctx, p, received)
152
}