Source code for kolzur_filter

# -*- coding: utf-8 -*-

# kolzur_filter module - NumPy implementation of the Kolmogorov-Zurbenko filter
# Copyright (C) 2017  Mathieu Schopfer
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


"""Numpy implementation of the Kolmogorov-Zurbenko filter

https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Zurbenko_filter

.. todo:: Implement the KZ adaptive filter.
"""

import numpy as np

__author__ = 'Mathieu Schopfer'
__version__ = '2017-03-31'


[docs]def sliding_window(arr, window): """Apply a sliding window on a numpy array. :param numpy.ndarray arr: An array of shape `(n1, ..., nN)` :param int window: Window size. :return: A :class:`numpy.ndarray` of shape `(n1, ..., nN-window+1, window)`. .. seealso:: Source http://stackoverflow.com/a/6811241/3849212 Usage (1D): .. doctest:: >>> arr = np.arange(10) >>> arrs = sliding_window(arr, 5) >>> arrs.shape (6, 5) >>> print(arrs[0]) [0 1 2 3 4] >>> print(arrs[1]) [1 2 3 4 5] Usage (2D): .. doctest:: >>> arr = np.arange(20).reshape(2, 10) >>> arrs = sliding_window(arr, 5) >>> arrs.shape (2, 6, 5) >>> print(arrs[0, 0]) [0 1 2 3 4] >>> print(arrs[0, 1]) [1 2 3 4 5] """ # Advanced numpy tricks shape = arr.shape[:-1] + (arr.shape[-1]-window+1, window) strides = arr.strides + (arr.strides[-1],) return np.lib.stride_tricks.as_strided(arr, shape=shape, strides=strides)
[docs]def _kz_coeffs(m, k): """Calculate coefficients of the Kolmogorov–Zurbenko filter :return: A :class:`numpy.ndarray` of size `k*(m-1)+1` This functions returns the normlalised coefficients :math:`a_s^{m,k}/m^k`. .. rubric:: Coefficients definition A definition of the Kolmogorov–Zurbenko filter coefficients is provided in `this article <http://onlinelibrary.wiley.com/doi/10.1002/wics.71/pdf>`_. Coefficients :math:`a_s^{m,k}` are the coefficients of the polynomial function: .. math:: (1 + z + \\cdots + z^{m-1})^k = \\sum_{s=-k(m-1)/2}^{k(m-1)/2} a_s^{m,k} \\cdot z^{s+k(m-1)/2} The :math:`a_s^{m,k}` coefficients are calculated by iterating over :math:`k`. .. rubric:: Calculation example for m=5 and k=3 Let us define the polynomial function .. math:: P(z) = 1 + z + z^2 + z^3 + z^4. At :math:`k=1`, the coefficients :math:`a_s^{m,k}=a_s^{1,5}` are that of :math:`P(z)`, .. math:: \\left(\\begin{matrix} 1 & 1 & 1 & 1 & 1 \\end{matrix}\\right). At :math:`k=2`, we want to calculate the coefficients of polynomial function :math:`P(z)\\cdot P(z)`, of degree 8. First, we calculate the polynomial functions :math:`P(z)`, :math:`zP(z)`, :math:`z^2P(z)` and :math:`z^3P(z)` and then sum them. Let us represent the coefficients of these functions in a table, with monomial elements in columns: .. math:: \\begin{array}{r|ccccccccc} & z^0 & z^1 & z^2 & z^3 & z^4 & z^5 & z^6 & z^7 & z^8 \\\\ \\hline P(z) & 1 & 1 & 1 & 1 & 1 & 0 & 0 & 0 & 0 \\\\ zP(z) & 0 & 1 & 1 & 1 & 1 & 1 & 0 & 0 & 0 \\\\ z^2P(z) & 0 & 0 & 1 & 1 & 1 & 1 & 1 & 0 & 0 \\\\ z^3P(z) & 0 & 0 & 1 & 1 & 1 & 1 & 1 & 1 & 0 \\\\ z^4P(z) & 0 & 0 & 0 & 1 & 1 & 1 & 1 & 1 & 1 \\\\ \\hline \\mathrm{Sum} & 1 & 2 & 3 & 4 & 5 & 4 & 3 & 2 & 1 \\end{array} At :math:`k=3`, we want to calculate the coefficients of polynomial function :math:`P(z)\\cdot P(z)^2`, of degree 12. We use the same representation: .. math:: \\begin{array}{r|ccccccccccccc} & z^0 & z^1 & z^2 & z^3 & z^4 & z^5 & z^6 & z^7 & z^8 & z^9 & z^{10} & z^{11} & z^{12} \\\\ \\hline P(z)^2 & 1 & 2 & 3 & 4 & 5 & 4 & 3 & 2 & 1 & 0 & 0 & 0 & 0 \\\\ zP(z)^2 & 0 & 1 & 2 & 3 & 4 & 5 & 4 & 3 & 2 & 1 & 0 & 0 & 0 \\\\ z^2P(z)^2 & 0 & 0 & 1 & 2 & 3 & 4 & 5 & 4 & 3 & 2 & 1 & 0 & 0 \\\\ z^3P(z)^2 & 0 & 0 & 0 & 1 & 2 & 3 & 4 & 5 & 4 & 3 & 2 & 1 & 0 \\\\ z^4P(z)^2 & 0 & 0 & 0 & 0 & 1 & 2 & 3 & 4 & 5 & 4 & 3 & 2 & 1 \\\\ \\hline \\mathrm{Sum} & 1 & 3 & 6 & 10 & 15 & 18 & 19 & 18 & 15 & 10 & 6 & 3 & 1 \\end{array} .. doctest:: >>> c = _kz_coeffs(3, 1) >>> print(c) [ 0.33333333 0.33333333 0.33333333] >>> c = _kz_coeffs(3, 2) >>> print(c*3**2) [ 1. 2. 3. 2. 1.] >>> c = _kz_coeffs(5, 3) >>> print(c*5**3) [ 1. 3. 6. 10. 15. 18. 19. 18. 15. 10. 6. 3. 1.] """ # Coefficients at degree one coef = np.ones(m) # Iterate k-1 times over coefficients for i in range(1, k): t = np.zeros((m, m+i*(m-1))) for km in range(m): t[km, km:km+coef.size] = coef coef = np.sum(t, axis=0) assert coef.size == k*(m-1)+1 return coef/m**k
[docs]def _kz_prod(data, coef, m, k, t=None): n = data.size data = sliding_window(data, k*(m-1)+1) assert data.shape == (n-k*(m-1), len(coef)) # Restrict KZ product calculation to provided indices if t is not None: data = data[t] assert data.shape == (len(t), len(coef)) return data*coef
[docs]def _kz_sum(data, coef): knan = np.isnan(data) # Handle missing values if any if np.any(knan): coef = np.ma.MaskedArray(np.broadcast_to(coef[np.newaxis, :], data.shape), mask=knan) coef = np.sum(coef, axis=-1) data = np.nansum(data, axis=-1) # Restore nan were data are missing data[coef.mask] = np.nan # Divide by coefficients sum, which may not be 1 k = np.logical_not(coef.mask) data[k] = data[k]/coef[k] return data else: return np.sum(data, axis=-1)
[docs]def kz_filter(data, m, k): """Kolmogorov-Zurbenko fitler :param numpy.ndarray data: A 1-dimensional numpy array of size `N`. Any missing value should be set to ``np.nan``. :param int m: Filter window width. :param int k: Filter degree. :return: A :class:`numpy.ndarray` of size `N-k*(m-1)` Given a time series :math:`X_t, t \\in \\{0, 1, \\cdots, N-1\\}`, the Kolmogorov-Zurbenko fitler is defined for :math:`t \\in \\{\\frac{k(m-1)}{2}, \\cdots, N-1-\\frac{k(m-1)}{2}\\}` by .. math:: KZ_{m,k}[X_t] = \\sum_{s=-k(m-1)/2}^{k(m-1)/2} \\frac{a_s^{m,k}}{m^k} \\cdot X_{t+s} Definition of coefficients :math:`a_s^{m,k}` is given in :func:`_kz_coeffs`. """ coef = _kz_coeffs(m, k) data = _kz_prod(data, coef, m, k) return _kz_sum(data, coef)
[docs]def kzft(data, nu, m, k, t=None, dt=1.): """Kolmogorov-Zurbenko Fourier transform filter :param numpy.ndarray data: A 1-dimensional numpy array of size `N`. Any missing value should be set to ``np.nan``. :param list-like nu: Frequencies, length `Nnu`. :param int m: Filter window width. :param int k: Filter degree. :param list-like t: Calculation indices, of length `Nt`. If provided, KZFT filter will be calculated only for values ``data[t]``. Note that the KZFT filter can only be calculated for indices in the range [k(m-1)/2, (N-1)-k(m-1)/2]. Trying to calculate the KZFT out of this range will raise an `IndexError`. `None`, calculation will happen over the whole calculable range. :param float dt: Time step, if not 1. :return: A :class:`numpy.ndarray` of shape `(Nnu, Nt)` or `(Nnu, N-k(m-1))` if `t` is `None`. :raise IndexError: If `t` contains one or more indices out of the calculation range. See documentation of keyword argument `t`. Given a time series :math:`X_t, t \\in \\{0, 1, \\cdots, N-1\\}`, the Kolmogorov-Zurbenko Fourier transform filter is defined for :math:`t \\in \\{\\frac{k(m-1)}{2}, \\cdots, N-1-\\frac{k(m-1)}{2}\\}` by .. math:: KZFT_{m,k,\\nu}[X_t] = \\sum_{s=-k(m-1)/2}^{k(m-1)/2} \\frac{a_s^{m,k}}{m^k} \\cdot X_{t+s} \\cdot e^{-2\\pi i\\nu s} """ if not dt == 1.: nu = np.asarray(nu)*dt m = int((m-1)/dt+1) if not m%2: m += 1 if t is not None: w = int(k*(m-1)/2) t = np.asarray(t)-w if np.any(t < 0) or np.any(t > (data.size-1-2*w)): raise IndexError('Inpunt calculation indices are out of range. Calculation indices should be in the range ' '[k*(m-1)/2, (N-1)-k*(m-1)/2], hence [{}, {}] in the present case.' .format(w, data.size-1-w)) coef = _kz_coeffs(m, k) data = _kz_prod(data, coef, m, k, t=t) nu = np.asarray(nu) s = k*(m-1)/2 s = np.arange(-s, s+1) s = np.exp(-1j*2*np.pi*nu[:, np.newaxis]*s) data = data[np.newaxis]*s[:, np.newaxis] return _kz_sum(data, coef)
[docs]def kzp(data, nu, m, k, dt=1.): """Kolmogorov-Zurbenko periodogram :param numpy.ndarray data: A 1-dimensional numpy array of size `N`. Any missing value should be set to ``np.nan``. :param list-like nu: Frequencies, length `Nnu`. :param int m: Filter window width. :param int k: Filter degree. :param float dt: Time step, if not 1. :return: A :class:`numpy.ndarray` os size `Nnu`. Given a time series :math:`X_t, t \\in \\{0, 1, \\cdots, N-1\\}`, the Kolmogorov-Zurbenko periodogram is defined by .. math:: KZP_{m,k}(\\nu) = \\sqrt{\\sum_{h=0}^{T-1} \\lvert 2 \\cdot KZFT_{m,k,\\nu}[X_{hL+k(m-1)/2}] \\rvert ^2} where :math:`L=(N-w)/(T-1)` is the distance between the beginnings of two successive intervals, :math:`w` being the calculation window width of the :func:`kzft` and :math:`T` the number of intervals. The assumption was made that :math:`L \\ll w \\ll N`, implying that the intervals overlap. """ if not dt == 1.: nu = nu*dt m = int((m-1)/dt+1) if not m%2: m += 1 # w is the width of the KZFT. As m is odd, k*(m-1) is always even, so w is always odd. w = k*(m-1)+1 # Distance between two successve intervals l = int(m/10) nt = int((data.size-w)/l+1) # Calculation indices l = np.arange(nt-1)*l+k*(m-1)/2 l = np.floor(l).astype(int) return np.sqrt(np.nanmean(np.square(2*np.abs(kzft(data, nu, m, k, t=l))), axis=-1))