from dataclasses import dataclass import time import numpy as np import pandas as pd import stumpy from statsmodels.nonparametric.smoothers_lowess import lowess from scipy.signal import argrelmin from util.data_prepare import down_sample, get_resolution, COL_VALUE from util import setup_logging from six.moves import range, zip logger = setup_logging.get_logger() @dataclass(frozen=True) class Params_PatternSwitchDetector: MP_WINDOW_LENGTH = pd.Timedelta(days=1) COUNT_CROSS_EDGE = pd.Timedelta(days=1) COUNT_CROSS_STEP = pd.Timedelta(hours=1) SMOOTH_LENGTH = pd.Timedelta(weeks=1) MINIMA_WEIGHT_THRESHOLD = 0.1 PARAM_PSD = Params_PatternSwitchDetector() def detect_pattern_switch(df): """This function implements pattern switch detection in time series based on matrix prlfile. How matrix profile is used for pattern switch detection: For a given point in the time series, the matrix profile API computes the minimum distance between the segment starting from the current point and all other segments of the same length in the time series. Additionally, it provides the position of the segment with the minimal distance. This position info can be treated as a pointer to the minimal-distance segment starting from the initial segment. From the concept of pointer to minimal-distance segment, we can define a test statistic: count of cross. The count of cross for a data point in the time series is defined as the count of those pointers that go cross it. Intuitively, when encountering a pattern switch point within the time series, the segments before the switch resemble those segments before the switch point, and similarly for segments after the switch point. The count of cross on the switch point should be very small. Consequently, the pattern switch detection is implemented by -- identifying significant local minima of the 'count of across' test statistic. Args: df (pandas Dataframe): dataframe containing a time series Returns: list of pandas Timestamp: list of pattern switch points, could be an empty list """ time0 = time.time() df = down_sample(df) df_resolution = get_resolution(df) window_length = int(PARAM_PSD.MP_WINDOW_LENGTH / df_resolution) edge_length = int(PARAM_PSD.COUNT_CROSS_EDGE / df_resolution) step_size = int(PARAM_PSD.COUNT_CROSS_STEP / df_resolution) smooth_length = int(PARAM_PSD.SMOOTH_LENGTH / df_resolution) mp = stumpy.stump(df[COL_VALUE], m=window_length, normalize=True) counts = count_cross( mp[:,1], # pointers of minimal-distance segment edge_length, step_size ) if len(counts) == 0: return [] # smooth the raw count-cross, to remove small local random variations frac = smooth_length / len(df) counts_s = np.array(smooth(counts, frac)) # get the indexes of local minima min_idx_list = argrelmin(counts_s) min_idx_list = min_idx_list[0] # Calcualte weights for each local minimal of the counts_s array # When the counts_s array contains local maxima: # For each local minimal, use its closest local maximal as the reference point, calculate the weight as the relative difference between the two # When the counts_s array contains no local maxima: # use the median of the counts_s array as the reference for weight calculating, instead. min_weights = [] if len(min_idx_list) > 0: max_idx_list = argrelmin(-counts_s) max_idx_list = max_idx_list[0] if len(max_idx_list) > 0: min_closest_neighbor = [max_idx_list[np.argmin(abs(min_idx - max_idx_list))] for min_idx in min_idx_list] min_weights = [(counts_s[min_neighbor] - counts_s[min_idx])/counts_s[min_idx] for (min_idx, min_neighbor) in zip(min_idx_list, min_closest_neighbor)] else: reference_value = np.median(counts_s) min_weights = [(reference_value - counts_s[min_idx])/counts_s[min_idx] for min_idx in min_idx_list] min_weights = np.array(min_weights) # convert indexes of local minima to timestamps switch_time0 = df.index[0] + PARAM_PSD.COUNT_CROSS_EDGE switch_time = np.array([switch_time0 + pd.Timedelta(idx * PARAM_PSD.COUNT_CROSS_STEP) for idx in min_idx_list]) # only return significant local minima, filtered by the weights rslt = switch_time[min_weights > PARAM_PSD.MINIMA_WEIGHT_THRESHOLD] rslt_str = 'no patterh switch detected' if len(rslt) == 0 else f'patterh switch detected: {rslt}' logger.debug(f'pattern switch detection time spent: {time.time() - time0:.2f}; {rslt_str}') return rslt def count_cross(mp_idx, edge, step): """Calculate the test statistic: count-cross Two technical details regarding the calculating of count-cross: 1. Points at the beginning and the ending of the time series natually have smaller count-cross. Instead of applying some normalization methtods, this implementation simply skips the two edges. This choice seems working fine, and faster. 2. To reduce the latency of pattern switch detection, instead of calculating count-cross for all data points, they are calculated every 'step' points. Args: mp_idx (numpy array): array of pointers to minimal-distance segment, returned from matrix profile API edge (int): the size of the beginning and ending of the time series that should be skipped step (int): the size of the stride to calculate count-cross Returns: _type_: _description_ """ if len(mp_idx) <= 2 * edge: return [] counts = np.zeros_like(mp_idx, dtype=int) for i in range(edge, len(mp_idx)-edge, step): counts[i] = sum(mp_idx[:i]>i) + sum(mp_idx[i:]0] def smooth(xs, frac): filtered = lowess(xs, list(range(len(xs))), is_sorted=True, frac=frac, it=0) return [x[1] for x in filtered]