Source code for dbldatagen.column_generation_spec

# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This file defines the `ColumnGenerationSpec` class
"""

import copy
import logging

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.functions import lit, concat, rand, round as sql_round, array, expr, when, udf, \
    format_string

import pyspark.sql.functions as F

from pyspark.sql.types import FloatType, IntegerType, StringType, DoubleType, BooleanType, \
    TimestampType, DataType, DateType, ArrayType, MapType, StructType

from .column_spec_options import ColumnSpecOptions
from .datagen_constants import RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, RANDOM_SEED_RANDOM, \
    DEFAULT_SEED_COLUMN, OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD, INFER_DATATYPE

from .daterange import DateRange
from .distributions import Normal, DataDistribution
from .nrange import NRange
from .text_generators import TemplateGenerator
from .utils import ensure, coalesce_values
from .schema_parser import SchemaParser

HASH_COMPUTE_METHOD = "hash"
VALUES_COMPUTE_METHOD = "values"
RAW_VALUES_COMPUTE_METHOD = "raw_values"
AUTO_COMPUTE_METHOD = "auto"
EXPR_OPTION = "expr"
COMPUTE_METHOD_VALID_VALUES = [HASH_COMPUTE_METHOD,
                               AUTO_COMPUTE_METHOD,
                               VALUES_COMPUTE_METHOD,
                               RAW_VALUES_COMPUTE_METHOD]


[docs]class ColumnGenerationSpec(object): """ Column generation spec object - specifies how column is to be generated Each column to be output will have a corresponding ColumnGenerationSpec object. This is added explicitly using the DataGenerators `withColumnSpec` or `withColumn` methods If none is explicitly added, a default one will be generated. The full set of arguments to the class is more than the explicitly called out parameters as any arguments that are not explicitly called out can still be passed due to the `**kwargs` expression. This class is meant for internal use only. :param name: Name of column (string). :param colType: Spark SQL datatype instance, representing the type of the column. :param min: minimum value of column :param max: maximum value of the column :param step: numeric step used in column data generation :param prefix: string used as prefix to the column underlying value to produce a string value :param random: Boolean, if True, will generate random values :param distribution: Instance of distribution, that will control the distribution of the generated values :param baseColumn: String or list of strings representing columns used as basis for generating the column data :param randomSeed: random seed value used to generate the random value, if column data is random :param randomSeedMethod: method for computing random values from the random seed. It may take on the values `fixed`, `hash_fieldname` or None :param implicit: If True, the specification for the column can be replaced by a later definition. If not, a later attempt to replace the definition will flag an error. Typically used when generating definitions automatically from a schema, or when using wildcards in the specification :param omit: if True, omit from the final output. :param nullable: If True, column may be null - defaults to True. :param debug: If True, output debugging log statements. Defaults to False. :param verbose: If True, output logging statements at the info level. If False (the default), only output warning and error logging statements. :param seedColumnName: if supplied, specifies seed column name For full list of options, see :doc:`/reference/api/dbldatagen.column_spec_options`. """ #: maxValue values for each column type, only if where value is intentionally restricted _max_type_range = { 'byte': 256, 'short': 65536 } _ARRAY_STRUCT_TYPE = "array" # set up logging # restrict spurious messages from java gateway logging.getLogger("py4j").setLevel(logging.WARNING) def __init__(self, name, colType=None, minValue=0, maxValue=None, step=1, prefix='', random=False, distribution=None, baseColumn=None, randomSeed=None, randomSeedMethod=None, implicit=False, omit=False, nullable=True, debug=False, verbose=False, seedColumnName=DEFAULT_SEED_COLUMN, **kwargs): # set up logging self.verbose = verbose self.debug = debug self._setup_logger() # set up default range and type for column self._dataRange = NRange(None, None, None) # by default range of values for column is unconstrained self._inferDataType = False if colType is None: # default to integer field if none specified colType = IntegerType() elif colType == INFER_DATATYPE: colType = StringType() # default inferred data type to string until exact type is known self._inferDataType = True if EXPR_OPTION not in kwargs: raise ValueError("Column generation spec must have `expr` attribute specified if datatype is inferred") elif type(colType) == str: colType = SchemaParser.columnTypeFromString(colType) assert isinstance(colType, DataType), f"colType `{colType}` is not instance of DataType" self._initialBuildPlan = [] # the build plan for the column - descriptive only self.executionHistory = [] # the execution history for the column self._seedColumnName = seedColumnName # If no base column is specified, assume its dependent on the seed column if baseColumn is None: baseColumn = self._seedColumnName # to allow for open ended extension of many column attributes, we use a few specific # parameters and pass the rest as keyword arguments supplied_options = {'name': name, 'minValue': minValue, 'type': colType, 'maxValue': maxValue, 'step': step, 'prefix': prefix, 'baseColumn': baseColumn, OPTION_RANDOM: random, 'distribution': distribution, OPTION_RANDOM_SEED_METHOD: randomSeedMethod, OPTION_RANDOM_SEED: randomSeed, 'omit': omit, 'nullable': nullable, 'implicit': implicit } supplied_options.update(kwargs) self._csOptions = ColumnSpecOptions(supplied_options) self._csOptions.checkValidColumnProperties(supplied_options) # only allow `template` or `text` self._csOptions.checkExclusiveOptions(["template", "text"]) # only allow `weights` or `distribution` self._csOptions.checkExclusiveOptions(["distribution", "weights"]) # check for alternative forms of specifying range # column_spec_options._checkExclusiveOptions(["minValue", "minValue", "begin", "dataRange"]) # column_spec_options._checkExclusiveOptions(["maxValue", "maxValue", "end", "dataRange"]) # column_spec_options._checkExclusiveOptions(["step", "interval", "dataRange"]) # we want to assign each of the properties to the appropriate instance variables # but compute sensible defaults in the process as needed # in particular, we want to ensure that things like values and weights match # and that minValue and maxValue are not inconsistent with distributions, ranges etc # if a column spec is implicit, it can be overwritten # by default column specs added by wild cards or inferred from schemas are implicit self._csOptions.checkBoolOption(implicit, name="implicit") self.implicit = implicit # if true, omit the column from the final output self._csOptions.checkBoolOption(omit, name="omit") self.omit = omit # the column name self.name = name # the base column data types self._baseColumnDatatypes = [] # not used for much other than to validate against option to generate nulls self._csOptions.checkBoolOption(nullable, name="nullable") self.nullable = nullable # should be either a literal or None # use of a random seed method will ensure that we have repeatability of data generation assert randomSeed is None or type(randomSeed) in [int, float], "seed should be None or numeric" assert randomSeedMethod is None or randomSeedMethod in [RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME], \ f"`randomSeedMethod` should be none or `{RANDOM_SEED_FIXED}` or `{RANDOM_SEED_HASH_FIELD_NAME}`" self._randomSeedMethod = self[OPTION_RANDOM_SEED_METHOD] self.random = self[OPTION_RANDOM] if self._randomSeedMethod == RANDOM_SEED_HASH_FIELD_NAME: assert self.name is not None, "field name cannot be None" self._randomSeed = abs(hash(self.name)) else: self._randomSeed = self[OPTION_RANDOM_SEED] # random seed method should be "fixed" or "hash_fieldname" if self._randomSeed is not None and self._randomSeedMethod is None: self._randomSeedMethod = RANDOM_SEED_FIXED # compute dependencies self.dependencies = self._computeBasicDependencies() # value of `base_column_type` must be `None`,"values", "raw_values", "auto", or "hash" # this is the method of computing current column value from base column, not the data type of the base column allowed_compute_methods = [AUTO_COMPUTE_METHOD, VALUES_COMPUTE_METHOD, HASH_COMPUTE_METHOD, RAW_VALUES_COMPUTE_METHOD, None] self._csOptions.checkOptionValues("baseColumnType", allowed_compute_methods) self._baseColumnComputeMethod = self['baseColumnType'] # handle text generation templates if self['template'] is not None: assert isinstance(self['template'], str), "template must be a string " escapeSpecialChars = self['escapeSpecialChars'] if self['escapeSpecialChars'] is not None else False self._textGenerator = TemplateGenerator(self['template'], escapeSpecialChars) elif self['text'] is not None: self._textGenerator = copy.deepcopy(self['text']) else: self._textGenerator = None # specify random seed for text generator if one is in effect if self._textGenerator is not None and self._randomSeed is not None: self._textGenerator = self._textGenerator.withRandomSeed(self._randomSeed) # compute required temporary values self.temporaryColumns = [] data_range = self["dataRange"] unique_values = self["uniqueValues"] c_min, c_max, c_step = (self["minValue"], self["maxValue"], self["step"]) c_begin, c_end, c_interval = self['begin'], self['end'], self['interval'] # handle weights / values and distributions self.weights, self.values = (self["weights"], self["values"]) self.distribution = self["distribution"] # if distribution is just specified as `normal` use standard normal distribution if self.distribution == "normal": self.distribution = Normal.standardNormal() # specify random seed for distribution if one is in effect if self.distribution is not None and self._randomSeed is not None: self.distribution = self.distribution.withRandomSeed(self._randomSeed) # force weights and values to list if self.weights is not None: # coerce to list - this will allow for pandas series, numpy arrays and tuples to be used self.weights = list(self.weights) if self.values is not None: # coerce to list - this will allow for pandas series, numpy arrays and tuples to be used self.values = list(self.values) # handle default method of computing the base column value # if we have text manipulation, use 'values' as default for format but 'hash' as default if # its a column with multiple values if self._baseColumnComputeMethod in [None, AUTO_COMPUTE_METHOD] \ and (self.textGenerator is not None or self['format'] is not None or self['prefix'] is not None or self['suffix'] is not None): if self.values is not None: self.logger.info("""Column [%s] has no `base_column_type` attribute and uses discrete values => Assuming `hash` for attribute `base_column_type`. => Use explicit value for `base_column_type` if alternate interpretation needed """, self.name) self._baseColumnComputeMethod = HASH_COMPUTE_METHOD else: self.logger.info("""Column [%s] has no `base_column_type` attribute specified for formatted text => Assuming `values` for attribute `base_column_type`. => Use explicit value for `base_column_type` if alternate interpretation needed """, self.name) self._baseColumnComputeMethod = VALUES_COMPUTE_METHOD # adjust the range by merging type and range information self._dataRange = self._computeAdjustedRangeForColumn(colType=colType, c_min=c_min, c_max=c_max, c_step=c_step, c_begin=c_begin, c_end=c_end, c_interval=c_interval, c_unique=unique_values, c_range=data_range) if self.distribution is not None: ensure((self._dataRange is not None and self._dataRange.isFullyPopulated()) or self.values is not None, """When using an explicit distribution, provide a fully populated range or a set of values""") # set up the temporary columns needed for data generation self._setupTemporaryColumns() def _temporaryRename(self, tmpName): """ Create enter / exit object to support temporary renaming of column spec This is to support the functionality: ``` with columSpec._temporaryRename("test") as modifiedColumn: modifiedColumn.doSomthing() ``` When building array or multi-column valued columns, we rename the column definition temporarily to ensure that logging, saving messages to the execution plan and random number generation based on the field name work correctly :param tmpName: temporary name for the column. :return: object supporting `with columSpec._temporaryRename("test") as modifiedColumn:` semantics .. note:: This does not create a copy of the column spec object """ # create class for temporary rename enter / exit assert tmpName is not None and len(tmpName) > 0, "method expects valid temporary name" class RenameEnterExit: def __init__(self, columnSpec, newName): """ Save column spec and old name to support enter / exit semantics """ self._cs = columnSpec self._oldName = columnSpec.name self._newName = newName self._randomSeed = columnSpec._randomSeed def __enter__(self): """ Return the inner column spec object """ self._cs.name = self._newName if self._cs._randomSeedMethod == RANDOM_SEED_HASH_FIELD_NAME: self._cs._randomSeed = abs(hash(self._cs.name)) if self._cs._textGenerator is not None and self._cs._randomSeed is not None: self._cs._textGenerator = self._cs._textGenerator.withRandomSeed(self._cs._randomSeed) return self._cs def __exit__(self, exc_type, exc_value, tb): # restore old name self._cs.name = self._oldName # restore old random seed self._cs._randomSeed = self._randomSeed if self._cs._randomSeedMethod == RANDOM_SEED_HASH_FIELD_NAME: if self._cs._textGenerator is not None and self._cs._randomSeed is not None: self._cs._textGenerator = self._cs._textGenerator.withRandomSeed(self._cs._randomSeed) if exc_type is not None: # uncomment for traceback # import traceback # traceback.print_exception(exc_type, exc_value, tb) return False return True return RenameEnterExit(self, tmpName) @property def specOptions(self): """ get column spec options for spec .. note:: This is intended for testing use only. Option values set directly through the options dict are not supported. :return: underlying options object """ return self._csOptions.options def __deepcopy__(self, memo): """Custom deep copy method that resets the logger to avoid trying to copy the logger :see https://docs.python.org/3/library/copy.html """ self.logger = None # pylint: disable=attribute-defined-outside-init result = None try: cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result for k, v in self.__dict__.items(): setattr(result, k, copy.deepcopy(v, memo)) finally: self._setup_logger() if result is not None: result._setup_logger() return result @property def randomSeed(self): """ get random seed for column spec""" return self._randomSeed @property def isRandom(self): """ returns True if column will be randomly generated""" return self[OPTION_RANDOM] @property def textGenerator(self): """ Get the text generator for the column spec""" return self._textGenerator @property def inferDatatype(self): """ If True indicates that datatype should be inferred to be result of computing SQL expression """ return self._inferDataType @property def baseColumns(self): """ Return base columns as list of strings""" # if base column is string and contains multiple columns, split them # other build list of columns if needed if type(self.baseColumn) is str and "," in self.baseColumn: return [x.strip() for x in self.baseColumn.split(",")] elif type(self.baseColumn) is list: return self.baseColumn else: return [self.baseColumn] def _computeBasicDependencies(self): """ get set of basic column dependencies. These are used to compute the order of field evaluation :return: base columns as list with dependency on seed column added """ if self.baseColumn != self._seedColumnName: return list(set(self.baseColumns + [self._seedColumnName])) else: return [self._seedColumnName]
[docs] def setBaseColumnDatatypes(self, columnDatatypes): """ Set the data types for the base columns :param column_datatypes: = list of data types for the base columns """ assert type(columnDatatypes) is list, " `column_datatypes` parameter must be list" ensure(len(columnDatatypes) == len(self.baseColumns), "number of base column datatypes must match number of base columns") self._baseColumnDatatypes = [].append(columnDatatypes)
def _setupTemporaryColumns(self): """ Set up any temporary columns needed for test data generation. For some types of test data, intermediate columns are used in the data generation process but dropped from the final output """ if self.isWeightedValuesColumn: # if its a weighted values column, then create temporary for it # not supported for feature / array columns for now min_num_columns, max_num_columns, struct_type = self._getMultiColumnDetails(validate=False) ensure(max_num_columns is None or max_num_columns <= 1, "weighted columns not supported for multi-column or multi-feature values") if self.random: temp_name = f"_rnd_{self.name}" self.dependencies.append(temp_name) desc = f"adding temporary column {temp_name} required by {self.name}" self._initialBuildPlan.append(desc) sql_random_generator = self._getUniformRandomSQLExpression(self.name) self.temporaryColumns.append((temp_name, DoubleType(), {'expr': sql_random_generator, 'omit': True, 'description': desc})) self._weightedBaseColumn = temp_name else: # create temporary expression mapping values to range of weights temp_name = f"_scaled_{self.name}" self.dependencies.append(temp_name) desc = f"adding temporary column {temp_name} required by {self.name}" self._initialBuildPlan.append(desc) # use a base expression based on mapping base column to size of data sql_scaled_generator = self._getScaledIntSQLExpression(self.name, scale=sum(self.weights), base_columns=self.baseColumns, base_datatypes=self._baseColumnDatatypes, compute_method=self._baseColumnComputeMethod, normalize=True) self.logger.debug("""building scaled sql expression : '%s' with base column: %s, dependencies: %s""", sql_scaled_generator, self.baseColumn, self.dependencies) self.temporaryColumns.append((temp_name, DoubleType(), {'expr': sql_scaled_generator, 'omit': True, 'baseColumn': self.baseColumn, 'description': desc})) self._weightedBaseColumn = temp_name def _setup_logger(self): """Set up logging This will set the logger at warning, info or debug levels depending on the instance construction parameters """ self.logger = logging.getLogger("DataGenerator") if self.debug: self.logger.setLevel(logging.DEBUG) elif self.verbose: self.logger.setLevel(logging.INFO) else: self.logger.setLevel(logging.WARNING) def _computeAdjustedRangeForColumn(self, colType, c_min, c_max, c_step, c_begin, c_end, c_interval, c_range, c_unique): """Determine adjusted range for data column """ assert colType is not None, "`colType` must be non-None instance" if type(colType) is DateType or type(colType) is TimestampType: return self._computeAdjustedDateTimeRangeForColumn(colType, c_begin, c_end, c_interval, c_range, c_unique) else: return self._computeAdjustedNumericRangeForColumn(colType, c_min, c_max, c_step, c_range, c_unique) def _computeAdjustedNumericRangeForColumn(self, colType, c_min, c_max, c_step, c_range, c_unique): """Determine adjusted range for data column Rules: - if a datarange is specified , use that range - if begin and end are specified or minValue and maxValue are specified, use that - if unique values is specified, compute minValue and maxValue depending on type """ if c_unique is not None: assert type(c_unique) is int, "unique_values must be integer" assert c_unique >= 1, "if supplied, unique values must be > 0" # TODO: set maxValue to unique_values + minValue & add unit test effective_min, effective_max, effective_step = None, None, None if c_range is not None and type(c_range) is NRange: effective_min = c_range.minValue effective_step = c_range.step effective_max = c_range.maxValue effective_min = coalesce_values(effective_min, c_min, 1) effective_step = coalesce_values(effective_step, c_step, 1) effective_max = coalesce_values(effective_max, c_max) # due to floating point errors in some Python floating point calculations, we need to apply rounding # if any of the components are float if type(effective_min) is float or type(effective_step) is float: unique_max = round(c_unique * effective_step + effective_min - effective_step, 9) else: unique_max = c_unique * effective_step + effective_min - effective_step result = NRange(effective_min, unique_max, effective_step) if result.maxValue is not None and effective_max is not None and result.maxValue > effective_max: self.logger.warning("Computed maxValue for column [%s] of %s is greater than specified maxValue %s", self.name, result.maxValue, effective_max) elif c_range is not None: result = c_range elif c_range is None: effective_min, effective_max, effective_step = None, None, None effective_min = coalesce_values(c_min, 0) effective_step = coalesce_values(c_step, 1) result = NRange(effective_min, c_max, effective_step) else: result = NRange(0, None, None) # assume numeric range of 0 to x, if no range specified self.logger.debug("Computing adjusted range for column: %s - %s", self.name, result) return result def _computeAdjustedDateTimeRangeForColumn(self, colType, c_begin, c_end, c_interval, c_range, c_unique): """Determine adjusted range for Date or Timestamp data column """ effective_begin, effective_end, effective_interval = None, None, None if c_range is not None and type(c_range) is DateRange: effective_begin = c_range.begin effective_end = c_range.end effective_interval = c_range.interval effective_interval = coalesce_values(effective_interval, c_interval) effective_end = coalesce_values(effective_end, c_end) effective_begin = coalesce_values(effective_begin, c_begin) if type(colType) is DateType: result = DateRange.computeDateRange(effective_begin, effective_end, effective_interval, c_unique) else: result = DateRange.computeTimestampRange(effective_begin, effective_end, effective_interval, c_unique) self.logger.debug("Computing adjusted range for column: %s - %s", self.name, result) return result def _getUniformRandomExpression(self, col_name): """ Get random expression accounting for seed method :returns: expression of ColDef form - i.e `lit`, `expr` etc The value returned will be a number between 0 and 1 inclusive """ assert col_name is not None, "`col_name` must not be None" if self._randomSeedMethod == RANDOM_SEED_FIXED and self._randomSeed != RANDOM_SEED_RANDOM: return expr(f"rand({self._randomSeed})") elif self._randomSeedMethod == RANDOM_SEED_HASH_FIELD_NAME: assert self.name is not None, " `self.name` must not be none" return expr(f"rand(hash('{self.name}'))") else: return rand() def _getRandomExpressionForDistribution(self, col_name, col_distribution): """ Get random expression accounting for seed method :returns: expression of ColDef form - i.e `lit`, `expr` etc The value returned will be a number between 0 and 1 inclusive """ assert col_name is not None and len(col_name) > 0, "`col_name` must not be None and non empty" assert col_distribution is not None, "`col_distribution` must not be None" assert isinstance(col_distribution, DataDistribution), \ "`distribution` object must be an instance of data distribution" self.executionHistory.append(f".. random number generation via distribution `{col_distribution}`") return col_distribution.generateNormalizedDistributionSample() def _getUniformRandomSQLExpression(self, col_name): """ Get random SQL expression accounting for seed method :returns: expression as a SQL string """ assert col_name is not None, " `col_name` must not be None" if self._randomSeedMethod == RANDOM_SEED_FIXED and self._randomSeed != RANDOM_SEED_RANDOM: assert self._randomSeed is not None, "`randomSeed` must not be None" return f"rand({self._randomSeed})" elif self._randomSeedMethod == RANDOM_SEED_HASH_FIELD_NAME: assert self.name is not None, "`self.name` must not be none" return f"rand(hash('{self.name}'))" else: return "rand()" def _getScaledIntSQLExpression(self, col_name, scale, base_columns, base_datatypes=None, compute_method=None, normalize=False): """ Get scaled numeric expression This will produce a scaled SQL expression from the base columns :param col_name: = Column name used for error messages and debugging :param normalize: = If True, will normalize to the range 0 .. 1 inclusive :param scale: = Numeric value indicating scaling factor - will scale via modulo arithmetic :param base_columns: = list of base_columns :param base_datatypes: = list of Spark SQL datatypes for columns :param compute_method: = indicates how the value is be derived from base columns - i.e 'hash' or 'values' - treated as hint only :returns: scaled expression as a SQL string """ assert col_name is not None, "`col_name` must not be None" assert self.name is not None, "`self.name` must not be None" assert scale is not None, "`scale` must not be None" assert (compute_method is None or compute_method in COMPUTE_METHOD_VALID_VALUES), "`compute_method` must be valid value " assert (base_columns is not None and type(base_columns) is list and len(base_columns) > 0), "Base columns must be a non-empty list" effective_compute_method = compute_method # if we have multiple columns, effective compute method is always the hash of the base values if len(base_columns) > 1: if compute_method == VALUES_COMPUTE_METHOD: self.logger.warning( "For column generation with values and multiple base columns, data will be computed with `hash`") effective_compute_method = HASH_COMPUTE_METHOD if effective_compute_method is None or effective_compute_method is AUTO_COMPUTE_METHOD: effective_compute_method = VALUES_COMPUTE_METHOD column_set = ",".join(base_columns) if effective_compute_method == HASH_COMPUTE_METHOD: result = f"cast( floor((hash({column_set}) % {scale}) + {scale}) % {scale} as double)" else: result = f"cast( ( floor(({column_set} % {scale}) + {scale}) % {scale}) as double) " if normalize: result = f"({result} / {(scale * 1.0) - 1.0})" self.logger.debug("computing scaled field [%s] as expression [%s]", col_name, result) return result @property def isWeightedValuesColumn(self): """ check if column is a weighed values column """ return self['weights'] is not None and self.values is not None
[docs] def getNames(self): """ get column names as list of strings""" min_num_columns, max_num_columns, struct_type = self._getMultiColumnDetails(validate=False) if max_num_columns > 1 and struct_type is None: return [f"{self.name}_{x}" for x in range(0, max_num_columns)] else: return [self.name]
[docs] def getNamesAndTypes(self): """ get column names as list of tuples `(name, datatype)`""" min_num_columns, max_num_columns, struct_type = self._getMultiColumnDetails(validate=False) if max_num_columns > 1 and struct_type is None: return [(f"{self.name}_{x}", self.datatype) for x in range(0, max_num_columns)] else: return [(self.name, self.datatype)]
[docs] def keys(self): """ Get the keys as list of strings """ assert self._csOptions is not None, "self._csOptions should be non-empty" return self._csOptions.keys()
def __getitem__(self, key): """ implement the built in dereference by key behavior """ assert key is not None, "key should be non-empty" return self._csOptions.getOrElse(key, None) @property def isFieldOmitted(self): """ check if this field should be omitted from the output If the field is omitted from the output, the field is available for use in expressions etc. but dropped from the final set of fields """ return self.omit @property def baseColumn(self): """get the base column used to generate values for this column""" return self['baseColumn'] @property def datatype(self): """get the Spark SQL data type used to generate values for this column""" return self['type'] @property def prefix(self): """get the string prefix used to generate values for this column When a string field is generated from this spec, the prefix is prepended to the generated string """ return self['prefix'] @property def suffix(self): """get the string suffix used to generate values for this column When a string field is generated from this spec, the suffix is appended to the generated string """ return self['suffix'] @property def min(self): """get the column generation `minValue` value used to generate values for this column""" return self._dataRange.minValue @property def max(self): """get the column generation `maxValue` value used to generate values for this column""" return self['maxValue'] @property def step(self): """get the column generation `step` value used to generate values for this column""" return self['step'] @property def exprs(self): """get the column generation `exprs` attribute used to generate values for this column. """ return self['exprs'] @property def expr(self): """get the `expr` attributed used to generate values for this column""" return self['expr'] @property def text_separator(self): """get the `expr` attributed used to generate values for this column""" return self['text_separator'] @property def begin(self): """get the `begin` attribute used to generate values for this column For numeric columns, the range (minValue, maxValue, step) is used to control data generation. For date and time columns, the range (begin, end, interval) are used to control data generation """ return self['begin'] @property def end(self): """get the `end` attribute used to generate values for this column For numeric columns, the range (minValue, maxValue, step) is used to control data generation. For date and time columns, the range (begin, end, interval) are used to control data generation """ return self['end'] @property def interval(self): """get the `interval` attribute used to generate values for this column For numeric columns, the range (minValue, maxValue, step) is used to control data generation. For date and time columns, the range (begin, end, interval) are used to control data generation """ return self['interval'] @property def numColumns(self): """get the `numColumns` attribute used to generate values for this column if a column is specified with the `numColumns` attribute, this is used to create multiple copies of the column, named `colName1` .. `colNameN` """ return self['numColumns'] @property def numFeatures(self): """get the `numFeatures` attribute used to generate values for this column if a column is specified with the `numFeatures` attribute, this is used to create multiple copies of the column, combined into an array or feature vector """ return self['numFeatures']
[docs] def structType(self): """get the `structType` attribute used to generate values for this column When a column spec is specified to generate multiple copies of the column, this controls whether these are combined into an array etc """ return self['structType']
[docs] def getOrElse(self, key, default=None): """ Get value for option key if it exists or else return default :param key: key name for option :param default: default value if option was not provided :return: option value or default """ return self._csOptions.getOrElse(key, default)
[docs] def getPlanEntry(self): """ Get execution plan entry for object :returns: String representation of plan entry """ desc = self['description'] if desc is not None: return " |-- " + desc else: return f" |-- building column generator for column {self.name}"
def _makeWeightedColumnValuesExpression(self, values, weights, seed_column_name): """make SQL expression to compute the weighted values expression :returns: Spark SQL expr """ from .function_builder import ColumnGeneratorBuilder assert values is not None, "`values` expression must be supplied as list of values" assert weights is not None, "`weights` expression must be list of weights" assert len(values) == len(weights), "`weights` and `values` lists must be of equal length" assert seed_column_name is not None, "`seed_column_name` must be explicit column name" expr_str = ColumnGeneratorBuilder.mkExprChoicesFn(values, weights, seed_column_name, self.datatype) return expr(expr_str).astype(self.datatype) def _isRealValuedColumn(self): """ determine if column is real valued :returns: Boolean - True if condition is true """ col_type_name = self['type'].typeName() return col_type_name in ['double', 'float', 'decimal'] def _isDecimalColumn(self): """ determine if column is decimal column :returns: Boolean - True if condition is true """ col_type_name = self['type'].typeName() return col_type_name == 'decimal' def _isContinuousValuedColumn(self): """ determine if column generates continuous values :returns: Boolean - True if condition is true """ is_continuous = self['continuous'] return is_continuous def _getSeedExpression(self, base_column): """ Get seed expression for column generation This is used to generate the base value for every column if using a single base column, then simply use that, otherwise use either a SQL hash of multiple columns, or an array of the base column values converted to strings :returns: Spark SQL `col` or `expr` object """ if type(base_column) is list: assert len(base_column) > 0, "`baseColumn` must be list of column names" if len(base_column) == 1: if self._baseColumnComputeMethod == HASH_COMPUTE_METHOD: return expr(f"hash({base_column[0]})") else: return col(base_column[0]) elif self._baseColumnComputeMethod == VALUES_COMPUTE_METHOD: base_values = [f"string(ifnull(`{x}`, 'null'))" for x in base_column] return expr(f"array({','.join(base_values)})") else: return expr(f"hash({','.join(base_column)})") else: if self._baseColumnComputeMethod == HASH_COMPUTE_METHOD: return expr(f"hash({base_column})") else: return col(base_column) def _isStringField(self): return type(self.datatype) is StringType def _computeRangedColumn(self, datarange, base_column, is_random): """ compute a ranged column maxValue is maxValue actual value :returns: spark sql `column` or expression that can be used to generate a column """ assert base_column is not None, "`baseColumn` must be specified" assert datarange is not None, "`datarange` must be specified" assert datarange.isFullyPopulated(), "`datarange` must be fully populated (minValue, maxValue, step)" if is_random: if self.distribution is not None: random_generator = self._getRandomExpressionForDistribution(self.name, self.distribution) else: random_generator = self._getUniformRandomExpression(self.name) else: random_generator = None if self._isContinuousValuedColumn() and self._isRealValuedColumn() and is_random: crange = datarange.getContinuousRange() baseval = random_generator * lit(crange) else: crange = datarange.getDiscreteRange() modulo_factor = lit(crange + 1) # following expression is needed as spark sql modulo of negative number is negative modulo_exp = ((self._getSeedExpression(base_column) % modulo_factor) + modulo_factor) % modulo_factor baseval = (modulo_exp * lit(datarange.step)) if not is_random else ( sql_round(random_generator * lit(crange)) * lit(datarange.step)) if self._baseColumnComputeMethod == VALUES_COMPUTE_METHOD: new_def = self._adjustForMinValue(baseval, datarange) elif self._baseColumnComputeMethod == RAW_VALUES_COMPUTE_METHOD: new_def = baseval else: new_def = self._adjustForMinValue(baseval, datarange, force=True) # for ranged values in strings, use type of minValue, maxValue and step as output type if type(self.datatype) is StringType: if type(datarange.min) is float or type(datarange.max) is float or type(datarange.step) is float: if datarange.getScale() > 0: new_def = sql_round(new_def.astype(FloatType()), datarange.getScale()) else: new_def = new_def.astype(DoubleType()) else: new_def = new_def.astype(IntegerType()) return new_def def _adjustForMinValue(self, baseval, datarange, force=False): """ Adjust for minimum value of data range :param baseval: base expression :param datarange: data range to conform to :param force: always adjust (possibly for implicit cast reasons) """ if force and datarange is not None: new_def = baseval + lit(datarange.minValue) elif (datarange is not None) and (datarange.minValue != 0) and (datarange.minValue != 0.0): new_def = baseval + lit(datarange.minValue) else: new_def = baseval return new_def def _makeSingleGenerationExpression(self, index=None, use_pandas_optimizations=True): """ generate column data for a single column value via Spark SQL expression :param index: for multi column generation, specifies index of column being generated :param use_pandas_optimizations: if True, uses Pandas vectorized optimizations. Defaults to `True` :returns: spark sql `column` or expression that can be used to generate a column """ self.logger.debug("building column : %s", self.name) # get key column specification properties col_is_rand, cdistribution = self[OPTION_RANDOM], self['distribution'] percent_nulls = self['percentNulls'] sformat = self['format'] if self._dataRange is not None: self._dataRange.adjustForColumnDatatype(self.datatype) self.executionHistory.append(f".. using effective range: {self._dataRange}") new_def = None # generate expression if type(self.datatype) in [ArrayType, MapType, StructType] and self.expr is None: self.logger.warning("Array, Map or Struct type column with no SQL `expr` will result in NULL value") self.executionHistory.append(".. WARNING: Array, Map or Struct type column with no SQL `expr` ") # handle weighted values for weighted value columns # a weighted values column will use a base value denoted by `self._weightedBaseColumn` if self.isWeightedValuesColumn: self.executionHistory.append(".. building weighted volumn values expression") new_def = self._makeWeightedColumnValuesExpression(self.values, self.weights, self._weightedBaseColumn) if type(self.datatype) is StringType and self.textGenerator is not None: self.logger.warning("Template generation / text generation not supported for weighted columns") self.executionHistory.append(".. WARNING: Template & text generation not supported for weights") if type(self.datatype) is StringType and sformat is not None: self.logger.warning("Formatting not supported for weighted columns") self.executionHistory.append(".. WARNING: Formatting not supported for weighted columns") else: # rs: initialize the begin, end and interval if not initialized for date computations # defaults are start of day, now, and 1 minute respectively # for array, struct and map types, either value is provided via `expr` or via values if not type(self.datatype) in [ArrayType, MapType, StructType] or self.values is not None: self._computeImpliedRangeIfNeeded(self.datatype) # TODO: add full support for date value generation if self.expr is not None: # note use of SQL expression ignores range specifications new_def = expr(self.expr) self.executionHistory.append(f".. using SQL expression `{self.expr}` as base") if not self._inferDataType: new_def = new_def.astype(self.datatype) self.executionHistory.append(f".. casting to `{self.datatype}`") elif type(self.datatype) in [ArrayType, MapType, StructType] and self.values is None: new_def = expr("NULL") elif self._dataRange is not None and self._dataRange.isFullyPopulated(): self.executionHistory.append(f".. computing ranged value: {self._dataRange}") new_def = self._computeRangedColumn(base_column=self.baseColumn, datarange=self._dataRange, is_random=col_is_rand) elif type(self.datatype) is DateType: # TODO: fixup for date generation # record execution history self.executionHistory.append(".. using random date expression") sql_random_generator = self._getUniformRandomSQLExpression(self.name) new_def = expr(f"date_sub(current_date, rounding({sql_random_generator}*1024))").astype( self.datatype) else: if self._baseColumnComputeMethod == VALUES_COMPUTE_METHOD: self.executionHistory.append(".. using values compute expression for seed") new_def = self._getSeedExpression(self.baseColumn) elif self._baseColumnComputeMethod == RAW_VALUES_COMPUTE_METHOD: self.executionHistory.append(".. using raw values compute expression for seed") new_def = self._getSeedExpression(self.baseColumn) # TODO: resolve issues with hash when using templates # elif self._baseColumnComputeMethod == HASH_COMPUTE_METHOD: # newDef = self._getSeedExpression(self.baseColumn) else: self.logger.info("Assuming a seeded base expression with minimum value for column %s", self.name) self.executionHistory.append(f".. seeding with minimum `{self._dataRange.minValue}`") new_def = ((self._getSeedExpression(self.baseColumn) + lit(self._dataRange.minValue)) .astype(self.datatype)) if self.values is not None: new_def = array([lit(x) for x in self.values])[new_def.astype(IntegerType())] elif type(self.datatype) is StringType and self.expr is None: new_def = self._applyPrefixSuffixExpressions(self.prefix, self.suffix, new_def) # use string generation template if available passing in what was generated to date if type(self.datatype) is StringType and self.textGenerator is not None: new_def = self._applyTextGenerationExpression(new_def, use_pandas_optimizations) if type(self.datatype) is StringType and sformat is not None: new_def = self._applyTextFormatExpression(new_def, sformat) new_def = self._applyFinalCastExpression(self.datatype, new_def) if percent_nulls is not None: new_def = self._applyComputePercentNullsExpression(new_def, percent_nulls) return new_def def _onSelect(self, df): """ The _onSelect method is called when the column specifications expression as produced by the method ``_makeSingleGenerationExpression`` is used in a select statement. :param df: Dataframe in which expression is used :return: nothing .. note:: The purpose of this method is to allow for introspection of information such as datatype which can only be determined when column specifications expression is used. """ if self._inferDataType: inferred_type = df.schema[self.name].dataType self.logger.info("Inferred datatype for column %s as %s", self.name, str(inferred_type)) self._csOptions.options['type'] = inferred_type def _applyTextFormatExpression(self, new_def, sformat): # note : # while it seems like this could use a shared instance, this does not work if initialized # in a class method self.executionHistory.append(f".. applying column format `{sformat}`") new_def = format_string(sformat, new_def) return new_def def _applyPrefixSuffixExpressions(self, cprefix, csuffix, new_def): # string value generation is simply handled by combining with a suffix or prefix # TODO: prefix and suffix only apply to base columns that are numeric types text_separator = self.text_separator if self.text_separator is not None else '_' if cprefix is not None and csuffix is not None: self.executionHistory.append(".. applying column prefix and suffix") new_def = concat(lit(cprefix), lit(text_separator), new_def.astype(IntegerType()), lit(text_separator), lit(csuffix)) elif cprefix is not None: self.executionHistory.append(".. applying column prefix") new_def = concat(lit(cprefix), lit(text_separator), new_def.astype(IntegerType())) elif csuffix is not None: self.executionHistory.append(".. applying column suffix") new_def = concat(new_def.astype(IntegerType()), lit(text_separator), lit(csuffix)) return new_def def _applyTextGenerationExpression(self, new_def, use_pandas_optimizations): """Apply text generation expression to column expression :param new_def : column definition being created :param use_pandas_optimizations: Whether Pandas optimizations should be applied :returns: new column definition """ # note : # while it seems like this could use a shared instance, this does not work if initialized # in a class method tg = self.textGenerator if use_pandas_optimizations: self.executionHistory.append(f".. text generation via pandas scalar udf `{tg}`") u_value_from_generator = pandas_udf(tg.pandasGenerateText, returnType=StringType()).asNondeterministic() else: self.executionHistory.append(f".. text generation via udf `{tg}`") u_value_from_generator = udf(tg.classicGenerateText, StringType()).asNondeterministic() new_def = u_value_from_generator(new_def) return new_def def _applyFinalCastExpression(self, col_type, new_def): """ Apply final cast expression for column data :param col_type: final column type :param new_def: column definition being created :returns: new column definition """ self.executionHistory.append(f".. casting column [{self.name}] to `{col_type}`") # cast the result to the appropriate type. For dates, cast first to timestamp, then to date if type(col_type) is DateType: new_def = new_def.astype(TimestampType()).astype(col_type) elif self._inferDataType: # dont apply cast when column has an inferred data type pass else: new_def = new_def.astype(col_type) return new_def def _applyComputePercentNullsExpression(self, newDef, probabilityNulls): """Compute percentage nulls for column being generated :param newDef: Column definition being created :param probabilityNulls: Probability of nulls to be generated for particular column. Values can be 0.0 - 1.0 :returns: new column definition with probability of nulls applied """ assert self.nullable, f"Column `{self.name}` must be nullable for `percent_nulls` option" self.executionHistory.append(".. applying null generator - `when rnd > prob then value - else null`") assert probabilityNulls is not None, "option 'percent_nulls' must not be null value or None" assert type(probabilityNulls) in [int, float], "option 'percent_nulls' must be int or float" assert 0.0 <= probabilityNulls <= 1.0, "option 'percent_nulls' must in the range [0.0 .. 1.0]" prob_nulls = probabilityNulls * 1.0 # for edge case where int was passed random_generator = self._getUniformRandomExpression(self.name) newDef = when(random_generator > lit(prob_nulls), newDef).otherwise(lit(None)) return newDef def _computeImpliedRangeIfNeeded(self, col_type): """ Compute implied range if necessary :param col_type" Column type :returns: nothing """ # check for implied ranges if self.values is not None: self._dataRange = NRange(0, len(self.values) - 1, 1) elif type(col_type) is BooleanType: self._dataRange = NRange(0, 1, 1) self.executionHistory.append(f".. using adjusted effective range: {self._dataRange}") def _getMultiColumnDetails(self, validate): """ Determine min and max number of columns to generate along with `structType` for columns with multiple columns / features :param validate: If true, raises ValueError if there are bad option entries :return: tuple of (min_columns, max_columns, structType """ num_columns = self['numColumns'] struct_type = self['structType'] if num_columns is None: num_columns = self['numFeatures'] if num_columns is None: min_num_columns, max_num_columns = 1, 1 elif isinstance(num_columns, int): min_num_columns, max_num_columns = int(num_columns), int(num_columns) elif isinstance(num_columns, tuple): if validate and ((len(num_columns) != 2) or not all(isinstance(c, int) for c in num_columns)): raise ValueError(f"Bad value [{num_columns}] for `numColumns` / `numFeatures` attribute") min_num_columns, max_num_columns = int(num_columns[0]), int(num_columns[1]) if validate and (min_num_columns > max_num_columns): raise ValueError(f"Bad value [{num_columns}] for `numColumns` / `numFeatures` attribute") else: if validate: raise ValueError(f"Bad value [{num_columns}] for `numColumns` / `numFeatures` attribute") min_num_columns, max_num_columns = 1, 1 if validate and (min_num_columns != max_num_columns) and (struct_type != self._ARRAY_STRUCT_TYPE): self.logger.warning( f"Varying number of features / columns specified for non-array column [{self.name}]") self.logger.warning( f"Lower bound for number of features / columns ignored for [{self.name}]") min_num_columns = max_num_columns return min_num_columns, max_num_columns, struct_type
[docs] def makeGenerationExpressions(self): """ Generate structured column if multiple columns or features are specified if there are multiple columns / features specified using a single definition, it will generate a set of columns conforming to the same definition, renaming them as appropriate and combine them into a array if necessary (depending on the structure combination instructions) :param self: is ColumnGenerationSpec for column :returns: spark sql `column` or expression that can be used to generate a column """ min_num_columns, max_num_columns, struct_type = self._getMultiColumnDetails(validate=True) self.executionHistory = [] if (min_num_columns == 1) and (max_num_columns == 1) and struct_type != self._ARRAY_STRUCT_TYPE: # record execution history for troubleshooting self.executionHistory.append(f"generating single column - `{self.name}` having type `{self.datatype}`") retval = self._makeSingleGenerationExpression(use_pandas_optimizations=True) # record how column was generated exec_step_history = ".. computed from base values - " exec_step_history += f"`{self.baseColumn}`, method: `{self._baseColumnComputeMethod}`" self.executionHistory.append(exec_step_history) else: self.executionHistory.append(f"generating multiple columns {max_num_columns} - `{self.name}`") retval = [] for ix in range(max_num_columns): with self._temporaryRename(f"{self.name}_{ix}") as renamed_cs: retval.append(renamed_cs._makeSingleGenerationExpression(ix, use_pandas_optimizations=True)) if struct_type == 'array': self.executionHistory.append(".. converting multiple columns to array") retval = array(retval) if min_num_columns != max_num_columns: column_set = ",".join(self.baseColumns) diff = max_num_columns - min_num_columns expr_str = f"{min_num_columns} + (abs(hash({column_set})) % {diff + 1})" retval = F.slice(retval, F.lit(1), F.expr(expr_str)) return retval