You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

105 lines
2.7 KiB

from __future__ import absolute_import
from dataclasses import dataclass
import pandas as pd
from util.constants import (
NO_DRIFT,
DRIFTED,
DRIFT_PART,
PART_OR_WHOLE,
DRIFT_TYPE,
THRESHOLD_TIME,
START_TIME,
END_TIME,
PERCENT_DRIFT
)
from util.data_prepare import percent_change
from six.moves import range
DRIFTED_STR = 'DRIFTED'
NO_DRIFT_STR = 'NO_DRIFT'
DRIFT_PART_STR = 'DRIFT_PART'
DRIFT_STR_DICT = {
NO_DRIFT : NO_DRIFT_STR,
DRIFTED : DRIFTED_STR,
DRIFT_PART : DRIFT_PART_STR,
}
TREND_DRIFT_TYPE = 'TREND'
LEVEL_DRIFT_TYPE = 'LEVEL'
CONSTANT_INPUT = 'CONSTANT_INPUT'
SHORT_INPUT = 'SHORT_INPUT'
EMPTY_INPUT = 'EMPTY_INPUT'
INPUT_MIN_DATA_POINT = 85
INPUT_MIN_TIME_LENGTH = pd.Timedelta(days=85)
@dataclass
class SegmentInfo:
idx_start : int
idx_end : int
val_start : float
val_end : float
part_or_whole : int = NO_DRIFT
idx_threshold : int = -1
drift_type : str = None
start_time : pd.Timestamp = None
end_time : pd.Timestamp = None
threshold_time : pd.Timestamp = None
def length(self):
return self.idx_end - self.idx_start
def percent_drift(self):
return percent_change(self.val_start, self.val_end)
def __str__(self):
return f'({self.length()}, {self.percent_drift():.1f}%)'
def format_drift_output(self, output_epoch_time):
def time_output(ts):
if output_epoch_time: # convert output timestamps from pandas Timestamp to epoch time
return int(ts.timestamp()) if ts is not None else -1
else:
return str(ts)
return {
PART_OR_WHOLE: DRIFT_STR_DICT[self.part_or_whole],
DRIFT_TYPE: self.drift_type,
PERCENT_DRIFT: int(self.percent_drift()),
START_TIME: time_output(self.start_time),
END_TIME: time_output(self.end_time),
THRESHOLD_TIME: time_output(self.threshold_time),
}
def summarize_drift_result(segments):
drifted_indexes = [i for (i, seg) in enumerate(segments) if seg.part_or_whole == DRIFTED]
if len(drifted_indexes) == 0: # No drifts
return []
rslt = []
idx_pre = -1
for idx in drifted_indexes:
drift_end = segments[idx].idx_end
drift_types = []
for i in range(idx, idx_pre, -1):
seg = segments[i]
if seg.part_or_whole == NO_DRIFT:
break
drift_types.append(seg.drift_type)
drift_start = seg.idx_start
rslt.append((list(reversed(drift_types)), drift_end - drift_start))
idx_pre = idx
return rslt