ipfs_impl.go 3.46 KB
Newer Older
1 2 3 4
package network

import (
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
5

6
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	host "github.com/jbenet/go-ipfs/p2p/host"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
	inet "github.com/jbenet/go-ipfs/p2p/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	peer "github.com/jbenet/go-ipfs/p2p/peer"
10
	routing "github.com/jbenet/go-ipfs/routing"
11
	util "github.com/jbenet/go-ipfs/util"
12
	eventlog "github.com/jbenet/go-ipfs/util/eventlog"
13 14
)

15
var log = eventlog.Logger("bitswap_network")
Jeromy's avatar
Jeromy committed
16

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host
func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork {
19
	bitswapNetwork := impl{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20
		host:    host,
21
		routing: r,
22
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
	host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream)
24
	return &bitswapNetwork
25 26
}

27 28
// impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface.
29
type impl struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30
	host    host.Host
31
	routing routing.IpfsRouting
32 33 34

	// inbound messages from the network are forwarded to the receiver
	receiver Receiver
35 36
}

37
func (bsnet *impl) DialPeer(ctx context.Context, p peer.ID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38
	return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40
}

41
func (bsnet *impl) SendMessage(
42
	ctx context.Context,
43
	p peer.ID,
44 45
	outgoing bsmsg.BitSwapMessage) error {

46 47 48
	log := log.Prefix("bitswap net SendMessage to %s", p)

	log.Debug("opening stream")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49
	s, err := bsnet.host.NewStream(ProtocolBitswap, p)
50 51 52
	if err != nil {
		return err
	}
53 54
	defer s.Close()

55 56 57 58 59 60 61 62
	log.Debug("sending")
	if err := outgoing.ToNet(s); err != nil {
		log.Errorf("error: %s", err)
		return err
	}

	log.Debug("sent")
	return err
63 64
}

65
func (bsnet *impl) SendRequest(
66
	ctx context.Context,
67
	p peer.ID,
68 69
	outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {

70 71 72
	log := log.Prefix("bitswap net SendRequest to %s", p)

	log.Debug("opening stream")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
	s, err := bsnet.host.NewStream(ProtocolBitswap, p)
74 75 76
	if err != nil {
		return nil, err
	}
77 78
	defer s.Close()

79
	log.Debug("sending")
80
	if err := outgoing.ToNet(s); err != nil {
81
		log.Errorf("error: %s", err)
82 83
		return nil, err
	}
84

85 86 87 88 89 90 91 92 93
	log.Debug("sent, now receiveing")
	incoming, err := bsmsg.FromNet(s)
	if err != nil {
		log.Errorf("error: %s", err)
		return incoming, err
	}

	log.Debug("received")
	return incoming, nil
94 95
}

96 97
func (bsnet *impl) SetDelegate(r Receiver) {
	bsnet.receiver = r
98
}
99

100
// FindProvidersAsync returns a channel of providers for the given key
101 102 103 104 105 106
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
	out := make(chan peer.ID)
	go func() {
		defer close(out)
		providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
		for info := range providers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107
			bsnet.host.Peerstore().AddAddresses(info.ID, info.Addrs)
108 109
			select {
			case <-ctx.Done():
110
				return
111 112 113 114 115
			case out <- info.ID:
			}
		}
	}()
	return out
116 117 118 119 120 121 122
}

// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k util.Key) error {
	return bsnet.routing.Provide(ctx, k)
}

123 124
// handleNewStream receives a new stream from the network.
func (bsnet *impl) handleNewStream(s inet.Stream) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125
	defer s.Close()
126 127 128 129 130

	if bsnet.receiver == nil {
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131 132 133
	received, err := bsmsg.FromNet(s)
	if err != nil {
		go bsnet.receiver.ReceiveError(err)
134
		log.Errorf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135 136
		return
	}
137

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138 139
	p := s.Conn().RemotePeer()
	ctx := context.Background()
140
	log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141
	bsnet.receiver.ReceiveMessage(ctx, p, received)
142
}