mux.go 5.28 KB
Newer Older
Jeromy's avatar
Jeromy committed
1
// package mux implements a protocol muxer.
2
package mux
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4 5

import (
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7

Jeromy's avatar
Jeromy committed
8
	conn "github.com/jbenet/go-ipfs/net/conn"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	msg "github.com/jbenet/go-ipfs/net/message"
10
	pb "github.com/jbenet/go-ipfs/net/mux/internal/pb"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13

14 15
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
var log = u.Logger("muxer")

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21
// ProtocolIDs used to identify each protocol.
// These should probably be defined elsewhere.
22 23 24 25 26 27
var (
	ProtocolID_Routing    = pb.ProtocolID_Routing
	ProtocolID_Exchange   = pb.ProtocolID_Exchange
	ProtocolID_Diagnostic = pb.ProtocolID_Diagnostic
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28 29 30 31 32 33 34 35 36
// Protocol objects produce + consume raw data. They are added to the Muxer
// with a ProtocolID, which is added to outgoing payloads. Muxer properly
// encapsulates and decapsulates when interfacing with its Protocols. The
// Protocols do not encounter their ProtocolID.
type Protocol interface {
	GetPipe() *msg.Pipe
}

// ProtocolMap maps ProtocolIDs to Protocols.
37
type ProtocolMap map[pb.ProtocolID]Protocol
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38 39 40 41 42 43 44 45

// Muxer is a simple multiplexor that reads + writes to Incoming and Outgoing
// channels. It multiplexes various protocols, wrapping and unwrapping data
// with a ProtocolID.
type Muxer struct {
	// Protocols are the multiplexed services.
	Protocols ProtocolMap

46
	bwiLock sync.Mutex
47
	bwIn    uint64
Jeromy's avatar
Jeromy committed
48
	msgIn   uint64
49

50
	bwoLock sync.Mutex
51
	bwOut   uint64
Jeromy's avatar
Jeromy committed
52
	msgOut  uint64
53

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54
	*msg.Pipe
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
	ctxc.ContextCloser
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
56 57
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58
// NewMuxer constructs a muxer given a protocol map.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59 60 61 62 63
func NewMuxer(ctx context.Context, mp ProtocolMap) *Muxer {
	m := &Muxer{
		Protocols:     mp,
		Pipe:          msg.NewPipe(10),
		ContextCloser: ctxc.NewContextCloser(ctx, nil),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66
	m.Children().Add(1)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
	go m.handleIncomingMessages()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68
	for pid, proto := range m.Protocols {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
		m.Children().Add(1)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
		go m.handleOutgoingMessages(pid, proto)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71 72
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
	return m
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74 75
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76 77 78 79 80
// GetPipe implements the Protocol interface
func (m *Muxer) GetPipe() *msg.Pipe {
	return m.Pipe
}

Jeromy's avatar
Jeromy committed
81 82 83 84 85 86 87 88 89 90 91 92
// GetMessageCounts return the in/out message count measured over this muxer.
func (m *Muxer) GetMessageCounts() (in uint64, out uint64) {
	m.bwiLock.Lock()
	in = m.msgIn
	m.bwiLock.Unlock()

	m.bwoLock.Lock()
	out = m.msgOut
	m.bwoLock.Unlock()
	return
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93
// GetBandwidthTotals return the in/out bandwidth measured over this muxer.
94
func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) {
95
	m.bwiLock.Lock()
96
	in = m.bwIn
97
	m.bwiLock.Unlock()
98

99
	m.bwoLock.Lock()
100
	out = m.bwOut
101
	m.bwoLock.Unlock()
102 103 104
	return
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105
// AddProtocol adds a Protocol with given ProtocolID to the Muxer.
106
func (m *Muxer) AddProtocol(p Protocol, pid pb.ProtocolID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107 108 109 110 111 112 113 114 115 116
	if _, found := m.Protocols[pid]; found {
		return errors.New("Another protocol already using this ProtocolID")
	}

	m.Protocols[pid] = p
	return nil
}

// handleIncoming consumes the messages on the m.Incoming channel and
// routes them appropriately (to the protocols).
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
func (m *Muxer) handleIncomingMessages() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118
	defer m.Children().Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121
	for {
		select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122 123 124
		case <-m.Closing():
			return

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125 126 127 128
		case msg, more := <-m.Incoming:
			if !more {
				return
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129
			m.Children().Add(1)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130
			go m.handleIncomingMessage(msg)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131 132 133 134 135
		}
	}
}

// handleIncomingMessage routes message to the appropriate protocol.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136
func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
137
	defer m.Children().Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138

139 140 141
	m.bwiLock.Lock()
	// TODO: compensate for overhead
	m.bwIn += uint64(len(m1.Data()))
Jeromy's avatar
Jeromy committed
142
	m.msgIn++
143 144
	m.bwiLock.Unlock()

145
	data, pid, err := unwrapData(m1.Data())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147
		log.Errorf("muxer de-serializing error: %v", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148 149
		return
	}
Jeromy's avatar
Jeromy committed
150
	conn.ReleaseBuffer(m1.Data())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151

152
	m2 := msg.New(m1.Peer(), data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153 154
	proto, found := m.Protocols[pid]
	if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
155
		log.Errorf("muxer unknown protocol %v", pid)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157 158 159 160
		return
	}

	select {
	case proto.GetPipe().Incoming <- m2:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161
	case <-m.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162 163 164 165 166 167
		return
	}
}

// handleOutgoingMessages consumes the messages on the proto.Outgoing channel,
// wraps them and sends them out.
168
func (m *Muxer) handleOutgoingMessages(pid pb.ProtocolID, proto Protocol) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169
	defer m.Children().Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171 172
	for {
		select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173 174 175 176
		case msg, more := <-proto.GetPipe().Outgoing:
			if !more {
				return
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177
			m.Children().Add(1)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178
			go m.handleOutgoingMessage(pid, msg)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
179

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180
		case <-m.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181 182 183 184 185 186
			return
		}
	}
}

// handleOutgoingMessage wraps out a message and sends it out the
187
func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188 189
	defer m.Children().Done()

190
	data, err := wrapData(m1.Data(), pid)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192
		log.Errorf("muxer serializing error: %v", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193 194 195
		return
	}

196 197
	m.bwoLock.Lock()
	// TODO: compensate for overhead
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198
	// TODO(jbenet): switch this to a goroutine to prevent sync waiting.
199
	m.bwOut += uint64(len(data))
Jeromy's avatar
Jeromy committed
200
	m.msgOut++
201 202
	m.bwoLock.Unlock()

203
	m2 := msg.New(m1.Peer(), data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204 205
	select {
	case m.GetPipe().Outgoing <- m2:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206
	case <-m.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207 208 209 210
		return
	}
}

211
func wrapData(data []byte, pid pb.ProtocolID) ([]byte, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
	// Marshal
213
	pbm := new(pb.PBProtocolMessage)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214 215 216 217 218 219 220 221 222 223
	pbm.ProtocolID = &pid
	pbm.Data = data
	b, err := proto.Marshal(pbm)
	if err != nil {
		return nil, err
	}

	return b, nil
}

224
func unwrapData(data []byte) ([]byte, pb.ProtocolID, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225
	// Unmarshal
226
	pbm := new(pb.PBProtocolMessage)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227 228 229 230 231 232 233
	err := proto.Unmarshal(data, pbm)
	if err != nil {
		return nil, 0, err
	}

	return pbm.GetData(), pbm.GetProtocolID(), nil
}