TSDF Module

class tempo.tsdf.Comparable[source]

Bases: object

For typing functions generated by operator_dict

class tempo.tsdf.TSDF(df: DataFrame, ts_col: str = 'event_ts', partition_cols: list[str] | None = None, sequence_col: str | None = None)[source]

Bases: object

This object is the main wrapper over a Spark data frame which allows a user to parallelize time series computations on a Spark data frame by various dimensions. The two dimensions required are partition_cols (list of columns by which to summarize) and ts_col (timestamp column, which can be epoch or TimestampType).

EMA(colName: str, window: int = 30, exp_factor: float = 0.2) TSDF[source]

Constructs an approximate EMA in the fashion of: EMA = e * lag(col,0) + e * (1 - e) * lag(col, 1) + e * (1 - e)^2 * lag(col, 2) etc, up until window TODO: replace case when statement with coalesce TODO: add in time partitions functionality (what is the overlap fraction?)

after(ts: str | int) TSDF[source]

Select only records after a given time

Parameters:

ts – timestamp on which to filter records

Returns:

a TSDF object containing just the records after the given time

asofJoin(right_tsdf: TSDF, left_prefix: str | None = None, right_prefix: str = 'right', tsPartitionVal: int | None = None, fraction: float = 0.5, skipNulls: bool = True, sql_join_opt: bool = False, suppress_null_warning: bool = False, tolerance: int | None = None) TSDF[source]

Performs an as-of join between two time-series. If a tsPartitionVal is specified, it will do this partitioned by time brackets, which can help alleviate skew.

NOTE: partition cols have to be the same for both Dataframes. We are collecting stats when the WARNING level is enabled also.

Parameters :param right_tsdf - right-hand data frame containing columns to merge in :param left_prefix - optional prefix for base data frame :param right_prefix - optional prefix for right-hand data frame :param tsPartitionVal - value to break up each partition into time brackets :param fraction - overlap fraction :param skipNulls - whether to skip nulls when joining in values :param sql_join_opt - if set to True, will use standard Spark SQL join if it is estimated to be efficient :param suppress_null_warning - when tsPartitionVal is specified, will collect min of each column and raise warnings about null values, set to True to avoid :param tolerance - only join values within this tolerance range (inclusive), expressed in number of seconds as a double

at(ts: str | int) TSDF[source]

Select only records at a given time

Parameters:

ts – timestamp of the records to select

Returns:

a TSDF object containing just the records at the given time

atOrAfter(ts: str | int) TSDF[source]

Select only records at or after a given time

Parameters:

ts – timestamp on which to filter records

Returns:

a TSDF object containing just the records at or after the given time

atOrBefore(ts: str | int) TSDF[source]

Select only records at or before a given time

Parameters:

ts – timestamp on which to filter records

Returns:

a TSDF object containing just the records at or before the given time

before(ts: str | int) TSDF[source]

Select only records before a given time

Parameters:

ts – timestamp on which to filter records

Returns:

a TSDF object containing just the records before the given time

between(start_ts: str | int, end_ts: str | int, inclusive: bool = True) TSDF[source]

Select only records in a given range

Parameters:
  • start_ts – starting time of the range to select

  • end_ts – ending time of the range to select

  • inclusive (bool) – whether the range is inclusive of the endpoints or not, defaults to True

Returns:

a TSDF object containing just the records within the range specified

calc_bars(freq: str, metricCols: List[str] | None = None, fill: bool | None = None) TSDF[source]
describe() DataFrame[source]

Describe a TSDF object using a global summary across all time series (anywhere from 10 to millions) as well as the standard Spark data frame stats. Missing vals Summary global - unique time series based on partition columns, min/max times, granularity - lowest precision in the time series timestamp column count / mean / stddev / min / max - standard Spark data frame describe() output missing_vals_pct - percentage (from 0 to 100) of missing values.

earliest(n: int = 1) TSDF[source]

Select the earliest n records for each series

Parameters:

n – number of records to select (default is 1)

Returns:

a TSDF object containing the earliest n records for each series

extractStateIntervals(*metric_cols: str, state_definition: str | Callable[[Column, Column], Column] = '=') DataFrame[source]

Extracts intervals from a TSDF based on some notion of “state”, as defined by the :param state_definition: parameter. The state definition consists of a comparison operation between the current and previous values of a metric. If the comparison operation evaluates to true across all metric columns, then we consider both points to be in the same “state”. Changes of state occur when the comparison operator returns false for any given metric column. So, the default state definition (‘=’) entails that intervals of time wherein the metrics all remained constant. A state definition of ‘>=’ would extract intervals wherein the metrics were all monotonically increasing.

