"""Stochastic ground motion model."""
import json
from functools import cached_property
import numpy as np
from scipy.fft import irfft
from . import engine
from .domain import Domain
from ..motion import signal
from ..motion.ground_motion import GroundMotion
from ..optimization.fit_eval import goodness_of_fit
from . import functions
[docs]
class StochasticModel(Domain):
"""
Stochastic ground motion model.
Parameters
----------
npts : int
Number of time points.
dt : float
Time step.
modulating : np.ndarray
Time-varying modulating function.
upper_frequency : np.ndarray
Upper frequency function in Hz.
upper_damping : np.ndarray
Upper damping function.
lower_frequency : np.ndarray
Lower frequency function in Hz.
lower_damping : np.ndarray
Lower damping function.
params : dict, optional
Original parameters dictionary used to create the model.
Examples
--------
>>> params = {
... 'modulating': {'type': 'BetaBasic', 'params': [0.1, 10.0, 1.0, 25.0]},
... 'upper_frequency': {'type': 'Linear', 'params': [8.0, 1.5]},
... 'upper_damping': {'type': 'Constant', 'params': [0.5]},
... 'lower_frequency': {'type': 'Linear', 'params': [1.0, 0.5]},
... 'lower_damping': {'type': 'Constant', 'params': [0.3]},
... }
>>> model = StochasticModel.load_from(params, npts, dt)
>>> model.fas.shape
(1000,)
>>> import matplotlib.pyplot as plt
>>> plt.plot(model.t, model.ac)
>>> plt.show()
>>> plt.plot(model.freq / (2 * np.pi), model.fas)
>>> plt.show()
"""
def __init__(self, npts: int, dt: float, modulating: np.ndarray,
upper_frequency: np.ndarray, upper_damping: np.ndarray,
lower_frequency: np.ndarray, lower_damping: np.ndarray,
params: dict = None):
super().__init__(npts, dt)
self.q = modulating
self.wu = upper_frequency
self.zu = upper_damping
self.wl = lower_frequency
self.zl = lower_damping
self.params = params
[docs]
@classmethod
def load_from(cls, params: dict, npts: int, dt: float):
"""
Create StochasticModel from a parameters dictionary.
Parameters
----------
params : dict
Dictionary containing model parameters (usually from ModelFitter.fit()).
npts : int
Number of time points.
dt : float
Time step.
Returns
-------
StochasticModel
Initialized stochastic model.
"""
t = signal.time(npts, dt)
def compute_array(param_group: dict):
name = param_group['type']
fn_class = functions.REGISTRY.get(name)
if fn_class is None:
raise ValueError(f"Unknown function type: '{name}'. Available: {list(functions.REGISTRY.keys())}")
time_shift = param_group.get('time_shift', 0.0)
return fn_class.compute(t - time_shift, **param_group['params'])
modulating = compute_array(params['modulating'])
upper_frequency = compute_array(params['upper_frequency'])
upper_damping = compute_array(params['upper_damping'])
lower_frequency = compute_array(params['lower_frequency'])
lower_damping = compute_array(params['lower_damping'])
return cls(npts, dt, modulating, upper_frequency, upper_damping, lower_frequency, lower_damping, params)
[docs]
def save(self, filename):
"""
Save model parameters to a JSON file.
Parameters
----------
filename : str
Path to the output JSON file (Recommeded to use .json extension).
Examples
--------
>>> model.save("my_model.json")
"""
with open(filename, "w") as f:
json.dump(self.params, f, indent=2)
# =========================================================================
# Core Statistics (cached tuple unpacking)
# =========================================================================
@cached_property
def _stats(self):
"""Variance statistics."""
return engine.stats(self.wu * 2 * np.pi, self.zu, self.wl * 2 * np.pi, self.zl,
self.freq_p2, self.freq_p4, self.freq_n2, self.freq_n4, self.df)
@property
def _variance(self):
return self._stats[0]
@property
def _variance_dot(self):
return self._stats[1]
@property
def _variance_2dot(self):
return self._stats[2]
@property
def _variance_bar(self):
return self._stats[3]
@property
def _variance_2bar(self):
return self._stats[4]
# =========================================================================
# Fourier Amplitude Spectra (cached tuple unpacking)
# =========================================================================
@cached_property
def _fas_all(self):
"""FAS for acceleration, velocity, and displacement."""
return engine.fas(self.q, self.wu * 2 * np.pi, self.zu, self.wl * 2 * np.pi, self.zl,
self.freq_p2, self.freq_p4, self._variance, self.dt)
@property
def fas(self):
"""
Fourier amplitude spectrum (FAS) of acceleration.
Returns
-------
ndarray
FAS computed using the model's PSD.
"""
return self._fas_all[0]
@property
def fas_vel(self):
"""
Fourier amplitude spectrum (FAS) of velocity.
Returns
-------
ndarray
FAS computed using the model's PSD.
"""
return self._fas_all[1]
@property
def fas_disp(self):
"""
Fourier amplitude spectrum (FAS) of displacement.
Returns
-------
ndarray
FAS computed using the model's PSD.
"""
return self._fas_all[2]
# =========================================================================
# Cumulative Energy
# =========================================================================
[docs]
@cached_property
def ce(self):
"""
Cumulative energy of the stochastic model.
Returns
-------
ndarray
Cumulative energy time history.
"""
return signal.ce(self.dt, self.q)
# =========================================================================
# Local Extrema Counts
# =========================================================================
[docs]
@cached_property
def le_ac(self):
"""
Mean cumulative local extrema count of acceleration.
Returns
-------
ndarray
Cumulative count of acceleration local extrema.
"""
return engine.cumulative_rate(self.dt, self._variance_2dot, self._variance_dot)
[docs]
@cached_property
def le_vel(self):
"""
Mean cumulative local extrema count of velocity.
Returns
-------
ndarray
Cumulative count of velocity local extrema.
"""
return engine.cumulative_rate(self.dt, self._variance_dot, self._variance)
[docs]
@cached_property
def le_disp(self):
"""
Mean cumulative local extrema count of displacement.
Returns
-------
ndarray
Cumulative count of displacement local extrema.
"""
return engine.cumulative_rate(self.dt, self._variance, self._variance_bar)
# =========================================================================
# Zero Crossing Counts
# =========================================================================
[docs]
@cached_property
def zc_ac(self):
"""
Mean cumulative zero crossing count of acceleration.
Returns
-------
ndarray
Cumulative count of acceleration zero crossings.
"""
return engine.cumulative_rate(self.dt, self._variance_dot, self._variance)
[docs]
@cached_property
def zc_vel(self):
"""
Mean cumulative zero crossing count of velocity.
Returns
-------
ndarray
Cumulative count of velocity zero crossings.
"""
return engine.cumulative_rate(self.dt, self._variance, self._variance_bar)
[docs]
@cached_property
def zc_disp(self):
"""
Mean cumulative zero crossing count of displacement.
Returns
-------
ndarray
Cumulative count of displacement zero crossings.
"""
return engine.cumulative_rate(self.dt, self._variance_bar, self._variance_2bar)
# =========================================================================
# Positive-Minima / Negative-Maxima Counts
# =========================================================================
[docs]
@cached_property
def pmnm_ac(self):
"""
Mean cumulative PMNM count of acceleration.
Returns
-------
ndarray
Cumulative count of acceleration positive-minima and negative-maxima.
"""
return engine.pmnm_rate(self.dt, self._variance_2dot, self._variance_dot, self._variance)
[docs]
@cached_property
def pmnm_vel(self):
"""
Mean cumulative PMNM count of velocity.
Returns
-------
ndarray
Cumulative count of velocity positive-minima and negative-maxima.
"""
return engine.pmnm_rate(self.dt, self._variance_dot, self._variance, self._variance_bar)
[docs]
@cached_property
def pmnm_disp(self):
"""
Mean cumulative PMNM count of displacement.
Returns
-------
ndarray
Cumulative count of displacement positive-minima and negative-maxima.
"""
return engine.pmnm_rate(self.dt, self._variance, self._variance_bar, self._variance_2bar)
# =========================================================================
# Simulation Methods
# =========================================================================
[docs]
def simulate(self, n: int, tag=None, seed: int = None) -> GroundMotion:
"""
Simulate ground motions using the stochastic model.
Parameters
----------
n : int
Number of simulations to generate.
tag : any, optional
Identifier for the simulation batch.
seed : int, optional
Random seed for reproducibility.
Returns
-------
GroundMotion
Simulated ground motions with acceleration, velocity, and displacement.
"""
n = int(n)
rng = np.random.default_rng(seed)
white_noise = rng.standard_normal((n, self.npts))
fourier = engine.fourier_series(n, self.npts, self.t,
self.freq_sim, self.freq_sim_p2,
self.q,
self.wu * 2 * np.pi, self.zu,
self.wl * 2 * np.pi, self.zl,
self._variance, white_noise, self.dt)
# Using default backward 1/N scaling and manual anti-aliasing
ac = irfft(fourier, workers=-1)[..., :self.npts]
# FT(w)/jw + pi*delta(w)*FT(0) integration in freq domain
fourier[..., 1:] /= (1j * self.freq_sim[1:])
vel = irfft(fourier, workers=-1)[..., :self.npts]
fourier[..., 1:] /= (1j * self.freq_sim[1:])
disp = irfft(fourier, workers=-1)[..., :self.npts]
return GroundMotion(self.npts, self.dt, ac, vel, disp, tag=tag)
[docs]
def simulate_conditional(self, n: int, target: GroundMotion, metrics: dict, max_iter: int = 100) -> GroundMotion:
"""
Conditionally simulate ground motions until GoF metrics are met.
Parameters
----------
n : int
Number of simulations to generate.
target : GroundMotion
Target ground motion to compare against.
metrics : dict
Conditioning metrics with GoF thresholds, e.g., {'sa': 0.9}.
max_iter : int, optional
Maximum attempts per required simulation.
Returns
-------
GroundMotion
Simulated ground motions meeting all GoF thresholds.
Raises
------
RuntimeError
If not enough simulations meet thresholds within max_iter * n attempts.
"""
successful: list[GroundMotion] = []
attempts = 0
while len(successful) < n and attempts < max_iter * n:
simulated = self.simulate(1, tag=attempts)
gof_scores = {metric: goodness_of_fit(getattr(simulated, metric), getattr(target, metric)) for metric in metrics}
if all(gof_scores[m] >= metrics[m] for m in metrics):
successful.append(simulated)
attempts += 1
if len(successful) < n:
raise RuntimeError(f"Only {len(successful)} simulations met thresholds after {attempts} attempts.")
ac = np.concatenate([gm.ac for gm in successful], axis=0)
vel = np.concatenate([gm.vel for gm in successful], axis=0)
disp = np.concatenate([gm.disp for gm in successful], axis=0)
return GroundMotion(self.npts, self.dt, ac, vel, disp, tag=len(successful))
[docs]
def summary(self):
"""
Print model parameters.
Returns
-------
Model
Self for method chaining.
"""
def _format_fn(fn):
params = ', '.join(f"{k}={v:.3f}" if isinstance(v, float) else f"{k}={v}" for k, v in fn['params'].items())
return f"{fn['type']}({params})"
title = "Model Summary " + "=" * 40
print(title)
print(f"{'Time Step (dt)':<25} : {self.dt}")
print(f"{'Number of Points (npts)':<25} : {self.npts}")
print("-" * len(title))
print(f"{'Modulating':<25} : {_format_fn(self.params['modulating'])}")
print(f"{'Upper Frequency':<25} : {_format_fn(self.params['upper_frequency'])}")
print(f"{'Lower Frequency':<25} : {_format_fn(self.params['lower_frequency'])}")
print(f"{'Upper Damping':<25} : {_format_fn(self.params['upper_damping'])}")
print(f"{'Lower Damping':<25} : {_format_fn(self.params['lower_damping'])}")
print("-" * len(title))
return self