Intervals Module¶
- class tempo.intervals.IntervalsDF(df: DataFrame, start_ts: str, end_ts: str, series_ids: Iterable[str] | None = None)[source]¶
Bases:
object
This object is the main wrapper over a Spark DataFrame which allows a user to parallelize computations over snapshots of metrics for intervals of time defined by a start and end timestamp and various dimensions.
The required dimensions are series (list of columns by which to summarize), metrics (list of columns to analyze), start_ts (timestamp column), and end_ts (timestamp column). start_ts and end_ts can be epoch or TimestampType.
- classmethod fromStackedMetrics(df: DataFrame, start_ts: str, end_ts: str, series: list[str], metrics_name_col: str, metrics_value_col: str, metric_names: list[str] | None = None) IntervalsDF [source]¶
Returns a new
IntervalsDF
with metrics of the current DataFrame pivoted by start and end timestamp and series.There are two versions of fromStackedMetrics. One that requires the caller to specify the list of distinct metric names to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct metric names internally.
- Parameters:
df (DataFrame) –
DataFrame
to wrap withIntervalsDF
start_ts (str) – Name of the column which denotes interval start
end_ts (str) – Name of the column which denotes interval end
series (list[str]) – column names
metrics_name_col (str) – column name
metrics_value_col (str) – column name
metric_names (list[str], optional) – List of metric names that will be translated to columns in the output
IntervalsDF
.
- Returns:
A new
IntervalsDF
with a column and respective values per distinct metric in metrics_name_col.- Example:
df = spark.createDataFrame( [["2020-08-01 00:00:09", "2020-08-01 00:00:14", "v1", "metric_1", 5], ["2020-08-01 00:00:09", "2020-08-01 00:00:11", "v1", "metric_2", 0]], "start_ts STRING, end_ts STRING, series_1 STRING, metric_name STRING, metric_value INT", ) # With distinct metric names specified idf = IntervalsDF.fromStackedMetrics( df, "start_ts", "end_ts", ["series_1"], "metric_name", "metric_value", ["metric_1", "metric_2"], ) idf.df.collect() [Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null), Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=null, metric_2=0)] # Or without specifying metric names (less efficient) idf = IntervalsDF.fromStackedMetrics(df, "start_ts", "end_ts", ["series_1"], "metric_name", "metric_value") idf.df.collect() [Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null), Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=null, metric_2=0)]
- property interval_boundaries: list[str]¶
- make_disjoint() IntervalsDF [source]¶
Returns a new
IntervalsDF
where metrics of overlapping time intervals are correlated and merged prior to constructing new time interval boundaries ( start and end timestamp) so that all intervals are disjoint.The merge process assumes that two overlapping intervals cannot simultaneously report two different values for the same metric unless recorded in a data type which supports multiple elements (such as ArrayType, etc.).
This is often used after
fromStackedMetrics()
to reduce the number of metrics with null values and helps when constructing filter predicates to retrieve specific metric values across all instances.- Returns:
A new
IntervalsDF
containing disjoint time intervals- Example:
df = spark.createDataFrame( [["2020-08-01 00:00:10", "2020-08-01 00:00:14", "v1", 5, null], ["2020-08-01 00:00:09", "2020-08-01 00:00:11", "v1", null, 0]], "start_ts STRING, end_ts STRING, series_1 STRING, metric_1 STRING, metric_2 INT", ) idf = IntervalsDF(df, "start_ts", "end_ts", ["series_1"], ["metric_1", "metric_2"]) idf.disjoint().df.collect() [Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:10', series_1='v1', metric_1=null, metric_2=0), Row(start_ts='2020-08-01 00:00:10', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=5, metric_2=0), Row(start_ts='2020-08-01 00:00:11', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null)]
- property metric_columns: list[str]¶
- property observational_columns: list[str]¶
- property structural_columns: list[str]¶
- toDF(stack: bool = False) DataFrame [source]¶
Returns a new Spark DataFrame converted from
IntervalsDF
.There are two versions of toDF. One that will output columns as they exist in
IntervalsDF
and, one that will stack metric columns into metric_names and metric_values columns populated with their respective values. The latter can be thought of as the inverse offromStackedMetrics()
.Based on pyspark.sql.DataFrame.toDF.
- Parameters:
stack (bool, optional) – How to handle metric columns in the conversion to a DataFrame
- Returns:
- union(other: IntervalsDF) IntervalsDF [source]¶
Returns a new
IntervalsDF
containing union of rows in this and anotherIntervalsDF
.This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().
Also, as standard in SQL, this function resolves columns by position (not by name).
Based on pyspark.sql.DataFrame.union.
- Parameters:
other (
IntervalsDF
) –IntervalsDF
to union- Returns:
A new
IntervalsDF
containing union of rows in this and other
- unionByName(other: IntervalsDF) IntervalsDF [source]¶
Returns a new
IntervalsDF
containing union of rows in this and anotherIntervalsDF
.This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().
Based on pyspark.sql.DataFrame.unionByName; however, allowMissingColumns is not supported.
- Parameters:
other (
IntervalsDF
) –IntervalsDF
to unionByName- Returns:
A new
IntervalsDF
containing union of rows in this and other
- property window: WindowSpec¶
- tempo.intervals.add_as_disjoint(interval: Series, disjoint_set: DataFrame | None, interval_boundaries: Iterable[str], series_ids: Iterable[str], metric_columns: Iterable[str]) DataFrame [source]¶
returns a disjoint set consisting of the given interval, made disjoint with those already in disjoint_set
- tempo.intervals.check_for_nan_values(to_check: Any) bool [source]¶
return True if there are any NaN values in to_check
- tempo.intervals.identify_interval_overlaps(in_pdf: DataFrame, with_row: Series, interval_start_ts: str, interval_end_ts: str) DataFrame [source]¶
return the subset of rows in DataFrame in_pdf that overlap with row with_row
- tempo.intervals.interval_ends_before(*, interval: Series, other: Series, interval_end_ts: str, other_end_ts: str | None = None) bool [source]¶
return True if interval_a ends before interval_b ends
- tempo.intervals.interval_is_contained_by(*, interval: Series, other: Series, interval_start_ts: str, interval_end_ts: str, other_start_ts: str | None = None, other_end_ts: str | None = None) bool [source]¶
return True if interval is contained in other
- tempo.intervals.interval_starts_before(*, interval: Series, other: Series, interval_start_ts: str, other_start_ts: str | None = None) bool [source]¶
return True if interval_a starts before interval_b starts
- tempo.intervals.intervals_boundaries_are_equivalent(interval: Series, other: Series, interval_start_ts: str, interval_end_ts: str, other_start_ts: str | None = None, other_end_ts: str | None = None) bool [source]¶
return True if interval_a is equivalent to interval_b
- tempo.intervals.intervals_do_not_overlap(*, interval: Series, other: Series, interval_start_ts: str, interval_end_ts: str, other_start_ts: str | None = None, other_end_ts: str | None = None) bool [source]¶
- tempo.intervals.intervals_have_equivalent_metric_columns(interval_a: Series, interval_b: Series, metric_columns: Iterable[str]) bool [source]¶
return True if interval_a and interval_b have identical metrics
return True if interval_a and interval_b share an end boundary
return True if interval_a and interval_b share a start boundary
- tempo.intervals.make_disjoint_wrap(start_ts: str, end_ts: str, series_ids: Iterable[str], metric_columns: Iterable[str]) Callable[[DataFrame], DataFrame] [source]¶
- tempo.intervals.merge_metric_columns_of_intervals(*, main_interval: Series, child_interval: Series, metric_columns: Iterable[str], metric_merge_method: bool = False) Series [source]¶
return the merged metrics of interval_a and interval_b
- tempo.intervals.resolve_all_overlaps(with_row: Series, overlaps: DataFrame, with_row_start_ts: str, with_row_end_ts: str, series_ids: Iterable[str], metric_columns: Iterable[str], overlap_start_ts: str | None = None, overlap_end_ts: str | None = None) DataFrame [source]¶
resolve the interval x against all overlapping intervals in overlapping, returning a set of disjoint intervals with the same spans https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.apply.html
- tempo.intervals.resolve_overlap(interval: Series, other: Series, interval_start_ts: str, interval_end_ts: str, series_ids: Iterable[str], metric_columns: Iterable[str], other_start_ts: str | None = None, other_end_ts: str | None = None) list[Series] [source]¶
resolve overlaps between the two given intervals, splitting them as necessary into some set of disjoint intervals