Param:

metric_cols: the set of metric columns to evaluate for state changes

Param:

state_definition: the comparison function used to evaluate individual metrics for state changes.

Either a string, giving a standard PySpark column comparison operation, or a binary function with the signature: (x1: Column, x2: Column) -> Column where the returned column expression evaluates to a BooleanType

Returns:

a DataFrame object containing the resulting intervals

fourier_transform(timestep: int | float | complex, valueCol: str) TSDF[source]

Function to fourier transform the time series to its frequency domain representation. :param timestep: timestep value to be used for getting the frequency scale :param valueCol: name of the time domain data column which will be transformed

interpolate(method: str, freq: str | None = None, func: Callable | str | None = None, target_cols: List[str] | None = None, ts_col: str | None = None, partition_cols: List[str] | None = None, show_interpolated: bool = False, perform_checks: bool = True) TSDF[source]

Function to interpolate based on frequency, aggregation, and fill similar to pandas. Data will first be aggregated using resample, then missing values will be filled based on the fill calculation.

Parameters:
  • freq – frequency for upsample - valid inputs are “hr”, “min”, “sec” corresponding to hour, minute, or second

  • func – function used to aggregate input

  • method – function used to fill missing values e.g. linear, null, zero, bfill, ffill

  • [optional] (show_interpolated) – columns that should be interpolated, by default interpolates all numeric columns

  • [optional] – specify other ts_col, by default this uses the ts_col within the TSDF object

  • [optional] – specify other partition_cols, by default this uses the partition_cols within the TSDF object

  • [optional] – if true will include an additional column to show which rows have been fully interpolated.

  • perform_checks – calculate time horizon and warnings if True (default is True)

Returns:

new TSDF object containing interpolated data

latest(n: int = 1) TSDF[source]

Select the latest n records for each series

Parameters:

n – number of records to select (default is 1)

Returns:

a TSDF object containing the latest n records for each series

static parse_nanos_timestamp(df: DataFrame, str_ts_col: str, ts_fmt: str = 'yyyy-MM-dd HH:mm:ss', double_ts_col: str | None = None, parsed_ts_col: str | None = None) DataFrame[source]

Parse a string timestamp column with nanosecond precision into a double timestamp column.

Parameters:
  • df – DataFrame containing the string timestamp column

  • str_ts_col – Name of the string timestamp column

  • ts_fmt – Format of the string timestamp column (default: “yyyy-MM-dd HH:mm:ss”)

  • double_ts_col – Name of the double timestamp column to create, if None the source string column will be overwritten

  • parsed_ts_col – Name of the parsed timestamp column to create, if None no parsed timestamp column will be kept

Returns:

DataFrame with the double timestamp column

priorTo(ts: str | int, n: int = 1) TSDF[source]

Select the n most recent records prior to a given time You can think of this like an ‘asOf’ select - it selects the records as of a particular time

Parameters:
  • ts – timestamp on which to filter records

  • n – number of records to select (default is 1)

Returns:

a TSDF object containing the n records prior to the given time

resample(freq: str, func: Callable | str, metricCols: List[str] | None = None, prefix: str | None = None, fill: bool | None = None, perform_checks: bool = True) TSDF[source]

function to upsample based on frequency and aggregate function similar to pandas :param freq: frequency for upsample - valid inputs are “hr”, “min”, “sec” corresponding to hour, minute, or second :param func: function used to aggregate input :param metricCols supply a smaller list of numeric columns if the entire set of numeric columns should not be returned for the resample function :param prefix - supply a prefix for the newly sampled columns :param fill - Boolean - set to True if the desired output should contain filled in gaps (with 0s currently) :param perform_checks: calculate time horizon and warnings if True (default is True) :return: TSDF object with sample data using aggregate function

select(*cols: str | List[str]) TSDF[source]

pyspark.sql.DataFrame.select() method’s equivalent for TSDF objects

Parameters:

cols – str or list of strs column names (string). If one of the column names is ‘*’, that column is expanded to include all columns in the current TSDF.

## Examples .. code-block:: python tsdf.select(‘*’).collect() [Row(age=2, name=’Alice’), Row(age=5, name=’Bob’)] tsdf.select(‘name’, ‘age’).collect() [Row(name=’Alice’, age=2), Row(name=’Bob’, age=5)]

