PVDcal / src /pvd_consolidation.py
Sompote's picture
Upload 2 files
1843b43 verified
"""
Multilayer Prefabricated Vertical Drain (PVD) Consolidation Analysis
Calculates settlement vs time using finite difference method
"""
import numpy as np
import matplotlib.pyplot as plt
import yaml
import argparse
import os
from dataclasses import dataclass
from typing import List, Tuple, Dict, Any
@dataclass
class LoadingStage:
"""Loading stage definition for staged loading"""
start_time: float # Time when this stage starts (years)
surcharge: float # Surcharge load (kPa)
vacuum: float # Vacuum pressure (kPa, positive value, will be applied as negative)
def __post_init__(self):
"""Validate inputs"""
if self.vacuum < 0:
raise ValueError(
"Vacuum should be specified as positive value (e.g., 80 for -80 kPa)"
)
@dataclass
class SoilLayer:
"""Soil layer properties"""
thickness: float # Layer thickness (m)
Cv: float # Vertical coefficient of consolidation (m²/year)
Ch: float # Horizontal coefficient of consolidation (m²/year)
RR: float # Recompression ratio
CR: float # Compression ratio
sigma_ini: float # Initial effective stress (kPa)
sigma_p: float # Preconsolidation pressure (kPa)
kh: float # Horizontal permeability (m/year)
ks: float # Smear zone permeability (m/year)
@dataclass
class PVDProperties:
"""PVD installation properties"""
dw: float # Equivalent diameter of drain (m)
ds: float # Smear zone diameter (m)
De: float # Equivalent diameter of unit cell (m)
L_drain: float # Total drain spacing for two-way drainage (m)
qw: float # Well discharge capacity (m³/year)
r_influence: float = None # Vacuum influence radius (m), default = De/4
vacuum_depth_loss: bool = True # Use vacuum loss with depth (default: True)
def __post_init__(self):
"""Set default vacuum influence radius"""
if self.r_influence is None:
self.r_influence = self.De / 4
class PVDConsolidation:
"""
Multilayer PVD consolidation analysis using finite difference method
"""
def __init__(
self,
soil_layers: List[SoilLayer],
pvd: PVDProperties,
surcharge: float = 0.0,
vacuum: float = 0.0,
loading_stages: List[LoadingStage] = None,
dt: float = 0.01,
):
"""
Initialize PVD consolidation analysis
Parameters:
-----------
soil_layers : List[SoilLayer]
List of soil layers from top to bottom
pvd : PVDProperties
PVD installation properties
surcharge : float
Applied surcharge load (kPa) - for single-stage loading
vacuum : float
Applied vacuum pressure (kPa, positive value) - for single-stage loading
loading_stages : List[LoadingStage]
List of loading stages for multi-stage loading (overrides surcharge/vacuum)
dt : float
Time step for finite difference (years)
"""
self.layers = soil_layers
self.pvd = pvd
self.dt = dt
# Setup loading
if loading_stages is not None:
self.loading_stages = sorted(loading_stages, key=lambda x: x.start_time)
self.use_staged_loading = True
else:
# Single stage loading
self.loading_stages = [
LoadingStage(start_time=0.0, surcharge=surcharge, vacuum=vacuum)
]
self.use_staged_loading = False
# For compatibility
self.surcharge = surcharge
self.vacuum = vacuum
# Calculate total thickness
self.total_thickness = sum(layer.thickness for layer in soil_layers)
# Initialize mesh
self._setup_mesh()
def _setup_mesh(self, nodes_per_meter: int = 10):
"""Setup finite difference mesh"""
self.nodes_per_meter = nodes_per_meter
# Create mesh for each layer
self.z_coords = []
self.layer_indices = []
self.Cv_profile = []
self.Ch_profile = []
self.kh_profile = []
self.ks_profile = []
z = 0
for i, layer in enumerate(self.layers):
n_nodes = max(int(layer.thickness * nodes_per_meter), 2)
z_layer = np.linspace(z, z + layer.thickness, n_nodes, endpoint=False)
self.z_coords.extend(z_layer)
self.layer_indices.extend([i] * len(z_layer))
self.Cv_profile.extend([layer.Cv] * len(z_layer))
self.Ch_profile.extend([layer.Ch] * len(z_layer))
self.kh_profile.extend([layer.kh] * len(z_layer))
self.ks_profile.extend([layer.ks] * len(z_layer))
z += layer.thickness
# Add final node
self.z_coords.append(self.total_thickness)
self.layer_indices.append(len(self.layers) - 1)
self.Cv_profile.append(self.layers[-1].Cv)
self.Ch_profile.append(self.layers[-1].Ch)
self.kh_profile.append(self.layers[-1].kh)
self.ks_profile.append(self.layers[-1].ks)
self.z_coords = np.array(self.z_coords)
self.layer_indices = np.array(self.layer_indices)
self.Cv_profile = np.array(self.Cv_profile)
self.Ch_profile = np.array(self.Ch_profile)
self.kh_profile = np.array(self.kh_profile)
self.ks_profile = np.array(self.ks_profile)
self.n_nodes = len(self.z_coords)
self.dz = np.diff(self.z_coords)
def get_current_loading(self, t: float) -> Tuple[float, float]:
"""
Get current surcharge and vacuum at time t
Parameters:
-----------
t : float
Current time (years)
Returns:
--------
surcharge, vacuum : float, float
Current surcharge and vacuum values (kPa)
"""
# Find the active loading stage
active_stage = self.loading_stages[0]
for stage in self.loading_stages:
if t >= stage.start_time:
active_stage = stage
else:
break
return active_stage.surcharge, active_stage.vacuum
def calculate_vacuum_at_radius(self, r: float, vacuum_magnitude: float) -> float:
"""
Calculate vacuum pressure at radial distance r from drain
u(r) = u_drain × exp(-r/r_influence)
Parameters:
-----------
r : float
Radial distance from drain (m)
vacuum_magnitude : float
Vacuum at drain (kPa, positive value)
Returns:
--------
vacuum_at_r : float
Vacuum pressure at distance r (kPa, positive value)
"""
if vacuum_magnitude == 0:
return 0.0
# Vacuum decreases exponentially with distance
vacuum_at_r = vacuum_magnitude * np.exp(-r / self.pvd.r_influence)
return vacuum_at_r
def calculate_layer_average_vacuum(
self, layer_idx: int, vacuum_surface: float
) -> float:
"""
Calculate average vacuum pressure for a specific soil layer
If vacuum_depth_loss = True:
Vacuum is full at surface and decreases with depth
u(z) = u_surface × exp(-z/L_drain)
If vacuum_depth_loss = False:
Uniform vacuum throughout (full vacuum at all depths within L_drain)
Parameters:
-----------
layer_idx : int
Layer index
vacuum_surface : float
Vacuum at surface (kPa, positive value)
Returns:
--------
avg_vacuum : float
Average vacuum for this layer (kPa)
"""
if vacuum_surface == 0:
return 0.0
layer = self.layers[layer_idx]
# Find layer depth range
depth_top = sum(self.layers[i].thickness for i in range(layer_idx))
depth_bottom = depth_top + layer.thickness
# Check if layer is within drain length
if depth_top >= self.pvd.L_drain:
# Layer is completely below drain length - no vacuum effect
return 0.0
# Option 1: Uniform vacuum (no depth loss)
if not self.pvd.vacuum_depth_loss:
# Full vacuum throughout the layer (within drain length)
return vacuum_surface
# Option 2: Vacuum loss with depth
# Adjust bottom depth if it exceeds drain length
if depth_bottom > self.pvd.L_drain:
depth_bottom = self.pvd.L_drain
effective_thickness = depth_bottom - depth_top
else:
effective_thickness = layer.thickness
# Integrate vacuum over layer thickness
# u(z) = u_surface × exp(-z/L_drain)
# Average = (1/h) ∫[z_top to z_bottom] u_surface × exp(-z/L_drain) dz
L = self.pvd.L_drain
# Analytical integration:
# ∫ u_surface × exp(-z/L) dz = -u_surface × L × exp(-z/L) + C
integral_bottom = -vacuum_surface * L * np.exp(-depth_bottom / L)
integral_top = -vacuum_surface * L * np.exp(-depth_top / L)
integral = integral_bottom - integral_top
# Average over effective layer thickness
avg_vacuum = integral / effective_thickness
return avg_vacuum
def calculate_pvd_factors_layer(self, layer_idx: int) -> Tuple[float, float, float]:
"""
Calculate PVD influence factors for a specific layer
Parameters:
-----------
layer_idx : int
Layer index
Returns:
--------
Fn, Fs, Fr : float
Geometric, smear, and well resistance factors for the layer
"""
layer = self.layers[layer_idx]
# Geometric factor (n ratio) - same for all layers
n = self.pvd.De / self.pvd.dw
Fn = (n**2 / (n**2 - 1)) * np.log(n) - 3 / 4 + 1 / n**2
# Fs - Smear effect factor (layer-specific)
s = self.pvd.ds / self.pvd.dw
Fs = ((layer.kh / layer.ks) - 1) * np.log(s)
# Fr - Well resistance factor (layer-specific)
L = self.pvd.L_drain
if self.pvd.qw > 1e10: # If qw is very large, assume negligible well resistance
Fr = 0.0
else:
Fr = (np.pi * L**2 * layer.kh) / (8 * self.pvd.qw)
return Fn, Fs, Fr
def calculate_pvd_factors(self) -> Tuple[float, float, float]:
"""
Calculate PVD influence factors using weighted average permeabilities
Returns:
--------
Fn, Fs, Fr : float
Geometric, smear, and well resistance factors
"""
# Use thickness-weighted average kh and ks
kh_avg = np.average(self.kh_profile, weights=np.ones(len(self.kh_profile)))
ks_avg = np.average(self.ks_profile, weights=np.ones(len(self.ks_profile)))
# Geometric factor (n ratio)
n = self.pvd.De / self.pvd.dw
# Fn - Geometric factor
Fn = (n**2 / (n**2 - 1)) * np.log(n) - 3 / 4 + 1 / n**2
# Fs - Smear effect factor
s = self.pvd.ds / self.pvd.dw
Fs = ((kh_avg / ks_avg) - 1) * np.log(s)
# Fr - Well resistance factor
# For typical band drains: Fr = π(2L-l)l/(qw·kh) where l = L/2
# Simplified for two-way drainage
L = self.pvd.L_drain
if self.pvd.qw > 1e10: # If qw is very large, assume negligible well resistance
Fr = 0.0
else:
# Using Hansbo formula: Fr = (L²/(8·qw))·(kh)
# More typical: Fr = π·L²·kh/(8·qw) for two-way drainage
Fr = (np.pi * L**2 * kh_avg) / (8 * self.pvd.qw)
return Fn, Fs, Fr
def calculate_Uh(self, t: float) -> np.ndarray:
"""
Calculate degree of consolidation in horizontal direction
using finite difference method with layer-specific PVD factors
Only applies to layers within drain length L
Parameters:
-----------
t : float
Time (years)
Returns:
--------
Uh : ndarray
Degree of horizontal consolidation at each node
(0 for nodes beyond drain length)
"""
# Pre-calculate F_total for each layer
F_total_layers = []
for layer_idx in range(len(self.layers)):
Fn, Fs, Fr = self.calculate_pvd_factors_layer(layer_idx)
F_total_layers.append(Fn + Fs + Fr)
# Initialize excess pore pressure (normalized)
u = np.ones(self.n_nodes) # Initially all excess pore pressure = surcharge
u_history = [u.copy()]
# Time stepping
n_steps = int(t / self.dt)
for step in range(n_steps):
u_new = u.copy()
# Finite difference for radial consolidation
# ∂u/∂t = Ch * [∂²u/∂r² + (1/r)(∂u/∂r)]
# Simplified for radial drainage with PVD
for i in range(1, self.n_nodes - 1):
# Check if this node is within drain length
depth = self.z_coords[i]
if depth <= self.pvd.L_drain:
# Within drain length - apply horizontal consolidation
Ch = self.Ch_profile[i]
layer_idx = self.layer_indices[i]
F_total = F_total_layers[layer_idx]
# Radial drainage term (simplified)
# Using equivalent time factor approach
Th = Ch * self.dt / (self.pvd.De**2 / 4)
# Update excess pore pressure
decay_rate = 8 * Th / F_total
u_new[i] = u[i] * np.exp(-decay_rate)
else:
# Beyond drain length - no horizontal consolidation
# u remains unchanged (Uh = 0)
pass
# Boundary conditions
u_new[0] = 0 # Top drainage
u_new[-1] = 0 # Bottom drainage (if two-way)
u = u_new
if step % max(1, n_steps // 100) == 0:
u_history.append(u.copy())
# Calculate Uh (degree of consolidation)
Uh = 1 - u
return Uh
def calculate_Uv(self, t: float) -> np.ndarray:
"""
Calculate degree of consolidation in vertical direction
For layers below PVD: drainage path is from layer midpoint to bottom of PVD
For layers within/above PVD: normal two-way drainage
Parameters:
-----------
t : float
Time (years)
Returns:
--------
Uv : ndarray
Degree of vertical consolidation at each node
"""
Uv = np.zeros(self.n_nodes)
for i in range(self.n_nodes):
depth = self.z_coords[i]
Cv = self.Cv_profile[i]
layer_idx = self.layer_indices[i]
# Determine drainage path length H
if depth > self.pvd.L_drain:
# Below PVD: drainage from this point UP to bottom of PVD
# For layer below PVD, use distance from layer midpoint to PVD bottom
# Find layer boundaries
cumulative_depth = 0
for idx in range(layer_idx):
cumulative_depth += self.layers[idx].thickness
layer_top = cumulative_depth
layer_bottom = cumulative_depth + self.layers[layer_idx].thickness
layer_midpoint = (layer_top + layer_bottom) / 2
# Drainage path: from layer midpoint to PVD bottom
H = abs(layer_midpoint - self.pvd.L_drain)
# One-way drainage (upward only to PVD)
drainage_type = "one-way"
else:
# Within or above PVD: normal two-way drainage
H = self.total_thickness
drainage_type = "two-way"
# Time factor
if H > 0:
Tv = Cv * t / H**2
else:
Tv = 1e10 # Very large, instant drainage
# Calculate Uv using Terzaghi's solution
if drainage_type == "one-way":
# One-way drainage (for layers below PVD)
if Tv < 0.05:
Uv[i] = np.sqrt(4 * Tv / np.pi)
else:
Uv[i] = 1 - (8 / np.pi**2) * np.exp(-(np.pi**2) * Tv / 4)
else:
# Two-way drainage (for layers within PVD zone)
if Tv < 0.217:
Uv[i] = np.sqrt(4 * Tv / np.pi)
else:
Uv[i] = 1 - (8 / np.pi**2) * np.exp(-(np.pi**2) * Tv / 4)
Uv[i] = min(Uv[i], 1.0)
return Uv
def calculate_total_U(self, t: float) -> np.ndarray:
"""
Calculate total degree of consolidation (combined vertical and horizontal)
For nodes within drain length: U = 1 - (1 - Uh)(1 - Uv)
For nodes beyond drain length: U = Uv (only vertical consolidation)
Parameters:
-----------
t : float
Time (years)
Returns:
--------
U : ndarray
Total degree of consolidation at each node
"""
Uh = self.calculate_Uh(t)
Uv = self.calculate_Uv(t)
U = np.zeros(self.n_nodes)
for i in range(self.n_nodes):
depth = self.z_coords[i]
if depth <= self.pvd.L_drain:
# Within drain length - combined vertical and horizontal
U[i] = 1 - (1 - Uh[i]) * (1 - Uv[i])
else:
# Beyond drain length - only vertical consolidation
U[i] = Uv[i]
return U
def calculate_settlement(self, t: float) -> Tuple[float, np.ndarray]:
"""
Calculate total settlement at time t
For staged loading: calculates settlement contribution from each stage
Parameters:
-----------
t : float
Time (years)
Returns:
--------
total_settlement : float
Total settlement (m)
layer_settlements : ndarray
Settlement for each layer (m)
"""
layer_settlements = np.zeros(len(self.layers))
# For each loading stage, calculate its contribution to settlement
for stage_idx, stage in enumerate(self.loading_stages):
if t < stage.start_time:
# This stage hasn't started yet
continue
# Time since this stage started
t_stage = t - stage.start_time
# Calculate consolidation degree for this stage's time duration
U_stage = self.calculate_total_U(t_stage)
# Calculate settlement for each layer from this load increment
for i, layer in enumerate(self.layers):
# Find nodes in this layer
layer_mask = self.layer_indices == i
U_layer = np.mean(U_stage[layer_mask])
# Get layer-specific vacuum (decreases with depth)
# Previous stage vacuum for this layer
if stage_idx == 0:
avg_vacuum_prev_layer = 0.0
else:
prev_stage = self.loading_stages[stage_idx - 1]
avg_vacuum_prev_layer = self.calculate_layer_average_vacuum(
i, prev_stage.vacuum
)
# Current stage vacuum for this layer
avg_vacuum_current_layer = self.calculate_layer_average_vacuum(
i, stage.vacuum
)
# Previous stage load for this layer
if stage_idx == 0:
prev_surcharge = 0.0
sigma_prev_layer = 0.0
else:
prev_surcharge = self.loading_stages[stage_idx - 1].surcharge
sigma_prev_layer = prev_surcharge + avg_vacuum_prev_layer
# Current stage load for this layer
sigma_current_layer = stage.surcharge + avg_vacuum_current_layer
# Load increment for this layer
delta_sigma_layer = sigma_current_layer - sigma_prev_layer
if abs(delta_sigma_layer) < 1e-6:
# No significant load change in this layer
continue
# Stress before this stage
sigma_before = layer.sigma_ini + sigma_prev_layer
# Stress after this stage (if fully consolidated)
sigma_after = layer.sigma_ini + sigma_current_layer
# Calculate ultimate settlement for this load increment
Sc_increment = 0.0
if delta_sigma_layer > 0:
# Loading increment
if sigma_after <= layer.sigma_p:
# All in recompression range
Sc_increment = (
layer.RR
* np.log10(sigma_after / sigma_before)
* layer.thickness
)
elif sigma_before >= layer.sigma_p:
# All in virgin compression range
Sc_increment = (
layer.CR
* np.log10(sigma_after / sigma_before)
* layer.thickness
)
else:
# Crosses from recompression to virgin compression
Sc_recomp = (
layer.RR
* np.log10(layer.sigma_p / sigma_before)
* layer.thickness
)
Sc_virgin = (
layer.CR
* np.log10(sigma_after / layer.sigma_p)
* layer.thickness
)
Sc_increment = Sc_recomp + Sc_virgin
else:
# Unloading (e.g., vacuum removal) - use recompression/swelling
Sc_increment = (
layer.RR
* np.log10(sigma_after / sigma_before)
* layer.thickness
)
# Add this stage's contribution (with consolidation degree)
layer_settlements[i] += U_layer * Sc_increment
total_settlement = np.sum(layer_settlements)
return total_settlement, layer_settlements
def settlement_vs_time(
self, t_max: float, n_points: int = 100
) -> Tuple[np.ndarray, np.ndarray]:
"""
Calculate settlement vs time curve
Parameters:
-----------
t_max : float
Maximum time (years)
n_points : int
Number of time points
Returns:
--------
time : ndarray
Time array (years)
settlement : ndarray
Settlement array (m)
"""
time = np.linspace(0, t_max, n_points)
settlement = np.zeros(n_points)
for i, t in enumerate(time):
if t == 0:
settlement[i] = 0
else:
settlement[i], _ = self.calculate_settlement(t)
return time, settlement
def plot_settlement_vs_time(
self, t_max: float, n_points: int = 100, save_path: str = None
):
"""
Plot settlement vs time curve
Parameters:
-----------
t_max : float
Maximum time (years)
n_points : int
Number of time points
save_path : str, optional
Path to save the plot
"""
time, settlement = self.settlement_vs_time(t_max, n_points)
# Convert to mm
settlement_mm = settlement * 1000
plt.figure(figsize=(10, 6))
plt.plot(time, settlement_mm, "b-", linewidth=2)
plt.xlabel("Time (years)", fontsize=12)
plt.ylabel("Settlement (mm)", fontsize=12)
plt.title(
"Settlement vs Time - PVD Consolidation", fontsize=14, fontweight="bold"
)
plt.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches="tight")
plt.show()
return time, settlement_mm
def plot_degree_of_consolidation(self, t: float, save_path: str = None):
"""
Plot degree of consolidation profile at time t
Parameters:
-----------
t : float
Time (years)
save_path : str, optional
Path to save the plot
"""
Uh = self.calculate_Uh(t)
Uv = self.calculate_Uv(t)
U = self.calculate_total_U(t)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
# Plot 1: Degree of consolidation profiles
ax1.plot(Uh * 100, self.z_coords, "r-", linewidth=2, label="Horizontal (Uh)")
ax1.plot(Uv * 100, self.z_coords, "b-", linewidth=2, label="Vertical (Uv)")
ax1.plot(U * 100, self.z_coords, "g-", linewidth=2, label="Total (U)")
# Add drain length line
ax1.axhline(
y=self.pvd.L_drain,
color="orange",
linestyle="--",
linewidth=1.5,
alpha=0.7,
label=f"Drain Length = {self.pvd.L_drain:.1f} m",
)
ax1.set_xlabel("Degree of Consolidation (%)", fontsize=12)
ax1.set_ylabel("Depth (m)", fontsize=12)
ax1.set_title(
f"Consolidation Profile at t = {t:.2f} years",
fontsize=12,
fontweight="bold",
)
ax1.invert_yaxis()
ax1.grid(True, alpha=0.3)
ax1.legend()
# Plot 2: Excess pore pressure
u = 1 - U
ax2.plot(u * self.surcharge, self.z_coords, "k-", linewidth=2)
ax2.set_xlabel("Excess Pore Pressure (kPa)", fontsize=12)
ax2.set_ylabel("Depth (m)", fontsize=12)
ax2.set_title(
f"Excess Pore Pressure at t = {t:.2f} years", fontsize=12, fontweight="bold"
)
ax2.invert_yaxis()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches="tight")
plt.show()
def get_summary_report(self, t_check: List[float]) -> str:
"""
Generate summary report for specified time points
Parameters:
-----------
t_check : List[float]
Time points to check (years)
Returns:
--------
report : str
Summary report
"""
Fn, Fs, Fr = self.calculate_pvd_factors()
report = "=" * 70 + "\n"
report += "PVD CONSOLIDATION ANALYSIS SUMMARY\n"
report += "=" * 70 + "\n\n"
report += "PVD PARAMETERS:\n"
report += f" Equivalent drain diameter (dw): {self.pvd.dw:.3f} m\n"
report += f" Smear zone diameter (ds): {self.pvd.ds:.3f} m\n"
report += f" Unit cell diameter (De): {self.pvd.De:.3f} m\n"
report += f" Drain length (L): {self.pvd.L_drain:.2f} m\n"
report += (
f" Drain spacing ratio (n = De/dw): {self.pvd.De / self.pvd.dw:.2f}\n"
)
report += f" Geometric factor (Fn): {Fn:.4f}\n"
report += f" Smear factor (Fs - avg): {Fs:.4f}\n"
report += f" Well resistance (Fr - avg): {Fr:.4f}\n"
report += f" Total resistance (F - avg): {Fn + Fs + Fr:.4f}\n\n"
report += "SOIL PROFILE:\n"
cumulative_depth = 0
for i, layer in enumerate(self.layers):
depth_top = cumulative_depth
depth_bottom = cumulative_depth + layer.thickness
# Check if layer is within drain length
if depth_bottom <= self.pvd.L_drain:
drain_status = "Full PVD effect (Uh + Uv)"
elif depth_top >= self.pvd.L_drain:
drain_status = "No PVD effect (Uv only)"
else:
drain_status = "Partial PVD effect"
report += f" Layer {i + 1} ({depth_top:.1f}m - {depth_bottom:.1f}m): {drain_status}\n"
report += f" Thickness: {layer.thickness:.2f} m\n"
report += f" Ch: {layer.Ch:.4f} m²/year\n"
report += f" Cv: {layer.Cv:.4f} m²/year\n"
report += f" kh: {layer.kh:.4f} m/year\n"
report += f" ks: {layer.ks:.4f} m/year\n"
report += f" RR: {layer.RR:.4f}\n"
report += f" CR: {layer.CR:.4f}\n"
report += f" σ'ini: {layer.sigma_ini:.1f} kPa\n"
report += f" σ'p: {layer.sigma_p:.1f} kPa\n\n"
cumulative_depth = depth_bottom
report += f"Applied surcharge: {self.surcharge:.1f} kPa\n\n"
report += "=" * 70 + "\n"
report += "SETTLEMENT vs TIME:\n"
report += "=" * 70 + "\n"
report += f"{'Time (years)':<15} {'Settlement (mm)':<20} {'U (%)':<15}\n"
report += "-" * 70 + "\n"
for t in t_check:
settlement, _ = self.calculate_settlement(t)
U = self.calculate_total_U(t)
U_avg = np.mean(U)
report += f"{t:<15.2f} {settlement * 1000:<20.2f} {U_avg * 100:<15.1f}\n"
report += "=" * 70 + "\n"
return report
def example_usage():
"""Example usage of PVD consolidation analysis"""
# Define soil layers (from top to bottom)
layers = [
SoilLayer(
thickness=5.0, # 5 m thick
Cv=0.5, # 0.5 m²/year vertical consolidation
Ch=1.5, # 1.5 m²/year horizontal consolidation
RR=0.05, # Recompression ratio
CR=0.30, # Compression ratio
sigma_ini=50.0, # Initial effective stress 50 kPa
sigma_p=80.0, # Preconsolidation pressure 80 kPa
kh=2.0, # 2 m/year horizontal permeability
ks=1.0, # 1 m/year smear zone permeability
),
SoilLayer(
thickness=8.0, # 8 m thick
Cv=0.3,
Ch=1.0,
RR=0.04,
CR=0.35,
sigma_ini=90.0,
sigma_p=90.0,
kh=1.5, # Lower permeability
ks=0.75,
),
SoilLayer(
thickness=7.0, # 7 m thick
Cv=0.4,
Ch=1.2,
RR=0.045,
CR=0.32,
sigma_ini=140.0,
sigma_p=150.0,
kh=1.8,
ks=0.9,
),
]
# Define PVD properties
pvd = PVDProperties(
dw=0.05, # 50 mm equivalent drain diameter
ds=0.15, # 150 mm smear zone diameter
De=1.5, # 1.5 m equivalent unit cell diameter (triangular spacing)
L_drain=20.0, # 20 m total drain length (two-way drainage)
qw=100.0, # 100 m³/year well discharge capacity
)
# Applied surcharge
surcharge = 100.0 # 100 kPa
# Create analysis object
analysis = PVDConsolidation(layers, pvd, surcharge, dt=0.01)
# Generate summary report
t_check = [0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0]
print(analysis.get_summary_report(t_check))
# Plot settlement vs time
print("\nGenerating settlement vs time plot...")
time, settlement = analysis.plot_settlement_vs_time(
t_max=20.0, n_points=200, save_path="settlement_vs_time.png"
)
# Plot consolidation profiles at different times
print("Generating consolidation profile plots...")
for t in [0.5, 2.0, 10.0]:
analysis.plot_degree_of_consolidation(
t, save_path=f"consolidation_profile_t{t:.1f}.png"
)
print("\nAnalysis complete!")
def load_yaml_data(yaml_file: str) -> Dict[str, Any]:
"""
Load PVD analysis data from YAML file
Parameters:
-----------
yaml_file : str
Path to YAML file
Returns:
--------
data : dict
Dictionary containing analysis parameters
"""
with open(yaml_file, "r") as f:
data = yaml.safe_load(f)
return data
def create_analysis_from_yaml(yaml_file: str) -> PVDConsolidation:
"""
Create PVDConsolidation object from YAML file
Parameters:
-----------
yaml_file : str
Path to YAML file
Returns:
--------
analysis : PVDConsolidation
PVD consolidation analysis object
"""
data = load_yaml_data(yaml_file)
# Create soil layers
layers = []
for layer_data in data["soil_layers"]:
layer = SoilLayer(
thickness=layer_data["thickness"],
Cv=layer_data["Cv"],
Ch=layer_data["Ch"],
RR=layer_data["RR"],
CR=layer_data["CR"],
sigma_ini=layer_data["sigma_ini"],
sigma_p=layer_data["sigma_p"],
kh=layer_data["kh"],
ks=layer_data["ks"],
)
layers.append(layer)
# Create PVD properties
pvd_data = data["pvd"]
pvd = PVDProperties(
dw=pvd_data["dw"],
ds=pvd_data["ds"],
De=pvd_data["De"],
L_drain=pvd_data["L_drain"],
qw=pvd_data["qw"],
)
# Get analysis parameters
surcharge = data["analysis"]["surcharge"]
dt = data["analysis"].get("dt", 0.01)
# Create analysis object
analysis = PVDConsolidation(layers, pvd, surcharge, dt)
return analysis, data
def run_analysis_from_yaml(yaml_file: str, output_dir: str = None):
"""
Run complete PVD analysis from YAML file
Parameters:
-----------
yaml_file : str
Path to YAML input file
output_dir : str, optional
Directory to save output files
"""
print(f"Loading data from: {yaml_file}")
analysis, data = create_analysis_from_yaml(yaml_file)
# Create output directory
if output_dir is None:
output_dir = os.path.dirname(yaml_file) or "."
os.makedirs(output_dir, exist_ok=True)
# Get analysis parameters
analysis_params = data["analysis"]
t_max = analysis_params.get("t_max", 20.0)
n_points = analysis_params.get("n_points", 200)
t_check = analysis_params.get("t_check", [0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0])
t_profiles = analysis_params.get("t_profiles", [0.5, 2.0, 10.0])
# Generate summary report
print("\n" + "=" * 70)
print("RUNNING PVD CONSOLIDATION ANALYSIS")
print("=" * 70 + "\n")
report = analysis.get_summary_report(t_check)
print(report)
# Save report to file
report_file = os.path.join(output_dir, "pvd_analysis_report.txt")
with open(report_file, "w") as f:
f.write(report)
print(f"\nReport saved to: {report_file}")
# Plot settlement vs time
print("\nGenerating settlement vs time plot...")
settlement_plot = os.path.join(output_dir, "settlement_vs_time.png")
time, settlement = analysis.plot_settlement_vs_time(
t_max=t_max, n_points=n_points, save_path=settlement_plot
)
print(f"Plot saved to: {settlement_plot}")
# Save settlement data to CSV
csv_file = os.path.join(output_dir, "settlement_data.csv")
np.savetxt(
csv_file,
np.column_stack((time, settlement * 1000)),
delimiter=",",
header="Time (years),Settlement (mm)",
comments="",
)
print(f"Data saved to: {csv_file}")
# Plot consolidation profiles at different times
print("\nGenerating consolidation profile plots...")
for t in t_profiles:
profile_plot = os.path.join(output_dir, f"consolidation_profile_t{t:.1f}y.png")
analysis.plot_degree_of_consolidation(t, save_path=profile_plot)
print(f"Profile at t={t:.1f} years saved to: {profile_plot}")
print("\n" + "=" * 70)
print("ANALYSIS COMPLETE!")
print("=" * 70)
def main():
"""Main CLI interface"""
parser = argparse.ArgumentParser(
description="PVD Consolidation Analysis - Settlement vs Time Calculator",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Run analysis from YAML file
python pvd_consolidation.py --data input.yaml
# Specify output directory
python pvd_consolidation.py --data input.yaml --output results/
# Run example analysis
python pvd_consolidation.py --example
""",
)
parser.add_argument("--data", type=str, help="Path to YAML input file")
parser.add_argument(
"--output",
type=str,
default=None,
help="Output directory for results (default: same as input file)",
)
parser.add_argument(
"--example",
action="store_true",
help="Run example analysis with default parameters",
)
args = parser.parse_args()
if args.example:
print("Running example analysis...")
example_usage()
elif args.data:
if not os.path.exists(args.data):
print(f"Error: Input file '{args.data}' not found!")
return
run_analysis_from_yaml(args.data, args.output)
else:
parser.print_help()
if __name__ == "__main__":
main()