dht.go 3.16 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4 5 6
	"sync"

	peer "github.com/jbenet/go-ipfs/peer"
7
	swarm "github.com/jbenet/go-ipfs/swarm"
8 9
	u "github.com/jbenet/go-ipfs/util"
	"code.google.com/p/goprotobuf/proto"
10 11
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12 13 14 15 16
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
17
	routes RoutingTable
18

19 20
	network *swarm.Swarm

21 22 23
	// local data (TEMPORARY: until we formalize data storage with datastore)
	data map[string][]byte

24 25
	// map of channels waiting for reply messages
	listeners  map[uint64]chan *swarm.Message
26
	listenLock sync.RWMutex
27 28 29

	// Signal to shutdown dht
	shutdown chan struct{}
30 31
}

32 33
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
34
func (dht *IpfsDHT) handleMessages() {
35 36 37 38 39 40 41 42 43 44 45
	for {
		select {
		case mes := <-dht.network.Chan.Incoming:
			pmes := new(DHTMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("Failed to decode protobuf message: %s", err)
				continue
			}

			// Note: not sure if this is the correct place for this
46 47 48 49 50 51 52 53 54 55 56
			if pmes.GetResponse() {
				dht.listenLock.RLock()
				ch, ok := dht.listeners[pmes.GetId()]
				dht.listenLock.RUnlock()
				if ok {
					ch <- mes
				}

				// this is expected behaviour during a timeout
				u.DOut("Received response with nobody listening...")
				continue
57
			}
58 59 60
			//

			switch pmes.GetType() {
61 62 63
			case DHTMessage_GET_VALUE:
				dht.handleGetValue(mes.Peer, pmes)
			case DHTMessage_PUT_VALUE:
64
			case DHTMessage_FIND_NODE:
65
			case DHTMessage_ADD_PROVIDER:
66 67 68 69 70 71
			case DHTMessage_GET_PROVIDERS:
			case DHTMessage_PING:
			}

		case <-dht.shutdown:
			return
72
		}
73
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74
}
75

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
	val, found := dht.data[pmes.GetKey()]
	if found {
		isResponse := true
		resp := new(DHTMessage)
		resp.Response = &isResponse
		resp.Id = pmes.Id
		resp.Key = pmes.Key
		resp.Value = val
	} else {
		// Find closest node(s) to desired key and reply with that info
		// TODO: this will need some other metadata in the protobuf message
		//			to signal to the querying node that the data its receiving
		//			is actually a list of other nodes
	}
}

func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
	isResponse := true
	resp := new(DHTMessage)
	resp.Id = pmes.Id
	resp.Response = &isResponse

	mes := new(swarm.Message)
	mes.Peer = p
	mes.Data = []byte(resp.String())
	dht.network.Chan.Outgoing <- mes
}


114 115
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
116 117
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
	lchan := make(chan *swarm.Message)
118 119 120 121 122
	dht.listenLock.Lock()
	dht.listeners[mesid] = lchan
	dht.listenLock.Unlock()
	return lchan
}
123 124 125 126 127 128

// Stop all communications from this node and shut down
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
}