"""Signal processing and spectral analysis tools for ground motion data."""
import numpy as np
from numba import njit, prange
from scipy.fft import rfft, rfftfreq
from scipy.ndimage import uniform_filter1d
from scipy.signal import butter, resample as sp_resample, sosfilt
from scipy.signal.windows import tukey
__all__ = [
"butterworth_filter",
"baseline_correction",
"taper",
"resample",
"smooth",
"response_spectra",
"sdof_response",
"fas",
"fps",
"frequency",
"time",
"principal_angle",
"rotate",
"slice_energy",
"slice_amplitude",
"slice_freq",
"integrate",
"integrate_detrend",
"ce",
"cav",
"le",
"zc",
"pmnm",
"peak_abs_value",
]
# Signal Processing =============================================================
[docs]
def butterworth_filter(dt, rec, lowcut=0.1, highcut=25.0, order=4):
"""
Apply bandpass Butterworth filter using second-order sections.
Parameters
----------
dt : float
Time step.
rec : ndarray
Input signal.
lowcut : float, optional
Low cutoff frequency (default is 0.1).
highcut : float, optional
High cutoff frequency (default is 25.0).
order : int, optional
Filter order (default is 4).
Returns
-------
ndarray
Filtered signal.
"""
nyquist = 0.5 / dt # Nyquist frequency
low = lowcut / nyquist
highcut = min(highcut, nyquist * 0.99)
high = highcut / nyquist
sos = butter(order, [low, high], btype='band', output='sos')
filtered_rec = sosfilt(sos, rec, axis=-1)
return filtered_rec
[docs]
def baseline_correction(rec, degree=1):
"""
Remove baseline drift using polynomial fitting.
Parameters
----------
rec : ndarray
Input signal.
degree : int, optional
Polynomial degree (default is 1).
Returns
-------
ndarray
Corrected signal.
"""
rec = np.atleast_2d(rec)
x = np.arange(rec.shape[-1])
corrected = np.empty_like(rec)
for i, signal in enumerate(rec):
p = np.polynomial.Polynomial.fit(x, signal, deg=degree)
corrected[i] = signal - p(x)
return corrected.squeeze()
[docs]
def smooth(rec: np.ndarray, window_size: int = 9) -> np.ndarray:
"""
Apply moving average smoothing.
Parameters
----------
rec : ndarray
Input signal.
window_size : int, optional
Window size (default is 9).
Returns
-------
ndarray
Smoothed signal.
"""
return uniform_filter1d(rec, size=window_size, axis=-1, mode='constant', cval=0.0)
[docs]
def taper(rec: np.ndarray, alpha: float = 0.05) -> np.ndarray:
"""
Apply Tukey window tapering to signal ends.
Parameters
----------
rec : ndarray
Input signal.
alpha : float, optional
Taper fraction (default is 0.05).
Returns
-------
ndarray
Tapered signal.
"""
window = tukey(rec.shape[-1], alpha=alpha)
return rec * window
[docs]
def resample(dt, dt_new, rec):
"""
Resample signal to new time step.
Parameters
----------
dt : float
Original time step.
dt_new : float
Target time step.
rec : ndarray
Input signal.
Returns
-------
npts_new : int
Number of points after resampling.
dt_new : float
New time step.
resampled : ndarray
Resampled signal.
"""
npts = rec.shape[-1]
duration = (npts - 1) * dt
npts_new = int(np.floor(duration / dt_new)) + 1
ac_new = sp_resample(rec, npts_new, axis=-1)
return npts_new, dt_new, ac_new
# Signal Analysis ============================================================
@njit('float64[:, :, :, :](float64, float64[:, :], float64[:], float64, float64)', fastmath=True, parallel=True, cache=True)
def _sdof_response_kernel(dt, rec, period, zeta, mass):
"""
Compute SDOF response time histories using Newmark method.
Parameters
----------
dt : float
Time step.
rec : ndarray
Input acceleration (2D).
period : ndarray
SDOF natural periods.
zeta : float
Damping ratio.
mass : float
SDOF mass.
Returns
-------
ndarray
Response array (4, n_rec, npts, n_period): [Disp, Vel, Acc, Acc_total].
"""
n_rec = rec.shape[0]
npts = rec.shape[1]
n_sdf = len(period)
# 4D output: (response_type, n_rec, npts, n_period)
# response_type indices: 0=disp, 1=vel, 2=acc, 3=acc_tot
out_responses = np.empty((4, n_rec, npts, n_sdf))
# Newmark Constants (Linear Acceleration Method)
gamma = 0.5
beta = 1.0 / 6.0
MIN_STEPS_PER_CYCLE = 20.0
for j in prange(n_sdf):
T = period[j]
# Safety for T=0
if T <= 1e-6:
# Rigid body: Disp=0, Acc = Ground Acc
out_responses[0, :, :, j] = 0.0 # Disp
out_responses[1, :, :, j] = 0.0 # Vel
# Acc relative = -Ground
out_responses[2, :, :, j] = -rec
# Acc total = Acc relative + Ground = 0 relative to inertial frame?
# Actually Acc Total = Ground Acc for rigid structure.
# Let's just output trivial zeros for disp/vel and handle acc:
for r in range(n_rec):
out_responses[3, r, :, j] = rec[r, :] # Total Acc = Ground Acc
continue
wn = 2 * np.pi / T
k = mass * wn**2
c = 2 * mass * wn * zeta
# Sub-stepping Logic
if dt > (T / MIN_STEPS_PER_CYCLE):
n_sub = int(np.ceil(dt / (T / MIN_STEPS_PER_CYCLE)))
else:
n_sub = 1
dt_sub = dt / n_sub
# Newmark Coefficients
a1 = mass / (beta * dt_sub**2) + c * gamma / (beta * dt_sub)
a2 = mass / (beta * dt_sub) + c * (gamma / beta - 1)
a3 = mass * (1 / (2 * beta) - 1) + c * dt_sub * (gamma / (2 * beta) - 1)
k_hat = k + a1
for r in range(n_rec):
# Views for cleaner indexing
disp = out_responses[0, r, :, j]
vel = out_responses[1, r, :, j]
acc = out_responses[2, r, :, j]
acc_tot = out_responses[3, r, :, j]
# --- CRITICAL FIX START ---
# Explicitly zero out the initial state in the output array
disp[0] = 0.0
vel[0] = 0.0
# Initial acceleration: ma + cv + kd = p => ma = p => a = -rec[0]
acc[0] = -rec[r, 0]
acc_tot[0] = acc[0] + rec[r, 0] # Should be 0
# Init Temp variables from these explicit zeros
d_curr = 0.0
v_curr = 0.0
a_curr = acc[0]
# --- CRITICAL FIX END ---
for i in range(npts - 1):
ug_start = rec[r, i]
ug_end = rec[r, i+1]
for sub in range(n_sub):
alpha = (sub + 1) / n_sub
ug_now = ug_start + (ug_end - ug_start) * alpha
p_eff = -mass * ug_now
dp = p_eff + a1 * d_curr + a2 * v_curr + a3 * a_curr
d_next = dp / k_hat
v_next = ((gamma / (beta * dt_sub)) * (d_next - d_curr) +
(1 - gamma / beta) * v_curr +
dt_sub * a_curr * (1 - gamma / (2 * beta)))
a_next = ((d_next - d_curr) / (beta * dt_sub**2) -
v_curr / (beta * dt_sub) -
a_curr * (1 / (2 * beta) - 1))
d_curr = d_next
v_curr = v_next
a_curr = a_next
# Save State
disp[i+1] = d_curr
vel[i+1] = v_curr
acc[i+1] = a_curr
acc_tot[i+1] = a_curr + ug_end
return out_responses
[docs]
def sdof_response(dt: float, rec: np.ndarray, period: np.ndarray, zeta: float = 0.05, mass: float = 1.0):
"""
Compute SDOF response time histories.
Parameters
----------
dt : float
Time step.
rec : ndarray
Input acceleration (1D or 2D).
period : ndarray
SDOF natural periods.
zeta : float, optional
Damping ratio (default is 0.05).
mass : float, optional
SDOF mass (default is 1.0).
Returns
-------
disp : ndarray
Displacement time histories.
vel : ndarray
Velocity time histories.
acc : ndarray
Relative acceleration time histories.
acc_total : ndarray
Total acceleration time histories.
"""
if rec.ndim == 1:
n = 1
rec = rec[None, :]
else:
n = rec.shape[0]
resp = _sdof_response_kernel(dt, rec, period, zeta, mass)
# Unpack: (4, n_rec, npts, n_sdf) -> disp, vel, acc, acc_tot
d, v, a, at = resp[0], resp[1], resp[2], resp[3]
if n == 1:
return d[0], v[0], a[0], at[0]
return d, v, a, at
@njit('float64[:, :, :](float64, float64[:, :], float64[:], float64, float64)', fastmath=True, parallel=True, cache=True)
def _spectra_kernel(dt, rec, period, zeta, mass):
"""
Compute response spectra using Newmark method.
Parameters
----------
dt : float
Time step.
rec : ndarray
Input acceleration (2D).
period : ndarray
Spectral periods.
zeta : float
Damping ratio.
mass : float
SDOF mass.
Returns
-------
ndarray
Spectra array (3, n_rec, n_period): [SD, SV, SA].
"""
n_rec = rec.shape[0]
npts = rec.shape[1]
n_sdf = period.shape[-1]
# Output: (SD, SV, SA)
spectra_vals = np.zeros((3, n_rec, n_sdf))
# Constants
gamma = 0.5
beta = 1.0 / 6.0
MIN_STEPS_PER_CYCLE = 20.0
for j in prange(n_sdf):
T = period[j]
# SAFETY: Handle T=0 or negative periods
if T <= 1e-6:
# For T=0 (Rigid), Response = Ground Motion
# SD=0, SV=0, SA = Max Ground Acc (PGA)
for r in range(n_rec):
pga = 0.0
for i in range(npts):
val = abs(rec[r, i])
if val > pga: pga = val
spectra_vals[2, r, j] = pga
continue # Skip to next period
wn = 2 * np.pi / T
k = mass * wn**2
c = 2 * mass * wn * zeta
# Sub-stepping Logic
if dt > (T / MIN_STEPS_PER_CYCLE):
n_sub = int(np.ceil(dt / (T / MIN_STEPS_PER_CYCLE)))
else:
n_sub = 1
dt_sub = dt / n_sub
# Newmark Coefficients (Linear Acceleration)
# use dt_sub for all dynamic stiffness calculations
a1 = mass / (beta * dt_sub**2) + c * gamma / (beta * dt_sub)
a2 = mass / (beta * dt_sub) + c * (gamma / beta - 1)
a3 = mass * (1 / (2 * beta) - 1) + c * dt_sub * (gamma / (2 * beta) - 1)
k_hat = k + a1
for r in range(n_rec):
# We must ensure previous state is exactly 0.0
disp_prev = 0.0
vel_prev = 0.0
# Initial acceleration (assuming starting from rest)
# ma + cv + kd = p -> ma = p -> a = p/m = -ug
acc_prev = -rec[r, 0]
# Initialize Max Trackers
sd_max = 0.0
sv_max = 0.0
# SA is Total Acceleration: a_rel + a_ground
# At t=0: -rec[0] + rec[0] = 0.0
sa_max = 0.0
for i in range(npts - 1):
ug_start = rec[r, i]
ug_end = rec[r, i+1]
# Temp variables for sub-stepping
d_curr = disp_prev
v_curr = vel_prev
a_curr = acc_prev
# Sub-step Loop
for sub in range(n_sub):
# Interpolate Ground Motion
alpha = (sub + 1) / n_sub
ug_now = ug_start + (ug_end - ug_start) * alpha
p_eff = -mass * ug_now
# Newmark Step
dp = p_eff + a1 * d_curr + a2 * v_curr + a3 * a_curr
d_next = dp / k_hat
v_next = ((gamma / (beta * dt_sub)) * (d_next - d_curr) +
(1 - gamma / beta) * v_curr +
dt_sub * a_curr * (1 - gamma / (2 * beta)))
a_next = ((d_next - d_curr) / (beta * dt_sub**2) -
v_curr / (beta * dt_sub) -
a_curr * (1 / (2 * beta) - 1))
# Update state
d_curr = d_next
v_curr = v_next
a_curr = a_next
# Track Maxima (inside sub-steps for precision)
if abs(d_curr) > sd_max: sd_max = abs(d_curr)
if abs(v_curr) > sv_max: sv_max = abs(v_curr)
# Total Acceleration = Relative Acc + Ground Acc
val_sa = abs(a_curr + ug_now)
if val_sa > sa_max: sa_max = val_sa
# End of Sub-loop
disp_prev = d_curr
vel_prev = v_curr
acc_prev = a_curr
# Save final spectra values
spectra_vals[0, r, j] = sd_max
spectra_vals[1, r, j] = sv_max
spectra_vals[2, r, j] = sa_max
return spectra_vals
[docs]
def response_spectra(dt: float, rec: np.ndarray, period: np.ndarray, zeta: float = 0.05):
"""
Calculate response spectra.
Parameters
----------
dt : float
Time step.
rec : ndarray
Input acceleration (1D or 2D).
period : ndarray
Spectral periods.
zeta : float, optional
Damping ratio (default is 0.05).
Returns
-------
sd : ndarray
Spectral displacement.
sv : ndarray
Spectral velocity.
sa : ndarray
Spectral acceleration.
"""
if rec.ndim == 1:
n = 1
rec = rec[None, :]
else:
n = rec.shape[0]
specs = _spectra_kernel(dt, rec, period, zeta, 1.0)
sd, sv, sa = specs[0], specs[1], specs[2]
if n == 1:
sd = sd[0]
sv = sv[0]
sa = sa[0]
return sd, sv, sa
[docs]
def slice_energy(ce: np.ndarray, target_range: tuple[float, float] = (0.001, 0.999)):
"""
Create slice for cumulative energy range.
Parameters
----------
ce : ndarray
Cumulative energy array.
target_range : tuple of float, optional
Energy fraction range (default is (0.001, 0.999)).
Returns
-------
slice
Slice object.
"""
total_energy = ce[-1]
start_idx = np.searchsorted(ce, target_range[0] * total_energy)
end_idx = np.searchsorted(ce, target_range[1] * total_energy)
return slice(start_idx, end_idx + 1)
[docs]
def slice_amplitude(rec: np.ndarray, threshold: float):
"""
Create slice based on amplitude threshold.
Parameters
----------
rec : ndarray
Input signal.
threshold : float
Amplitude threshold.
Returns
-------
slice
Slice object.
Raises
------
ValueError
If no values exceed threshold.
"""
indices = np.nonzero(np.abs(rec) > threshold)[0]
if len(indices) == 0:
raise ValueError("No values exceed the threshold. Consider using a lower threshold value.")
return slice(indices[0], indices[-1] + 1)
[docs]
def slice_freq(freq: np.ndarray, target_range: tuple[float, float] = (0.1, 25.0)):
"""
Create slice for frequency range.
Parameters
----------
freq : ndarray
Frequency array.
target_range : tuple of float, optional
Frequency range (default is (0.1, 25.0)).
Returns
-------
slice
Slice object.
"""
start_idx = np.searchsorted(freq, target_range[0])
end_idx = np.searchsorted(freq, target_range[1])
return slice(start_idx, end_idx + 1)
[docs]
def fas(dt: float, rec: np.ndarray):
"""
Calculate Fourier amplitude spectrum.
Parameters
----------
dt : float
Time step.
rec : ndarray
Input signal.
Returns
-------
ndarray
Fourier amplitude spectrum.
Notes
-----
Scaled by dt for seismological convention. Units are input units × time
(e.g., g·s for acceleration in g, or m/s for acceleration in m/s²).
"""
return np.abs(rfft(rec)) * dt
[docs]
def fps(rec: np.ndarray):
"""
Calculate Fourier phase spectrum (unwrapped).
Parameters
----------
rec : ndarray
Input signal.
Returns
-------
ndarray
Unwrapped phase in radians.
Notes
-----
Unlike fas(), phase spectrum is independent of time step.
"""
complex_coeffs = rfft(rec)
return np.unwrap(np.angle(complex_coeffs))
[docs]
def frequency(npts, dt):
"""
Generate frequency array.
Parameters
----------
npts : int
Number of points.
dt : float
Time step.
Returns
-------
ndarray
Frequency array.
"""
return rfftfreq(npts, dt)
[docs]
def time(npts, dt):
"""
Generate time array.
Parameters
----------
npts : int
Number of points.
dt : float
Time step.
Returns
-------
ndarray
Time array.
"""
return np.linspace(0, (npts - 1) * dt, npts, dtype=np.float64)
[docs]
def zc(rec):
"""
Calculate zero-crossing rate.
Parameters
----------
rec : ndarray
Input signal.
Returns
-------
ndarray
Cumulative zero crossings.
"""
cross_mask = rec[..., :-1] * rec[..., 1:] < 0
cross_vec = np.empty_like(rec, dtype=np.float64)
cross_vec[..., :-1] = cross_mask * 0.5
cross_vec[..., -1] = cross_vec[..., -2]
return np.cumsum(cross_vec, axis=-1)
[docs]
def pmnm(rec):
"""
Calculate positive-minima and negative-maxima count.
Parameters
----------
rec : ndarray
Input signal.
Returns
-------
ndarray
Cumulative PMNM count.
"""
pmnm_mask =((rec[..., :-2] < rec[..., 1:-1]) & (rec[..., 1:-1] > rec[..., 2:]) & (rec[..., 1:-1] < 0) |
(rec[..., :-2] > rec[..., 1:-1]) & (rec[..., 1:-1] < rec[..., 2:]) & (rec[..., 1:-1] > 0))
pmnm_vec = np.empty_like(rec, dtype=np.float64)
pmnm_vec[..., 1:-1] = pmnm_mask * 0.5
pmnm_vec[..., 0] = pmnm_vec[..., 1]
pmnm_vec[..., -1] = pmnm_vec[..., -2]
return np.cumsum(pmnm_vec, axis=-1)
[docs]
def le(rec):
"""
Calculate local extrema count.
Parameters
----------
rec : ndarray
Input signal.
Returns
-------
ndarray
Cumulative local extrema.
"""
mle_mask = ((rec[..., :-2] < rec[..., 1:-1]) & (rec[..., 1:-1] > rec[..., 2:]) |
(rec[..., :-2] > rec[..., 1:-1]) & (rec[..., 1:-1] < rec[..., 2:]))
mle_vec = np.empty_like(rec, dtype=np.float64)
mle_vec[..., 1:-1] = mle_mask * 0.5
mle_vec[..., 0] = mle_vec[..., 1]
mle_vec[..., -1] = mle_vec[..., -2]
return np.cumsum(mle_vec, axis=-1)
[docs]
def ce(dt: float, rec: np.ndarray):
"""
Compute cumulative energy.
Parameters
----------
dt : float
Time step.
rec : ndarray
Input signal.
Returns
-------
ndarray
Cumulative energy array.
"""
return np.cumsum(rec ** 2, axis=-1) * dt
[docs]
def integrate(dt: float, rec: np.ndarray):
"""
Integrate signal using cumulative sum.
Parameters
----------
dt : float
Time step.
rec : ndarray
Input signal.
Returns
-------
ndarray
Integrated signal.
"""
return np.cumsum(rec, axis=-1) * dt
[docs]
def integrate_detrend(dt: float, rec: np.ndarray):
"""
Integrate signal with linear detrending.
Parameters
----------
dt : float
Time step.
rec : ndarray
Input signal.
Returns
-------
ndarray
Integrated and detrended signal.
"""
uvec = integrate(dt, rec)
return uvec - np.linspace(0.0, uvec[-1], len(uvec))
[docs]
def peak_abs_value(rec: np.ndarray):
"""
Calculate peak absolute value.
Parameters
----------
rec : ndarray
Input signal.
Returns
-------
float or ndarray
Peak value.
"""
return np.max(np.abs(rec), axis=-1)
[docs]
def cav(dt: float, rec: np.ndarray):
"""
Calculate cumulative absolute velocity.
Parameters
----------
dt : float
Time step.
rec : ndarray
Input signal.
Returns
-------
float or ndarray
CAV value.
"""
return np.sum(np.abs(rec), axis=-1) * dt
[docs]
def rotate(rec1, rec2, angle):
"""
Rotate two-component signal.
Parameters
----------
rec1 : ndarray
First component.
rec2 : ndarray
Second component.
angle : float
Rotation angle.
Returns
-------
rotated_1 : ndarray
Rotated first component.
rotated_2 : ndarray
Rotated second component.
"""
xr = rec1 * np.cos(angle) - rec2 * np.sin(angle)
yr = rec1 * np.sin(angle) + rec2 * np.cos(angle)
return xr, yr
[docs]
def principal_angle(rec1: np.ndarray, rec2: np.ndarray) -> float:
"""
Find rotation angle for principal axes (max/min energy).
Computes the angle that rotates two orthogonal components to their
principal directions, where one component has maximum energy and
the other has minimum energy.
Parameters
----------
rec1 : ndarray
First orthogonal component.
rec2 : ndarray
Second orthogonal component.
Returns
-------
float
Rotation angle in radians.
Examples
--------
>>> theta = principal_angle(rec_ns, rec_ew)
>>> rec_major, rec_minor = rotate(rec_ns, rec_ew, theta)
"""
cross = 2 * np.sum(rec1 * rec2)
diff = np.sum(rec1**2) - np.sum(rec2**2)
return 0.5 * np.arctan2(cross, diff)