tempo.intervals#

class tempo.intervals.IntervalsDF(df: DataFrame, start_ts: str, end_ts: str, series_ids: list[str] = None)[source]#

This object is the main wrapper over a Spark DataFrame which allows a user to parallelize computations over snapshots of metrics for intervals of time defined by a start and end timestamp and various dimensions.

The required dimensions are series (list of columns by which to summarize), metrics (list of columns to analyze), start_ts (timestamp column), and end_ts (timestamp column). start_ts and end_ts can be epoch or TimestampType.

disjoint() IntervalsDF[source]#

Returns a new IntervalsDF where metrics of overlapping time intervals are correlated and merged prior to constructing new time interval boundaries ( start and end timestamp) so that all intervals are disjoint.

The merge process assumes that two overlapping intervals cannot simultaneously report two different values for the same metric unless recorded in a data type which supports multiple elements (such as ArrayType, etc.).

This is often used after fromStackedMetrics() to reduce the number of metrics with null values and helps when constructing filter predicates to to retrieve specific metric values across all instances.

Returns

A new IntervalsDF containing disjoint time intervals

Example

df = spark.createDataFrame(
    [["2020-08-01 00:00:10", "2020-08-01 00:00:14", "v1", 5, null],
     ["2020-08-01 00:00:09", "2020-08-01 00:00:11", "v1", null, 0]],
    "start_ts STRING, end_ts STRING, series_1 STRING, metric_1 STRING, metric_2 INT",
)
idf = IntervalsDF(df, "start_ts", "end_ts", ["series_1"], ["metric_1", "metric_2"])
idf.disjoint().df.collect()
[Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:10', series_1='v1', metric_1=null, metric_2=0),
 Row(start_ts='2020-08-01 00:00:10', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=5, metric_2=0),
 Row(start_ts='2020-08-01 00:00:11', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null)]
classmethod fromStackedMetrics(df: DataFrame, start_ts: str, end_ts: str, series: list[str], metrics_name_col: str, metrics_value_col: str, metric_names: Optional[list[str]] = None) IntervalsDF[source]#

Returns a new IntervalsDF with metrics of the current DataFrame pivoted by start and end timestamp and series.

There are two versions of fromStackedMetrics. One that requires the caller to specify the list of distinct metric names to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct metric names internally.

Parameters
  • df (DataFrame) – DataFrame to wrap with IntervalsDF

  • start_ts (str) – Name of the column which denotes interval start

  • end_ts (str) – Name of the column which denotes interval end

  • series (list[str]) – column names

  • metrics_name_col (str) – column name

  • metrics_value_col (str) – column name

  • metric_names (list[str], optional) – List of metric names that will be translated to columns in the output IntervalsDF.

Returns

A new IntervalsDF with a column and respective values per distinct metric in metrics_name_col.

Example

df = spark.createDataFrame(
    [["2020-08-01 00:00:09", "2020-08-01 00:00:14", "v1", "metric_1", 5],
     ["2020-08-01 00:00:09", "2020-08-01 00:00:11", "v1", "metric_2", 0]],
    "start_ts STRING, end_ts STRING, series_1 STRING, metric_name STRING, metric_value INT",
)

# With distinct metric names specified

idf = IntervalsDF.fromStackedMetrics(
    df, "start_ts", "end_ts", ["series_1"], "metric_name", "metric_value", ["metric_1", "metric_2"],
)
idf.df.collect()
[Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null),
 Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=null, metric_2=0)]

# Or without specifying metric names (less efficient)

idf = IntervalsDF.fromStackedMetrics(df, "start_ts", "end_ts", ["series_1"], "metric_name", "metric_value")
idf.df.collect()
[Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null),
 Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=null, metric_2=0)]
toDF(stack: bool = False) pyspark.sql.dataframe.DataFrame[source]#

Returns a new Spark DataFrame converted from IntervalsDF.

There are two versions of toDF. One that will output columns as they exist in IntervalsDF and, one that will stack metric columns into metric_names and metric_values columns populated with their respective values. The latter can be thought of as the inverse of fromStackedMetrics().

Based on pyspark.sql.DataFrame.toDF.

Parameters

stack (bool, optional) – How to handle metric columns in the conversion to a DataFrame

Returns

union(other: IntervalsDF) IntervalsDF[source]#

Returns a new IntervalsDF containing union of rows in this and another IntervalsDF.

This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().

Also, as standard in SQL, this function resolves columns by position (not by name).

Based on pyspark.sql.DataFrame.union.

Parameters

other (IntervalsDF) – IntervalsDF to union

Returns

A new IntervalsDF containing union of rows in this and other

unionByName(other: IntervalsDF) IntervalsDF[source]#

Returns a new IntervalsDF containing union of rows in this and another IntervalsDF.

This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().

Based on pyspark.sql.DataFrame.unionByName; however, allowMissingColumns is not supported.

Parameters

other (IntervalsDF) – IntervalsDF to unionByName

Returns

A new IntervalsDF containing union of rows in this and other