show(n: int = 20, k: int = 5, truncate: bool = True, vertical: bool = False) None[source]

pyspark.sql.DataFrame.show() method’s equivalent for TSDF objects

Parameters:
  • n – Number of rows to show. (default: 20)

  • truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right.

  • vertical – If set to True, print output rows vertically (one line per column value).

## Example to show usage: .. code-block:: python from pyspark.sql.functions import *

phone_accel_df = spark.read.format(“csv”).option(“header”, “true”).load(“dbfs:/home/tempo/Phones_accelerometer”)

.withColumn(“event_ts”, (col(“Arrival_Time”).cast(“double”)/1000).cast(“timestamp”))

.withColumn(“x”, col(“x”).cast(“double”))

.withColumn(“y”, col(“y”).cast(“double”))

.withColumn(“z”, col(“z”).cast(“double”))

.withColumn(“event_ts_dbl”, col(“event_ts”).cast(“double”))

from tempo import *

phone_accel_tsdf = TSDF(phone_accel_df, ts_col=”event_ts”, partition_cols = [“User”])

# Call show method here phone_accel_tsdf.show()

subsequentTo(ts: str | int, n: int = 1) TSDF[source]

Select the n records subsequent to a give time

Parameters:
  • ts – timestamp on which to filter records

  • n – number of records to select (default is 1)

Returns:

a TSDF object containing the n records subsequent to the given time

vwap(frequency: str = 'm', volume_col: str = 'volume', price_col: str = 'price') TSDF[source]
withGroupedStats(metricCols: List[str] | None = None, freq: str | None = None) TSDF[source]

Create a wider set of stats based on all numeric columns by default Users can choose which columns they want to summarize also. These stats are: mean/count/min/max/sum/std deviation :param metricCols - list of user-supplied columns to compute stats for. All numeric columns are used if no list is provided :param freq - frequency (provide a string of the form ‘1 min’, ‘30 seconds’ and we interpret the window to use to aggregate

withLookbackFeatures(featureCols: List[str], lookbackWindowSize: int, exactSize: bool = True, featureColName: str = 'features') DataFrame | 'TSDF'[source]

Creates a 2-D feature tensor suitable for training an ML model to predict current values from the history of some set of features. This function creates a new column containing, for each observation, a 2-D array of the values of some number of other columns over a trailing “lookback” window from the previous observation up to some maximum number of past observations.

Parameters:
  • featureCols – the names of one or more feature columns to be aggregated into the feature column

  • lookbackWindowSize – The size of lookback window (in terms of past observations). Must be an integer >= 1

  • exactSize – If True (the default), then the resulting DataFrame will only include observations where the generated feature column contains arrays of length lookbackWindowSize. This implies that it will truncate observations that occurred less than lookbackWindowSize from the start of the timeseries. If False, no truncation occurs, and the column may contain arrays less than lookbackWindowSize in length.

  • featureColName – The name of the feature column to be generated. Defaults to “features”

Returns:

a DataFrame with a feature column named featureColName containing the lookback feature tensor

withPartitionCols(partitionCols: list[str]) TSDF[source]

Sets certain columns of the TSDF as partition columns. Partition columns are those that differentiate distinct timeseries from each other. :param partitionCols: a list of columns used to partition distinct timeseries :return: a TSDF object with the given partition columns

withRangeStats(type: str = 'range', colsToSummarize: List[Column] | None = None, rangeBackWindowSecs: int = 1000) TSDF[source]

Create a wider set of stats based on all numeric columns by default Users can choose which columns they want to summarize also. These stats are: mean/count/min/max/sum/std deviation/zscore :param type - this is created in case we want to extend these stats to lookback over a fixed number of rows instead of ranging over column values :param colsToSummarize - list of user-supplied columns to compute stats for. All numeric columns are used if no list is provided :param rangeBackWindowSecs - lookback this many seconds in time to summarize all stats. Note this will look back from the floor of the base event timestamp (as opposed to the exact time since we cast to long) Assumptions:

  1. The features are summarized over a rolling window that ranges back

  2. The range back window can be specified by the user

  3. Sequence numbers are not yet supported for the sort

  4. There is a cast to long from timestamp so microseconds or more likely breaks down - this could be more easily handled with a string timestamp or sorting the timestamp itself. If using a ‘rows preceding’ window, this wouldn’t be a problem

write(spark: SparkSession, tabName: str, optimizationCols: List[str] | None = None) None[source]