DHT-kademlia-P2Psec/pkg/kademlia.go
2024-09-13 14:04:40 +09:00

484 lines
13 KiB
Go

package kademlia
import (
"context"
"encoding/binary"
"errors"
"io"
"log"
"net"
"os"
"sync"
"time"
)
const (
ProofOfWorkBits = 2
Parallelisation = 3
)
type Config struct {
Bootstrapper string // Bootstrapper IP
BootstrapperPort uint64
P2PAddress string
P2PPort uint64
APIAddress string
APIPort uint64
RepublishInterval time.Duration
PrefixLength uint64 // Bits matched per bucket in routing table
CertFile string // Where to store the certificate
KeyFile string // Where to store the key
BootstrapperCert string // Path to the already present bootstrapper's certificate
}
type DHT struct {
RoutingTable *RoutingTable
SelfNode P2PNode
Storage DHTStorage
config *Config
}
/*
NewDHT Creates a new DHT object based on the configuration passed. If the bootstrap field of the config is left as empty
string, bootstrapping will be skipped.
*/
func NewDHT(config *Config) *DHT {
nodeIP := net.ParseIP(config.P2PAddress)
GenerateCert(config.P2PAddress, config.CertFile, config.KeyFile)
cert, err := os.ReadFile(config.CertFile)
if err != nil {
log.Fatal(err)
}
selfNode := P2PNode{
NodeID: GetPeerId(nodeIP, uint16(config.P2PPort)),
IPAddress: nodeIP,
Port: config.P2PPort,
Certificate: cert,
}
dht := &DHT{
Storage: DHTStorage{
//storageMap: sync.Map{},
storageMap: map[[32]byte]ValueType{},
},
SelfNode: selfNode,
RoutingTable: NewRoutingTable(selfNode, config.PrefixLength),
config: config,
}
dht.bootstrap()
return dht
}
type P2PNode struct {
NodeID [32]byte
IPAddress net.IP
Port uint64
Certificate []byte
}
/*
bootstrap populates the routing table upon creation of the dht.
*/
func (d *DHT) bootstrap() {
if len(d.config.Bootstrapper) == 0 {
return
}
bootstrapperCert, err := os.ReadFile(d.config.BootstrapperCert)
if err != nil {
log.Printf("[Boostrap] Failed to load boostrapper certificate: %v Proceeding without boostrap!", err)
return
}
bootstrapNode := CreateNode(d.config.Bootstrapper, d.config.BootstrapperPort, bootstrapperCert)
d.RoutingTable.Insert(bootstrapNode)
//By flipping bits of our ID, we can query for nodes at specific distances to us, filling up the respective
//Buckets of the routing table.
for i := 0; i < 256; i++ {
toQuery := setBitAt(d.SelfNode.NodeID, i, getBitsAt(d.SelfNode.NodeID, uint64(i), 1) == 0) //Flip the bit
targets := *d.RoutingTable.FindClosest(toQuery, bucketSize)
if len(targets) == 0 {
log.Printf("This should not have happened")
}
targets = SortNodesByDistanceToKey(d.SelfNode.NodeID, targets)
// Once we find ourself, we will do so in further steps as well, so no need to go on.
if NodeEquals(targets[0], d.SelfNode) {
break
}
res, err := d.FindNodesAtNode(toQuery, targets[0], bucketSize)
if err != nil {
log.Printf("[DHT]: Error during bootstrap of node %v: %v", d.SelfNode.IPAddress, err)
continue
}
for _, node := range res {
d.RoutingTable.Insert(node)
}
}
}
/*
Store stores an entry at the specified key in the network. For this it first finds the nodes responsible for the key and
then sends STORE requests to them.
*/
func (d *DHT) Store(key [32]byte, value []byte, ttl uint16, repCount uint16) {
// Find the closest nodes to the key
targets, err := d.FindNode(key, repCount)
if err != nil {
return
}
// If we are one of the closest nodes, we store it ourselves
if ContainsNode(targets, d.SelfNode) {
d.Storage.Put(key, value, ttl, repCount)
}
//Do proof of work only once
pow := ProofWorkForStore(key, value)
// Replicate the value to the closest nodes
for i := 0; i < int(repCount) && i < len(targets); i++ {
if targets[i].NodeID != d.SelfNode.NodeID {
// Send the value to the target
// log.Printf("Storing at %v, entry %d", targets[i].IPAddress, i)
go d.sendStore(key, value, repCount, ttl, targets[i], pow)
}
}
}
func (d *DHT) Get(key [32]byte) ([]byte, bool) {
return d.FindValue(key)
}
/*
Ping sends a PING message to the specified node and returns if it is still alive
*/
func Ping(node P2PNode, selfNode P2PNode) bool {
connection, err := ConnectToNode(node)
if err != nil {
return false
}
defer connection.Close()
packet := P2PPingPacket{
SourceIP: selfNode.IPAddress,
SourcePort: uint16(selfNode.Port),
Certificate: selfNode.Certificate,
}
_, err = connection.Write(SerialisePingPacket(&packet))
if err != nil {
return false
}
pingTimeout := 1 * time.Second
err = connection.SetDeadline(time.Now().Add(pingTimeout))
if err != nil {
return false
}
receiveBuffer := make([]byte, 4)
_, err = io.ReadFull(connection, receiveBuffer)
if err != nil {
return false
}
recvSize := binary.BigEndian.Uint16(receiveBuffer[:2])
recvType := binary.BigEndian.Uint16(receiveBuffer[2:])
if recvType != PONG || recvSize != 4 {
return false
}
return true
}
/*
FindValue queries the DHT network for the value related to the provided key. It does so in a highly parallel fashion,
querying the network for the closest nodes to the key first and then querying those for the key.
It returns the key, if found
*/
func (d *DHT) FindValue(key [32]byte) ([]byte, bool) {
v, e := d.Storage.GetValue(key)
if e {
return v, true
}
toSearch, err := d.FindNode(key, bucketSize)
if toSearch == nil || err != nil {
return nil, false
}
resultChannel := make(chan []byte)
done := make(chan struct{})
var wg sync.WaitGroup
for _, node := range toSearch {
wg.Add(1)
go func() {
d.findValueAtNode(resultChannel, key, node)
wg.Done()
}()
}
go func() {
wg.Wait()
close(done)
}()
select {
case result := <-resultChannel:
return result, true
case <-done:
return nil, false
}
}
/*
findValueAtNode connects to a node and sends a FINDVALUE request for the specified key. In case it is successful it
reports it result to the results channel. If not, it exits with no reply, allowing for parallel queries.
*/
func (d *DHT) findValueAtNode(results chan<- []byte, key [32]byte, target P2PNode) {
connection, err := ConnectToNode(target)
if err != nil {
return
}
defer connection.Close()
packet := P2PFindValuePacket{
SourceIP: d.SelfNode.IPAddress,
SourcePort: uint16(d.SelfNode.Port),
Certificate: d.SelfNode.Certificate,
Key: key,
}
_, err = connection.Write(SerialiseFindValuePacket(&packet))
if err != nil {
return
}
responseHeader := make([]byte, 4)
_, err = io.ReadFull(connection, responseHeader)
if err != nil {
log.Print(err)
return
}
size := binary.BigEndian.Uint16(responseHeader[0:2])
responseType := binary.BigEndian.Uint16(responseHeader[2:4])
switch responseType {
case FOUNDVALUE:
responseBody := make([]byte, size-4)
_, err = io.ReadFull(connection, responseBody)
if err != nil {
log.Print(err)
return
}
reply := ParseFoundValuePacket(append(responseHeader, responseBody...))
results <- reply.Value
return
default:
return
}
}
/*
sendStore connects to the target node and sends a STORE packet matching the parameters provided.
*/
func (d *DHT) sendStore(key [32]byte, value []byte, repCount uint16, ttl uint16, target P2PNode, pow [32]byte) {
connection, err := ConnectToNode(target)
if err != nil {
return
}
defer connection.Close()
packet := P2PStorePacket{
SourceIP: d.SelfNode.IPAddress,
SourcePort: uint16(d.SelfNode.Port),
Certificate: d.SelfNode.Certificate,
TTL: ttl,
RepCount: repCount,
PoW: pow,
Key: key,
Value: value,
}
_, err = connection.Write(SerialiseStorePacket(&packet))
if err != nil {
print("Error sending store message %v", err)
return
}
}
// FindNode queries the DHT until it found the closest node to the given key. It then returns the reply
// of that node to a FINDNODE request for that key
// The search is performed on several nodes in parallel, depending on the Parallelisation setting
func (d *DHT) FindNode(key [32]byte, numberOfNodes uint16) ([]P2PNode, error) {
toSearch := *d.RoutingTable.FindClosest(key, Parallelisation*5)
// If our routing table is empty, we apparently are alone
if len(toSearch) == 0 {
d.RoutingTable.Insert(d.SelfNode)
toSearch = append(toSearch, d.SelfNode)
return toSearch, nil
}
// Create a context to stop any loose running threads once a result has been found
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resultChannel := make(chan []P2PNode)
done := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < Parallelisation; i++ {
wg.Add(1)
go func() {
// Start from different positions to not query the same node twice
startIndex := i * len(toSearch) / Parallelisation
_ = d.findClosestNodes(ctx, resultChannel, key, toSearch[startIndex:], numberOfNodes)
wg.Done()
}()
}
// The done channel is only closed, if all routines calling findClosestNodes finished. If any of them does so by
// reporting a success, the first case below will be selected. Only if all failed to contact any nodes will
// the node search fail.
go func() {
wg.Wait()
close(done)
}()
select {
case result := <-resultChannel:
return result, nil
case <-done:
return toSearch, errors.New("node search unsuccessful")
}
}
/*
findClosestNodes tries to find the closest <amount> nodes to the specified key in the network. If no further improvements
are made, it reports back its result to the results channel. If the context is ended, it stops prematurely without
reporting a result. This is used when another concurrent search is successful first. It is provided an array of nodes
that it will use as first nodes to contact. This allows starting several threads with different node arrays, so that
not all threads choose the same node as start point.
*/
func (d *DHT) findClosestNodes(ctx context.Context, results chan<- []P2PNode, key [32]byte, nodes []P2PNode, amount uint16) error {
currentNode := nodes[0]
var closestSuccessfulNodeID [32]byte //Start of as far away as possible so everything is an improvement
for i, b := range key {
closestSuccessfulNodeID[i] = ^b
}
index := 0
for {
select {
case <-ctx.Done():
return nil
default:
}
result, err := d.FindNodesAtNode(key, currentNode, amount)
if err != nil || len(result) == 0 {
index++
if index >= len(nodes) {
return errors.New("ran out of nodes")
} else {
currentNode = nodes[index]
continue
}
}
result = append(result, nodes...)
result = SortNodesByDistanceToKey(key, result)
result = RemoveDuplicates(result)
if !IsCloserToKey(key, result[index].NodeID, closestSuccessfulNodeID) {
results <- result[index:]
return nil
} else {
nodes = result
currentNode = result[0]
index = 0
continue
}
}
}
/*
FindNodesAtNode connects to the specified node and issues a FINDNODE request to it for the key in question. It returns
the nodes sent back, if successful. numberOfNodes specifies, how many nodes should be tried to return.
*/
func (d *DHT) FindNodesAtNode(key [32]byte, node P2PNode, numberOfNodes uint16) ([]P2PNode, error) {
connection, err := ConnectToNode(node)
if err != nil {
return nil, err
}
defer connection.Close()
packet := P2PFindNodePacket{
SourceIP: d.SelfNode.IPAddress,
SourcePort: uint16(d.SelfNode.Port),
Key: key,
Certificate: d.SelfNode.Certificate,
Amount: numberOfNodes,
}
_, err = connection.Write(SerialiseFindNodePacket(&packet))
if err != nil {
return nil, err
}
responseHeader := make([]byte, 4)
deadline := time.Now().Add(time.Second * 1)
err = connection.SetDeadline(deadline)
if err != nil {
return nil, err
}
_, err = io.ReadFull(connection, responseHeader)
if err != nil {
return nil, err
}
size := binary.BigEndian.Uint16(responseHeader)
messageType := binary.BigEndian.Uint16(responseHeader[2:])
if messageType != FOUNDNODE {
return nil, errors.New("received invalid response")
}
responseBody := make([]byte, size-4)
_, err = io.ReadFull(connection, responseBody)
if err != nil {
return nil, err
}
foundNodes, err := ParseFoundNodesPacket(append(responseHeader, responseBody...))
if err != nil {
return nil, err
}
return foundNodes.Nodes, nil
}
/*
Housekeeping needs to be passed the time that passed since the last time it has been called. It then expires any entries
in the in memory storage of the DHT whose ttl ran out and republishes entries due for republishing.
*/
func (d *DHT) Housekeeping(elapsedTime time.Duration) {
d.Storage.UpdateAndExpireEntries(elapsedTime)
toBeRepublished := d.Storage.GetEntriesForRepublishing(d.config.RepublishInterval)
d.RepublishEntries(toBeRepublished)
}
/*
RepublishEntries checks for all entries in the map provided, if they have not been republished for a long enough time
by now and republishes the ones due.
*/
func (d *DHT) RepublishEntries(entries map[KeyType]ValueType) {
for key, entry := range entries {
if time.Since(entry.timeAdded) > d.config.RepublishInterval {
// Do in parallel
go d.Store(key, entry.Value, entry.TTL, entry.RepCount)
}
}
}