Spaces:
Sleeping
Sleeping
File size: 7,503 Bytes
e8c11b9 1668ff4 e8c11b9 4974395 |
|
import gradio as gr
import joblib
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
import io
from PIL import Image
import ast
import pandas as pd
def create_node_inputs(num_nodes):
"""Dynamically create input fields for each node's parameters, returning sliders only."""
inputs = []
for i in range(num_nodes):
inputs.append(gr.HTML(value=f"<h3>Node {i} Parameters</h3>")) # Display-only
inputs.append(gr.Slider(0.1, 0.5, value=0.3, label=f"Node {i} Active Time (s)"))
inputs.append(gr.Slider(0.5, 2.0, value=1.0, label=f"Node {i} Sleep Time (s)"))
inputs.append(gr.Slider(0.1, 1.0, value=0.5, label=f"Node {i} Energy Level"))
inputs.append(gr.Slider(0.1, 1.0, value=0.5, label=f"Node {i} Retransmission Interval (s)"))
inputs.append(gr.Slider(0.01, 0.3, value=0.1, label=f"Node {i} Packet Loss Rate"))
inputs.append(gr.Slider(10, 100, value=50, label=f"Node {i} Distance to Next Node (m)"))
return inputs
def simulate_network(num_nodes, edges_input, source_node, dest_node, *node_params):
# Load the trained model
model = joblib.load('energy_efficiency_model.pkl')
# Parse edges input (e.g., "0->1,1->2" or "[(0,1),(1,2)]")
try:
if edges_input.startswith('['):
edges = ast.literal_eval(edges_input)
if not all(isinstance(e, tuple) and len(e) == 2 for e in edges):
raise ValueError("Edges must be a list of tuples, e.g., [(0,1),(1,2)]")
else:
edges = [tuple(map(int, e.split('->'))) for e in edges_input.split(',')]
if not all(len(e) == 2 for e in edges):
raise ValueError("Edges must be in format '0->1,1->2' or [(0,1),(1,2)]")
except Exception as e:
return None, f"Error parsing edges: {str(e)}. Use format '0->1,1->2' or [(0,1),(1,2)]"
# Validate edges and source/destination nodes
try:
source_node = int(source_node)
dest_node = int(dest_node)
if not (0 <= source_node < num_nodes and 0 <= dest_node < num_nodes):
return None, f"Error: Source and destination nodes must be between 0 and {num_nodes - 1}"
if not all(0 <= u < num_nodes and 0 <= v < num_nodes for u, v in edges):
return None, f"Error: All node indices in edges must be between 0 and {num_nodes - 1}"
except ValueError:
return None, "Error: Source and destination nodes must be integers"
# Create directed graph
G = nx.DiGraph()
G.add_nodes_from(range(num_nodes))
G.add_edges_from(edges)
# Group node parameters, excluding HTML components
params_per_node = 6 # Number of numerical parameters per node
node_data = []
for i in range(num_nodes):
start_idx = i * (params_per_node + 1) + 1 # Skip HTML component
node_data.append({
'active_time': node_params[start_idx],
'sleep_time': node_params[start_idx + 1],
'energy_level': node_params[start_idx + 2],
'retransmission_interval': node_params[start_idx + 3],
'packet_loss_rate': node_params[start_idx + 4],
'distance_to_next_node': node_params[start_idx + 5]
})
# Calculate energy consumption for each edge
feature_names = ['active_time', 'sleep_time', 'energy_level',
'retransmission_interval', 'packet_loss_rate', 'distance_to_next_node']
for u, v in G.edges():
# Use source node's parameters for edge (u,v)
params = node_data[u]
input_features = pd.DataFrame([[
params['active_time'],
params['sleep_time'],
params['energy_level'],
params['retransmission_interval'],
params['packet_loss_rate'],
params['distance_to_next_node']
]], columns=feature_names)
energy_consumption = model.predict(input_features)[0]
G.edges[u, v]['energy'] = energy_consumption
# Find the most energy-efficient path
path = None
total_energy = None
try:
path = nx.shortest_path(G, source=source_node, target=dest_node, weight='energy')
total_energy = sum(G.edges[u, v]['energy'] for u, v in zip(path[:-1], path[1:]))
except nx.NetworkXNoPath:
# Find all possible paths and select the most energy-efficient one
all_paths = list(nx.all_simple_paths(G, source=source_node, target=dest_node))
if not all_paths:
return None, f"No path exists from node {source_node} to node {dest_node}"
# Calculate total energy for each path
min_energy = float('inf')
best_path = None
for p in all_paths:
energy = sum(G.edges[u, v]['energy'] for u, v in zip(p[:-1], p[1:]))
if energy < min_energy:
min_energy = energy
best_path = p
path = best_path
total_energy = min_energy
# Visualize the network
plt.figure(figsize=(12, 8))
pos = nx.spring_layout(G)
# Draw nodes and edges
nx.draw(G, pos, with_labels=True, node_color='lightblue',
node_size=500, font_size=12, font_weight='bold')
# Draw edge labels (energy consumption)
edge_labels = nx.get_edge_attributes(G, 'energy')
edge_labels = {k: f"{v:.3f}" for k, v in edge_labels.items()}
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)
# Highlight the optimal path
path_edges = list(zip(path[:-1], path[1:]))
nx.draw_networkx_edges(G, pos, edgelist=path_edges, edge_color='r', width=2)
# Save plot to buffer
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
img = Image.open(buf)
plt.close()
# Prepare output
result = f"""
Network Simulation Results:
- Number of nodes: {num_nodes}
- Edges: {edges}
- Optimal path from node {source_node} to node {dest_node}: {path}
- Total energy consumption: {total_energy:.3f} units
- Node Parameters:
"""
for i, params in enumerate(node_data):
result += f"""
Node {i}:
* Active Time: {params['active_time']:.2f} s
* Sleep Time: {params['sleep_time']:.2f} s
* Energy Level: {params['energy_level']:.2f}
* Retransmission Interval: {params['retransmission_interval']:.2f} s
* Packet Loss Rate: {params['packet_loss_rate']:.2f}
* Distance to Next Node: {params['distance_to_next_node']:.2f} m
"""
return img, result
# Create Gradio interface
def create_dynamic_interface(num_nodes=5):
inputs = [
gr.Slider(0, 9, value=num_nodes, step=1, label="Number of Nodes"),
gr.Textbox(label="Edges (e.g., '0->1,1->2' or '[(0,1),(1,2)]')",
value="0->1,1->2,2->3,3->4" if num_nodes == 5 else ""),
gr.Number(value=0, label="Source Node", precision=0),
gr.Number(value=num_nodes - 1, label="Destination Node", precision=0),
]
inputs.extend(create_node_inputs(num_nodes))
return inputs
# Initial interface with default number of nodes
initial_inputs = create_dynamic_interface()
iface = gr.Interface(
fn=simulate_network,
inputs=initial_inputs,
outputs=[gr.Image(label="Network Visualization"), gr.Textbox(label="Simulation Results")],
title="Energy-Efficient Network Path Simulation",
description="Simulate energy-efficient path selection with custom node parameters and network topology. Specify edges as '0->1,1->2' or [(0,1),(1,2)]."
)
if __name__ == "__main__":
iface.launch() |