# -*- coding: utf-8 -*-
import numpy as np, librosa
from itertools import chain
from math import ceil
from copy import copy
from .base import _Base
from .internals import _Validator
__all__ = [
'Transform', 'Identity',
'GaussianWhiteNoise',
'TimeStretch', 'PitchShift',
'EdgeCrop', 'RandomCrop',
'LinearFade',
'Normalize', 'PreEmphasize', 'ExtractLoudestSection',
'MedianFilter',
'Reverb', 'ClipDistort'
]
[docs]class Identity(Transform):
"""Applies an identity transformation to a signal.
Notes
-----
- A sampling rate **is not** required when applying this transformation.
"""
def __init__(self):
super().__init__(p=1., random_state=None)
def __call__(self, X, sr=None):
return self._flatten(self._val.signal(copy(X)))
[docs]class GaussianWhiteNoise(Transform):
"""Applies additive Gaussian white noise to the signal.
Parameters
----------
scale: float [scale > 0] or (float, float)
| Amount to scale the value sampled from the standard normal distribution.
| Essentially the variance :math:`\sigma^2`.
Notes
-----
- A sampling rate **is not** required when applying this transformation.
"""
def __init__(self, scale, p=1., random_state=None):
super().__init__(p, random_state)
self.scale = self._val.float_value(
scale, 'scale (scale parameter)',
lambda a, b: 0. < a <= b, 'positive')
def _transform(self, X, sr):
X = self._val.signal(X)
scale = self.random_state.uniform(*self.scale)
# Generate the additive Gaussian white signal noise
noise = self.random_state.normal(loc=0, scale=scale, size=X.shape)
# Return the signal with added noise
return X + noise
[docs]class TimeStretch(Transform):
"""Stretches the duration or speed of the signal without affecting its pitch.
Parameters
----------
rate: float [rate > 0] or (float, float)
Stretch rate.
- If `rate < 1`, the signal is slowed down.
- If `rate > 1`, the signal is sped up.
Notes
-----
- A sampling rate **is not** required when applying this transformation.
"""
def __init__(self, rate, p=1., random_state=None):
super().__init__(p, random_state)
self.rate = self._val.float_value(
rate, 'rate (stretch rate)',
lambda a, b: 0. < a <= b, 'positive')
def _transform(self, X, sr):
X = self._val.signal(X)
rate = self.random_state.uniform(*self.rate)
# Return the signal with time stretching applied to each channel independently
return np.apply_along_axis(librosa.effects.time_stretch, 1, np.asfortranarray(X.T).T, rate=rate)
[docs]class PitchShift(Transform):
"""Shifts the pitch of the signal without changing its duration or speed.
Parameters
----------
n_steps: float [-12 ≤ n_steps ≤ 12] or (float, float)
Number of semitones to shift.
Notes
-----
- A sampling rate **is** required when applying this transformation.
"""
def __init__(self, n_steps, p=1., random_state=None):
super().__init__(p, random_state)
self.n_steps = self._val.float_value(
n_steps, 'n_steps (number of semitones to shift)',
lambda a, b: -12. <= a <= b <= 12., 'between -12 and 12')
def _transform(self, X, sr):
X = self._val.signal(X)
sr = self._val.restricted_integer(
sr, 'sr (sample rate)',
lambda x: x > 0, 'positive')
n_steps = self.random_state.uniform(*self.n_steps)
# Return the signal with pitch shifting applied to each channel independently
return np.apply_along_axis(librosa.effects.pitch_shift, 1, np.asfortranarray(X.T).T, sr=sr, n_steps=n_steps)
[docs]class EdgeCrop(Transform):
"""Crops a section from the start or end of the signal.
Parameters
----------
side: {'start', 'end'}
The side of the signal to crop.
crop_size: float [0 < crop_size ≤ 0.5] or (float, float)
The fraction of the signal duration to crop from the chosen `side`.
Notes
-----
- A sampling rate **is not** required when applying this transformation.
"""
def __init__(self, side, crop_size, p=1., random_state=None):
super().__init__(p, random_state)
self.side = self._val.one_of(
side, 'side (side to crop)',
['start', 'end'])
self.crop_size = self._val.float_value(
crop_size, 'crop_size (fraction of signal duration)',
lambda a, b: 0. < a <= b <= 0.5, 'between zero and a half')
def _transform(self, X, sr):
X = self._val.signal(X)
crop_size = self.random_state.uniform(*self.crop_size)
# Calculate the number of frames to crop
crop_frames = int(crop_size * X.shape[1])
# Remove the frames from the start or end of the signal
return X[:, crop_frames:] if self.side == 'start' else X[:, :-crop_frames]
[docs]class RandomCrop(Transform):
"""Randomly crops multiple sections from the signal.
Parameters
----------
crop_size: float [0 < crop_size < 1] or (float, float)
The fraction of the signal duration to crop.
n_crops: int [n_crops > 0] or (int, int)
The number of random crops of size `crop_size` to make.
Notes
-----
- Chunking is done according to the algorithm defined at [1]_.
- `crop_size` :math:`\\times` `n_crops` must not exceed 1.
- A sampling rate **is not** required when applying this transformation.
References
----------
.. [1] https://stackoverflow.com/a/49944026
"""
def __init__(self, crop_size, n_crops, p=1., random_state=None):
super().__init__(p, random_state)
self.crop_size = self._val.float_value(
crop_size, 'crop_size (fraction of signal duration)',
lambda a, b: 0. < a <= b < 1., 'between zero and one')
self.n_crops = self._val.integer_value(
n_crops, 'n_crops (number of crops)',
lambda a, b: b >= a > 0, 'positive')
if self.crop_size[1] * self.n_crops[1] >= 1.:
raise ValueError('Expected maximum possible crop_size * n_crops to be less than one')
def _transform(self, X, sr):
X = self._val.signal(X)
n_crops = self.random_state.randint(self.n_crops[0], self.n_crops[1] + 1)
# Convert crop_size fraction to number of frames
length = X.shape[1]
lower_crop_size, upper_crop_size = int(self.crop_size[0] * length), int(self.crop_size[1] * length)
# Get at least enough random chunk sizes in the specified range (i.e. lower <= n <= upper)
ns = self.random_state.randint(lower_crop_size, upper_crop_size + 1, size=length//lower_crop_size)
# Add up the chunk sizes to get the indices at which we'll slice up the input array
idxs = np.add.accumulate(ns)
# Truncate idxs so that its contents are all valid indices with respect to signal
idxs = idxs[:np.searchsorted(idxs, length)]
# Retrieve chunks from the signal using idxs
chunks = [X[:, start:end] for start, end in zip(chain([None], idxs), chain(idxs, [None]))]
# Return signal with chunks removed
remove_idxs = self.random_state.choice(range(len(chunks)), n_crops, replace=False)
return np.hstack([c for i, c in enumerate(chunks) if i not in remove_idxs])
[docs]class LinearFade(Transform):
"""Linearly fades the signal in or out.
Parameters
----------
direction: {'in', 'out'}
The direction to fade the signal.
fade_size: float [0 < fade_size ≤ 0.5] or (float, float)
The fraction of the signal to fade in the chosen `direction`.
Notes
-----
- A sampling rate **is not** required when applying this transformation.
"""
def __init__(self, direction, fade_size, p=1., random_state=None):
super().__init__(p, random_state)
self.direction = self._val.one_of(
direction, 'direction (direction to fade)',
['in', 'out'])
self.fade_size = self._val.float_value(
fade_size, 'fade_size (fraction of signal duration)',
lambda a, b: 0. < a <= b <= 0.5, 'between zero and a half')
def _transform(self, X, sr):
X = self._val.signal(X)
fade_size = self.random_state.uniform(*self.fade_size)
# Calculate the number of frames to fade
fade_frames = int(fade_size * X.shape[1])
# Generate scalars for fading
scalars = np.arange(1, fade_frames + 1).reshape(1, -1) / float(fade_frames)
# Fade the signal from the start or end with the scalars
if self.direction == 'in':
X[:, :fade_frames] *= scalars
else:
X[:, -fade_frames:] *= np.flip(scalars)
# Return the faded signal
return X
[docs]class Normalize(Transform):
"""Normalizes the signal by dividing each sample by the maximum absolute sample amplitude.
Parameters
----------
independent: bool
Whether or not to normalize each channel independently.
Notes
-----
- A sampling rate **is not** required when applying this transformation.
"""
def __init__(self, independent=True, p=1., random_state=None):
super().__init__(p, random_state)
self.independent = self._val.boolean(
independent, 'independent (whether to independently normalize channels)')
def _transform(self, X, sr):
X = self._val.signal(X)
# Return the normalized signal (treat each channel separately if independent=True)
return X / (np.max(np.abs(X), axis=1, keepdims=True) if self.independent else np.max(np.abs(X)))
[docs]class PreEmphasize(Transform):
"""Pre-emphasizes the signal by applying a first-order high-pass filter.
.. math::
x'[t] = \\begin{cases}
x[t] & \\text{if $t=0$} \\\\
x[t] - \\alpha x[t-1] & \\text{otherwise}
\\end{cases}
Parameters
----------
alpha: float [0 < alpha ≤ 1] or (float, float)
Pre-emphasis coefficient.
Notes
-----
- A sampling rate **is not** required when applying this transformation.
"""
def __init__(self, alpha=0.95, p=1., random_state=None):
super().__init__(p, random_state)
self.alpha = self._val.float_value(
alpha, 'alpha (pre-emphasis coefficient)',
lambda a, b: 0. < a <= b <= 1., 'between zero and one')
def _transform(self, X, sr):
X = self._val.signal(X)
alpha = self.random_state.uniform(*self.alpha)
# Return the pre-emphasized signal
return np.append(X[:, 0][:, None], X[:, 1:] - alpha * X[:, :-1], axis=1)
[docs]class Reverb(Transform):
"""Applies reverb to the signal.
Parameters
----------
delay: float [0 < delay ≤ 1] or (float, float)
Fraction of signal diration to delay reverberated samples by.
decay: float [0 < decay ≤ 1] or (float, float)
Scalar to decay reverberated samples by.
Notes
-----
- See [3]_ for more details on the implementation.
References
----------
.. [3] https://stackoverflow.com/a/1117249
"""
def __init__(self, delay, decay, p=1., random_state=None):
super().__init__(p, random_state)
self.delay = self._val.float_value(
delay, 'delay (fraction of signal duration)',
lambda a, b: 0. < a <= b <= 1., 'between zero and one')
self.decay = self._val.float_value(
decay, 'decay (scalar to decay samples by)',
lambda a, b: 0. < a <= b <= 1., 'between zero and one')
def _transform(self, X, sr):
X = self._val.signal(X)
delay = self.random_state.uniform(*self.delay)
decay = self.random_state.uniform(*self.decay)
# Calculate the number of frames to delay
C, T = X.shape
delay_frames = int(delay * T)
# Decay and delay the signal
out = np.zeros((C, T))
for t in range(T):
out[:, t] += X[:, t]
if t < T - delay_frames:
out[:, t + delay_frames] += X[:, t] * decay
# Return the reverberated signal
return out
[docs]class ClipDistort(Transform):
"""Applies clipping distortion to the signal according to a percentile clipping threshold.
Parameters
----------
percentile: int [0 < percentile ≤ 100]
Percentile of sample amplitudes to use as a clipping threshold.
independent: boolean
Whether or not to independently distort channels by calculating individual percentiles.
"""
def __init__(self, percentile, independent=False, p=1., random_state=None):
super().__init__(p, random_state)
self.percentile = self._val.integer_value(
percentile, 'percentile (clipping threshold)',
lambda a, b: 0 < a <= b <= 100, 'between zero and 100')
self.independent = self._val.boolean(
independent, 'independent (whether to independently distort channels)')
def _transform(self, X, sr):
X = self._val.signal(X)
percentile = self.random_state.randint(self.percentile[0], self.percentile[1] + 1)
# Return the distorted signal by clipping at the percentile threshold
clip = lambda signal, percentile: signal.clip(max=np.percentile(signal, percentile))
return np.apply_along_axis(clip, 1, X, percentile) if self.independent else clip(X, percentile)