|
|
""" |
|
|
Multilayer Prefabricated Vertical Drain (PVD) Consolidation Analysis |
|
|
Calculates settlement vs time using finite difference method |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
import yaml |
|
|
import argparse |
|
|
import os |
|
|
from dataclasses import dataclass |
|
|
from typing import List, Tuple, Dict, Any |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class LoadingStage: |
|
|
"""Loading stage definition for staged loading""" |
|
|
|
|
|
start_time: float |
|
|
surcharge: float |
|
|
vacuum: float |
|
|
|
|
|
def __post_init__(self): |
|
|
"""Validate inputs""" |
|
|
if self.vacuum < 0: |
|
|
raise ValueError( |
|
|
"Vacuum should be specified as positive value (e.g., 80 for -80 kPa)" |
|
|
) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SoilLayer: |
|
|
"""Soil layer properties""" |
|
|
|
|
|
thickness: float |
|
|
Cv: float |
|
|
Ch: float |
|
|
RR: float |
|
|
CR: float |
|
|
sigma_ini: float |
|
|
sigma_p: float |
|
|
kh: float |
|
|
ks: float |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PVDProperties: |
|
|
"""PVD installation properties""" |
|
|
|
|
|
dw: float |
|
|
ds: float |
|
|
De: float |
|
|
L_drain: float |
|
|
qw: float |
|
|
r_influence: float = None |
|
|
vacuum_depth_loss: bool = True |
|
|
|
|
|
def __post_init__(self): |
|
|
"""Set default vacuum influence radius""" |
|
|
if self.r_influence is None: |
|
|
self.r_influence = self.De / 4 |
|
|
|
|
|
|
|
|
class PVDConsolidation: |
|
|
""" |
|
|
Multilayer PVD consolidation analysis using finite difference method |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
soil_layers: List[SoilLayer], |
|
|
pvd: PVDProperties, |
|
|
surcharge: float = 0.0, |
|
|
vacuum: float = 0.0, |
|
|
loading_stages: List[LoadingStage] = None, |
|
|
dt: float = 0.01, |
|
|
): |
|
|
""" |
|
|
Initialize PVD consolidation analysis |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
soil_layers : List[SoilLayer] |
|
|
List of soil layers from top to bottom |
|
|
pvd : PVDProperties |
|
|
PVD installation properties |
|
|
surcharge : float |
|
|
Applied surcharge load (kPa) - for single-stage loading |
|
|
vacuum : float |
|
|
Applied vacuum pressure (kPa, positive value) - for single-stage loading |
|
|
loading_stages : List[LoadingStage] |
|
|
List of loading stages for multi-stage loading (overrides surcharge/vacuum) |
|
|
dt : float |
|
|
Time step for finite difference (years) |
|
|
""" |
|
|
self.layers = soil_layers |
|
|
self.pvd = pvd |
|
|
self.dt = dt |
|
|
|
|
|
|
|
|
if loading_stages is not None: |
|
|
self.loading_stages = sorted(loading_stages, key=lambda x: x.start_time) |
|
|
self.use_staged_loading = True |
|
|
else: |
|
|
|
|
|
self.loading_stages = [ |
|
|
LoadingStage(start_time=0.0, surcharge=surcharge, vacuum=vacuum) |
|
|
] |
|
|
self.use_staged_loading = False |
|
|
|
|
|
|
|
|
self.surcharge = surcharge |
|
|
self.vacuum = vacuum |
|
|
|
|
|
|
|
|
self.total_thickness = sum(layer.thickness for layer in soil_layers) |
|
|
|
|
|
|
|
|
self._setup_mesh() |
|
|
|
|
|
def _setup_mesh(self, nodes_per_meter: int = 10): |
|
|
"""Setup finite difference mesh""" |
|
|
self.nodes_per_meter = nodes_per_meter |
|
|
|
|
|
|
|
|
self.z_coords = [] |
|
|
self.layer_indices = [] |
|
|
self.Cv_profile = [] |
|
|
self.Ch_profile = [] |
|
|
self.kh_profile = [] |
|
|
self.ks_profile = [] |
|
|
|
|
|
z = 0 |
|
|
for i, layer in enumerate(self.layers): |
|
|
n_nodes = max(int(layer.thickness * nodes_per_meter), 2) |
|
|
z_layer = np.linspace(z, z + layer.thickness, n_nodes, endpoint=False) |
|
|
|
|
|
self.z_coords.extend(z_layer) |
|
|
self.layer_indices.extend([i] * len(z_layer)) |
|
|
self.Cv_profile.extend([layer.Cv] * len(z_layer)) |
|
|
self.Ch_profile.extend([layer.Ch] * len(z_layer)) |
|
|
self.kh_profile.extend([layer.kh] * len(z_layer)) |
|
|
self.ks_profile.extend([layer.ks] * len(z_layer)) |
|
|
|
|
|
z += layer.thickness |
|
|
|
|
|
|
|
|
self.z_coords.append(self.total_thickness) |
|
|
self.layer_indices.append(len(self.layers) - 1) |
|
|
self.Cv_profile.append(self.layers[-1].Cv) |
|
|
self.Ch_profile.append(self.layers[-1].Ch) |
|
|
self.kh_profile.append(self.layers[-1].kh) |
|
|
self.ks_profile.append(self.layers[-1].ks) |
|
|
|
|
|
self.z_coords = np.array(self.z_coords) |
|
|
self.layer_indices = np.array(self.layer_indices) |
|
|
self.Cv_profile = np.array(self.Cv_profile) |
|
|
self.Ch_profile = np.array(self.Ch_profile) |
|
|
self.kh_profile = np.array(self.kh_profile) |
|
|
self.ks_profile = np.array(self.ks_profile) |
|
|
|
|
|
self.n_nodes = len(self.z_coords) |
|
|
self.dz = np.diff(self.z_coords) |
|
|
|
|
|
def get_current_loading(self, t: float) -> Tuple[float, float]: |
|
|
""" |
|
|
Get current surcharge and vacuum at time t |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
t : float |
|
|
Current time (years) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
surcharge, vacuum : float, float |
|
|
Current surcharge and vacuum values (kPa) |
|
|
""" |
|
|
|
|
|
active_stage = self.loading_stages[0] |
|
|
for stage in self.loading_stages: |
|
|
if t >= stage.start_time: |
|
|
active_stage = stage |
|
|
else: |
|
|
break |
|
|
|
|
|
return active_stage.surcharge, active_stage.vacuum |
|
|
|
|
|
def calculate_vacuum_at_radius(self, r: float, vacuum_magnitude: float) -> float: |
|
|
""" |
|
|
Calculate vacuum pressure at radial distance r from drain |
|
|
|
|
|
u(r) = u_drain × exp(-r/r_influence) |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
r : float |
|
|
Radial distance from drain (m) |
|
|
vacuum_magnitude : float |
|
|
Vacuum at drain (kPa, positive value) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
vacuum_at_r : float |
|
|
Vacuum pressure at distance r (kPa, positive value) |
|
|
""" |
|
|
if vacuum_magnitude == 0: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
vacuum_at_r = vacuum_magnitude * np.exp(-r / self.pvd.r_influence) |
|
|
|
|
|
return vacuum_at_r |
|
|
|
|
|
def calculate_layer_average_vacuum( |
|
|
self, layer_idx: int, vacuum_surface: float |
|
|
) -> float: |
|
|
""" |
|
|
Calculate average vacuum pressure for a specific soil layer |
|
|
|
|
|
If vacuum_depth_loss = True: |
|
|
Vacuum is full at surface and decreases with depth |
|
|
u(z) = u_surface × exp(-z/L_drain) |
|
|
|
|
|
If vacuum_depth_loss = False: |
|
|
Uniform vacuum throughout (full vacuum at all depths within L_drain) |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
layer_idx : int |
|
|
Layer index |
|
|
vacuum_surface : float |
|
|
Vacuum at surface (kPa, positive value) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
avg_vacuum : float |
|
|
Average vacuum for this layer (kPa) |
|
|
""" |
|
|
if vacuum_surface == 0: |
|
|
return 0.0 |
|
|
|
|
|
layer = self.layers[layer_idx] |
|
|
|
|
|
|
|
|
depth_top = sum(self.layers[i].thickness for i in range(layer_idx)) |
|
|
depth_bottom = depth_top + layer.thickness |
|
|
|
|
|
|
|
|
if depth_top >= self.pvd.L_drain: |
|
|
|
|
|
return 0.0 |
|
|
|
|
|
|
|
|
if not self.pvd.vacuum_depth_loss: |
|
|
|
|
|
return vacuum_surface |
|
|
|
|
|
|
|
|
|
|
|
if depth_bottom > self.pvd.L_drain: |
|
|
depth_bottom = self.pvd.L_drain |
|
|
effective_thickness = depth_bottom - depth_top |
|
|
else: |
|
|
effective_thickness = layer.thickness |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
L = self.pvd.L_drain |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
integral_bottom = -vacuum_surface * L * np.exp(-depth_bottom / L) |
|
|
integral_top = -vacuum_surface * L * np.exp(-depth_top / L) |
|
|
|
|
|
integral = integral_bottom - integral_top |
|
|
|
|
|
|
|
|
avg_vacuum = integral / effective_thickness |
|
|
|
|
|
return avg_vacuum |
|
|
|
|
|
def calculate_pvd_factors_layer(self, layer_idx: int) -> Tuple[float, float, float]: |
|
|
""" |
|
|
Calculate PVD influence factors for a specific layer |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
layer_idx : int |
|
|
Layer index |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Fn, Fs, Fr : float |
|
|
Geometric, smear, and well resistance factors for the layer |
|
|
""" |
|
|
layer = self.layers[layer_idx] |
|
|
|
|
|
|
|
|
n = self.pvd.De / self.pvd.dw |
|
|
Fn = (n**2 / (n**2 - 1)) * np.log(n) - 3 / 4 + 1 / n**2 |
|
|
|
|
|
|
|
|
s = self.pvd.ds / self.pvd.dw |
|
|
Fs = ((layer.kh / layer.ks) - 1) * np.log(s) |
|
|
|
|
|
|
|
|
L = self.pvd.L_drain |
|
|
if self.pvd.qw > 1e10: |
|
|
Fr = 0.0 |
|
|
else: |
|
|
Fr = (np.pi * L**2 * layer.kh) / (8 * self.pvd.qw) |
|
|
|
|
|
return Fn, Fs, Fr |
|
|
|
|
|
def calculate_pvd_factors(self) -> Tuple[float, float, float]: |
|
|
""" |
|
|
Calculate PVD influence factors using weighted average permeabilities |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Fn, Fs, Fr : float |
|
|
Geometric, smear, and well resistance factors |
|
|
""" |
|
|
|
|
|
kh_avg = np.average(self.kh_profile, weights=np.ones(len(self.kh_profile))) |
|
|
ks_avg = np.average(self.ks_profile, weights=np.ones(len(self.ks_profile))) |
|
|
|
|
|
|
|
|
n = self.pvd.De / self.pvd.dw |
|
|
|
|
|
|
|
|
Fn = (n**2 / (n**2 - 1)) * np.log(n) - 3 / 4 + 1 / n**2 |
|
|
|
|
|
|
|
|
s = self.pvd.ds / self.pvd.dw |
|
|
Fs = ((kh_avg / ks_avg) - 1) * np.log(s) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
L = self.pvd.L_drain |
|
|
if self.pvd.qw > 1e10: |
|
|
Fr = 0.0 |
|
|
else: |
|
|
|
|
|
|
|
|
Fr = (np.pi * L**2 * kh_avg) / (8 * self.pvd.qw) |
|
|
|
|
|
return Fn, Fs, Fr |
|
|
|
|
|
def calculate_Uh(self, t: float) -> np.ndarray: |
|
|
""" |
|
|
Calculate degree of consolidation in horizontal direction |
|
|
using finite difference method with layer-specific PVD factors |
|
|
Only applies to layers within drain length L |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
t : float |
|
|
Time (years) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Uh : ndarray |
|
|
Degree of horizontal consolidation at each node |
|
|
(0 for nodes beyond drain length) |
|
|
""" |
|
|
|
|
|
F_total_layers = [] |
|
|
for layer_idx in range(len(self.layers)): |
|
|
Fn, Fs, Fr = self.calculate_pvd_factors_layer(layer_idx) |
|
|
F_total_layers.append(Fn + Fs + Fr) |
|
|
|
|
|
|
|
|
u = np.ones(self.n_nodes) |
|
|
u_history = [u.copy()] |
|
|
|
|
|
|
|
|
n_steps = int(t / self.dt) |
|
|
|
|
|
for step in range(n_steps): |
|
|
u_new = u.copy() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i in range(1, self.n_nodes - 1): |
|
|
|
|
|
depth = self.z_coords[i] |
|
|
|
|
|
if depth <= self.pvd.L_drain: |
|
|
|
|
|
Ch = self.Ch_profile[i] |
|
|
layer_idx = self.layer_indices[i] |
|
|
F_total = F_total_layers[layer_idx] |
|
|
|
|
|
|
|
|
|
|
|
Th = Ch * self.dt / (self.pvd.De**2 / 4) |
|
|
|
|
|
|
|
|
decay_rate = 8 * Th / F_total |
|
|
u_new[i] = u[i] * np.exp(-decay_rate) |
|
|
else: |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
u_new[0] = 0 |
|
|
u_new[-1] = 0 |
|
|
|
|
|
u = u_new |
|
|
|
|
|
if step % max(1, n_steps // 100) == 0: |
|
|
u_history.append(u.copy()) |
|
|
|
|
|
|
|
|
Uh = 1 - u |
|
|
|
|
|
return Uh |
|
|
|
|
|
def calculate_Uv(self, t: float) -> np.ndarray: |
|
|
""" |
|
|
Calculate degree of consolidation in vertical direction |
|
|
|
|
|
For layers below PVD: drainage path is from layer midpoint to bottom of PVD |
|
|
For layers within/above PVD: normal two-way drainage |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
t : float |
|
|
Time (years) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Uv : ndarray |
|
|
Degree of vertical consolidation at each node |
|
|
""" |
|
|
Uv = np.zeros(self.n_nodes) |
|
|
|
|
|
for i in range(self.n_nodes): |
|
|
depth = self.z_coords[i] |
|
|
Cv = self.Cv_profile[i] |
|
|
layer_idx = self.layer_indices[i] |
|
|
|
|
|
|
|
|
if depth > self.pvd.L_drain: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cumulative_depth = 0 |
|
|
for idx in range(layer_idx): |
|
|
cumulative_depth += self.layers[idx].thickness |
|
|
layer_top = cumulative_depth |
|
|
layer_bottom = cumulative_depth + self.layers[layer_idx].thickness |
|
|
layer_midpoint = (layer_top + layer_bottom) / 2 |
|
|
|
|
|
|
|
|
H = abs(layer_midpoint - self.pvd.L_drain) |
|
|
|
|
|
|
|
|
drainage_type = "one-way" |
|
|
else: |
|
|
|
|
|
H = self.total_thickness |
|
|
drainage_type = "two-way" |
|
|
|
|
|
|
|
|
if H > 0: |
|
|
Tv = Cv * t / H**2 |
|
|
else: |
|
|
Tv = 1e10 |
|
|
|
|
|
|
|
|
if drainage_type == "one-way": |
|
|
|
|
|
if Tv < 0.05: |
|
|
Uv[i] = np.sqrt(4 * Tv / np.pi) |
|
|
else: |
|
|
Uv[i] = 1 - (8 / np.pi**2) * np.exp(-(np.pi**2) * Tv / 4) |
|
|
else: |
|
|
|
|
|
if Tv < 0.217: |
|
|
Uv[i] = np.sqrt(4 * Tv / np.pi) |
|
|
else: |
|
|
Uv[i] = 1 - (8 / np.pi**2) * np.exp(-(np.pi**2) * Tv / 4) |
|
|
|
|
|
Uv[i] = min(Uv[i], 1.0) |
|
|
|
|
|
return Uv |
|
|
|
|
|
def calculate_total_U(self, t: float) -> np.ndarray: |
|
|
""" |
|
|
Calculate total degree of consolidation (combined vertical and horizontal) |
|
|
|
|
|
For nodes within drain length: U = 1 - (1 - Uh)(1 - Uv) |
|
|
For nodes beyond drain length: U = Uv (only vertical consolidation) |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
t : float |
|
|
Time (years) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
U : ndarray |
|
|
Total degree of consolidation at each node |
|
|
""" |
|
|
Uh = self.calculate_Uh(t) |
|
|
Uv = self.calculate_Uv(t) |
|
|
|
|
|
U = np.zeros(self.n_nodes) |
|
|
|
|
|
for i in range(self.n_nodes): |
|
|
depth = self.z_coords[i] |
|
|
|
|
|
if depth <= self.pvd.L_drain: |
|
|
|
|
|
U[i] = 1 - (1 - Uh[i]) * (1 - Uv[i]) |
|
|
else: |
|
|
|
|
|
U[i] = Uv[i] |
|
|
|
|
|
return U |
|
|
|
|
|
def calculate_settlement(self, t: float) -> Tuple[float, np.ndarray]: |
|
|
""" |
|
|
Calculate total settlement at time t |
|
|
For staged loading: calculates settlement contribution from each stage |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
t : float |
|
|
Time (years) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
total_settlement : float |
|
|
Total settlement (m) |
|
|
layer_settlements : ndarray |
|
|
Settlement for each layer (m) |
|
|
""" |
|
|
layer_settlements = np.zeros(len(self.layers)) |
|
|
|
|
|
|
|
|
for stage_idx, stage in enumerate(self.loading_stages): |
|
|
if t < stage.start_time: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
t_stage = t - stage.start_time |
|
|
|
|
|
|
|
|
U_stage = self.calculate_total_U(t_stage) |
|
|
|
|
|
|
|
|
for i, layer in enumerate(self.layers): |
|
|
|
|
|
layer_mask = self.layer_indices == i |
|
|
U_layer = np.mean(U_stage[layer_mask]) |
|
|
|
|
|
|
|
|
|
|
|
if stage_idx == 0: |
|
|
avg_vacuum_prev_layer = 0.0 |
|
|
else: |
|
|
prev_stage = self.loading_stages[stage_idx - 1] |
|
|
avg_vacuum_prev_layer = self.calculate_layer_average_vacuum( |
|
|
i, prev_stage.vacuum |
|
|
) |
|
|
|
|
|
|
|
|
avg_vacuum_current_layer = self.calculate_layer_average_vacuum( |
|
|
i, stage.vacuum |
|
|
) |
|
|
|
|
|
|
|
|
if stage_idx == 0: |
|
|
prev_surcharge = 0.0 |
|
|
sigma_prev_layer = 0.0 |
|
|
else: |
|
|
prev_surcharge = self.loading_stages[stage_idx - 1].surcharge |
|
|
sigma_prev_layer = prev_surcharge + avg_vacuum_prev_layer |
|
|
|
|
|
|
|
|
sigma_current_layer = stage.surcharge + avg_vacuum_current_layer |
|
|
|
|
|
|
|
|
delta_sigma_layer = sigma_current_layer - sigma_prev_layer |
|
|
|
|
|
if abs(delta_sigma_layer) < 1e-6: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
sigma_before = layer.sigma_ini + sigma_prev_layer |
|
|
|
|
|
|
|
|
sigma_after = layer.sigma_ini + sigma_current_layer |
|
|
|
|
|
|
|
|
Sc_increment = 0.0 |
|
|
|
|
|
if delta_sigma_layer > 0: |
|
|
|
|
|
if sigma_after <= layer.sigma_p: |
|
|
|
|
|
Sc_increment = ( |
|
|
layer.RR |
|
|
* np.log10(sigma_after / sigma_before) |
|
|
* layer.thickness |
|
|
) |
|
|
elif sigma_before >= layer.sigma_p: |
|
|
|
|
|
Sc_increment = ( |
|
|
layer.CR |
|
|
* np.log10(sigma_after / sigma_before) |
|
|
* layer.thickness |
|
|
) |
|
|
else: |
|
|
|
|
|
Sc_recomp = ( |
|
|
layer.RR |
|
|
* np.log10(layer.sigma_p / sigma_before) |
|
|
* layer.thickness |
|
|
) |
|
|
Sc_virgin = ( |
|
|
layer.CR |
|
|
* np.log10(sigma_after / layer.sigma_p) |
|
|
* layer.thickness |
|
|
) |
|
|
Sc_increment = Sc_recomp + Sc_virgin |
|
|
else: |
|
|
|
|
|
Sc_increment = ( |
|
|
layer.RR |
|
|
* np.log10(sigma_after / sigma_before) |
|
|
* layer.thickness |
|
|
) |
|
|
|
|
|
|
|
|
layer_settlements[i] += U_layer * Sc_increment |
|
|
|
|
|
total_settlement = np.sum(layer_settlements) |
|
|
|
|
|
return total_settlement, layer_settlements |
|
|
|
|
|
def settlement_vs_time( |
|
|
self, t_max: float, n_points: int = 100 |
|
|
) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Calculate settlement vs time curve |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
t_max : float |
|
|
Maximum time (years) |
|
|
n_points : int |
|
|
Number of time points |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
time : ndarray |
|
|
Time array (years) |
|
|
settlement : ndarray |
|
|
Settlement array (m) |
|
|
""" |
|
|
time = np.linspace(0, t_max, n_points) |
|
|
settlement = np.zeros(n_points) |
|
|
|
|
|
for i, t in enumerate(time): |
|
|
if t == 0: |
|
|
settlement[i] = 0 |
|
|
else: |
|
|
settlement[i], _ = self.calculate_settlement(t) |
|
|
|
|
|
return time, settlement |
|
|
|
|
|
def plot_settlement_vs_time( |
|
|
self, t_max: float, n_points: int = 100, save_path: str = None |
|
|
): |
|
|
""" |
|
|
Plot settlement vs time curve |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
t_max : float |
|
|
Maximum time (years) |
|
|
n_points : int |
|
|
Number of time points |
|
|
save_path : str, optional |
|
|
Path to save the plot |
|
|
""" |
|
|
time, settlement = self.settlement_vs_time(t_max, n_points) |
|
|
|
|
|
|
|
|
settlement_mm = settlement * 1000 |
|
|
|
|
|
plt.figure(figsize=(10, 6)) |
|
|
plt.plot(time, settlement_mm, "b-", linewidth=2) |
|
|
plt.xlabel("Time (years)", fontsize=12) |
|
|
plt.ylabel("Settlement (mm)", fontsize=12) |
|
|
plt.title( |
|
|
"Settlement vs Time - PVD Consolidation", fontsize=14, fontweight="bold" |
|
|
) |
|
|
plt.grid(True, alpha=0.3) |
|
|
plt.tight_layout() |
|
|
|
|
|
if save_path: |
|
|
plt.savefig(save_path, dpi=300, bbox_inches="tight") |
|
|
|
|
|
plt.show() |
|
|
|
|
|
return time, settlement_mm |
|
|
|
|
|
def plot_degree_of_consolidation(self, t: float, save_path: str = None): |
|
|
""" |
|
|
Plot degree of consolidation profile at time t |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
t : float |
|
|
Time (years) |
|
|
save_path : str, optional |
|
|
Path to save the plot |
|
|
""" |
|
|
Uh = self.calculate_Uh(t) |
|
|
Uv = self.calculate_Uv(t) |
|
|
U = self.calculate_total_U(t) |
|
|
|
|
|
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) |
|
|
|
|
|
|
|
|
ax1.plot(Uh * 100, self.z_coords, "r-", linewidth=2, label="Horizontal (Uh)") |
|
|
ax1.plot(Uv * 100, self.z_coords, "b-", linewidth=2, label="Vertical (Uv)") |
|
|
ax1.plot(U * 100, self.z_coords, "g-", linewidth=2, label="Total (U)") |
|
|
|
|
|
|
|
|
ax1.axhline( |
|
|
y=self.pvd.L_drain, |
|
|
color="orange", |
|
|
linestyle="--", |
|
|
linewidth=1.5, |
|
|
alpha=0.7, |
|
|
label=f"Drain Length = {self.pvd.L_drain:.1f} m", |
|
|
) |
|
|
|
|
|
ax1.set_xlabel("Degree of Consolidation (%)", fontsize=12) |
|
|
ax1.set_ylabel("Depth (m)", fontsize=12) |
|
|
ax1.set_title( |
|
|
f"Consolidation Profile at t = {t:.2f} years", |
|
|
fontsize=12, |
|
|
fontweight="bold", |
|
|
) |
|
|
ax1.invert_yaxis() |
|
|
ax1.grid(True, alpha=0.3) |
|
|
ax1.legend() |
|
|
|
|
|
|
|
|
u = 1 - U |
|
|
ax2.plot(u * self.surcharge, self.z_coords, "k-", linewidth=2) |
|
|
ax2.set_xlabel("Excess Pore Pressure (kPa)", fontsize=12) |
|
|
ax2.set_ylabel("Depth (m)", fontsize=12) |
|
|
ax2.set_title( |
|
|
f"Excess Pore Pressure at t = {t:.2f} years", fontsize=12, fontweight="bold" |
|
|
) |
|
|
ax2.invert_yaxis() |
|
|
ax2.grid(True, alpha=0.3) |
|
|
|
|
|
plt.tight_layout() |
|
|
|
|
|
if save_path: |
|
|
plt.savefig(save_path, dpi=300, bbox_inches="tight") |
|
|
|
|
|
plt.show() |
|
|
|
|
|
def get_summary_report(self, t_check: List[float]) -> str: |
|
|
""" |
|
|
Generate summary report for specified time points |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
t_check : List[float] |
|
|
Time points to check (years) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
report : str |
|
|
Summary report |
|
|
""" |
|
|
Fn, Fs, Fr = self.calculate_pvd_factors() |
|
|
|
|
|
report = "=" * 70 + "\n" |
|
|
report += "PVD CONSOLIDATION ANALYSIS SUMMARY\n" |
|
|
report += "=" * 70 + "\n\n" |
|
|
|
|
|
report += "PVD PARAMETERS:\n" |
|
|
report += f" Equivalent drain diameter (dw): {self.pvd.dw:.3f} m\n" |
|
|
report += f" Smear zone diameter (ds): {self.pvd.ds:.3f} m\n" |
|
|
report += f" Unit cell diameter (De): {self.pvd.De:.3f} m\n" |
|
|
report += f" Drain length (L): {self.pvd.L_drain:.2f} m\n" |
|
|
report += ( |
|
|
f" Drain spacing ratio (n = De/dw): {self.pvd.De / self.pvd.dw:.2f}\n" |
|
|
) |
|
|
report += f" Geometric factor (Fn): {Fn:.4f}\n" |
|
|
report += f" Smear factor (Fs - avg): {Fs:.4f}\n" |
|
|
report += f" Well resistance (Fr - avg): {Fr:.4f}\n" |
|
|
report += f" Total resistance (F - avg): {Fn + Fs + Fr:.4f}\n\n" |
|
|
|
|
|
report += "SOIL PROFILE:\n" |
|
|
cumulative_depth = 0 |
|
|
for i, layer in enumerate(self.layers): |
|
|
depth_top = cumulative_depth |
|
|
depth_bottom = cumulative_depth + layer.thickness |
|
|
|
|
|
|
|
|
if depth_bottom <= self.pvd.L_drain: |
|
|
drain_status = "Full PVD effect (Uh + Uv)" |
|
|
elif depth_top >= self.pvd.L_drain: |
|
|
drain_status = "No PVD effect (Uv only)" |
|
|
else: |
|
|
drain_status = "Partial PVD effect" |
|
|
|
|
|
report += f" Layer {i + 1} ({depth_top:.1f}m - {depth_bottom:.1f}m): {drain_status}\n" |
|
|
report += f" Thickness: {layer.thickness:.2f} m\n" |
|
|
report += f" Ch: {layer.Ch:.4f} m²/year\n" |
|
|
report += f" Cv: {layer.Cv:.4f} m²/year\n" |
|
|
report += f" kh: {layer.kh:.4f} m/year\n" |
|
|
report += f" ks: {layer.ks:.4f} m/year\n" |
|
|
report += f" RR: {layer.RR:.4f}\n" |
|
|
report += f" CR: {layer.CR:.4f}\n" |
|
|
report += f" σ'ini: {layer.sigma_ini:.1f} kPa\n" |
|
|
report += f" σ'p: {layer.sigma_p:.1f} kPa\n\n" |
|
|
|
|
|
cumulative_depth = depth_bottom |
|
|
|
|
|
report += f"Applied surcharge: {self.surcharge:.1f} kPa\n\n" |
|
|
report += "=" * 70 + "\n" |
|
|
report += "SETTLEMENT vs TIME:\n" |
|
|
report += "=" * 70 + "\n" |
|
|
report += f"{'Time (years)':<15} {'Settlement (mm)':<20} {'U (%)':<15}\n" |
|
|
report += "-" * 70 + "\n" |
|
|
|
|
|
for t in t_check: |
|
|
settlement, _ = self.calculate_settlement(t) |
|
|
U = self.calculate_total_U(t) |
|
|
U_avg = np.mean(U) |
|
|
report += f"{t:<15.2f} {settlement * 1000:<20.2f} {U_avg * 100:<15.1f}\n" |
|
|
|
|
|
report += "=" * 70 + "\n" |
|
|
|
|
|
return report |
|
|
|
|
|
|
|
|
def example_usage(): |
|
|
"""Example usage of PVD consolidation analysis""" |
|
|
|
|
|
|
|
|
layers = [ |
|
|
SoilLayer( |
|
|
thickness=5.0, |
|
|
Cv=0.5, |
|
|
Ch=1.5, |
|
|
RR=0.05, |
|
|
CR=0.30, |
|
|
sigma_ini=50.0, |
|
|
sigma_p=80.0, |
|
|
kh=2.0, |
|
|
ks=1.0, |
|
|
), |
|
|
SoilLayer( |
|
|
thickness=8.0, |
|
|
Cv=0.3, |
|
|
Ch=1.0, |
|
|
RR=0.04, |
|
|
CR=0.35, |
|
|
sigma_ini=90.0, |
|
|
sigma_p=90.0, |
|
|
kh=1.5, |
|
|
ks=0.75, |
|
|
), |
|
|
SoilLayer( |
|
|
thickness=7.0, |
|
|
Cv=0.4, |
|
|
Ch=1.2, |
|
|
RR=0.045, |
|
|
CR=0.32, |
|
|
sigma_ini=140.0, |
|
|
sigma_p=150.0, |
|
|
kh=1.8, |
|
|
ks=0.9, |
|
|
), |
|
|
] |
|
|
|
|
|
|
|
|
pvd = PVDProperties( |
|
|
dw=0.05, |
|
|
ds=0.15, |
|
|
De=1.5, |
|
|
L_drain=20.0, |
|
|
qw=100.0, |
|
|
) |
|
|
|
|
|
|
|
|
surcharge = 100.0 |
|
|
|
|
|
|
|
|
analysis = PVDConsolidation(layers, pvd, surcharge, dt=0.01) |
|
|
|
|
|
|
|
|
t_check = [0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0] |
|
|
print(analysis.get_summary_report(t_check)) |
|
|
|
|
|
|
|
|
print("\nGenerating settlement vs time plot...") |
|
|
time, settlement = analysis.plot_settlement_vs_time( |
|
|
t_max=20.0, n_points=200, save_path="settlement_vs_time.png" |
|
|
) |
|
|
|
|
|
|
|
|
print("Generating consolidation profile plots...") |
|
|
for t in [0.5, 2.0, 10.0]: |
|
|
analysis.plot_degree_of_consolidation( |
|
|
t, save_path=f"consolidation_profile_t{t:.1f}.png" |
|
|
) |
|
|
|
|
|
print("\nAnalysis complete!") |
|
|
|
|
|
|
|
|
def load_yaml_data(yaml_file: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Load PVD analysis data from YAML file |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
yaml_file : str |
|
|
Path to YAML file |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
data : dict |
|
|
Dictionary containing analysis parameters |
|
|
""" |
|
|
with open(yaml_file, "r") as f: |
|
|
data = yaml.safe_load(f) |
|
|
return data |
|
|
|
|
|
|
|
|
def create_analysis_from_yaml(yaml_file: str) -> PVDConsolidation: |
|
|
""" |
|
|
Create PVDConsolidation object from YAML file |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
yaml_file : str |
|
|
Path to YAML file |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
analysis : PVDConsolidation |
|
|
PVD consolidation analysis object |
|
|
""" |
|
|
data = load_yaml_data(yaml_file) |
|
|
|
|
|
|
|
|
layers = [] |
|
|
for layer_data in data["soil_layers"]: |
|
|
layer = SoilLayer( |
|
|
thickness=layer_data["thickness"], |
|
|
Cv=layer_data["Cv"], |
|
|
Ch=layer_data["Ch"], |
|
|
RR=layer_data["RR"], |
|
|
CR=layer_data["CR"], |
|
|
sigma_ini=layer_data["sigma_ini"], |
|
|
sigma_p=layer_data["sigma_p"], |
|
|
kh=layer_data["kh"], |
|
|
ks=layer_data["ks"], |
|
|
) |
|
|
layers.append(layer) |
|
|
|
|
|
|
|
|
pvd_data = data["pvd"] |
|
|
pvd = PVDProperties( |
|
|
dw=pvd_data["dw"], |
|
|
ds=pvd_data["ds"], |
|
|
De=pvd_data["De"], |
|
|
L_drain=pvd_data["L_drain"], |
|
|
qw=pvd_data["qw"], |
|
|
) |
|
|
|
|
|
|
|
|
surcharge = data["analysis"]["surcharge"] |
|
|
dt = data["analysis"].get("dt", 0.01) |
|
|
|
|
|
|
|
|
analysis = PVDConsolidation(layers, pvd, surcharge, dt) |
|
|
|
|
|
return analysis, data |
|
|
|
|
|
|
|
|
def run_analysis_from_yaml(yaml_file: str, output_dir: str = None): |
|
|
""" |
|
|
Run complete PVD analysis from YAML file |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
yaml_file : str |
|
|
Path to YAML input file |
|
|
output_dir : str, optional |
|
|
Directory to save output files |
|
|
""" |
|
|
print(f"Loading data from: {yaml_file}") |
|
|
analysis, data = create_analysis_from_yaml(yaml_file) |
|
|
|
|
|
|
|
|
if output_dir is None: |
|
|
output_dir = os.path.dirname(yaml_file) or "." |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
analysis_params = data["analysis"] |
|
|
t_max = analysis_params.get("t_max", 20.0) |
|
|
n_points = analysis_params.get("n_points", 200) |
|
|
t_check = analysis_params.get("t_check", [0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0]) |
|
|
t_profiles = analysis_params.get("t_profiles", [0.5, 2.0, 10.0]) |
|
|
|
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("RUNNING PVD CONSOLIDATION ANALYSIS") |
|
|
print("=" * 70 + "\n") |
|
|
|
|
|
report = analysis.get_summary_report(t_check) |
|
|
print(report) |
|
|
|
|
|
|
|
|
report_file = os.path.join(output_dir, "pvd_analysis_report.txt") |
|
|
with open(report_file, "w") as f: |
|
|
f.write(report) |
|
|
print(f"\nReport saved to: {report_file}") |
|
|
|
|
|
|
|
|
print("\nGenerating settlement vs time plot...") |
|
|
settlement_plot = os.path.join(output_dir, "settlement_vs_time.png") |
|
|
time, settlement = analysis.plot_settlement_vs_time( |
|
|
t_max=t_max, n_points=n_points, save_path=settlement_plot |
|
|
) |
|
|
print(f"Plot saved to: {settlement_plot}") |
|
|
|
|
|
|
|
|
csv_file = os.path.join(output_dir, "settlement_data.csv") |
|
|
np.savetxt( |
|
|
csv_file, |
|
|
np.column_stack((time, settlement * 1000)), |
|
|
delimiter=",", |
|
|
header="Time (years),Settlement (mm)", |
|
|
comments="", |
|
|
) |
|
|
print(f"Data saved to: {csv_file}") |
|
|
|
|
|
|
|
|
print("\nGenerating consolidation profile plots...") |
|
|
for t in t_profiles: |
|
|
profile_plot = os.path.join(output_dir, f"consolidation_profile_t{t:.1f}y.png") |
|
|
analysis.plot_degree_of_consolidation(t, save_path=profile_plot) |
|
|
print(f"Profile at t={t:.1f} years saved to: {profile_plot}") |
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("ANALYSIS COMPLETE!") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main CLI interface""" |
|
|
parser = argparse.ArgumentParser( |
|
|
description="PVD Consolidation Analysis - Settlement vs Time Calculator", |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
epilog=""" |
|
|
Examples: |
|
|
# Run analysis from YAML file |
|
|
python pvd_consolidation.py --data input.yaml |
|
|
|
|
|
# Specify output directory |
|
|
python pvd_consolidation.py --data input.yaml --output results/ |
|
|
|
|
|
# Run example analysis |
|
|
python pvd_consolidation.py --example |
|
|
""", |
|
|
) |
|
|
|
|
|
parser.add_argument("--data", type=str, help="Path to YAML input file") |
|
|
parser.add_argument( |
|
|
"--output", |
|
|
type=str, |
|
|
default=None, |
|
|
help="Output directory for results (default: same as input file)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--example", |
|
|
action="store_true", |
|
|
help="Run example analysis with default parameters", |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.example: |
|
|
print("Running example analysis...") |
|
|
example_usage() |
|
|
elif args.data: |
|
|
if not os.path.exists(args.data): |
|
|
print(f"Error: Input file '{args.data}' not found!") |
|
|
return |
|
|
run_analysis_from_yaml(args.data, args.output) |
|
|
else: |
|
|
parser.print_help() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|