Source code for sgsim.motion.ground_motion

from functools import cached_property
import numpy as np
import csv
from . import signal_tools
from ..file_reading.record_reader import RecordReader
from ..core.domain_config import DomainConfig

[docs] class GroundMotion(DomainConfig): """ Ground motion data container Parameters ---------- npts : int Number of time points in the record. dt : float Time step interval in seconds. ac : ndarray Acceleration time series. vel : ndarray Velocity time series. disp : ndarray Displacement time series. tag : str, optional Identifier for the ground motion record. """ _CORE_ATTRS = DomainConfig._CORE_ATTRS | frozenset({'ac', 'vel', 'disp', 'tag'}) def __init__(self, npts, dt, ac, vel, disp, tag=None): super().__init__(npts, dt) self.ac = ac self.vel = vel self.disp = disp self.tag = tag
[docs] def trim(self, method: str, value: tuple[float, float] | int | slice): """ Trim ground motion time series. Parameters ---------- method : {'energy', 'npts', 'slice'} Trimming approach. 'energy' trims by cumulative energy fraction, 'npts' keeps first N points, 'slice' applies custom indexing. value : tuple of float, int, or slice Trim parameters. For 'energy': (start, end) fractions (e.g., 0.05, 0.95). For 'npts': number of points. For 'slice': slice object. Returns ------- self Modified GroundMotion instance for method chaining. Examples -------- >>> motion.trim('energy', (0.05, 0.95)) >>> motion.trim('npts', 1000) >>> motion.trim('slice', slice(100, 500)) """ if method.lower() == 'energy': if not isinstance(value, tuple) or len(value) != 2: raise ValueError("Energy trimming requires a tuple of (start_fraction, end_fraction)") self.energy_slicer = value slicer = self.energy_slicer elif method.lower() == 'npts': if not isinstance(value, int) or value <= 0 or value > self.npts: raise ValueError("Number of points must be a positive integer less than the current number of points") slicer = slice(0, value) elif method.lower() == 'slice': if not isinstance(value, slice): raise ValueError("Slice method requires a Python slice object") slicer = value else: raise ValueError(f"Unsupported trim method: '{method}'. Use 'energy', 'npts', or 'slice'") self.ac = self.ac[slicer] self.vel = self.vel[slicer] self.disp = self.disp[slicer] self.npts = len(self.ac) # auto clear cache return self
[docs] def filter(self, bandpass_freqs: tuple[float, float]): """ Apply bandpass filter to ground motion. Parameters ---------- bandpass_freqs : tuple of float Lower and upper cutoff frequencies (Hz) as (f_low, f_high). Returns ------- self Modified GroundMotion instance for method chaining. """ self.ac = signal_tools.bandpass_filter(self.dt, self.ac, bandpass_freqs[0], bandpass_freqs[1]) self.vel = signal_tools.get_integral(self.dt, self.ac) self.disp = signal_tools.get_integral(self.dt, self.vel) self._clear_cache() return self
[docs] def resample(self, dt: float): """ Resample ground motion to new time step. Parameters ---------- dt : float Target time step in seconds. Returns ------- self Modified GroundMotion instance for method chaining. """ npts_new, dt_new, ac_new = signal_tools.resample(self.dt, dt, self.ac) self.ac = ac_new self.vel = signal_tools.get_integral(dt_new, self.ac) self.disp = signal_tools.get_integral(dt_new, self.vel) self.npts = npts_new # auto clear cache self.dt = dt_new return self
@property def fas(self): """ Fourier amplitude spectrum of acceleration. Returns ------- ndarray Fourier amplitude spectrum. """ return signal_tools.get_fas(self.npts, self.ac)
[docs] def smooth_fas(self, window: int = 9): """ Smoothed Fourier amplitude spectrum. Parameters ---------- window : int, optional Moving average window size. Default is 9. Returns ------- ndarray Smoothed Fourier amplitude spectrum. """ return signal_tools.moving_average(self.fas, window)
@property def ce(self): """ Cumulative energy of acceleration time series. Returns ------- ndarray Cumulative energy array. """ return signal_tools.get_ce(self.dt, self.ac) @property def mle_ac(self): """ Mean local extrema of acceleration. Returns ------- float Average of local peak values. """ return signal_tools.get_mle(self.ac) @property def mle_vel(self): """ Mean local extrema of velocity. Returns ------- float Average of local peak values. """ return signal_tools.get_mle(self.vel) @property def mle_disp(self): """ Mean local extrema of displacement. Returns ------- float Average of local peak values. """ return signal_tools.get_mle(self.disp) @property def mzc_ac(self): """ Mean zero-crossing of acceleration. Returns ------- float Zero-crossings per unit time. """ return signal_tools.get_mzc(self.ac) @property def mzc_vel(self): """ Mean zero-crossing of velocity. Returns ------- float Zero-crossings per unit time. """ return signal_tools.get_mzc(self.vel) @property def mzc_disp(self): """ Mean zero-crossing of displacement. Returns ------- float Zero-crossings per unit time. """ return signal_tools.get_mzc(self.disp) @property def pmnm_ac(self): """ Positive-minima and negative-maxima of acceleration. Returns ------- float Ratio of peak to mean absolute value. """ return signal_tools.get_pmnm(self.ac) @property def pmnm_vel(self): """ Positive-minima and negative-maxima of velocity. Returns ------- float Ratio of peak to mean absolute value. """ return signal_tools.get_pmnm(self.vel) @property def pmnm_disp(self): """ Positive-minima and negative-maxima of displacement. Returns ------- float Ratio of peak to mean absolute value. """ return signal_tools.get_pmnm(self.disp)
[docs] @cached_property def spectra(self): """ Response spectra at 5% damping. Returns ------- ndarray Array of shape (3, n_periods) with [Sd, Sv, Sa]. """ if not hasattr(self, 'tp'): raise AttributeError("Set 'tp' attribute (periods) before accessing spectra") return signal_tools.get_spectra(self.dt, self.ac if self.ac.ndim == 2 else self.ac[None, :], period=self.tp, zeta=0.05)
@property def sa(self): """ Spectral acceleration response. Returns ------- ndarray Sa values at periods defined by tp. """ return self.spectra[2] @property def sv(self): """ Spectral velocity response. Returns ------- ndarray Sv values at periods defined by tp. """ return self.spectra[1] @property def sd(self): """ Spectral displacement response. Returns ------- ndarray Sd values at periods defined by tp. """ return self.spectra[0] @property def pga(self): """ Peak ground acceleration. Returns ------- float Maximum absolute acceleration value. """ return signal_tools.get_peak_param(self.ac) @property def pgv(self): """ Peak ground velocity. Returns ------- float Maximum absolute velocity value. """ return signal_tools.get_peak_param(self.vel) @property def pgd(self): """ Peak ground displacement. Returns ------- float Maximum absolute displacement value. """ return signal_tools.get_peak_param(self.disp) @property def energy_slicer(self): """ Slice indices for cumulative energy range. Returns ------- slice Index slice for energy-based trimming. """ return self._energy_slicer @energy_slicer.setter def energy_slicer(self, energy_range: tuple[float, float]): """ Set energy slice range. Parameters ---------- energy_range : tuple of float Start and end fractions of cumulative energy (e.g., 0.05, 0.95). """ self._energy_slicer = signal_tools.slice_energy(self.ce, energy_range)
[docs] def to_csv(self, filename: str, features: list[str]): """ Export selected features to CSV. Parameters ---------- filename : str Output CSV file path. features : list of str List of feature names to export. """ header = [] row = [] for feature in features: feature_l = feature.lower() attr = getattr(self, feature_l) # Spectral arrays (sa, sv, sd) if feature_l in ("sa", "sv", "sd"): if not hasattr(self, "tp"): raise AttributeError("Set 'tp' attribute (periods) before accessing spectra.") for i, val in enumerate(attr.T): header.append(f"{feature_l}_{self.tp[i]:.3f}") row.append(val) # FAS (Fourier amplitude spectrum) elif feature_l == "fas": if not hasattr(self, "freq"): raise AttributeError("Set 'freq' attribute (frequencies) before accessing spectra") for i, val in enumerate(attr.T): header.append(f"fas_{self.freq[i] / (2*np.pi):.3f}") row.append(val) else: header.append(feature_l) row.append(attr) with open(filename, "w", newline="") as f: writer = csv.writer(f) writer.writerow(header) writer.writerow(row)
[docs] @classmethod def load_from(cls, source: str, tag=None, **kwargs): """ Load ground motion from file or array. Parameters ---------- source : str Data source format: 'NGA', 'ESM', 'COL', 'RAW', 'COR', or 'Array'. tag : str, optional Record identifier. **kwargs Source-specific arguments: For file sources: file, filename, zip_file, skiprows, scale For 'Array' source: dt (float), ac (ndarray) Returns ------- GroundMotion Loaded ground motion instance. Examples -------- >>> gm = GroundMotion.load_from('NGA', file='record.AT2') >>> gm = GroundMotion.load_from('Array', dt=0.01, ac=acc_array) """ record = RecordReader(source, **kwargs) return cls(npts=record.npts, dt=record.dt, ac=record.ac, vel=record.vel, disp=record.disp, tag=tag)