package main import ( "crypto/tls" "encoding/binary" kademlia "gitlab.lrz.de/netintum/teaching/p2psec_projects_2024/DHT-6/pkg" "io" "log" "net" "strconv" "time" ) type P2PServer struct { dhtInstance *kademlia.DHT } func initializeP2PServer(config *kademlia.Config, dht *kademlia.DHT) { // Get the certificate ready to accept connections via TLS cert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile) if err != nil { log.Fatal(err) } tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}} listener, err := tls.Listen("tcp", config.P2PAddress+":"+strconv.Itoa(int(config.P2PPort)), tlsConfig) if err != nil { log.Fatalln("[P2P Server] Error starting server: ", err) } defer listener.Close() server := &P2PServer{dhtInstance: dht} lastUpdated := time.Now() for { connection, err := listener.Accept() // Expire and republish entries periodically if time.Since(lastUpdated) > config.RepublishInterval { go server.dhtInstance.Housekeeping(time.Now().Sub(lastUpdated)) lastUpdated = time.Now() } if err != nil { log.Println("[P2P Server] Error accepting connection: ", err) continue } // Handle in separate thread and close when done go func() { server.handleP2PMessage(connection) connection.Close() }() } } func (p *P2PServer) handleP2PMessage(connection net.Conn) { // Add a lenient timeout so connections cannot be held open forever. err := connection.SetDeadline(time.Now().Add(10 * time.Second)) if err != nil { log.Println("[P2P Server] Error setting deadline: ", err) return } buf := make([]byte, 4) _, err = io.ReadFull(connection, buf) if err != nil { log.Println("[P2P Server] Error reading message: ", err) return } size := binary.BigEndian.Uint16(buf[:2]) messageType := binary.BigEndian.Uint16(buf[2:]) // Subtract 4 bytes used for size and message type buf2 := make([]byte, size-4) _, err = io.ReadFull(connection, buf2) if err != nil { log.Println("[P2P Server] Error reading message: ", err) return } // combine header and body to get the full message again data := append(buf, buf2...) switch messageType { case kademlia.STORE: packet, err := kademlia.ParseStorePacket(data) if err != nil { log.Println("[P2P Server] Error parsing store packet: ", err) return } p.handleStore(packet) case kademlia.PING: packet, err := kademlia.ParsePingPacket(data) if err != nil { log.Println("[P2P Server] Error parsing ping packet: ", err) return } p.handlePing(packet, connection) case kademlia.FINDNODE: packet, err := kademlia.ParseFindNodePacket(data) if err != nil { log.Println("[P2P Server] Error parsing find node packet: ", err) return } p.handleFindNode(packet, connection) case kademlia.FINDVALUE: packet, err := kademlia.ParseFindValuePacket(data) if err != nil { log.Println("[P2P Server] Error parsing find value packet: ", err) return } p.handleFindValue(packet, connection) default: log.Println("[P2P Server] Invalid message type") } } func (p *P2PServer) handleStore(packet *kademlia.P2PStorePacket) { // Always add the sender to the routing table senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate) go p.dhtInstance.RoutingTable.Insert(senderNode) nodes := p.dhtInstance.RoutingTable.FindClosest(packet.Key, int(packet.RepCount)) if len(*nodes) < int(packet.RepCount) || kademlia.ContainsID((*nodes)[:packet.RepCount], p.dhtInstance.SelfNode.NodeID) { log.Printf("Storing value...") value := packet.Value p.dhtInstance.Storage.Put(packet.Key, value, packet.TTL, packet.RepCount) } else { log.Printf("Stored at enough other nodes, no need to.") } } func (p *P2PServer) handlePing(packet *kademlia.P2PPingPacket, connection net.Conn) { // Always add the sender to the routing table senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate) go p.dhtInstance.RoutingTable.Insert(senderNode) reply := kademlia.SerialisePongPacket(&kademlia.P2PPongPacket{}) _, _ = connection.Write(reply) } func (p *P2PServer) handleFindNode(packet *kademlia.P2PFindNodePacket, connection net.Conn) { // Always add the sender to the routing table senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate) go p.dhtInstance.RoutingTable.Insert(senderNode) nodes := p.dhtInstance.RoutingTable.FindClosest(packet.Key, int(packet.Amount)) reply := kademlia.SerialiseFoundNodePacket(&kademlia.P2PFoundNodePacket{Nodes: *nodes}) _, _ = connection.Write(reply) } func (p *P2PServer) handleFindValue(packet *kademlia.P2PFindValuePacket, connection net.Conn) { // Always add the sender to the routing table senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate) go p.dhtInstance.RoutingTable.Insert(senderNode) val, exists := p.dhtInstance.Storage.GetValue(packet.Key) if exists { reply := kademlia.P2PFoundValuePacket{ Value: val, } _, _ = connection.Write(kademlia.SerialiseFoundValuePacket(&reply)) } else { reply := kademlia.SerialiseNotFoundValuePacket(&kademlia.P2PNotFoundPValuePacket{}) _, _ = connection.Write(reply) } }