Spaces:
Sleeping
Sleeping
File size: 7,503 Bytes
e8c11b9 1668ff4 e8c11b9 4974395 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import gradio as gr
import joblib
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
import io
from PIL import Image
import ast
import pandas as pd
def create_node_inputs(num_nodes):
"""Dynamically create input fields for each node's parameters, returning sliders only."""
inputs = []
for i in range(num_nodes):
inputs.append(gr.HTML(value=f"<h3>Node {i} Parameters</h3>")) # Display-only
inputs.append(gr.Slider(0.1, 0.5, value=0.3, label=f"Node {i} Active Time (s)"))
inputs.append(gr.Slider(0.5, 2.0, value=1.0, label=f"Node {i} Sleep Time (s)"))
inputs.append(gr.Slider(0.1, 1.0, value=0.5, label=f"Node {i} Energy Level"))
inputs.append(gr.Slider(0.1, 1.0, value=0.5, label=f"Node {i} Retransmission Interval (s)"))
inputs.append(gr.Slider(0.01, 0.3, value=0.1, label=f"Node {i} Packet Loss Rate"))
inputs.append(gr.Slider(10, 100, value=50, label=f"Node {i} Distance to Next Node (m)"))
return inputs
def simulate_network(num_nodes, edges_input, source_node, dest_node, *node_params):
# Load the trained model
model = joblib.load('energy_efficiency_model.pkl')
# Parse edges input (e.g., "0->1,1->2" or "[(0,1),(1,2)]")
try:
if edges_input.startswith('['):
edges = ast.literal_eval(edges_input)
if not all(isinstance(e, tuple) and len(e) == 2 for e in edges):
raise ValueError("Edges must be a list of tuples, e.g., [(0,1),(1,2)]")
else:
edges = [tuple(map(int, e.split('->'))) for e in edges_input.split(',')]
if not all(len(e) == 2 for e in edges):
raise ValueError("Edges must be in format '0->1,1->2' or [(0,1),(1,2)]")
except Exception as e:
return None, f"Error parsing edges: {str(e)}. Use format '0->1,1->2' or [(0,1),(1,2)]"
# Validate edges and source/destination nodes
try:
source_node = int(source_node)
dest_node = int(dest_node)
if not (0 <= source_node < num_nodes and 0 <= dest_node < num_nodes):
return None, f"Error: Source and destination nodes must be between 0 and {num_nodes - 1}"
if not all(0 <= u < num_nodes and 0 <= v < num_nodes for u, v in edges):
return None, f"Error: All node indices in edges must be between 0 and {num_nodes - 1}"
except ValueError:
return None, "Error: Source and destination nodes must be integers"
# Create directed graph
G = nx.DiGraph()
G.add_nodes_from(range(num_nodes))
G.add_edges_from(edges)
# Group node parameters, excluding HTML components
params_per_node = 6 # Number of numerical parameters per node
node_data = []
for i in range(num_nodes):
start_idx = i * (params_per_node + 1) + 1 # Skip HTML component
node_data.append({
'active_time': node_params[start_idx],
'sleep_time': node_params[start_idx + 1],
'energy_level': node_params[start_idx + 2],
'retransmission_interval': node_params[start_idx + 3],
'packet_loss_rate': node_params[start_idx + 4],
'distance_to_next_node': node_params[start_idx + 5]
})
# Calculate energy consumption for each edge
feature_names = ['active_time', 'sleep_time', 'energy_level',
'retransmission_interval', 'packet_loss_rate', 'distance_to_next_node']
for u, v in G.edges():
# Use source node's parameters for edge (u,v)
params = node_data[u]
input_features = pd.DataFrame([[
params['active_time'],
params['sleep_time'],
params['energy_level'],
params['retransmission_interval'],
params['packet_loss_rate'],
params['distance_to_next_node']
]], columns=feature_names)
energy_consumption = model.predict(input_features)[0]
G.edges[u, v]['energy'] = energy_consumption
# Find the most energy-efficient path
path = None
total_energy = None
try:
path = nx.shortest_path(G, source=source_node, target=dest_node, weight='energy')
total_energy = sum(G.edges[u, v]['energy'] for u, v in zip(path[:-1], path[1:]))
except nx.NetworkXNoPath:
# Find all possible paths and select the most energy-efficient one
all_paths = list(nx.all_simple_paths(G, source=source_node, target=dest_node))
if not all_paths:
return None, f"No path exists from node {source_node} to node {dest_node}"
# Calculate total energy for each path
min_energy = float('inf')
best_path = None
for p in all_paths:
energy = sum(G.edges[u, v]['energy'] for u, v in zip(p[:-1], p[1:]))
if energy < min_energy:
min_energy = energy
best_path = p
path = best_path
total_energy = min_energy
# Visualize the network
plt.figure(figsize=(12, 8))
pos = nx.spring_layout(G)
# Draw nodes and edges
nx.draw(G, pos, with_labels=True, node_color='lightblue',
node_size=500, font_size=12, font_weight='bold')
# Draw edge labels (energy consumption)
edge_labels = nx.get_edge_attributes(G, 'energy')
edge_labels = {k: f"{v:.3f}" for k, v in edge_labels.items()}
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)
# Highlight the optimal path
path_edges = list(zip(path[:-1], path[1:]))
nx.draw_networkx_edges(G, pos, edgelist=path_edges, edge_color='r', width=2)
# Save plot to buffer
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
img = Image.open(buf)
plt.close()
# Prepare output
result = f"""
Network Simulation Results:
- Number of nodes: {num_nodes}
- Edges: {edges}
- Optimal path from node {source_node} to node {dest_node}: {path}
- Total energy consumption: {total_energy:.3f} units
- Node Parameters:
"""
for i, params in enumerate(node_data):
result += f"""
Node {i}:
* Active Time: {params['active_time']:.2f} s
* Sleep Time: {params['sleep_time']:.2f} s
* Energy Level: {params['energy_level']:.2f}
* Retransmission Interval: {params['retransmission_interval']:.2f} s
* Packet Loss Rate: {params['packet_loss_rate']:.2f}
* Distance to Next Node: {params['distance_to_next_node']:.2f} m
"""
return img, result
# Create Gradio interface
def create_dynamic_interface(num_nodes=5):
inputs = [
gr.Slider(0, 9, value=num_nodes, step=1, label="Number of Nodes"),
gr.Textbox(label="Edges (e.g., '0->1,1->2' or '[(0,1),(1,2)]')",
value="0->1,1->2,2->3,3->4" if num_nodes == 5 else ""),
gr.Number(value=0, label="Source Node", precision=0),
gr.Number(value=num_nodes - 1, label="Destination Node", precision=0),
]
inputs.extend(create_node_inputs(num_nodes))
return inputs
# Initial interface with default number of nodes
initial_inputs = create_dynamic_interface()
iface = gr.Interface(
fn=simulate_network,
inputs=initial_inputs,
outputs=[gr.Image(label="Network Visualization"), gr.Textbox(label="Simulation Results")],
title="Energy-Efficient Network Path Simulation",
description="Simulate energy-efficient path selection with custom node parameters and network topology. Specify edges as '0->1,1->2' or [(0,1),(1,2)]."
)
if __name__ == "__main__":
iface.launch() |