Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-unixfs
Commits
0ac4a2ba
Commit
0ac4a2ba
authored
10 years ago
by
Juan Batiz-Benet
Committed by
Brian Tiger Chow
10 years ago
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
swarm rewrite, doesnt yet work (tests)
parent
035d600f
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
595 additions
and
18 deletions
+595
-18
net/conn/conn.go
net/conn/conn.go
+36
-17
net/conn/conn_test.go
net/conn/conn_test.go
+1
-1
net/swarm/conn.go
net/swarm/conn.go
+69
-0
net/swarm/swarm.go
net/swarm/swarm.go
+335
-0
net/swarm/swarm_test.go
net/swarm/swarm_test.go
+154
-0
No files found.
net/conn/conn.go
View file @
0ac4a2ba
package
swarm
package
conn
import
(
"fmt"
...
...
@@ -32,6 +32,32 @@ type Conn struct {
// Map maps Keys (Peer.IDs) to Connections.
type
Map
map
[
u
.
Key
]
*
Conn
// NewConn constructs a new connection
func
NewConn
(
peer
*
peer
.
Peer
,
addr
*
ma
.
Multiaddr
,
nconn
net
.
Conn
)
(
*
Conn
,
error
)
{
conn
:=
&
Conn
{
Peer
:
peer
,
Addr
:
addr
,
Conn
:
nconn
,
}
if
err
:=
conn
.
newChans
();
err
!=
nil
{
return
nil
,
err
}
return
conn
,
nil
}
// NewNetConn constructs a new connection with given net.Conn
func
NewNetConn
(
nconn
net
.
Conn
)
(
*
Conn
,
error
)
{
addr
,
err
:=
ma
.
FromNetAddr
(
nconn
.
RemoteAddr
())
if
err
!=
nil
{
return
nil
,
err
}
return
NewConn
(
new
(
peer
.
Peer
),
addr
,
nconn
)
}
// Dial connects to a particular peer, over a given network
// Example: Dial("udp", peer)
func
Dial
(
network
string
,
peer
*
peer
.
Peer
)
(
*
Conn
,
error
)
{
...
...
@@ -50,18 +76,11 @@ func Dial(network string, peer *peer.Peer) (*Conn, error) {
return
nil
,
err
}
conn
:=
&
Conn
{
Peer
:
peer
,
Addr
:
addr
,
Conn
:
nconn
,
}
newConnChans
(
conn
)
return
conn
,
nil
return
NewConn
(
peer
,
addr
,
nconn
)
}
// Construct new channels for given Conn.
func
newConnChans
(
c
*
Conn
)
error
{
func
(
c
*
Conn
)
newChans
(
)
error
{
if
c
.
Outgoing
!=
nil
||
c
.
Incoming
!=
nil
{
return
fmt
.
Errorf
(
"Conn already initialized"
)
}
...
...
@@ -77,18 +96,18 @@ func newConnChans(c *Conn) error {
}
// Close closes the connection, and associated channels.
func
(
s
*
Conn
)
Close
()
error
{
func
(
c
*
Conn
)
Close
()
error
{
u
.
DOut
(
"Closing Conn.
\n
"
)
if
s
.
Conn
==
nil
{
if
c
.
Conn
==
nil
{
return
fmt
.
Errorf
(
"Already closed"
)
// already closed
}
// closing net connection
err
:=
s
.
Conn
.
Close
()
s
.
Conn
=
nil
err
:=
c
.
Conn
.
Close
()
c
.
Conn
=
nil
// closing channels
s
.
Incoming
.
Close
()
s
.
Outgoing
.
Close
()
s
.
Closed
<-
true
c
.
Incoming
.
Close
()
c
.
Outgoing
.
Close
()
c
.
Closed
<-
true
return
err
}
This diff is collapsed.
Click to expand it.
net/conn/conn_test.go
View file @
0ac4a2ba
package
swarm
package
conn
import
(
"net"
...
...
This diff is collapsed.
Click to expand it.
net/swarm/conn.go
0 → 100644
View file @
0ac4a2ba
package
swarm
import
(
"errors"
"fmt"
"net"
ident
"github.com/jbenet/go-ipfs/identify"
conn
"github.com/jbenet/go-ipfs/net/conn"
u
"github.com/jbenet/go-ipfs/util"
)
// Handle getting ID from this peer, handshake, and adding it into the map
func
(
s
*
Swarm
)
handleIncomingConn
(
nconn
net
.
Conn
)
{
c
,
err
:=
conn
.
NewNetConn
(
nconn
)
if
err
!=
nil
{
s
.
errChan
<-
err
return
}
//TODO(jbenet) the peer might potentially already be in the global PeerBook.
// maybe use the handshake to populate peer.
c
.
Peer
.
AddAddress
(
c
.
Addr
)
// Setup the new connection
err
=
s
.
connSetup
(
c
)
if
err
!=
nil
&&
err
!=
ErrAlreadyOpen
{
s
.
errChan
<-
err
c
.
Close
()
}
}
// connSetup adds the passed in connection to its peerMap and starts
// the fanIn routine for that connection
func
(
s
*
Swarm
)
connSetup
(
c
*
conn
.
Conn
)
error
{
if
c
==
nil
{
return
errors
.
New
(
"Tried to start nil connection."
)
}
u
.
DOut
(
"Starting connection: %s
\n
"
,
c
.
Peer
.
Key
()
.
Pretty
())
// handshake
if
err
:=
s
.
connHandshake
(
c
);
err
!=
nil
{
return
fmt
.
Errorf
(
"Conn handshake error: %v"
,
err
)
}
// add to conns
s
.
connsLock
.
Lock
()
if
_
,
ok
:=
s
.
conns
[
c
.
Peer
.
Key
()];
ok
{
s
.
connsLock
.
Unlock
()
return
ErrAlreadyOpen
}
s
.
conns
[
c
.
Peer
.
Key
()]
=
c
s
.
connsLock
.
Unlock
()
// kick off reader goroutine
go
s
.
fanIn
(
c
)
return
nil
}
// connHandshake runs the handshake with the remote connection.
func
(
s
*
Swarm
)
connHandshake
(
c
*
conn
.
Conn
)
error
{
//TODO(jbenet) this Handshake stuff should be moved elsewhere.
// needs cleanup. needs context. use msg.Pipe.
return
ident
.
Handshake
(
s
.
local
,
c
.
Peer
,
c
.
Incoming
.
MsgChan
,
c
.
Outgoing
.
MsgChan
)
}
This diff is collapsed.
Click to expand it.
net/swarm/swarm.go
0 → 100644
View file @
0ac4a2ba
package
swarm
import
(
"errors"
"fmt"
"net"
"sync"
conn
"github.com/jbenet/go-ipfs/net/conn"
msg
"github.com/jbenet/go-ipfs/net/message"
peer
"github.com/jbenet/go-ipfs/peer"
u
"github.com/jbenet/go-ipfs/util"
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
// ErrAlreadyOpen signals that a connection to a peer is already open.
var
ErrAlreadyOpen
=
errors
.
New
(
"Error: Connection to this peer already open."
)
// ListenErr contains a set of errors mapping to each of the swarms addresses.
// Used to return multiple errors, as in listen.
type
ListenErr
struct
{
Errors
[]
error
}
func
(
e
*
ListenErr
)
Error
()
string
{
if
e
==
nil
{
return
"<nil error>"
}
var
out
string
for
i
,
v
:=
range
e
.
Errors
{
if
v
!=
nil
{
out
+=
fmt
.
Sprintf
(
"%d: %s
\n
"
,
i
,
v
)
}
}
return
out
}
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
type
Swarm
struct
{
// local is the peer this swarm represents
local
*
peer
.
Peer
// Swarm includes a Pipe object.
*
msg
.
Pipe
// errChan is the channel of errors.
errChan
chan
error
// conns are the open connections the swarm is handling.
conns
conn
.
Map
connsLock
sync
.
RWMutex
// listeners for each network address
listeners
[]
net
.
Listener
// cancel is an internal function used to stop the Swarm's processing.
cancel
context
.
CancelFunc
ctx
context
.
Context
}
// NewSwarm constructs a Swarm, with a Chan.
func
NewSwarm
(
ctx
context
.
Context
,
local
*
peer
.
Peer
)
(
*
Swarm
,
error
)
{
s
:=
&
Swarm
{
Pipe
:
msg
.
NewPipe
(
10
),
conns
:
conn
.
Map
{},
local
:
local
,
errChan
:
make
(
chan
error
,
100
),
}
s
.
ctx
,
s
.
cancel
=
context
.
WithCancel
(
ctx
)
go
s
.
fanOut
()
return
s
,
s
.
listen
()
}
// Open listeners for each network the swarm should listen on
func
(
s
*
Swarm
)
listen
()
error
{
hasErr
:=
false
retErr
:=
&
ListenErr
{
Errors
:
make
([]
error
,
len
(
s
.
local
.
Addresses
)),
}
// listen on every address
for
i
,
addr
:=
range
s
.
local
.
Addresses
{
err
:=
s
.
connListen
(
addr
)
if
err
!=
nil
{
hasErr
=
true
retErr
.
Errors
[
i
]
=
err
u
.
PErr
(
"Failed to listen on: %s [%s]"
,
addr
,
err
)
}
}
if
hasErr
{
return
retErr
}
return
nil
}
// Listen for new connections on the given multiaddr
func
(
s
*
Swarm
)
connListen
(
maddr
*
ma
.
Multiaddr
)
error
{
netstr
,
addr
,
err
:=
maddr
.
DialArgs
()
if
err
!=
nil
{
return
err
}
list
,
err
:=
net
.
Listen
(
netstr
,
addr
)
if
err
!=
nil
{
return
err
}
// NOTE: this may require a lock around it later. currently, only run on setup
s
.
listeners
=
append
(
s
.
listeners
,
list
)
// Accept and handle new connections on this listener until it errors
go
func
()
{
for
{
nconn
,
err
:=
list
.
Accept
()
if
err
!=
nil
{
e
:=
fmt
.
Errorf
(
"Failed to accept connection: %s - %s [%s]"
,
netstr
,
addr
,
err
)
s
.
errChan
<-
e
// if cancel is nil, we're closed.
if
s
.
cancel
==
nil
{
return
}
}
else
{
go
s
.
handleIncomingConn
(
nconn
)
}
}
}()
return
nil
}
// Close stops a swarm.
func
(
s
*
Swarm
)
Close
()
error
{
if
s
.
cancel
==
nil
{
return
errors
.
New
(
"Swarm already closed."
)
}
// issue cancel for the context
s
.
cancel
()
// set cancel to nil to prevent calling Close again, and signal to Listeners
s
.
cancel
=
nil
// close listeners
for
_
,
list
:=
range
s
.
listeners
{
list
.
Close
()
}
return
nil
}
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
func
(
s
*
Swarm
)
Dial
(
peer
*
peer
.
Peer
)
(
*
conn
.
Conn
,
error
)
{
if
peer
.
ID
.
Equal
(
s
.
local
.
ID
)
{
return
nil
,
errors
.
New
(
"Attempted connection to self!"
)
}
k
:=
peer
.
Key
()
// check if we already have an open connection first
s
.
connsLock
.
RLock
()
c
,
found
:=
s
.
conns
[
k
]
s
.
connsLock
.
RUnlock
()
if
found
{
return
c
,
nil
}
// open connection to peer
c
,
err
:=
conn
.
Dial
(
"tcp"
,
peer
)
if
err
!=
nil
{
return
nil
,
err
}
if
err
:=
s
.
connSetup
(
c
);
err
!=
nil
{
c
.
Close
()
return
nil
,
err
}
return
c
,
nil
}
// DialAddr is for connecting to a peer when you know their addr but not their ID.
// Should only be used when sure that not connected to peer in question
// TODO(jbenet) merge with Dial? need way to patch back.
func
(
s
*
Swarm
)
DialAddr
(
addr
*
ma
.
Multiaddr
)
(
*
conn
.
Conn
,
error
)
{
if
addr
==
nil
{
return
nil
,
errors
.
New
(
"addr must be a non-nil Multiaddr"
)
}
npeer
:=
new
(
peer
.
Peer
)
npeer
.
AddAddress
(
addr
)
c
,
err
:=
conn
.
Dial
(
"tcp"
,
npeer
)
if
err
!=
nil
{
return
nil
,
err
}
if
err
:=
s
.
connSetup
(
c
);
err
!=
nil
{
c
.
Close
()
return
nil
,
err
}
return
c
,
err
}
// Handles the unwrapping + sending of messages to the right connection.
func
(
s
*
Swarm
)
fanOut
()
{
for
{
select
{
case
<-
s
.
ctx
.
Done
()
:
return
// told to close.
case
msg
,
ok
:=
<-
s
.
Outgoing
:
if
!
ok
{
return
}
s
.
connsLock
.
RLock
()
conn
,
found
:=
s
.
conns
[
msg
.
Peer
.
Key
()]
s
.
connsLock
.
RUnlock
()
if
!
found
{
e
:=
fmt
.
Errorf
(
"Sent msg to peer without open conn: %v"
,
msg
.
Peer
)
s
.
errChan
<-
e
continue
}
// queue it in the connection's buffer
conn
.
Outgoing
.
MsgChan
<-
msg
.
Data
}
}
}
// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
func
(
s
*
Swarm
)
fanIn
(
c
*
conn
.
Conn
)
{
for
{
select
{
case
<-
s
.
ctx
.
Done
()
:
// close Conn.
c
.
Close
()
goto
out
case
<-
c
.
Closed
:
goto
out
case
data
,
ok
:=
<-
c
.
Incoming
.
MsgChan
:
if
!
ok
{
e
:=
fmt
.
Errorf
(
"Error retrieving from conn: %v"
,
c
.
Peer
.
Key
()
.
Pretty
())
s
.
errChan
<-
e
goto
out
}
msg
:=
&
msg
.
Message
{
Peer
:
c
.
Peer
,
Data
:
data
}
s
.
Incoming
<-
msg
}
}
out
:
s
.
connsLock
.
Lock
()
delete
(
s
.
conns
,
c
.
Peer
.
Key
())
s
.
connsLock
.
Unlock
()
}
// GetPeer returns the peer in the swarm with given key id.
func
(
s
*
Swarm
)
GetPeer
(
key
u
.
Key
)
*
peer
.
Peer
{
s
.
connsLock
.
RLock
()
conn
,
found
:=
s
.
conns
[
key
]
s
.
connsLock
.
RUnlock
()
if
!
found
{
return
nil
}
return
conn
.
Peer
}
// GetConnection will check if we are already connected to the peer in question
// and only open a new connection if we arent already
func
(
s
*
Swarm
)
GetConnection
(
id
peer
.
ID
,
addr
*
ma
.
Multiaddr
)
(
*
peer
.
Peer
,
error
)
{
p
:=
&
peer
.
Peer
{
ID
:
id
,
Addresses
:
[]
*
ma
.
Multiaddr
{
addr
},
}
c
,
err
:=
s
.
Dial
(
p
)
if
err
!=
nil
{
return
nil
,
err
}
return
c
.
Peer
,
nil
}
// CloseConnection removes a given peer from swarm + closes the connection
func
(
s
*
Swarm
)
CloseConnection
(
p
*
peer
.
Peer
)
error
{
s
.
connsLock
.
RLock
()
conn
,
found
:=
s
.
conns
[
u
.
Key
(
p
.
ID
)]
s
.
connsLock
.
RUnlock
()
if
!
found
{
return
u
.
ErrNotFound
}
s
.
connsLock
.
Lock
()
delete
(
s
.
conns
,
u
.
Key
(
p
.
ID
))
s
.
connsLock
.
Unlock
()
return
conn
.
Close
()
}
func
(
s
*
Swarm
)
Error
(
e
error
)
{
s
.
errChan
<-
e
}
// GetErrChan returns the errors chan.
func
(
s
*
Swarm
)
GetErrChan
()
chan
error
{
return
s
.
errChan
}
// Temporary to ensure that the Swarm always matches the Network interface as we are changing it
// var _ Network = &Swarm{}
This diff is collapsed.
Click to expand it.
net/swarm/swarm_test.go
0 → 100644
View file @
0ac4a2ba
package
swarm
import
(
"fmt"
"net"
"testing"
msg
"github.com/jbenet/go-ipfs/net/message"
peer
"github.com/jbenet/go-ipfs/peer"
u
"github.com/jbenet/go-ipfs/util"
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
msgio
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
ma
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
mh
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
)
func
pingListen
(
t
*
testing
.
T
,
listener
*
net
.
TCPListener
,
peer
*
peer
.
Peer
)
{
for
{
c
,
err
:=
listener
.
Accept
()
if
err
==
nil
{
fmt
.
Println
(
"accepted"
)
go
pong
(
t
,
c
,
peer
)
}
}
}
func
pong
(
t
*
testing
.
T
,
c
net
.
Conn
,
peer
*
peer
.
Peer
)
{
mrw
:=
msgio
.
NewReadWriter
(
c
)
for
{
data
:=
make
([]
byte
,
1024
)
n
,
err
:=
mrw
.
ReadMsg
(
data
)
if
err
!=
nil
{
fmt
.
Printf
(
"error %v
\n
"
,
err
)
return
}
d
:=
string
(
data
[
:
n
])
if
d
!=
"ping"
{
t
.
Errorf
(
"error: didn't receive ping: '%v'
\n
"
,
d
)
return
}
err
=
mrw
.
WriteMsg
([]
byte
(
"pong"
))
if
err
!=
nil
{
fmt
.
Printf
(
"error %v
\n
"
,
err
)
return
}
}
}
func
setupPeer
(
id
string
,
addr
string
)
(
*
peer
.
Peer
,
error
)
{
tcp
,
err
:=
ma
.
NewMultiaddr
(
addr
)
if
err
!=
nil
{
return
nil
,
err
}
mh
,
err
:=
mh
.
FromHexString
(
id
)
if
err
!=
nil
{
return
nil
,
err
}
p
:=
&
peer
.
Peer
{
ID
:
peer
.
ID
(
mh
)}
p
.
AddAddress
(
tcp
)
return
p
,
nil
}
func
TestSwarm
(
t
*
testing
.
T
)
{
local
,
err
:=
setupPeer
(
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30"
,
"/ip4/127.0.0.1/tcp/1234"
)
if
err
!=
nil
{
t
.
Fatal
(
"error setting up peer"
,
err
)
}
swarm
,
err
:=
NewSwarm
(
context
.
Background
(),
local
)
if
err
!=
nil
{
t
.
Error
(
err
)
}
var
peers
[]
*
peer
.
Peer
var
listeners
[]
net
.
Listener
peerNames
:=
map
[
string
]
string
{
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30"
:
"/ip4/127.0.0.1/tcp/1234"
,
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31"
:
"/ip4/127.0.0.1/tcp/2345"
,
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32"
:
"/ip4/127.0.0.1/tcp/3456"
,
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"
:
"/ip4/127.0.0.1/tcp/4567"
,
}
for
k
,
n
:=
range
peerNames
{
peer
,
err
:=
setupPeer
(
k
,
n
)
if
err
!=
nil
{
t
.
Fatal
(
"error setting up peer"
,
err
)
}
a
:=
peer
.
NetAddress
(
"tcp"
)
if
a
==
nil
{
t
.
Fatal
(
"error setting up peer (addr is nil)"
,
peer
)
}
n
,
h
,
err
:=
a
.
DialArgs
()
if
err
!=
nil
{
t
.
Fatal
(
"error getting dial args from addr"
)
}
listener
,
err
:=
net
.
Listen
(
n
,
h
)
if
err
!=
nil
{
t
.
Fatal
(
"error setting up listener"
,
err
)
}
go
pingListen
(
t
,
listener
.
(
*
net
.
TCPListener
),
peer
)
u
.
POut
(
"wat?
\n
"
)
_
,
err
=
swarm
.
Dial
(
peer
)
if
err
!=
nil
{
t
.
Fatal
(
"error swarm dialing to peer"
,
err
)
}
u
.
POut
(
"wut?
\n
"
)
// ok done, add it.
peers
=
append
(
peers
,
peer
)
listeners
=
append
(
listeners
,
listener
)
}
MsgNum
:=
1000
for
k
:=
0
;
k
<
MsgNum
;
k
++
{
for
_
,
p
:=
range
peers
{
swarm
.
Outgoing
<-
&
msg
.
Message
{
Peer
:
p
,
Data
:
[]
byte
(
"ping"
)}
u
.
POut
(
"sending ping to %v
\n
"
,
p
)
}
}
got
:=
map
[
u
.
Key
]
int
{}
for
k
:=
0
;
k
<
(
MsgNum
*
len
(
peers
));
k
++
{
u
.
POut
(
"listening for ping..."
)
msg
:=
<-
swarm
.
Incoming
if
string
(
msg
.
Data
)
!=
"pong"
{
t
.
Error
(
"unexpected conn output"
,
msg
.
Data
)
}
n
,
_
:=
got
[
msg
.
Peer
.
Key
()]
got
[
msg
.
Peer
.
Key
()]
=
n
+
1
}
if
len
(
peers
)
!=
len
(
got
)
{
t
.
Error
(
"got less messages than sent"
)
}
for
p
,
n
:=
range
got
{
if
n
!=
MsgNum
{
t
.
Error
(
"peer did not get all msgs"
,
p
,
n
,
"/"
,
MsgNum
)
}
}
fmt
.
Println
(
"closing"
)
swarm
.
Close
()
for
_
,
listener
:=
range
listeners
{
listener
.
(
*
net
.
TCPListener
)
.
Close
()
}
}
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment