Intervals Module

class tempo.intervals.IntervalsDF(df: DataFrame, start_ts: str, end_ts: str, series_ids: Iterable[str] | None = None)[source]

Bases: object

This object is the main wrapper over a Spark DataFrame which allows a user to parallelize computations over snapshots of metrics for intervals of time defined by a start and end timestamp and various dimensions.

The required dimensions are series (list of columns by which to summarize), metrics (list of columns to analyze), start_ts (timestamp column), and end_ts (timestamp column). start_ts and end_ts can be epoch or TimestampType.

classmethod fromStackedMetrics(df: DataFrame, start_ts: str, end_ts: str, series: list[str], metrics_name_col: str, metrics_value_col: str, metric_names: list[str] | None = None) IntervalsDF[source]

Returns a new IntervalsDF with metrics of the current DataFrame pivoted by start and end timestamp and series.

There are two versions of fromStackedMetrics. One that requires the caller to specify the list of distinct metric names to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct metric names internally.

Parameters:
  • df (DataFrame) – DataFrame to wrap with IntervalsDF

  • start_ts (str) – Name of the column which denotes interval start

  • end_ts (str) – Name of the column which denotes interval end

  • series (list[str]) – column names

  • metrics_name_col (str) – column name

  • metrics_value_col (str) – column name

  • metric_names (list[str], optional) – List of metric names that will be translated to columns in the output IntervalsDF.

Returns:

A new IntervalsDF with a column and respective values per distinct metric in metrics_name_col.

Example:

df = spark.createDataFrame(
    [["2020-08-01 00:00:09", "2020-08-01 00:00:14", "v1", "metric_1", 5],
     ["2020-08-01 00:00:09", "2020-08-01 00:00:11", "v1", "metric_2", 0]],
    "start_ts STRING, end_ts STRING, series_1 STRING, metric_name STRING, metric_value INT",
)

# With distinct metric names specified

idf = IntervalsDF.fromStackedMetrics(
    df, "start_ts", "end_ts", ["series_1"], "metric_name", "metric_value", ["metric_1", "metric_2"],
)
idf.df.collect()
[Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null),
 Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=null, metric_2=0)]

# Or without specifying metric names (less efficient)

idf = IntervalsDF.fromStackedMetrics(df, "start_ts", "end_ts", ["series_1"], "metric_name", "metric_value")
idf.df.collect()
[Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null),
 Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=null, metric_2=0)]
property interval_boundaries: list[str]
make_disjoint() IntervalsDF[source]

Returns a new IntervalsDF where metrics of overlapping time intervals are correlated and merged prior to constructing new time interval boundaries ( start and end timestamp) so that all intervals are disjoint.

The merge process assumes that two overlapping intervals cannot simultaneously report two different values for the same metric unless recorded in a data type which supports multiple elements (such as ArrayType, etc.).

This is often used after fromStackedMetrics() to reduce the number of metrics with null values and helps when constructing filter predicates to retrieve specific metric values across all instances.

Returns:

A new IntervalsDF containing disjoint time intervals

Example:

df = spark.createDataFrame(
    [["2020-08-01 00:00:10", "2020-08-01 00:00:14", "v1", 5, null],
     ["2020-08-01 00:00:09", "2020-08-01 00:00:11", "v1", null, 0]],
    "start_ts STRING, end_ts STRING, series_1 STRING, metric_1 STRING, metric_2 INT",
)
idf = IntervalsDF(df, "start_ts", "end_ts", ["series_1"], ["metric_1", "metric_2"])
idf.disjoint().df.collect()
[Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:10', series_1='v1', metric_1=null, metric_2=0),
 Row(start_ts='2020-08-01 00:00:10', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=5, metric_2=0),
 Row(start_ts='2020-08-01 00:00:11', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null)]
property metric_columns: list[str]
property observational_columns: list[str]
property structural_columns: list[str]
toDF(stack: bool = False) DataFrame[source]

Returns a new Spark DataFrame converted from IntervalsDF.

There are two versions of toDF. One that will output columns as they exist in IntervalsDF and, one that will stack metric columns into metric_names and metric_values columns populated with their respective values. The latter can be thought of as the inverse of fromStackedMetrics().

Based on pyspark.sql.DataFrame.toDF.

Parameters:

stack (bool, optional) – How to handle metric columns in the conversion to a DataFrame

Returns:

union(other: IntervalsDF) IntervalsDF[source]

Returns a new IntervalsDF containing union of rows in this and another IntervalsDF.

This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().

Also, as standard in SQL, this function resolves columns by position (not by name).

Based on pyspark.sql.DataFrame.union.

Parameters:

other (IntervalsDF) – IntervalsDF to union

Returns:

A new IntervalsDF containing union of rows in this and other

unionByName(other: IntervalsDF) IntervalsDF[source]

Returns a new IntervalsDF containing union of rows in this and another IntervalsDF.

This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().

Based on pyspark.sql.DataFrame.unionByName; however, allowMissingColumns is not supported.

Parameters:

other (IntervalsDF) – IntervalsDF to unionByName

Returns:

A new IntervalsDF containing union of rows in this and other

property window: WindowSpec
tempo.intervals.add_as_disjoint(interval: Series, disjoint_set: DataFrame | None, interval_boundaries: Iterable[str], series_ids: Iterable[str], metric_columns: Iterable[str]) DataFrame[source]

returns a disjoint set consisting of the given interval, made disjoint with those already in disjoint_set

tempo.intervals.check_for_nan_values(to_check: Any) bool[source]

return True if there are any NaN values in to_check

tempo.intervals.identify_interval_overlaps(in_pdf: DataFrame, with_row: Series, interval_start_ts: str, interval_end_ts: str) DataFrame[source]

return the subset of rows in DataFrame in_pdf that overlap with row with_row

tempo.intervals.interval_ends_before(*, interval: Series, other: Series, interval_end_ts: str, other_end_ts: str | None = None) bool[source]

return True if interval_a ends before interval_b ends

tempo.intervals.interval_is_contained_by(*, interval: Series, other: Series, interval_start_ts: str, interval_end_ts: str, other_start_ts: str | None = None, other_end_ts: str | None = None) bool[source]

return True if interval is contained in other

tempo.intervals.interval_starts_before(*, interval: Series, other: Series, interval_start_ts: str, other_start_ts: str | None = None) bool[source]

return True if interval_a starts before interval_b starts

tempo.intervals.intervals_boundaries_are_equivalent(interval: Series, other: Series, interval_start_ts: str, interval_end_ts: str, other_start_ts: str | None = None, other_end_ts: str | None = None) bool[source]

return True if interval_a is equivalent to interval_b

tempo.intervals.intervals_do_not_overlap(*, interval: Series, other: Series, interval_start_ts: str, interval_end_ts: str, other_start_ts: str | None = None, other_end_ts: str | None = None) bool[source]
tempo.intervals.intervals_have_equivalent_metric_columns(interval_a: Series, interval_b: Series, metric_columns: Iterable[str]) bool[source]

return True if interval_a and interval_b have identical metrics

tempo.intervals.intervals_share_end_boundary(interval: Series, other: Series, interval_end_ts: str, other_end_ts: str | None = None) bool[source]

return True if interval_a and interval_b share an end boundary

tempo.intervals.intervals_share_start_boundary(interval: Series, other: Series, interval_start_ts: str, other_start_ts: str | None = None) bool[source]

return True if interval_a and interval_b share a start boundary

tempo.intervals.is_metric_col(col: StructField) bool[source]
tempo.intervals.make_disjoint_wrap(start_ts: str, end_ts: str, series_ids: Iterable[str], metric_columns: Iterable[str]) Callable[[DataFrame], DataFrame][source]
tempo.intervals.merge_metric_columns_of_intervals(*, main_interval: Series, child_interval: Series, metric_columns: Iterable[str], metric_merge_method: bool = False) Series[source]

return the merged metrics of interval_a and interval_b

tempo.intervals.resolve_all_overlaps(with_row: Series, overlaps: DataFrame, with_row_start_ts: str, with_row_end_ts: str, series_ids: Iterable[str], metric_columns: Iterable[str], overlap_start_ts: str | None = None, overlap_end_ts: str | None = None) DataFrame[source]

resolve the interval x against all overlapping intervals in overlapping, returning a set of disjoint intervals with the same spans https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.apply.html

tempo.intervals.resolve_overlap(interval: Series, other: Series, interval_start_ts: str, interval_end_ts: str, series_ids: Iterable[str], metric_columns: Iterable[str], other_start_ts: str | None = None, other_end_ts: str | None = None) list[Series][source]

resolve overlaps between the two given intervals, splitting them as necessary into some set of disjoint intervals

tempo.intervals.update_interval_boundary(*, interval: Series, boundary_to_update: str, update_value: str) Series[source]

return new copy of interval with start or end time updated using update_value