Spaces:
Configuration error
Configuration error
| //go:build p2p | |
| // +build p2p | |
| package p2p | |
| import ( | |
| "context" | |
| "errors" | |
| "fmt" | |
| "io" | |
| "net" | |
| "os" | |
| "sync" | |
| "time" | |
| "github.com/ipfs/go-log" | |
| "github.com/libp2p/go-libp2p/core/peer" | |
| "github.com/mudler/LocalAI/pkg/utils" | |
| "github.com/mudler/edgevpn/pkg/config" | |
| "github.com/mudler/edgevpn/pkg/node" | |
| "github.com/mudler/edgevpn/pkg/protocol" | |
| "github.com/mudler/edgevpn/pkg/services" | |
| "github.com/mudler/edgevpn/pkg/types" | |
| eutils "github.com/mudler/edgevpn/pkg/utils" | |
| "github.com/phayes/freeport" | |
| zlog "github.com/rs/zerolog/log" | |
| "github.com/mudler/edgevpn/pkg/logger" | |
| ) | |
| func generateNewConnectionData(DHTInterval, OTPInterval int) *node.YAMLConnectionConfig { | |
| maxMessSize := 20 << 20 // 20MB | |
| keyLength := 43 | |
| if DHTInterval == 0 { | |
| DHTInterval = 360 | |
| } | |
| if OTPInterval == 0 { | |
| OTPInterval = 9000 | |
| } | |
| return &node.YAMLConnectionConfig{ | |
| MaxMessageSize: maxMessSize, | |
| RoomName: eutils.RandStringRunes(keyLength), | |
| Rendezvous: eutils.RandStringRunes(keyLength), | |
| MDNS: eutils.RandStringRunes(keyLength), | |
| OTP: node.OTP{ | |
| DHT: node.OTPConfig{ | |
| Key: eutils.RandStringRunes(keyLength), | |
| Interval: DHTInterval, | |
| Length: keyLength, | |
| }, | |
| Crypto: node.OTPConfig{ | |
| Key: eutils.RandStringRunes(keyLength), | |
| Interval: OTPInterval, | |
| Length: keyLength, | |
| }, | |
| }, | |
| } | |
| } | |
| func GenerateToken(DHTInterval, OTPInterval int) string { | |
| // Generates a new config and exit | |
| return generateNewConnectionData(DHTInterval, OTPInterval).Base64() | |
| } | |
| func IsP2PEnabled() bool { | |
| return true | |
| } | |
| func nodeID(s string) string { | |
| hostname, _ := os.Hostname() | |
| return fmt.Sprintf("%s-%s", hostname, s) | |
| } | |
| func nodeAnnounce(ctx context.Context, node *node.Node) { | |
| ledger, _ := node.Ledger() | |
| // Announce ourselves so nodes accepts our connection | |
| ledger.Announce( | |
| ctx, | |
| 10*time.Second, | |
| func() { | |
| updatedMap := map[string]interface{}{} | |
| updatedMap[node.Host().ID().String()] = &types.User{ | |
| PeerID: node.Host().ID().String(), | |
| Timestamp: time.Now().String(), | |
| } | |
| ledger.Add(protocol.UsersLedgerKey, updatedMap) | |
| }, | |
| ) | |
| } | |
| func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) { | |
| ledger, _ := node.Ledger() | |
| // Retrieve current ID for ip in the blockchain | |
| existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID) | |
| service := &types.Service{} | |
| existingValue.Unmarshal(service) | |
| // If mismatch, update the blockchain | |
| if !found { | |
| zlog.Error().Msg("Service not found on blockchain") | |
| conn.Close() | |
| // ll.Debugf("service '%s' not found on blockchain", serviceID) | |
| return | |
| } | |
| // Decode the Peer | |
| d, err := peer.Decode(service.PeerID) | |
| if err != nil { | |
| zlog.Error().Msg("cannot decode peer") | |
| conn.Close() | |
| // ll.Debugf("could not decode peer '%s'", service.PeerID) | |
| return | |
| } | |
| // Open a stream | |
| stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID()) | |
| if err != nil { | |
| zlog.Error().Err(err).Msg("cannot open stream peer") | |
| conn.Close() | |
| // ll.Debugf("could not open stream '%s'", err.Error()) | |
| return | |
| } | |
| // ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String()) | |
| zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String()) | |
| closer := make(chan struct{}, 2) | |
| go copyStream(closer, stream, conn) | |
| go copyStream(closer, conn, stream) | |
| <-closer | |
| stream.Close() | |
| conn.Close() | |
| } | |
| func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error { | |
| zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) | |
| // Open local port for listening | |
| l, err := net.Listen("tcp", listenAddr) | |
| if err != nil { | |
| zlog.Error().Err(err).Msg("Error listening") | |
| return err | |
| } | |
| go func() { | |
| <-ctx.Done() | |
| l.Close() | |
| }() | |
| nodeAnnounce(ctx, node) | |
| defer l.Close() | |
| for { | |
| select { | |
| case <-ctx.Done(): | |
| return errors.New("context canceled") | |
| default: | |
| zlog.Debug().Msg("New for connection") | |
| // Listen for an incoming connection. | |
| conn, err := l.Accept() | |
| if err != nil { | |
| fmt.Println("Error accepting: ", err.Error()) | |
| continue | |
| } | |
| // Handle connections in a new goroutine, forwarding to the p2p service | |
| go func() { | |
| proxyP2PConnection(ctx, node, service, conn) | |
| }() | |
| } | |
| } | |
| } | |
| // This is the main of the server (which keeps the env variable updated) | |
| // This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services | |
| func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData), allocate bool) error { | |
| if servicesID == "" { | |
| servicesID = defaultServicesID | |
| } | |
| tunnels, err := discoveryTunnels(ctx, n, token, servicesID, allocate) | |
| if err != nil { | |
| return err | |
| } | |
| // TODO: discoveryTunnels should return all the nodes that are available? | |
| // In this way we updated availableNodes here instead of appending | |
| // e.g. we have a LastSeen field in NodeData that is updated in discoveryTunnels | |
| // each time the node is seen | |
| // In this case the below function should be idempotent and just keep track of the nodes | |
| go func() { | |
| for { | |
| select { | |
| case <-ctx.Done(): | |
| zlog.Error().Msg("Discoverer stopped") | |
| return | |
| case tunnel := <-tunnels: | |
| AddNode(servicesID, tunnel) | |
| if discoveryFunc != nil { | |
| discoveryFunc(servicesID, tunnel) | |
| } | |
| } | |
| } | |
| }() | |
| return nil | |
| } | |
| func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) { | |
| tunnels := make(chan NodeData) | |
| ledger, err := n.Ledger() | |
| if err != nil { | |
| return nil, fmt.Errorf("getting the ledger: %w", err) | |
| } | |
| // get new services, allocate and return to the channel | |
| // TODO: | |
| // a function ensureServices that: | |
| // - starts a service if not started, if the worker is Online | |
| // - checks that workers are Online, if not cancel the context of allocateLocalService | |
| // - discoveryTunnels should return all the nodes and addresses associated with it | |
| // - the caller should take now care of the fact that we are always returning fresh informations | |
| go func() { | |
| for { | |
| select { | |
| case <-ctx.Done(): | |
| zlog.Error().Msg("Discoverer stopped") | |
| return | |
| default: | |
| time.Sleep(5 * time.Second) | |
| data := ledger.LastBlock().Storage[servicesID] | |
| zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data") | |
| for k, v := range data { | |
| zlog.Debug().Msgf("New worker found in the ledger data '%s'", k) | |
| nd := &NodeData{} | |
| if err := v.Unmarshal(nd); err != nil { | |
| zlog.Error().Msg("cannot unmarshal node data") | |
| continue | |
| } | |
| ensureService(ctx, n, nd, k, allocate) | |
| muservice.Lock() | |
| if _, ok := service[nd.Name]; ok { | |
| tunnels <- service[nd.Name].NodeData | |
| } | |
| muservice.Unlock() | |
| } | |
| } | |
| } | |
| }() | |
| return tunnels, err | |
| } | |
| type nodeServiceData struct { | |
| NodeData NodeData | |
| CancelFunc context.CancelFunc | |
| } | |
| var service = map[string]nodeServiceData{} | |
| var muservice sync.Mutex | |
| func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) { | |
| muservice.Lock() | |
| defer muservice.Unlock() | |
| nd.ServiceID = sserv | |
| if ndService, found := service[nd.Name]; !found { | |
| if !nd.IsOnline() { | |
| // if node is offline and not present, do nothing | |
| zlog.Debug().Msgf("Node %s is offline", nd.ID) | |
| return | |
| } | |
| newCtxm, cancel := context.WithCancel(ctx) | |
| if allocate { | |
| // Start the service | |
| port, err := freeport.GetFreePort() | |
| if err != nil { | |
| zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID) | |
| return | |
| } | |
| tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port) | |
| nd.TunnelAddress = tunnelAddress | |
| go allocateLocalService(newCtxm, n, tunnelAddress, sserv) | |
| zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress) | |
| } | |
| service[nd.Name] = nodeServiceData{ | |
| NodeData: *nd, | |
| CancelFunc: cancel, | |
| } | |
| } else { | |
| // Check if the service is still alive | |
| // if not cancel the context | |
| if !nd.IsOnline() && !ndService.NodeData.IsOnline() { | |
| ndService.CancelFunc() | |
| delete(service, nd.Name) | |
| zlog.Info().Msgf("Node %s is offline, deleting", nd.ID) | |
| } else if nd.IsOnline() { | |
| // update last seen inside service | |
| nd.TunnelAddress = ndService.NodeData.TunnelAddress | |
| service[nd.Name] = nodeServiceData{ | |
| NodeData: *nd, | |
| CancelFunc: ndService.CancelFunc, | |
| } | |
| zlog.Debug().Msgf("Node %s is still online", nd.ID) | |
| } | |
| } | |
| } | |
| // This is the P2P worker main | |
| func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) { | |
| if servicesID == "" { | |
| servicesID = defaultServicesID | |
| } | |
| llger := logger.New(log.LevelFatal) | |
| nodeOpts, err := newNodeOpts(token) | |
| if err != nil { | |
| return nil, err | |
| } | |
| // generate a random string for the name | |
| name := utils.RandString(10) | |
| // Register the service | |
| nodeOpts = append(nodeOpts, | |
| services.RegisterService(llger, time.Duration(60)*time.Second, name, fmt.Sprintf("%s:%s", host, port))...) | |
| n, err := node.New(nodeOpts...) | |
| if err != nil { | |
| return nil, fmt.Errorf("creating a new node: %w", err) | |
| } | |
| err = n.Start(ctx) | |
| if err != nil { | |
| return n, fmt.Errorf("creating a new node: %w", err) | |
| } | |
| ledger, err := n.Ledger() | |
| if err != nil { | |
| return n, fmt.Errorf("creating a new node: %w", err) | |
| } | |
| ledger.Announce( | |
| ctx, | |
| 20*time.Second, | |
| func() { | |
| updatedMap := map[string]interface{}{} | |
| updatedMap[name] = &NodeData{ | |
| Name: name, | |
| LastSeen: time.Now(), | |
| ID: nodeID(name), | |
| } | |
| ledger.Add(servicesID, updatedMap) | |
| }, | |
| ) | |
| return n, err | |
| } | |
| func NewNode(token string) (*node.Node, error) { | |
| nodeOpts, err := newNodeOpts(token) | |
| if err != nil { | |
| return nil, err | |
| } | |
| n, err := node.New(nodeOpts...) | |
| if err != nil { | |
| return nil, fmt.Errorf("creating a new node: %w", err) | |
| } | |
| return n, nil | |
| } | |
| func newNodeOpts(token string) ([]node.Option, error) { | |
| llger := logger.New(log.LevelFatal) | |
| defaultInterval := 10 * time.Second | |
| // TODO: move this up, expose more config options when creating a node | |
| noDHT := os.Getenv("LOCALAI_P2P_DISABLE_DHT") == "true" | |
| noLimits := os.Getenv("LOCALAI_P2P_ENABLE_LIMITS") == "true" | |
| loglevel := os.Getenv("LOCALAI_P2P_LOGLEVEL") | |
| if loglevel == "" { | |
| loglevel = "info" | |
| } | |
| libp2ploglevel := os.Getenv("LOCALAI_LIBP2P_LOGLEVEL") | |
| if libp2ploglevel == "" { | |
| libp2ploglevel = "fatal" | |
| } | |
| c := config.Config{ | |
| Limit: config.ResourceLimit{ | |
| Enable: noLimits, | |
| MaxConns: 100, | |
| }, | |
| NetworkToken: token, | |
| LowProfile: false, | |
| LogLevel: loglevel, | |
| Libp2pLogLevel: libp2ploglevel, | |
| Ledger: config.Ledger{ | |
| SyncInterval: defaultInterval, | |
| AnnounceInterval: defaultInterval, | |
| }, | |
| NAT: config.NAT{ | |
| Service: true, | |
| Map: true, | |
| RateLimit: true, | |
| RateLimitGlobal: 100, | |
| RateLimitPeer: 100, | |
| RateLimitInterval: defaultInterval, | |
| }, | |
| Discovery: config.Discovery{ | |
| DHT: !noDHT, | |
| MDNS: true, | |
| Interval: 10 * time.Second, | |
| }, | |
| Connection: config.Connection{ | |
| HolePunch: true, | |
| AutoRelay: true, | |
| MaxConnections: 1000, | |
| }, | |
| } | |
| nodeOpts, _, err := c.ToOpts(llger) | |
| if err != nil { | |
| return nil, fmt.Errorf("parsing options: %w", err) | |
| } | |
| nodeOpts = append(nodeOpts, services.Alive(30*time.Second, 900*time.Second, 15*time.Minute)...) | |
| return nodeOpts, nil | |
| } | |
| func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { | |
| defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy | |
| io.Copy(dst, src) | |
| } | |