Commit 05b80afc authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

fix swarm message type code, i beleive it works well now

parent afdac2ca
...@@ -28,6 +28,7 @@ type IpfsDHT struct { ...@@ -28,6 +28,7 @@ type IpfsDHT struct {
routingTables []*kb.RoutingTable routingTables []*kb.RoutingTable
network swarm.Network network swarm.Network
netChan *swarm.Chan
// Local peer (yourself) // Local peer (yourself)
self *peer.Peer self *peer.Peer
...@@ -55,6 +56,7 @@ type IpfsDHT struct { ...@@ -55,6 +56,7 @@ type IpfsDHT struct {
func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT { func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT {
dht := new(IpfsDHT) dht := new(IpfsDHT)
dht.network = net dht.network = net
dht.netChan = net.GetChannel(swarm.PBWrapper_DHT_MESSAGE)
dht.datastore = ds.NewMapDatastore() dht.datastore = ds.NewMapDatastore()
dht.self = p dht.self = p
dht.providers = NewProviderManager() dht.providers = NewProviderManager()
...@@ -101,10 +103,9 @@ func (dht *IpfsDHT) handleMessages() { ...@@ -101,10 +103,9 @@ func (dht *IpfsDHT) handleMessages() {
u.DOut("Begin message handling routine\n") u.DOut("Begin message handling routine\n")
errs := dht.network.GetErrChan() errs := dht.network.GetErrChan()
dhtmes := dht.network.GetChannel(swarm.PBWrapper_DHT_MESSAGE)
for { for {
select { select {
case mes, ok := <-dhtmes: case mes, ok := <-dht.netChan.Incoming:
if !ok { if !ok {
u.DOut("handleMessages closing, bad recv on incoming\n") u.DOut("handleMessages closing, bad recv on incoming\n")
return return
...@@ -165,7 +166,7 @@ func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) er ...@@ -165,7 +166,7 @@ func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) er
} }
mes := swarm.NewMessage(p, pmes.ToProtobuf()) mes := swarm.NewMessage(p, pmes.ToProtobuf())
dht.network.Send(mes) dht.netChan.Outgoing <- mes
return nil return nil
} }
...@@ -225,7 +226,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { ...@@ -225,7 +226,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
out: out:
mes := swarm.NewMessage(p, resp.ToProtobuf()) mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes) dht.netChan.Outgoing <- mes
} }
// Store a value in this peer local storage // Store a value in this peer local storage
...@@ -247,7 +248,7 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) { ...@@ -247,7 +248,7 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) {
ID: pmes.GetId(), ID: pmes.GetId(),
} }
dht.network.Send(swarm.NewMessage(p, resp.ToProtobuf())) dht.netChan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
} }
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
...@@ -258,7 +259,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { ...@@ -258,7 +259,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
} }
defer func() { defer func() {
mes := swarm.NewMessage(p, resp.ToProtobuf()) mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes) dht.netChan.Outgoing <- mes
}() }()
level := pmes.GetValue()[0] level := pmes.GetValue()[0]
u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty()) u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty())
...@@ -310,7 +311,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { ...@@ -310,7 +311,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
} }
mes := swarm.NewMessage(p, resp.ToProtobuf()) mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes) dht.netChan.Outgoing <- mes
} }
type providerInfo struct { type providerInfo struct {
...@@ -336,7 +337,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) { ...@@ -336,7 +337,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
for _, ps := range seq { for _, ps := range seq {
mes := swarm.NewMessage(ps, pmes) mes := swarm.NewMessage(ps, pmes)
dht.network.Send(mes) dht.netChan.Outgoing <- mes
} }
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
...@@ -372,7 +373,7 @@ out: ...@@ -372,7 +373,7 @@ out:
} }
mes := swarm.NewMessage(p, resp.ToProtobuf()) mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes) dht.netChan.Outgoing <- mes
} }
func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) { func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {
...@@ -429,7 +430,7 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio ...@@ -429,7 +430,7 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio
mes := swarm.NewMessage(p, pmes.ToProtobuf()) mes := swarm.NewMessage(p, pmes.ToProtobuf())
t := time.Now() t := time.Now()
dht.network.Send(mes) dht.netChan.Outgoing <- mes
// Wait for either the response or a timeout // Wait for either the response or a timeout
timeup := time.After(timeout) timeup := time.After(timeout)
...@@ -545,7 +546,7 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati ...@@ -545,7 +546,7 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati
mes := swarm.NewMessage(p, pmes.ToProtobuf()) mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute) listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
t := time.Now() t := time.Now()
dht.network.Send(mes) dht.netChan.Outgoing <- mes
after := time.After(timeout) after := time.After(timeout)
select { select {
case <-after: case <-after:
...@@ -581,7 +582,7 @@ func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, time ...@@ -581,7 +582,7 @@ func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, time
mes := swarm.NewMessage(p, pmes.ToProtobuf()) mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute) listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
dht.network.Send(mes) dht.netChan.Outgoing <- mes
after := time.After(timeout) after := time.After(timeout)
select { select {
case <-after: case <-after:
......
...@@ -70,8 +70,8 @@ func (f *fauxNet) GetErrChan() chan error { ...@@ -70,8 +70,8 @@ func (f *fauxNet) GetErrChan() chan error {
return f.Chan.Errors return f.Chan.Errors
} }
func (f *fauxNet) GetChannel(t swarm.PBWrapper_MessageType) chan *swarm.Message { func (f *fauxNet) GetChannel(t swarm.PBWrapper_MessageType) *swarm.Chan {
return f.Chan.Incoming return f.Chan
} }
func (f *fauxNet) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { func (f *fauxNet) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
......
...@@ -240,7 +240,7 @@ func (dht *IpfsDHT) Provide(key u.Key) error { ...@@ -240,7 +240,7 @@ func (dht *IpfsDHT) Provide(key u.Key) error {
for _, p := range peers { for _, p := range peers {
mes := swarm.NewMessage(p, pbmes) mes := swarm.NewMessage(p, pbmes)
dht.network.Send(mes) dht.netChan.Outgoing <- mes
} }
return nil return nil
} }
...@@ -352,7 +352,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { ...@@ -352,7 +352,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
before := time.Now() before := time.Now()
responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute) responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
dht.network.Send(mes) dht.netChan.Outgoing <- mes
tout := time.After(timeout) tout := time.After(timeout)
select { select {
...@@ -385,7 +385,7 @@ func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) { ...@@ -385,7 +385,7 @@ func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
pbmes := pmes.ToProtobuf() pbmes := pmes.ToProtobuf()
for _, p := range targets { for _, p := range targets {
mes := swarm.NewMessage(p, pbmes) mes := swarm.NewMessage(p, pbmes)
dht.network.Send(mes) dht.netChan.Outgoing <- mes
} }
var out []*diagInfo var out []*diagInfo
......
...@@ -8,14 +8,13 @@ import ( ...@@ -8,14 +8,13 @@ import (
) )
type Network interface { type Network interface {
Send(*Message)
Error(error)
Find(u.Key) *peer.Peer Find(u.Key) *peer.Peer
Listen() error Listen() error
ConnectNew(*ma.Multiaddr) (*peer.Peer, error) ConnectNew(*ma.Multiaddr) (*peer.Peer, error)
GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error)
Error(error)
GetErrChan() chan error GetErrChan() chan error
GetChannel(PBWrapper_MessageType) chan *Message GetChannel(PBWrapper_MessageType) *Chan
Close() Close()
Drop(*peer.Peer) error Drop(*peer.Peer) error
} }
...@@ -23,14 +23,17 @@ var _ = math.Inf ...@@ -23,14 +23,17 @@ var _ = math.Inf
type PBWrapper_MessageType int32 type PBWrapper_MessageType int32
const ( const (
PBWrapper_DHT_MESSAGE PBWrapper_MessageType = 0 PBWrapper_TEST PBWrapper_MessageType = 0
PBWrapper_DHT_MESSAGE PBWrapper_MessageType = 1
) )
var PBWrapper_MessageType_name = map[int32]string{ var PBWrapper_MessageType_name = map[int32]string{
0: "DHT_MESSAGE", 0: "TEST",
1: "DHT_MESSAGE",
} }
var PBWrapper_MessageType_value = map[string]int32{ var PBWrapper_MessageType_value = map[string]int32{
"DHT_MESSAGE": 0, "TEST": 0,
"DHT_MESSAGE": 1,
} }
func (x PBWrapper_MessageType) Enum() *PBWrapper_MessageType { func (x PBWrapper_MessageType) Enum() *PBWrapper_MessageType {
...@@ -64,7 +67,7 @@ func (m *PBWrapper) GetType() PBWrapper_MessageType { ...@@ -64,7 +67,7 @@ func (m *PBWrapper) GetType() PBWrapper_MessageType {
if m != nil && m.Type != nil { if m != nil && m.Type != nil {
return *m.Type return *m.Type
} }
return PBWrapper_DHT_MESSAGE return PBWrapper_TEST
} }
func (m *PBWrapper) GetMessage() []byte { func (m *PBWrapper) GetMessage() []byte {
......
...@@ -2,7 +2,8 @@ package swarm; ...@@ -2,7 +2,8 @@ package swarm;
message PBWrapper { message PBWrapper {
enum MessageType { enum MessageType {
DHT_MESSAGE = 0; TEST = 0;
DHT_MESSAGE = 1;
} }
required MessageType Type = 1; required MessageType Type = 1;
......
...@@ -38,7 +38,7 @@ func NewMessage(p *peer.Peer, data proto.Message) *Message { ...@@ -38,7 +38,7 @@ func NewMessage(p *peer.Peer, data proto.Message) *Message {
} }
} }
// Chan is a swam channel, which provides duplex communication and errors. // Chan is a swarm channel, which provides duplex communication and errors.
type Chan struct { type Chan struct {
Outgoing chan *Message Outgoing chan *Message
Incoming chan *Message Incoming chan *Message
...@@ -84,7 +84,7 @@ type Swarm struct { ...@@ -84,7 +84,7 @@ type Swarm struct {
conns ConnMap conns ConnMap
connsLock sync.RWMutex connsLock sync.RWMutex
filterChans map[PBWrapper_MessageType]chan *Message filterChans map[PBWrapper_MessageType]*Chan
toFilter chan *Message toFilter chan *Message
newFilters chan *newFilterInfo newFilters chan *newFilterInfo
...@@ -98,7 +98,7 @@ func NewSwarm(local *peer.Peer) *Swarm { ...@@ -98,7 +98,7 @@ func NewSwarm(local *peer.Peer) *Swarm {
Chan: NewChan(10), Chan: NewChan(10),
conns: ConnMap{}, conns: ConnMap{},
local: local, local: local,
filterChans: make(map[PBWrapper_MessageType]chan *Message), filterChans: make(map[PBWrapper_MessageType]*Chan),
toFilter: make(chan *Message, 32), toFilter: make(chan *Message, 32),
newFilters: make(chan *newFilterInfo), newFilters: make(chan *newFilterInfo),
} }
...@@ -233,6 +233,8 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error, bool) { ...@@ -233,6 +233,8 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error, bool) {
return conn, nil, false return conn, nil, false
} }
// StartConn adds the passed in connection to its peerMap and starts
// the fanIn routine for that connection
func (s *Swarm) StartConn(conn *Conn) error { func (s *Swarm) StartConn(conn *Conn) error {
if conn == nil { if conn == nil {
return errors.New("Tried to start nil connection.") return errors.New("Tried to start nil connection.")
...@@ -275,14 +277,8 @@ func (s *Swarm) fanOut() { ...@@ -275,14 +277,8 @@ func (s *Swarm) fanOut() {
continue continue
} }
wrapped, err := Wrap(msg.Data, PBWrapper_DHT_MESSAGE)
if err != nil {
s.Error(err)
continue
}
// queue it in the connection's buffer // queue it in the connection's buffer
conn.Outgoing.MsgChan <- wrapped conn.Outgoing.MsgChan <- msg.Data
} }
} }
} }
...@@ -320,7 +316,7 @@ out: ...@@ -320,7 +316,7 @@ out:
type newFilterInfo struct { type newFilterInfo struct {
Type PBWrapper_MessageType Type PBWrapper_MessageType
resp chan chan *Message resp chan *Chan
} }
func (s *Swarm) routeMessages() { func (s *Swarm) routeMessages() {
...@@ -342,15 +338,36 @@ func (s *Swarm) routeMessages() { ...@@ -342,15 +338,36 @@ func (s *Swarm) routeMessages() {
} }
mes.Data = wrapper.GetMessage() mes.Data = wrapper.GetMessage()
ch <- mes ch.Incoming <- mes
case gchan := <-s.newFilters: case gchan := <-s.newFilters:
nch := make(chan *Message) nch, ok := s.filterChans[gchan.Type]
s.filterChans[gchan.Type] = nch if !ok {
nch = NewChan(16)
s.filterChans[gchan.Type] = nch
go s.muxChan(nch, gchan.Type)
}
gchan.resp <- nch gchan.resp <- nch
} }
} }
} }
func (s *Swarm) muxChan(ch *Chan, typ PBWrapper_MessageType) {
for {
select {
case <-ch.Close:
return
case mes := <-ch.Outgoing:
data, err := Wrap(mes.Data, typ)
if err != nil {
u.PErr("muxChan error: %s\n", err)
continue
}
mes.Data = data
s.Chan.Outgoing <- mes
}
}
}
func (s *Swarm) Find(key u.Key) *peer.Peer { func (s *Swarm) Find(key u.Key) *peer.Peer {
s.connsLock.RLock() s.connsLock.RLock()
defer s.connsLock.RUnlock() defer s.connsLock.RUnlock()
...@@ -386,6 +403,7 @@ func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error ...@@ -386,6 +403,7 @@ func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error
return conn.Peer, err return conn.Peer, err
} }
// Handle performing a handshake on a new connection and ensuring proper forward communication
func (s *Swarm) handleDialedCon(conn *Conn) error { func (s *Swarm) handleDialedCon(conn *Conn) error {
err := ident.Handshake(s.local, conn.Peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) err := ident.Handshake(s.local, conn.Peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
if err != nil { if err != nil {
...@@ -440,10 +458,6 @@ func (s *Swarm) Drop(p *peer.Peer) error { ...@@ -440,10 +458,6 @@ func (s *Swarm) Drop(p *peer.Peer) error {
return conn.Close() return conn.Close()
} }
func (s *Swarm) Send(mes *Message) {
s.Chan.Outgoing <- mes
}
func (s *Swarm) Error(e error) { func (s *Swarm) Error(e error) {
s.Chan.Errors <- e s.Chan.Errors <- e
} }
...@@ -452,31 +466,10 @@ func (s *Swarm) GetErrChan() chan error { ...@@ -452,31 +466,10 @@ func (s *Swarm) GetErrChan() chan error {
return s.Chan.Errors return s.Chan.Errors
} }
func Wrap(data []byte, typ PBWrapper_MessageType) ([]byte, error) { func (s *Swarm) GetChannel(typ PBWrapper_MessageType) *Chan {
wrapper := new(PBWrapper)
wrapper.Message = data
wrapper.Type = &typ
b, err := proto.Marshal(wrapper)
if err != nil {
return nil, err
}
return b, nil
}
func Unwrap(data []byte) (*PBWrapper, error) {
mes := new(PBWrapper)
err := proto.Unmarshal(data, mes)
if err != nil {
return nil, err
}
return mes, nil
}
func (s *Swarm) GetChannel(typ PBWrapper_MessageType) chan *Message {
nfi := &newFilterInfo{ nfi := &newFilterInfo{
Type: typ, Type: typ,
resp: make(chan chan *Message), resp: make(chan *Chan),
} }
s.newFilters <- nfi s.newFilters <- nfi
......
...@@ -38,7 +38,8 @@ func pong(c net.Conn, peer *peer.Peer) { ...@@ -38,7 +38,8 @@ func pong(c net.Conn, peer *peer.Peer) {
fmt.Printf("error: didn't receive ping: '%v'\n", b.GetMessage()) fmt.Printf("error: didn't receive ping: '%v'\n", b.GetMessage())
return return
} }
data, err = Wrap([]byte("pong"), PBWrapper_DHT_MESSAGE)
data, err = Wrap([]byte("pong"), PBWrapper_TEST)
if err != nil { if err != nil {
fmt.Printf("error %v\n", err) fmt.Printf("error %v\n", err)
return return
...@@ -63,6 +64,7 @@ func TestSwarm(t *testing.T) { ...@@ -63,6 +64,7 @@ func TestSwarm(t *testing.T) {
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33": "/ip4/127.0.0.1/tcp/4567", "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33": "/ip4/127.0.0.1/tcp/4567",
} }
recv := swarm.GetChannel(PBWrapper_TEST)
for k, n := range peerNames { for k, n := range peerNames {
peer, err := setupPeer(k, n) peer, err := setupPeer(k, n)
if err != nil { if err != nil {
...@@ -96,13 +98,14 @@ func TestSwarm(t *testing.T) { ...@@ -96,13 +98,14 @@ func TestSwarm(t *testing.T) {
MsgNum := 1000 MsgNum := 1000
for k := 0; k < MsgNum; k++ { for k := 0; k < MsgNum; k++ {
for _, p := range peers { for _, p := range peers {
swarm.Chan.Outgoing <- &Message{Peer: p, Data: []byte("ping")} recv.Outgoing <- &Message{Peer: p, Data: []byte("ping")}
} }
} }
got := map[u.Key]int{} got := map[u.Key]int{}
for k := 0; k < (MsgNum * len(peers)); k++ { for k := 0; k < (MsgNum * len(peers)); k++ {
msg := <-swarm.Chan.Incoming msg := <-recv.Incoming
if string(msg.Data) != "pong" { if string(msg.Data) != "pong" {
t.Error("unexpected conn output", msg.Data) t.Error("unexpected conn output", msg.Data)
} }
......
package swarm
import "code.google.com/p/goprotobuf/proto"
func Wrap(data []byte, typ PBWrapper_MessageType) ([]byte, error) {
wrapper := new(PBWrapper)
wrapper.Message = data
wrapper.Type = &typ
b, err := proto.Marshal(wrapper)
if err != nil {
return nil, err
}
return b, nil
}
func Unwrap(data []byte) (*PBWrapper, error) {
mes := new(PBWrapper)
err := proto.Unmarshal(data, mes)
if err != nil {
return nil, err
}
return mes, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment