DHT-kademlia-P2Psec/cmd/p2p_server.go
2024-09-13 14:04:40 +09:00

176 lines
5.1 KiB
Go

package main
import (
"crypto/tls"
"encoding/binary"
kademlia "gitlab.lrz.de/netintum/teaching/p2psec_projects_2024/DHT-6/pkg"
"io"
"log"
"net"
"strconv"
"time"
)
type P2PServer struct {
dhtInstance *kademlia.DHT
}
func initializeP2PServer(config *kademlia.Config, dht *kademlia.DHT) {
// Get the certificate ready to accept connections via TLS
cert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile)
if err != nil {
log.Fatal(err)
}
tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}}
listener, err := tls.Listen("tcp", config.P2PAddress+":"+strconv.Itoa(int(config.P2PPort)), tlsConfig)
if err != nil {
log.Fatalln("[P2P Server] Error starting server: ", err)
}
defer listener.Close()
server := &P2PServer{dhtInstance: dht}
lastUpdated := time.Now()
for {
connection, err := listener.Accept()
// Expire and republish entries periodically
if time.Since(lastUpdated) > config.RepublishInterval {
go server.dhtInstance.Housekeeping(time.Now().Sub(lastUpdated))
lastUpdated = time.Now()
}
if err != nil {
log.Println("[P2P Server] Error accepting connection: ", err)
continue
}
// Handle in separate thread and close when done
go func() {
server.handleP2PMessage(connection)
connection.Close()
}()
}
}
func (p *P2PServer) handleP2PMessage(connection net.Conn) {
// Add a lenient timeout so connections cannot be held open forever.
err := connection.SetDeadline(time.Now().Add(10 * time.Second))
if err != nil {
log.Println("[P2P Server] Error setting deadline: ", err)
return
}
buf := make([]byte, 4)
_, err = io.ReadFull(connection, buf)
if err != nil {
log.Println("[P2P Server] Error reading message: ", err)
return
}
size := binary.BigEndian.Uint16(buf[:2])
messageType := binary.BigEndian.Uint16(buf[2:])
// Subtract 4 bytes used for size and message type
buf2 := make([]byte, size-4)
_, err = io.ReadFull(connection, buf2)
if err != nil {
log.Println("[P2P Server] Error reading message: ", err)
return
}
// combine header and body to get the full message again
data := append(buf, buf2...)
switch messageType {
case kademlia.STORE:
packet, err := kademlia.ParseStorePacket(data)
if err != nil {
log.Println("[P2P Server] Error parsing store packet: ", err)
return
}
p.handleStore(packet)
case kademlia.PING:
packet, err := kademlia.ParsePingPacket(data)
if err != nil {
log.Println("[P2P Server] Error parsing ping packet: ", err)
return
}
p.handlePing(packet, connection)
case kademlia.FINDNODE:
packet, err := kademlia.ParseFindNodePacket(data)
if err != nil {
log.Println("[P2P Server] Error parsing find node packet: ", err)
return
}
p.handleFindNode(packet, connection)
case kademlia.FINDVALUE:
packet, err := kademlia.ParseFindValuePacket(data)
if err != nil {
log.Println("[P2P Server] Error parsing find value packet: ", err)
return
}
p.handleFindValue(packet, connection)
default:
log.Println("[P2P Server] Invalid message type")
}
}
func (p *P2PServer) handleStore(packet *kademlia.P2PStorePacket) {
// Always add the sender to the routing table
senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate)
go p.dhtInstance.RoutingTable.Insert(senderNode)
nodes := p.dhtInstance.RoutingTable.FindClosest(packet.Key, int(packet.RepCount))
if len(*nodes) < int(packet.RepCount) || kademlia.ContainsID((*nodes)[:packet.RepCount], p.dhtInstance.SelfNode.NodeID) {
log.Printf("Storing value...")
value := packet.Value
p.dhtInstance.Storage.Put(packet.Key, value, packet.TTL, packet.RepCount)
} else {
log.Printf("Stored at enough other nodes, no need to.")
}
}
func (p *P2PServer) handlePing(packet *kademlia.P2PPingPacket, connection net.Conn) {
// Always add the sender to the routing table
senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate)
go p.dhtInstance.RoutingTable.Insert(senderNode)
reply := kademlia.SerialisePongPacket(&kademlia.P2PPongPacket{})
_, _ = connection.Write(reply)
}
func (p *P2PServer) handleFindNode(packet *kademlia.P2PFindNodePacket, connection net.Conn) {
// Always add the sender to the routing table
senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate)
go p.dhtInstance.RoutingTable.Insert(senderNode)
nodes := p.dhtInstance.RoutingTable.FindClosest(packet.Key, int(packet.Amount))
reply := kademlia.SerialiseFoundNodePacket(&kademlia.P2PFoundNodePacket{Nodes: *nodes})
_, _ = connection.Write(reply)
}
func (p *P2PServer) handleFindValue(packet *kademlia.P2PFindValuePacket, connection net.Conn) {
// Always add the sender to the routing table
senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate)
go p.dhtInstance.RoutingTable.Insert(senderNode)
val, exists := p.dhtInstance.Storage.GetValue(packet.Key)
if exists {
reply := kademlia.P2PFoundValuePacket{
Value: val,
}
_, _ = connection.Write(kademlia.SerialiseFoundValuePacket(&reply))
} else {
reply := kademlia.SerialiseNotFoundValuePacket(&kademlia.P2PNotFoundPValuePacket{})
_, _ = connection.Write(reply)
}
}