Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import joblib | |
| import numpy as np | |
| import networkx as nx | |
| import matplotlib.pyplot as plt | |
| import io | |
| from PIL import Image | |
| import ast | |
| import pandas as pd | |
| def create_node_inputs(num_nodes): | |
| """Dynamically create input fields for each node's parameters, returning sliders only.""" | |
| inputs = [] | |
| for i in range(num_nodes): | |
| inputs.append(gr.HTML(value=f"<h3>Node {i} Parameters</h3>")) # Display-only | |
| inputs.append(gr.Slider(0.1, 0.5, value=0.3, label=f"Node {i} Active Time (s)")) | |
| inputs.append(gr.Slider(0.5, 2.0, value=1.0, label=f"Node {i} Sleep Time (s)")) | |
| inputs.append(gr.Slider(0.1, 1.0, value=0.5, label=f"Node {i} Energy Level")) | |
| inputs.append(gr.Slider(0.1, 1.0, value=0.5, label=f"Node {i} Retransmission Interval (s)")) | |
| inputs.append(gr.Slider(0.01, 0.3, value=0.1, label=f"Node {i} Packet Loss Rate")) | |
| inputs.append(gr.Slider(10, 100, value=50, label=f"Node {i} Distance to Next Node (m)")) | |
| return inputs | |
| def simulate_network(num_nodes, edges_input, source_node, dest_node, *node_params): | |
| # Load the trained model | |
| model = joblib.load('energy_efficiency_model.pkl') | |
| # Parse edges input (e.g., "0->1,1->2" or "[(0,1),(1,2)]") | |
| try: | |
| if edges_input.startswith('['): | |
| edges = ast.literal_eval(edges_input) | |
| if not all(isinstance(e, tuple) and len(e) == 2 for e in edges): | |
| raise ValueError("Edges must be a list of tuples, e.g., [(0,1),(1,2)]") | |
| else: | |
| edges = [tuple(map(int, e.split('->'))) for e in edges_input.split(',')] | |
| if not all(len(e) == 2 for e in edges): | |
| raise ValueError("Edges must be in format '0->1,1->2' or [(0,1),(1,2)]") | |
| except Exception as e: | |
| return None, f"Error parsing edges: {str(e)}. Use format '0->1,1->2' or [(0,1),(1,2)]" | |
| # Validate edges and source/destination nodes | |
| try: | |
| source_node = int(source_node) | |
| dest_node = int(dest_node) | |
| if not (0 <= source_node < num_nodes and 0 <= dest_node < num_nodes): | |
| return None, f"Error: Source and destination nodes must be between 0 and {num_nodes - 1}" | |
| if not all(0 <= u < num_nodes and 0 <= v < num_nodes for u, v in edges): | |
| return None, f"Error: All node indices in edges must be between 0 and {num_nodes - 1}" | |
| except ValueError: | |
| return None, "Error: Source and destination nodes must be integers" | |
| # Create directed graph | |
| G = nx.DiGraph() | |
| G.add_nodes_from(range(num_nodes)) | |
| G.add_edges_from(edges) | |
| # Group node parameters, excluding HTML components | |
| params_per_node = 6 # Number of numerical parameters per node | |
| node_data = [] | |
| for i in range(num_nodes): | |
| start_idx = i * (params_per_node + 1) + 1 # Skip HTML component | |
| node_data.append({ | |
| 'active_time': node_params[start_idx], | |
| 'sleep_time': node_params[start_idx + 1], | |
| 'energy_level': node_params[start_idx + 2], | |
| 'retransmission_interval': node_params[start_idx + 3], | |
| 'packet_loss_rate': node_params[start_idx + 4], | |
| 'distance_to_next_node': node_params[start_idx + 5] | |
| }) | |
| # Calculate energy consumption for each edge | |
| feature_names = ['active_time', 'sleep_time', 'energy_level', | |
| 'retransmission_interval', 'packet_loss_rate', 'distance_to_next_node'] | |
| for u, v in G.edges(): | |
| # Use source node's parameters for edge (u,v) | |
| params = node_data[u] | |
| input_features = pd.DataFrame([[ | |
| params['active_time'], | |
| params['sleep_time'], | |
| params['energy_level'], | |
| params['retransmission_interval'], | |
| params['packet_loss_rate'], | |
| params['distance_to_next_node'] | |
| ]], columns=feature_names) | |
| energy_consumption = model.predict(input_features)[0] | |
| G.edges[u, v]['energy'] = energy_consumption | |
| # Find the most energy-efficient path | |
| path = None | |
| total_energy = None | |
| try: | |
| path = nx.shortest_path(G, source=source_node, target=dest_node, weight='energy') | |
| total_energy = sum(G.edges[u, v]['energy'] for u, v in zip(path[:-1], path[1:])) | |
| except nx.NetworkXNoPath: | |
| # Find all possible paths and select the most energy-efficient one | |
| all_paths = list(nx.all_simple_paths(G, source=source_node, target=dest_node)) | |
| if not all_paths: | |
| return None, f"No path exists from node {source_node} to node {dest_node}" | |
| # Calculate total energy for each path | |
| min_energy = float('inf') | |
| best_path = None | |
| for p in all_paths: | |
| energy = sum(G.edges[u, v]['energy'] for u, v in zip(p[:-1], p[1:])) | |
| if energy < min_energy: | |
| min_energy = energy | |
| best_path = p | |
| path = best_path | |
| total_energy = min_energy | |
| # Visualize the network | |
| plt.figure(figsize=(12, 8)) | |
| pos = nx.spring_layout(G) | |
| # Draw nodes and edges | |
| nx.draw(G, pos, with_labels=True, node_color='lightblue', | |
| node_size=500, font_size=12, font_weight='bold') | |
| # Draw edge labels (energy consumption) | |
| edge_labels = nx.get_edge_attributes(G, 'energy') | |
| edge_labels = {k: f"{v:.3f}" for k, v in edge_labels.items()} | |
| nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels) | |
| # Highlight the optimal path | |
| path_edges = list(zip(path[:-1], path[1:])) | |
| nx.draw_networkx_edges(G, pos, edgelist=path_edges, edge_color='r', width=2) | |
| # Save plot to buffer | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png') | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| plt.close() | |
| # Prepare output | |
| result = f""" | |
| Network Simulation Results: | |
| - Number of nodes: {num_nodes} | |
| - Edges: {edges} | |
| - Optimal path from node {source_node} to node {dest_node}: {path} | |
| - Total energy consumption: {total_energy:.3f} units | |
| - Node Parameters: | |
| """ | |
| for i, params in enumerate(node_data): | |
| result += f""" | |
| Node {i}: | |
| * Active Time: {params['active_time']:.2f} s | |
| * Sleep Time: {params['sleep_time']:.2f} s | |
| * Energy Level: {params['energy_level']:.2f} | |
| * Retransmission Interval: {params['retransmission_interval']:.2f} s | |
| * Packet Loss Rate: {params['packet_loss_rate']:.2f} | |
| * Distance to Next Node: {params['distance_to_next_node']:.2f} m | |
| """ | |
| return img, result | |
| # Create Gradio interface | |
| def create_dynamic_interface(num_nodes=5): | |
| inputs = [ | |
| gr.Slider(0, 9, value=num_nodes, step=1, label="Number of Nodes"), | |
| gr.Textbox(label="Edges (e.g., '0->1,1->2' or '[(0,1),(1,2)]')", | |
| value="0->1,1->2,2->3,3->4" if num_nodes == 5 else ""), | |
| gr.Number(value=0, label="Source Node", precision=0), | |
| gr.Number(value=num_nodes - 1, label="Destination Node", precision=0), | |
| ] | |
| inputs.extend(create_node_inputs(num_nodes)) | |
| return inputs | |
| # Initial interface with default number of nodes | |
| initial_inputs = create_dynamic_interface() | |
| iface = gr.Interface( | |
| fn=simulate_network, | |
| inputs=initial_inputs, | |
| outputs=[gr.Image(label="Network Visualization"), gr.Textbox(label="Simulation Results")], | |
| title="Energy-Efficient Network Path Simulation", | |
| description="Simulate energy-efficient path selection with custom node parameters and network topology. Specify edges as '0->1,1->2' or [(0,1),(1,2)]." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() |