Source code for sigkit.impairments.phase_shift
"""Phase Shift Module utilized for impairments."""
from typing import Tuple
import numpy as np
from sigkit.core.base import SigKitError, Signal
from sigkit.impairments.base import Impairment
[docs]
class PhaseShift(Impairment):
"""Apply a constant or random phase offset to a baseband Signal.
Args:
phase_offset:
- If float: apply a fixed phase (radians).
- If tuple of two numbers (min_phase, max_phase):
pick a random phase (per call) uniformly in [min_phase, max_phase].
"""
def __init__(self, phase_offset: float | Tuple[float, float] = (-np.pi, np.pi)):
if isinstance(phase_offset, (int, float)):
self.phase_range = (float(phase_offset), float(phase_offset))
elif (
isinstance(phase_offset, (tuple, list))
and len(phase_offset) == 2
and all(isinstance(p, (int, float)) for p in phase_offset)
):
self.phase_range = (float(phase_offset[0]), float(phase_offset[1]))
else:
raise SigKitError(
"phase_offset must be a number or a 2‐tuple/list of numbers, "
f"got {phase_offset!r}"
)
[docs]
def apply(self, signal: Signal) -> Signal:
x: np.ndarray = signal.samples
if x.dtype != np.complex64:
raise SigKitError(
"PhaseShift impairment expects samples to be np.complex64."
)
min_ph, max_ph = self.phase_range
if min_ph == max_ph: # fixed offset
phi = min_ph
else:
phi = float(np.random.uniform(min_ph, max_ph))
phase_factor = np.exp(1j * phi).astype(np.complex64)
shifted = (x * phase_factor).astype(np.complex64)
return Signal(
samples=shifted,
sample_rate=signal.sample_rate,
carrier_frequency=signal.carrier_frequency,
)