Source code for dbldatagen.data_generator

# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This file defines the `DataGenError` and `DataGenerator` classes
"""
import copy
import logging
import re

from pyspark.sql.types import LongType, IntegerType, StringType, StructType, StructField, DataType

from ._version import _get_spark_version
from .column_generation_spec import ColumnGenerationSpec
from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, \
    DEFAULT_SEED_COLUMN, SPARK_RANGE_COLUMN, MIN_SPARK_VERSION, \
    OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD, \
    INFER_DATATYPE
from .html_utils import HtmlUtils
from .schema_parser import SchemaParser
from .spark_singleton import SparkSingleton
from .utils import ensure, topologicalSort, DataGenError, deprecated, split_list_matching_condition

_OLD_MIN_OPTION = 'min'
_OLD_MAX_OPTION = 'max'

_STREAMING_TIMESTAMP_COLUMN = "_source_timestamp"


[docs]class DataGenerator: """ Main Class for test data set generation This class acts as the entry point to all test data generation activities. :param sparkSession: spark Session object to use :param name: is name of data set :param randomSeedMethod: = seed method for random numbers - either None, 'fixed', 'hash_fieldname' :param rows: = amount of rows to generate :param startingId: = starting value for generated seed column :param randomSeed: = seed for random number generator :param partitions: = number of partitions to generate, if not provided, uses `spark.sparkContext.defaultParallelism` :param verbose: = if `True`, generate verbose output :param batchSize: = UDF batch number of rows to pass via Apache Arrow to Pandas UDFs :param debug: = if set to True, output debug level of information :param seedColumnName: = if set, this should be the name of the `seed` or logical `id` column. Defaults to `id` :param random: = if set, specifies default value of `random` attribute for all columns where not set By default the seed column is named `id`. If you need to use this column name in your generated data, it is recommended that you use a different name for the seed column - for example `_id`. This may be specified by setting the `seedColumnName` attribute to `_id` """ # class vars _nextNameIndex = 0 _untitledNamePrefix = "Untitled" _randomSeed = DEFAULT_RANDOM_SEED _allowed_keys = ["startingId", "rowCount", "output_id"] # set up logging # restrict spurious messages from java gateway logging.getLogger("py4j").setLevel(logging.WARNING) # logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.NOTSET) def __init__(self, sparkSession=None, name=None, randomSeedMethod=None, rows=1000000, startingId=0, randomSeed=None, partitions=None, verbose=False, batchSize=None, debug=False, seedColumnName=DEFAULT_SEED_COLUMN, random=False, **kwargs): """ Constructor for data generator object """ # set up logging self.verbose = verbose self.debug = debug self._setupLogger() self._seedColumnName = seedColumnName self._outputStreamingFields = False if seedColumnName != DEFAULT_SEED_COLUMN: self.logger.info(f"Using '{self._seedColumnName}' for seed column in place of '{DEFAULT_SEED_COLUMN}") self.name = name if name is not None else self.generateName() self._rowCount = rows self.starting_id = startingId self.__schema__ = None if sparkSession is None: sparkSession = SparkSingleton.getLocalInstance() self.sparkSession = sparkSession # if the active Spark session is stopped, you may end up with a valid SparkSession object but the underlying # SparkContext will be invalid assert sparkSession is not None, "Spark session not initialized" assert sparkSession.sparkContext is not None, "Expecting spark session to have valid sparkContext" self.partitions = partitions if partitions is not None else sparkSession.sparkContext.defaultParallelism # check for old versions of args if "starting_id" in kwargs: self.logger.warning("starting_id is deprecated - use option 'startingId' instead") startingId = kwargs["starting_id"] if "seed" in kwargs: self.logger.warning("seed is deprecated - use option 'randomSeed' instead") randomSeed = kwargs["seed"] if "seed_method" in kwargs: self.logger.warning("seed_method is deprecated - use option 'seedMethod' instead") seedMethod = kwargs["seed_method"] if "batch_size" in kwargs: self.logger.warning("batch_size is deprecated - use option 'batchSize' instead") batchSize = kwargs["batch_size"] if "use_pandas" in kwargs or "usePandas" in kwargs: self.logger.warning("option 'usePandas' is deprecated - Pandas will always be used") if "generateWithSelects" in kwargs or "generateWithSelects" in kwargs: self.logger.warning("option 'generateWithSelects' switch is deprecated - selects will always be used") self._seedMethod = randomSeedMethod # set default random setting self._defaultRandom = random if random is not None else False if randomSeed is None: self._instanceRandomSeed = self._randomSeed if randomSeedMethod is None: self._seedMethod = RANDOM_SEED_HASH_FIELD_NAME else: self._seedMethod = randomSeedMethod else: self._instanceRandomSeed = randomSeed # if a valid random seed was supplied but no seed method was applied, make the seed method "fixed" if randomSeedMethod is None: self._seedMethod = "fixed" if self._seedMethod not in [None, RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME]: msg = f"seedMethod should be None, '{RANDOM_SEED_FIXED}' or '{RANDOM_SEED_HASH_FIELD_NAME}' " raise DataGenError(msg) self._columnSpecsByName = {} self._allColumnSpecs = [] self._buildPlan = [] self.executionHistory = [] self._options = {} self._buildOrder = [] self._inferredSchemaFields = [] self.buildPlanComputed = False # lets add the seed column self.withColumn(self._seedColumnName, LongType(), nullable=False, implicit=True, omit=True, noWarn=True) self._batchSize = batchSize # set up spark session self._setupSparkSession(sparkSession) # set up use of pandas udfs self._setupPandas(batchSize) @property def seedColumnName(self): """ return the name of data generation seed column""" return self._seedColumnName @classmethod def _checkSparkVersion(cls, sparkVersion, minSparkVersion): """ check spark version :param sparkVersion: spark version string :param minSparkVersion: min spark version as tuple :return: True if version passes minVersion Layout of version string must be compatible "xx.xx.xx.patch" """ sparkVersionInfo = _get_spark_version(sparkVersion) if sparkVersionInfo < minSparkVersion: logging.warning(f"*** Minimum version of Python supported is {minSparkVersion} - found version %s ", sparkVersionInfo) return False return True def _setupSparkSession(self, sparkSession): """ Set up spark session :param sparkSession: spark session to use :return: nothing """ if sparkSession is None: sparkSession = SparkSingleton.getInstance() assert sparkSession is not None, "Spark session not initialized" self.sparkSession = sparkSession # check if the spark version meets the minimum requirements and warn if not sparkVersion = sparkSession.version self._checkSparkVersion(sparkVersion, MIN_SPARK_VERSION) def _setupPandas(self, pandasBatchSize): """ Set up pandas :param pandasBatchSize: batch size for pandas, may be None :return: nothing """ assert pandasBatchSize is None or type(pandasBatchSize) is int, \ "If pandas_batch_size is specified, it must be an integer" self.logger.info("*** using pandas udf for custom functions ***") self.logger.info("Spark version: %s", self.sparkSession.version) if str(self.sparkSession.version).startswith("3"): self.logger.info("Using spark 3.x") self.sparkSession.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") else: self.sparkSession.conf.set("spark.sql.execution.arrow.enabled", "true") if self._batchSize is not None: self.sparkSession.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", self._batchSize) def _setupLogger(self): """Set up logging This will set the logger at warning, info or debug levels depending on the instance construction parameters """ self.logger = logging.getLogger("DataGenerator") if self.debug: self.logger.setLevel(logging.DEBUG) elif self.verbose: self.logger.setLevel(logging.INFO) else: self.logger.setLevel(logging.WARNING)
[docs] @classmethod def useSeed(cls, seedVal): """ set seed for random number generation Arguments: :param seedVal: - new value for the random number seed """ cls._randomSeed = seedVal
@deprecated("Use `useSeed` instead") @classmethod def use_seed(cls, seedVal): """ set seed for random number generation Arguments: :param seedVal: - new value for the random number seed """ cls._randomSeed = seedVal
[docs] @classmethod def reset(cls): """ reset any state associated with the data """ cls._nextNameIndex = 0
[docs] @classmethod def generateName(cls): """ get a name for the data set Uses the untitled name prefix and nextNameIndex to generate a dummy dataset name :returns: string containing generated name """ cls._nextNameIndex += 1 new_name = cls._untitledNamePrefix + '_' + str(cls._nextNameIndex) return new_name
# noinspection PyAttributeOutsideInit
[docs] def clone(self): """Make a clone of the data spec via deep copy preserving same spark session :returns: deep copy of test data generator definition """ old_spark_session = self.sparkSession old_logger = self.logger new_copy = None try: # temporarily set the spark session to null self.sparkSession = None # set logger to None before copy, disable pylint warning to ensure not triggered for this statement self.logger = None # pylint: disable=attribute-defined-outside-init new_copy = copy.deepcopy(self) new_copy.sparkSession = old_spark_session new_copy.buildPlanComputed = False new_copy._setupLogger() finally: # now set it back self.sparkSession = old_spark_session # set logger to old value, disable pylint warning to ensure not triggered for this statement self.logger = old_logger # pylint: disable=attribute-defined-outside-init return new_copy
@property def randomSeed(self): """ return the data generation spec random seed""" return self._instanceRandomSeed @property def random(self): """ return the data generation spec default random setting for columns to be used when an explicit `random` attribute setting is not supplied """ return self._defaultRandom def _markForPlanRegen(self): """Mark that build plan needs to be regenerated :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern """ self.buildPlanComputed = False return self
[docs] def explain(self, suppressOutput=False): """Explain the test data generation process :param suppressOutput: If True, suppress display of build plan :returns: String containing explanation of test data generation for this specification """ if not self.buildPlanComputed: self.computeBuildPlan() rc = self._rowCount tasks = self.partitions output = ["", "Data generation plan", "====================", f"spec=DateGenerator(name={self.name}, rows={rc}, startingId={self.starting_id}, partitions={tasks})", ")", "", f"seed column: {self._seedColumnName}", "", f"column build order: {self._buildOrder}", "", "build plan:"] for plan_action in self._buildPlan: output.append(" ==> " + plan_action) output.extend(["", "execution history:", ""]) for build_action in self.executionHistory: output.append(" ==> " + build_action) output.append("") output.append("====================") output.append("") explain_results = "\n".join(output) if not suppressOutput: print(explain_results) return explain_results
[docs] def withRowCount(self, rc): """Modify the row count - useful when starting a new spec from a clone :param rc: The count of rows to generate :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern """ self._rowCount = rc return self
[docs] @deprecated('Use `withRowCount` instead') def setRowCount(self, rc): """Modify the row count - useful when starting a new spec from a clone .. warning:: Method is deprecated - use `withRowCount` instead :param rc: The count of rows to generate :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern """ self.logger.warning("method `setRowCount` is deprecated, use `withRowCount` instead") return self.withRowCount(rc)
@property def rowCount(self): """ Return the row count This may differ from the original specified row counts, if counts need to be adjusted for purposes of keeping the ratio of rows to unique keys correct or other heuristics """ return self._rowCount
[docs] def withIdOutput(self): """ output seed column field (defaults to `id`) as a column in the generated data set if specified If this is not called, the seed column field is omitted from the final generated data set :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern """ self._columnSpecsByName[self._seedColumnName].omit = False self._markForPlanRegen() return self
[docs] def option(self, optionKey, optionValue): """ set option to option value for later processing :param optionKey: key for option :param optionValue: value for option :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern """ ensure(optionKey in self._allowed_keys) self._options[optionKey] = optionValue self._markForPlanRegen() return self
[docs] def options(self, **kwargs): """ set options in bulk Allows for multiple options with option=optionValue style of option passing :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern """ for key, value in kwargs.items(): self.option(key, value) self._markForPlanRegen() return self
def _processOptions(self): """ process options to give effect to the options supplied earlier :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern """ self.logger.info("options: %s", str(self._options)) for key, value in self._options.items(): if key == "startingId": self.starting_id = value elif key in ["rowCount", "row_count"]: self._rowCount = value return self
[docs] def describe(self): """ return description of the dataset generation spec :returns: Dict object containing key attributes of test data generator instance """ return { 'name': self.name, 'rowCount': self._rowCount, 'schema': self.schema, 'randomSeed': self._instanceRandomSeed, 'partitions': self.partitions, 'columnDefinitions': self._columnSpecsByName, 'debug': self.debug, 'verbose': self.verbose }
def __repr__(self): """ return the repr string for the class""" name = getattr(self, "name", "None") rows = getattr(self, "_rowCount", "None") partitions = getattr(self, "partitions", "None") return f"DataGenerator(name='{name}', rows={rows}, partitions={partitions})" def _checkFieldList(self): """Check field list for common errors Does not return anything but will assert / raise exceptions if errors occur """ ensure(self._inferredSchemaFields is not None, "schemaFields should be non-empty") ensure(type(self._inferredSchemaFields) is list, "schemaFields should be list") @property def schemaFields(self): """ get list of schema fields for final output schema :returns: list of fields in schema """ self._checkFieldList() return [fd for fd in self._inferredSchemaFields if not self._columnSpecsByName[fd.name].isFieldOmitted] @property def schema(self): """ infer spark output schema definition from the field specifications :returns: Spark SQL `StructType` for schema ..note:: If the data generation specification contains columns for which the datatype is inferred, the schema type for inferred columns may not be correct until the build command has completed. """ return StructType(self.schemaFields) @property def inferredSchema(self): """ infer spark interim schema definition from the field specifications ..note:: If the data generation specification contains columns for which the datatype is inferred, the schema type for inferred columns may not be correct until the build command has completed. """ self._checkFieldList() return StructType(self._inferredSchemaFields) def __getitem__(self, key): """ implement the built in derefernce by key behavior """ ensure(key is not None, "key should be non-empty") return self._columnSpecsByName[key]
[docs] def getColumnType(self, colName): """ Get column Spark SQL datatype for specified column :param colName: name of column as string :returns: Spark SQL datatype for named column """ ct = self._columnSpecsByName[colName].datatype return ct if ct is not None else IntegerType()
[docs] def isFieldExplicitlyDefined(self, colName): """ return True if column generation spec has been explicitly defined for column, else false .. note:: A column is not considered explicitly defined if it was inferred from a schema or added with a wildcard statement. This impacts whether the column can be redefined. """ ensure(colName is not None, "colName should be non-empty") col_def = self._columnSpecsByName.get(colName, None) return not col_def.implicit if col_def is not None else False
[docs] def getInferredColumnNames(self): """ get list of output columns """ return [fd.name for fd in self._inferredSchemaFields]
[docs] @staticmethod def flatten(lst): """ flatten list :param lst: list to flatten """ return [item for sublist in lst for item in sublist]
[docs] def getColumnSpec(self, name): """ get column spec for column having name supplied :param name: name of column to find spec for :return: column spec for named column if any """ assert name is not None and len(name.strip()) > 0, "column name must be non empty string" return self._columnSpecsByName[name]
[docs] def getOutputColumnNames(self): """ get list of output columns by flattening list of lists of column names normal columns will have a single column name but column definitions that result in multiple columns will produce a list of multiple names :returns: list of column names to be output in generated data set """ return self.flatten([self._columnSpecsByName[fd.name].getNames() for fd in self._inferredSchemaFields if not self._columnSpecsByName[fd.name].isFieldOmitted])
[docs] def getOutputColumnNamesAndTypes(self): """ get list of output columns by flattening list of lists of column names and types normal columns will have a single column name but column definitions that result in multiple columns will produce a list of multiple names """ return self.flatten([self._columnSpecsByName[fd.name].getNamesAndTypes() for fd in self._inferredSchemaFields if not self._columnSpecsByName[fd.name].isFieldOmitted])
[docs] def withSchema(self, sch): """ populate column definitions and specifications for each of the columns in the schema :param sch: Spark SQL schema, from which fields are added :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern """ ensure(sch is not None, "schema sch should be non-empty") self.__schema__ = sch for fs in sch.fields: self.withColumn(fs.name, fs.dataType, implicit=True, omit=False, nullable=fs.nullable) return self
def _computeRange(self, dataRange, minValue, maxValue, step): """ Compute merged range based on parameters :returns: effective minValue, maxValue, step as tuple """ # TODO: may also need to check for instance of DataRange if dataRange is not None and isinstance(dataRange, range): if maxValue is not None or minValue != 0 or step != 1: raise ValueError("You cant specify both a range and minValue, maxValue or step values") return dataRange.start, dataRange.stop, dataRange.step else: return minValue, maxValue, step
[docs] def withColumnSpecs(self, patterns=None, fields=None, matchTypes=None, **kwargs): """Add column specs for columns matching a) list of field names, b) one or more regex patterns c) type (as in pyspark.sql.types) :param patterns: patterns may specified a single pattern as a string or a list of patterns that match the column names. May be omitted. :param fields: a string specifying an explicit field to match , or a list of strings specifying explicit fields to match. May be omitted. :param matchTypes: a single Spark SQL datatype or list of Spark SQL data types to match. May be omitted. :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern .. note:: matchTypes may also take SQL type strings or a list of SQL type strings such as "array<integer>". However, you may not use ``INFER_DATYTYPE`` as part of the matchTypes list. You may also add a variety of options to further control the test data generation process. For full list of options, see :doc:`/reference/api/dbldatagen.column_spec_options`. """ if fields is not None and type(fields) is str: fields = [fields] if OPTION_RANDOM not in kwargs: kwargs[OPTION_RANDOM] = self._defaultRandom # add support for deprecated legacy names if "match_types" in kwargs: assert matchTypes is None, "Argument 'match_types' is deprecated, use 'matchTypes' instead" matchTypes = kwargs["match_types"] del kwargs["match_types"] # remove the legacy option from keyword args as they will be used later if matchTypes is not None and type(matchTypes) is not list: matchTypes = [matchTypes] # if only one match type, make a list of that match type if patterns is not None and type(patterns) is str: patterns = ["^" + patterns + "$"] elif type(patterns) is list: patterns = ["^" + pat + "$" for pat in patterns] all_fields = self.getInferredColumnNames() effective_fields = [x for x in all_fields if (fields is None or x in fields) and x != self._seedColumnName] if patterns is not None: effective_fields = [x for x in effective_fields for y in patterns if re.search(y, x) is not None] if matchTypes is not None: effective_types = [] for typ in matchTypes: if isinstance(typ, str): if typ == INFER_DATATYPE: raise ValueError("You cannot use INFER_DATATYPE with the method `withColumnSpecs`") effective_types.append(SchemaParser.columnTypeFromString(typ)) else: effective_types.append(typ) effective_fields = [x for x in effective_fields for y in effective_types if self.getColumnType(x) == y] for f in effective_fields: self.withColumnSpec(f, implicit=True, **kwargs) return self
def _checkColumnOrColumnList(self, columns, allowId=False): """ Check if column or columns refer to existing columns :param columns: a single column or list of columns as strings :param allowId: If True, allows the specialized seed column (which defaults to `id`) to be present in columns :returns: True if test passes """ inferred_columns = self.getInferredColumnNames() if allowId and columns == self._seedColumnName: return True if type(columns) is list: for column in columns: ensure(column in inferred_columns, f" column `{column}` must refer to defined column") else: ensure(columns in inferred_columns, f" column `{columns}` must refer to defined column") return True
[docs] def withColumnSpec(self, colName, minValue=None, maxValue=None, step=1, prefix=None, random=None, distribution=None, implicit=False, dataRange=None, omit=False, baseColumn=None, **kwargs): """ add a column specification for an existing column :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern You may also add a variety of options to further control the test data generation process. For full list of options, see :doc:`/reference/api/dbldatagen.column_spec_options`. """ ensure(colName is not None, "Must specify column name for column") ensure(colName in self.getInferredColumnNames(), f" column `{colName}` must refer to defined column") if baseColumn is not None: self._checkColumnOrColumnList(baseColumn) ensure(not self.isFieldExplicitlyDefined(colName), f"duplicate column spec for column `{colName}`") ensure(not isinstance(minValue, DataType), f"""unnecessary `datatype` argument specified for `withColumnSpec` for column `{colName}` - Datatype parameter is only needed for `withColumn` and not permitted for `withColumnSpec` """) if random is None: random = self._defaultRandom # handle migration of old `min` and `max` options if _OLD_MIN_OPTION in kwargs: assert minValue is None, \ "Only one of `minValue` and `minValue` can be specified. Use of `minValue` is preferred" minValue = kwargs[_OLD_MIN_OPTION] kwargs.pop(_OLD_MIN_OPTION, None) if _OLD_MAX_OPTION in kwargs: assert maxValue is None, \ "Only one of `maxValue` and `maxValue` can be specified. Use of `maxValue` is preferred" maxValue = kwargs[_OLD_MAX_OPTION] kwargs.pop(_OLD_MAX_OPTION, None) new_props = {} new_props.update(kwargs) self.logger.info("adding column spec - `%s` with baseColumn : `%s`, implicit : %s , omit %s", colName, baseColumn, implicit, omit) self._generateColumnDefinition(colName, self.getColumnType(colName), minValue=minValue, maxValue=maxValue, step=step, prefix=prefix, random=random, dataRange=dataRange, distribution=distribution, baseColumn=baseColumn, implicit=implicit, omit=omit, **new_props) return self
[docs] def hasColumnSpec(self, colName): """returns true if there is a column spec for the column :param colName: name of column to check for :returns: True if column has spec, False otherwise """ return colName in self._columnSpecsByName
[docs] def withColumn(self, colName, colType=StringType(), minValue=None, maxValue=None, step=1, dataRange=None, prefix=None, random=None, distribution=None, baseColumn=None, nullable=True, omit=False, implicit=False, noWarn=False, **kwargs): """ add a new column to the synthetic data generation specification :param colName: Name of column to add. If this conflicts with the underlying seed column (`id`), it is recommended that the seed column name is customized during the construction of the data generator spec. :param colType: Data type for column. This may be specified as either a type from one of the possible pyspark.sql.types (e.g. `StringType`, `DecimalType(10,3)` etc) or as a string containing a Spark SQL type definition (i.e `String`, `array<Integer>`, `map<String, Float>`) :param omit: if True, the column will be omitted from the final set of columns in the generated data. Used to create columns that are used by other columns as intermediate results. Defaults to False :param expr: Specifies SQL expression used to create column value. If specified, overrides the default rules for creating column value. Defaults to None :param baseColumn: String or list of columns to control order of generation of columns. If not specified, column is dependent on base seed column (which defaults to `id`) :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern .. note:: if the value ``None`` is used for the ``colType`` parameter, the method will try to use the underlying datatype derived from the base columns. If the value ``INFER_DATATYPE`` is used for the ``colType`` parameter and a SQL expression has been supplied via the ``expr`` parameter, the method will try to infer the column datatype from the SQL expression when the ``build()`` method is called. Inferred data types can only be used if the ``expr`` parameter is specified. Note that properties which return a schema based on the specification may not be accurate until the ``build()`` method is called. Prior to this, the schema may indicate a default column type for those fields. You may also add a variety of additional options to further control the test data generation process. For full list of options, see :doc:`/reference/api/dbldatagen.column_spec_options`. """ ensure(colName is not None, "Must specify column name for column") ensure(colType is not None, f"Must specify column type for column `{colName}`") if baseColumn is not None: self._checkColumnOrColumnList(baseColumn, allowId=True) if not noWarn and colName == self._seedColumnName: self.logger.warning(f"Adding a new column named '{colName}' overrides seed column '{self._seedColumnName}'") self.logger.warning("Use `seedColumName` option on DataGenerator construction for different seed column") # handle migration of old `min` and `max` options if _OLD_MIN_OPTION in kwargs: assert minValue is None, \ "Only one of `minValue` and `minValue` can be specified. Use of `minValue` is preferred" minValue = kwargs[_OLD_MIN_OPTION] kwargs.pop(_OLD_MIN_OPTION, None) if _OLD_MAX_OPTION in kwargs: assert maxValue is None, \ "Only one of `maxValue` and `maxValue` can be specified. Use of `maxValue` is preferred" maxValue = kwargs[_OLD_MAX_OPTION] kwargs.pop(_OLD_MAX_OPTION, None) if random is None: random = self._defaultRandom new_props = {} new_props.update(kwargs) self.logger.info("effective range: %s, %s, %s args: %s", minValue, maxValue, step, kwargs) self.logger.info("adding column - `%s` with baseColumn : `%s`, implicit : %s , omit %s", colName, baseColumn, implicit, omit) newColumn = self._generateColumnDefinition(colName, colType, minValue=minValue, maxValue=maxValue, step=step, prefix=prefix, random=random, distribution=distribution, baseColumn=baseColumn, dataRange=dataRange, implicit=implicit, omit=omit, **new_props) # note for inferred columns, the column type is initially sey to a StringType but may be superceded later self._inferredSchemaFields.append(StructField(colName, newColumn.datatype, nullable)) return self
def _mkSqlStructFromList(self, fields): """ Create a SQL struct expression from a list of fields :param fields: a list of elements that make up the SQL struct expression (each being a string or tuple) :returns: SQL expression to generate the struct .. note:: This method is used internally when creating struct columns. It is not intended for general use. Each element of the list may be a simple string, or a tuple. When the element is specified as a simple string, it must be the name of a previously defined column which will be used as both the field name within the struct and the SQL expression to generate the field value. When the element is specified as a tuple, it must be a tuple of two elements. The first element must be the name of the field within the struct. The second element must be a SQL expression that will be used to generate the field value, and may reference previously defined columns. """ assert fields is not None and isinstance(fields, list), \ "Fields must be a non-empty list of fields that make up the struct elements" assert len(fields) >= 1, "Fields must be a non-empty list of fields that make up the struct elements" struct_expressions = [] for fieldSpec in fields: if isinstance(fieldSpec, str): struct_expressions.append(f"'{fieldSpec}'") struct_expressions.append(fieldSpec) elif isinstance(fieldSpec, tuple): assert len(fieldSpec) == 2, "tuple must be field name and SQL expression strings" assert isinstance(fieldSpec[0], str), "First element must be field name string" assert isinstance(fieldSpec[1], str), "Second element must be field value SQL string" struct_expressions.append(f"'{fieldSpec[0]}'") struct_expressions.append(fieldSpec[1]) struct_expression = f"named_struct({','.join(struct_expressions)})" return struct_expression def _mkStructFromDict(self, fields): assert fields is not None and isinstance(fields, dict), \ "Fields must be a non-empty dict of fields that make up the struct elements" struct_expressions = [] for key, value in fields.items(): struct_expressions.append(f"'{key}'") if isinstance(value, str): struct_expressions.append(str(value)) elif isinstance(value, dict): struct_expressions.append(self._mkStructFromDict(value)) elif isinstance(value, list): array_expressions = ",".join([str(x) for x in value]) struct_expressions.append(f"array({array_expressions})") else: raise ValueError(f"Invalid field element for field `{key}`") struct_expression = f"named_struct({','.join(struct_expressions)})" return struct_expression
[docs] def withStructColumn(self, colName, fields=None, asJson=False, **kwargs): """ Add a struct column to the synthetic data generation specification. This will add a new column composed of a struct of the specified fields. :param colName: name of column :param fields: list of elements to compose as a struct valued column (each being a string or tuple), or a dict outlining the structure of the struct column :param asJson: If False, generate a struct valued column. If True, generate a JSON string column :param kwargs: keyword arguments to pass to the underlying column generators as per `withColumn` :return: A modified in-place instance of data generator allowing for chaining of calls following the Builder pattern .. note:: Additional options for the field specification may be specified as keyword arguments. The fields specification specified by the `fields` argument may be : - A list of field references (`strings`) which will be used as both the field name and the SQL expression - A list of tuples of the form **(field_name, field_expression)** where `field_name` is the name of the field. In that case, the `field_expression` string should be a SQL expression to generate the field value - A Python dict outlining the structure of the struct column. The keys of the dict are the field names When using the `dict` form of the field specifications, a field whose value is a list will be treated as creating a SQL array literal. """ assert fields is not None and isinstance(fields, (list, dict)), \ "Fields argument must be a list of field specifications or dict outlining the target structure " assert type(colName) is str and len(colName) > 0, "Must specify a column name" if isinstance(fields, list): assert len(fields) > 0, \ "Must specify at least one field for struct column" struct_expr = self._mkSqlStructFromList(fields) elif isinstance(fields, dict): struct_expr = self._mkStructFromDict(fields) else: raise ValueError(f"Invalid field specification for struct column `{colName}`") if asJson: output_expr = f"to_json({struct_expr})" newDf = self.withColumn(colName, StringType(), expr=output_expr, **kwargs) else: newDf = self.withColumn(colName, INFER_DATATYPE, expr=struct_expr, **kwargs) return newDf
def _generateColumnDefinition(self, colName, colType=None, baseColumn=None, implicit=False, omit=False, nullable=True, **kwargs): """ generate field definition and column spec .. note:: Any time that a new column definition is added, we'll mark that the build plan needs to be regenerated. For our purposes, the build plan determines the order of column generation etc. :returns: Newly added column_spec """ if colType is None: colType = self.getColumnType(baseColumn) if colType == INFER_DATATYPE: raise ValueError("When base column(s) have inferred datatype, you must specify the column type") new_props = {} new_props.update(kwargs) # if the column has the option `random` set to true # then use the instance level random seed # otherwise use the default random seed for the class if OPTION_RANDOM_SEED in new_props: effective_random_seed = new_props[OPTION_RANDOM_SEED] new_props.pop(OPTION_RANDOM_SEED) new_props[OPTION_RANDOM] = True # if random seed has override but randomSeedMethod does not # set it to fixed if OPTION_RANDOM_SEED_METHOD not in new_props: new_props[OPTION_RANDOM_SEED_METHOD] = RANDOM_SEED_FIXED elif OPTION_RANDOM in new_props and new_props[OPTION_RANDOM]: effective_random_seed = self._instanceRandomSeed else: effective_random_seed = self._randomSeed # handle column level override if OPTION_RANDOM_SEED_METHOD in new_props: effective_random_seed_method = new_props[OPTION_RANDOM_SEED_METHOD] new_props.pop(OPTION_RANDOM_SEED_METHOD) else: effective_random_seed_method = self._seedMethod column_spec = ColumnGenerationSpec(colName, colType, baseColumn=baseColumn, implicit=implicit, omit=omit, randomSeed=effective_random_seed, randomSeedMethod=effective_random_seed_method, nullable=nullable, verbose=self.verbose, debug=self.debug, seedColumnName=self._seedColumnName, **new_props) self._columnSpecsByName[colName] = column_spec # if column spec for column already exists - remove it items_to_remove = [x for x in self._allColumnSpecs if x.name == colName] for x in items_to_remove: self._allColumnSpecs.remove(x) self._allColumnSpecs.append(column_spec) # mark that the build plan needs to be regenerated self._markForPlanRegen() return column_spec def _getBaseDataFrame(self, startId=0, streaming=False, options=None): """ generate the base data frame and seed column (which defaults to `id`) , partitioning the data if necessary This is used when generating the test data. A base data frame is created and then each of the additional columns are generated, according to base column dependency order, and added to the base data frame using expressions or withColumn statements. :returns: Spark data frame for base data that drives the data generation """ end_id = self._rowCount + startId id_partitions = self.partitions if self.partitions is not None else 4 if not streaming: status = f"Generating data frame with ids from {startId} to {end_id} with {id_partitions} partitions" self.logger.info(status) self.executionHistory.append(status) df1 = self.sparkSession.range(start=startId, end=end_id, numPartitions=id_partitions) # spark.range generates a dataframe with the column `id` so rename it if its not our seed column if SPARK_RANGE_COLUMN != self._seedColumnName: df1 = df1.withColumnRenamed(SPARK_RANGE_COLUMN, self._seedColumnName) else: status = ( f"Generating streaming data frame with ids from {startId} to {end_id} with {id_partitions} partitions") self.logger.info(status) self.executionHistory.append(status) df1 = (self.sparkSession.readStream .format("rate")) if options is not None: if "rowsPerSecond" not in options: options['rowsPerSecond'] = 1 if "numPartitions" not in options: options['numPartitions'] = id_partitions for k, v in options.items(): df1 = df1.option(k, v) df1 = (df1.load() .withColumnRenamed("value", self._seedColumnName) ) else: df1 = (df1.option("rowsPerSecond", 1) .option("numPartitions", id_partitions) .load() .withColumnRenamed("value", self._seedColumnName) ) return df1 def _computeColumnBuildOrder(self): """ compute the build ordering using a topological sort on dependencies In order to avoid references to columns that have not yet been generated, the test data generation process sorts the columns according to the order they need to be built. This determines which columns are built first. The test generation process will select the columns in the correct order at the end so that the columns appear in the correct order in the final output. :returns: the build ordering """ dependency_ordering = [(x.name, set(x.dependencies)) if x.name != self._seedColumnName else ( self._seedColumnName, set()) for x in self._allColumnSpecs] self.logger.info("dependency list: %s", str(dependency_ordering)) self._buildOrder = list( topologicalSort(dependency_ordering, flatten=False, initial_columns=[self._seedColumnName])) self.logger.info("columnBuildOrder: %s", str(self._buildOrder)) self._buildOrder = self._adjustBuildOrderForSqlDependencies(self._buildOrder, self._columnSpecsByName) return self._buildOrder def _adjustBuildOrderForSqlDependencies(self, buildOrder, columnSpecsByName): """ Adjust column build order according to the following heuristics 1: if the column being built in a specific build order phase has a SQL expression and it references other columns in the same build phase (or potentially references them as the expression parsing is primitive), separate that phase into multiple phases. It will also issue a warning if the SQL expression appears to reference a column built later :param buildOrder: list of lists of ids - each sublist represents phase of build :param columnSpecsByName: dictionary to map column names to column specs :returns: Spark SQL dataframe of generated test data """ new_build_order = [] all_columns = set([item for sublist in buildOrder for item in sublist]) built_columns = [] prior_phase_built_columns = [] # for each phase, evaluate it to see if it needs to be split for current_phase in buildOrder: separate_phase_columns = [] for columnBeingBuilt in current_phase: if columnBeingBuilt in columnSpecsByName: cs = columnSpecsByName[columnBeingBuilt] if cs.expr is not None: sql_references = SchemaParser.columnsReferencesFromSQLString(cs.expr, filterItems=all_columns) # determine references to columns not yet built forward_references = set(sql_references) - set(built_columns) if len(forward_references) > 0: msg = f"Column '{columnBeingBuilt} may have forward references to {forward_references}." self.logger.warning(msg) self.logger.warning("Use `baseColumn` attribute to correct build ordering if necessary") references_not_yet_built = set(sql_references) - set(prior_phase_built_columns) if len(references_not_yet_built.intersection(set(current_phase))) > 0: separate_phase_columns.append(columnBeingBuilt) # for each column, get the set of sql references and filter against column names built_columns.append(columnBeingBuilt) if len(separate_phase_columns) > 0: # split phase based on columns in separate_phase_column_list set revised_phase = split_list_matching_condition(current_phase, lambda el, cols=separate_phase_columns: el in cols) new_build_order.extend(revised_phase) else: # no change to phase new_build_order.append(current_phase) prior_phase_built_columns.extend(current_phase) return new_build_order @property def build_order(self): """ return the build order minus the seed column (which defaults to `id`) The build order will be a list of lists - each list specifying columns that can be built at the same time """ if not self.buildPlanComputed: self.computeBuildPlan() return [x for x in self._buildOrder if x != [self._seedColumnName]] def _getColumnDataTypes(self, columns): """ Get data types for columns :param columns: list of columns to retrieve data types for """ return [self._columnSpecsByName[colspec].datatype for colspec in columns]
[docs] def computeBuildPlan(self): """ prepare for building by computing a pseudo build plan The build plan is not a true build plan - it is only used for debugging purposes, but does not actually drive the column generation order. :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern """ self._buildPlan = [] self.executionHistory = [] self._processOptions() self._buildPlan.append(f"Build Spark data frame with seed column: '{self._seedColumnName}'") # add temporary columns for cs in self._allColumnSpecs: # extend overall build plan with build plan of each column spec self._buildPlan.extend(cs._initialBuildPlan) # handle generation of any temporary columns for tmp_col in cs.temporaryColumns: # create column spec for temporary column if its not already present if not self.hasColumnSpec(tmp_col[0]): self._buildPlan.append(f"materializing temporary column {tmp_col[0]}") self.withColumn(tmp_col[0], tmp_col[1], **tmp_col[2]) # TODO: set up the base column data type information for cs in self._allColumnSpecs: base_column_datatypes = self._getColumnDataTypes(cs.baseColumns) cs.setBaseColumnDatatypes(base_column_datatypes) self._computeColumnBuildOrder() for x1 in self._buildOrder: for x in x1: cs = self._columnSpecsByName[x] self._buildPlan.append(cs.getPlanEntry()) self.buildPlanComputed = True return self
[docs] def build(self, withTempView=False, withView=False, withStreaming=False, options=None): """ build the test data set from the column definitions and return a dataframe for it if `withStreaming` is True, generates a streaming data set. Use options to control the rate of generation of test data if streaming is used. For example: `dfTestData = testDataSpec.build(withStreaming=True,options={ 'rowsPerSecond': 5000})` :param withTempView: if True, automatically creates temporary view for generated data set :param withView: If True, automatically creates global view for data set :param withStreaming: If True, generates data using Spark Structured Streaming Rate source suitable for writing with `writeStream` :param options: optional Dict of options to control generating of streaming data :returns: Spark SQL dataframe of generated test data """ self.logger.debug("starting build ... withStreaming [%s]", withStreaming) self.executionHistory = [] self.computeBuildPlan() output_columns = self.getOutputColumnNames() ensure(output_columns is not None and len(output_columns) > 0, """ | You must specify at least one column for output | - use withIdOutput() to output base seed column """) df1 = self._getBaseDataFrame(self.starting_id, streaming=withStreaming, options=options) self.executionHistory.append("Using Pandas Optimizations {True}") # build columns df1 = self._buildColumnExpressionsWithSelects(df1) df1 = df1.select(*self.getOutputColumnNames()) self.executionHistory.append(f"selecting columns: {self.getOutputColumnNames()}") # register temporary or global views if necessary if withView: self.executionHistory.append("registering view") self.logger.info("Registered global view [%s]", self.name) df1.createGlobalTempView(self.name) self.logger.info("Registered!") elif withTempView: self.executionHistory.append("registering temp view") self.logger.info("Registering temporary view [%s]", self.name) df1.createOrReplaceTempView(self.name) self.logger.info("Registered!") return df1
# noinspection PyProtectedMember def _buildColumnExpressionsWithSelects(self, df1): """ Build column generation expressions with selects :param df1: dataframe for base data generator :return: new dataframe The data generator build plan is separated into `rounds` of expressions. Each round consists of expressions that are generated using a single `select` operation """ self.executionHistory.append("Generating data with selects") # generation with selects may be more efficient as less intermediate data frames # are generated resulting in shorter lineage for colNames in self.build_order: build_round = ["*"] column_specs_applied = [] inx_col = 0 self.executionHistory.append(f"building stage for columns: {colNames}") for colName in colNames: col1 = self._columnSpecsByName[colName] column_generators = col1.makeGenerationExpressions() self.executionHistory.extend(col1.executionHistory) if type(column_generators) is list and len(column_generators) == 1: build_round.append(column_generators[0].alias(colName)) elif type(column_generators) is list and len(column_generators) > 1: i = 0 for cg in column_generators: build_round.append(cg.alias(f'{colName}_{i}')) i += 1 else: build_round.append(column_generators.alias(colName)) column_specs_applied.append(col1) inx_col = inx_col + 1 df1 = df1.select(*build_round) # apply any post select processing for cs in column_specs_applied: cs._onSelect(df1) return df1 def _sqlTypeFromSparkType(self, dt): """Get sql type for spark type :param dt: instance of Spark SQL type such as IntegerType() """ return dt.simpleString() def _mkInsertOrUpdateStatement(self, columns, srcAlias, substitutions, isUpdate=True): if substitutions is None: substitutions = [] results = [] subs = {} for x in columns: subs[x] = f"{srcAlias}.{x}" for x in substitutions: subs[x[0]] = x[1] for substitution_col in columns: new_val = subs[substitution_col] if isUpdate: results.append(f"{substitution_col}={new_val}") else: results.append(f"{new_val}") return ", ".join(results)
[docs] def scriptTable(self, name=None, location=None, tableFormat="delta", asHtml=False): """ generate create table script suitable for format of test data set :param name: name of table to use in generated script :param location: path to location of data. If specified (default is None), will generate an external table definition. :param tableFormat: table format for table :param asHtml: if true, generate output suitable for use with `displayHTML` method in notebook environment :returns: SQL string for scripted table """ assert name is not None, "`name` must be specified" self.computeBuildPlan() output_columns = self.getOutputColumnNamesAndTypes() results = [f"CREATE TABLE IF NOT EXISTS {name} ("] ensure(output_columns is not None and len(output_columns) > 0, """ | You must specify at least one column for output | - use withIdOutput() to output base seed column """) col_expressions = [] for col_to_output in output_columns: col_expressions.append(f" {col_to_output[0]} {self._sqlTypeFromSparkType(col_to_output[1])}") results.append(",\n".join(col_expressions)) results.append(")") results.append(f"using {tableFormat}") if location is not None: results.append(f"location '{location}'") results = "\n".join(results) if asHtml: results = HtmlUtils.formatCodeAsHtml(results) return results
[docs] def scriptMerge(self, tgtName=None, srcName=None, updateExpr=None, delExpr=None, joinExpr=None, timeExpr=None, insertExpr=None, useExplicitNames=True, updateColumns=None, updateColumnExprs=None, insertColumns=None, insertColumnExprs=None, srcAlias="src", tgtAlias="tgt", asHtml=False ): """ generate merge table script suitable for format of test data set :param tgtName: name of target table to use in generated script :param tgtAlias: alias for target table - defaults to `tgt` :param srcName: name of source table to use in generated script :param srcAlias: alias for source table - defaults to `src` :param updateExpr: optional string representing updated condition. If not present, then any row that does not match join condition is considered an update :param delExpr: optional string representing delete condition - For example `src.action='DEL'`. If not present, no delete clause is generated :param insertExpr: optional string representing insert condition - If not present, there is no condition on insert other than no match :param joinExpr: string representing join condition. For example, `tgt.id=src.id` :param timeExpr: optional time travel expression - for example : `TIMESTAMP AS OF timestamp_expression` or `VERSION AS OF version` :param insertColumns: Optional list of strings designating columns to insert. If not supplied, uses all columns defined in spec :param insertColumnExprs: Optional list of strings designating designating column expressions for insert. By default, will use src column as insert value into target table. This should have the form [ ("insert_column_name", "insert column expr"), ...] :param updateColumns: List of strings designating columns to update. If not supplied, uses all columns defined in spec :param updateColumnExprs: Optional list of strings designating designating column expressions for update. By default, will use src column as update value for target table. This should have the form [ ("update_column_name", "update column expr"), ...] :param useExplicitNames: If True, generate explicit column names in insert and update statements :param asHtml: if true, generate output suitable for use with `displayHTML` method in notebook environment :returns: SQL string for scripted merge statement """ assert tgtName is not None, "you must specify a target table" assert srcName is not None, "you must specify a source table" assert joinExpr is not None, "you must specify a join expression" self.computeBuildPlan() # get list of column names output_columns = [x[0] for x in self.getOutputColumnNamesAndTypes()] ensure(output_columns is not None and len(output_columns) > 0, """ | You must specify at least one column for output | - use withIdOutput() to output base seed column """) # use list of column names if not supplied if insertColumns is None: insertColumns = output_columns if updateColumns is None: updateColumns = output_columns # build merge statement results = [f"MERGE INTO `{tgtName}` as {tgtAlias}"] # use time expression if supplied if timeExpr is None: results.append(f"USING `{srcName}` as {srcAlias}") else: results.append(f"USING `{srcName}` {timeExpr} as {srcAlias}") # add join condition results.append(f"ON {joinExpr}") # generate update clause update_clause = None if updateExpr is not None: update_clause = f"WHEN MATCHED and {updateExpr} THEN UPDATE " else: update_clause = "WHEN MATCHED THEN UPDATE " if not useExplicitNames: update_clause = update_clause + " SET *" else: update_clause = (update_clause + " SET " + self._mkInsertOrUpdateStatement(columns=updateColumns, srcAlias=srcAlias, substitutions=updateColumnExprs)) results.append(update_clause) # generate delete clause if delExpr is not None: results.append(f"WHEN MATCHED and {delExpr} THEN DELETE") if insertExpr is not None: ins_clause = f"WHEN NOT MATCHED and {insertExpr} THEN INSERT " else: ins_clause = "WHEN NOT MATCHED THEN INSERT " if not useExplicitNames: ins_clause = ins_clause + " *" else: ins_clause = (ins_clause + "(" + ",".join(insertColumns) + ") VALUES (" + self._mkInsertOrUpdateStatement(columns=insertColumns, srcAlias=srcAlias, substitutions=insertColumnExprs, isUpdate=False) + ")" ) results.append(ins_clause) result = "\n".join(results) if asHtml: result = HtmlUtils.formatCodeAsHtml(results) return result