Spatial aggregation functions
st_asgeojsontile_agg
- st_asgeojsontile_agg(geom, attributes)
Generates GeoJSON vector tiles from a group by statement over aggregated geometry column.
geom
column is WKB, WKT, or GeoJSON.attributes
column is a Spark struct; it requires minimally “id”.
- Parameters:
geom (Column) – A grouped column containing geometries.
attributes (Column(StructType)) – The attributes column to aggregate.
- Return type:
Column
- Example:
df.groupBy()\
.agg(mos.st_asgeojsontile_agg("geom", struct("id"))).limit(1).display()
+----------------------------------------------------------------------------------------------------------------+
| st_asgeojsontile_agg(geom, struct(id)) |
+----------------------------------------------------------------------------------------------------------------+
| {"type": "FeatureCollection", "name": "tiles", "crs": { |
| "type": "name", "properties": { "name": "urn:ogc:def:crs:OGC:1.3:CRS84" } }, "features": [ ... ] } |
+----------------------------------------------------------------------------------------------------------------+
df.groupBy()
.agg(st_asgeojsontile_agg(col("geom"), struct(col("id"))).limit(1).show
+----------------------------------------------------------------------------------------------------------------+
| st_asgeojsontile_agg(geom, struct(id)) |
+----------------------------------------------------------------------------------------------------------------+
| {"type": "FeatureCollection", "name": "tiles", "crs": { |
| "type": "name", "properties": { "name": "urn:ogc:def:crs:OGC:1.3:CRS84" } }, "features": [ ... ] } |
+----------------------------------------------------------------------------------------------------------------+
SELECT st_asgeojsontile_agg(geom, struct(id))
FROM table
GROUP BY 1
+----------------------------------------------------------------------------------------------------------------+
| st_asgeojsontile_agg(geom, struct(id)) |
+----------------------------------------------------------------------------------------------------------------+
| {"type": "FeatureCollection", "name": "tiles", "crs": { |
| "type": "name", "properties": { "name": "urn:ogc:def:crs:OGC:1.3:CRS84" } }, "features": [ ... ] } |
+----------------------------------------------------------------------------------------------------------------+
st_asmvttile_agg
- st_asmvttile_agg(geom, attributes, zxyID)
Generates Mapbox Vector Tiles from a group by statement over aggregated geometry column.
- Parameters:
geom (Column) – A grouped column containing geometries.
attributes (Column(StringType)) – the attributes column to aggregate.
zxyID – the zxyID column to aggregate.
- Return type:
Column
Note
- Notes
geom
column must be represented using the Mosaic Internal Geometry, e.g. usingST_GeomFrom[WKB|WKT|GeoJSON]
.The geometry used in this operation must have an SRID set. Use e.g.
ST_SetSRID
orST_UpdateSRID
to achieve this.MVT tiles require the SRID to be set to EPSG::3857.
attributes
column is a Spark struct; it requires at least an “id” member.
- example:
df.groupBy()\
.agg(mos.st_asmvttile_agg("geom_3857", struct("id"), "zxyID")).limit(1).display()
+----------------------------------------------------------------------------------------------------------------+
| st_asmvttile_agg(geom_3857, struct(id), zxyID) |
+----------------------------------------------------------------------------------------------------------------+
| H4sIAAAAAAAAA5Ny5GItycxJLRZSFmJiYJBgVpLmfKXxwySIgYmZg5mJkZGRgYGRiZGFFYgZ+KWYMlOUuDQavk05e+ntl1fCGg0KFUwA... |
+----------------------------------------------------------------------------------------------------------------+
df.groupBy()
.agg(st_asmvttiletile_agg(col("geom_3857"), struct(col("id")), col("zxyID")).limit(1).show
+----------------------------------------------------------------------------------------------------------------+
| st_asmvttile_agg(geom_3857, struct(id), zxyID) |
+----------------------------------------------------------------------------------------------------------------+
| H4sIAAAAAAAAA5Ny5GItycxJLRZSFmJiYJBgVpLmfKXxwySIgYmZg5mJkZGRgYGRiZGFFYgZ+KWYMlOUuDQavk05e+ntl1fCGg0KFUwA... |
+----------------------------------------------------------------------------------------------------------------+
SELECT st_asmvttile_agg(geom_3857, struct(id), zxyID)
FROM table
GROUP BY 1
+----------------------------------------------------------------------------------------------------------------+
| st_asmvttile_agg(geom_3857, struct(id), zxyID) |
+----------------------------------------------------------------------------------------------------------------+
| H4sIAAAAAAAAA5Ny5GItycxJLRZSFmJiYJBgVpLmfKXxwySIgYmZg5mJkZGRgYGRiZGFFYgZ+KWYMlOUuDQavk05e+ntl1fCGg0KFUwA... |
+----------------------------------------------------------------------------------------------------------------+
rst_combineavg_agg
- rst_combineavg_agg(tile)
Aggregates raster tiles by averaging pixel values.
- Parameters:
tile (Column (RasterTileType)) – A grouped column containing raster tiles.
- Return type:
Column: RasterTileType
Note
- Notes
Each
tile
must have the same extent, number of bands, pixel data type, pixel size and coordinate reference system.The output raster will have the same extent, number of bands, pixel data type, pixel size and coordinate reference system as the input tiles.
Also, see rst_combineavg_agg function.
df.groupBy()\
.agg(mos.rst_combineavg_agg("tile").limit(1).display()
+----------------------------------------------------------------------------------------------------------------+
| rst_combineavg_agg(tile) |
+----------------------------------------------------------------------------------------------------------------+
| {index_id: 593308294097928191, raster: [00 01 10 ... 00], parentPath: "dbfs:/path_to_file", driver: "NetCDF" } |
+----------------------------------------------------------------------------------------------------------------+
df.groupBy()
.agg(rst_combineavg_agg(col("tile")).limit(1).show
+----------------------------------------------------------------------------------------------------------------+
| rst_combineavg_agg(tile) |
+----------------------------------------------------------------------------------------------------------------+
| {index_id: 593308294097928191, raster: [00 01 10 ... 00], parentPath: "dbfs:/path_to_file", driver: "NetCDF" } |
+----------------------------------------------------------------------------------------------------------------+
SELECT rst_combineavg_agg(tile)
FROM table
GROUP BY 1
+----------------------------------------------------------------------------------------------------------------+
| rst_combineavg_agg(tile) |
+----------------------------------------------------------------------------------------------------------------+
| {index_id: 593308294097928191, raster: [00 01 10 ... 00], parentPath: "dbfs:/path_to_file", driver: "NetCDF" } |
+----------------------------------------------------------------------------------------------------------------+
rst_derivedband_agg
- rst_derivedband_agg(tile, python_func, func_name)
Combines a group by statement over aggregated raster tiles by using the provided python function.
- Parameters:
tile (Column (RasterTileType)) – A grouped column containing raster tile(s).
python_func (Column (StringType)) – A function to evaluate in python.
func_name (Column (StringType)) – name of the function to evaluate in python.
- Return type:
Column: RasterTileType
Note
- Notes
Input raster tiles in
tile
must have the same extent, number of bands, pixel data type, pixel size and coordinate reference system.The output raster will have the same the same extent, number of bands, pixel data type, pixel size and coordinate reference system as the input raster tiles.
- example:
from textwrap import dedent
df\
.select(
"date", "tile",
F.lit(dedent(
"""
import numpy as np
def average(in_ar, out_ar, xoff, yoff, xsize, ysize, raster_xsize, raster_ysize, buf_radius, gt, **kwargs):
out_ar[:] = np.sum(in_ar, axis=0) / len(in_ar)
""")).alias("py_func1"),
F.lit("average").alias("func1_name")
)\
.groupBy("date", "py_func1", "func1_name")\
.agg(mos.rst_derivedband_agg("tile","py_func1","func1_name")).limit(1).display()
+----------------------------------------------------------------------------------------------------------------+
| rst_derivedband_agg(tile,py_func1,func1_name) |
+----------------------------------------------------------------------------------------------------------------+
| {index_id: 593308294097928191, raster: [00 01 10 ... 00], parentPath: "dbfs:/path_to_file", driver: "NetCDF" } |
+----------------------------------------------------------------------------------------------------------------+
df
.select(
"date", "tile"
lit(
"""
|import numpy as np
|def average(in_ar, out_ar, xoff, yoff, xsize, ysize, raster_xsize, raster_ysize, buf_radius, gt, **kwargs):
| out_ar[:] = np.sum(in_ar, axis=0) / len(in_ar)
|""".stripMargin).as("py_func1"),
lit("average").as("func1_name")
)
.groupBy("date", "py_func1", "func1_name")
.agg(mos.rst_derivedband_agg("tile","py_func1","func1_name")).limit(1).show
+----------------------------------------------------------------------------------------------------------------+
| rst_derivedband_agg(tile,py_func1,func1_name) |
+----------------------------------------------------------------------------------------------------------------+
| {index_id: 593308294097928191, raster: [00 01 10 ... 00], parentPath: "dbfs:/path_to_file", driver: "NetCDF" } |
+----------------------------------------------------------------------------------------------------------------+
SELECT
date, py_func1, func1_name,
rst_derivedband_agg(tile, py_func1, func1_name)
FROM SELECT (
date, tile,
"""
import numpy as np
def average(in_ar, out_ar, xoff, yoff, xsize, ysize, raster_xsize, raster_ysize, buf_radius, gt, **kwargs):
out_ar[:] = np.sum(in_ar, axis=0) / len(in_ar)
""" as py_func1,
"average" as func1_name
FROM table
)
GROUP BY date, py_func1, func1_name
LIMIT 1
+----------------------------------------------------------------------------------------------------------------+
| rst_derivedband_agg(tile,py_func1,func1_name) |
+----------------------------------------------------------------------------------------------------------------+
| {index_id: 593308294097928191, raster: [00 01 10 ... 00], parentPath: "dbfs:/path_to_file", driver: "NetCDF" } |
+----------------------------------------------------------------------------------------------------------------+
rst_merge_agg
- rst_merge_agg(tile)
Aggregates raster tiles into a single raster.
- Parameters:
tile (Column (RasterTileType)) – A column containing raster tiles.
- Return type:
Column: RasterTileType
Note
Notes
- Input tiles in
tile
: are not required to have the same extent.
must have the same coordinate reference system.
must have the same pixel data type.
will be combined using the
gdalwarp
command.require a
noData
value to have been initialised (if this is not the case, the non valid pixels may introduce artifacts in the output raster).will be stacked in the order they are provided. - This order is randomized since this is an aggregation function. - If the order of rasters is important please first collect rasters and sort them by metadata information and then use rst_merge function.
- The resulting output raster will have:
an extent that covers all of the input tiles;
the same number of bands as the input tiles;
the same pixel type as the input tiles;
the same pixel size as the highest resolution input tiles; and
the same coordinate reference system as the input tiles.
See also rst_merge function.
- example:
df.groupBy("date")\
.agg(mos.rst_merge_agg("tile")).limit(1).display()
+----------------------------------------------------------------------------------------------------------------+
| rst_merge_agg(tile) |
+----------------------------------------------------------------------------------------------------------------+
| {index_id: 593308294097928191, raster: [00 01 10 ... 00], parentPath: "dbfs:/path_to_file", driver: "NetCDF" } |
+----------------------------------------------------------------------------------------------------------------+
df.groupBy("date")
.agg(rst_merge_agg(col("tile"))).limit(1).show
+----------------------------------------------------------------------------------------------------------------+
| rst_merge_agg(tile) |
+----------------------------------------------------------------------------------------------------------------+
| {index_id: 593308294097928191, raster: [00 01 10 ... 00], parentPath: "dbfs:/path_to_file", driver: "NetCDF" } |
+----------------------------------------------------------------------------------------------------------------+
SELECT rst_merge_agg(tile)
FROM table
GROUP BY date
+----------------------------------------------------------------------------------------------------------------+
| rst_merge_agg(tile) |
+----------------------------------------------------------------------------------------------------------------+
| {index_id: 593308294097928191, raster: [00 01 10 ... 00], parentPath: "dbfs:/path_to_file", driver: "NetCDF" } |
+----------------------------------------------------------------------------------------------------------------+
st_intersects_agg
- st_intersects_agg(leftIndex, rightIndex)
Returns
true
if any of theleftIndex
andrightIndex
pairs intersect.- Parameters:
leftIndex (Column) – Geometry
rightIndex (Column) – Geometry
- Return type:
Column
- Example:
left_df = (
spark.createDataFrame([{'geom': 'POLYGON ((0 0, 0 3, 3 3, 3 0))'}])
.select(grid_tessellateexplode(col("geom"), lit(1)).alias("left_index"))
)
right_df = (
spark.createDataFrame([{'geom': 'POLYGON ((2 2, 2 4, 4 4, 4 2))'}])
.select(grid_tessellateexplode(col("geom"), lit(1)).alias("right_index"))
)
(
left_df
.join(right_df, col("left_index.index_id") == col("right_index.index_id"))
.groupBy()
.agg(st_intersects_agg(col("left_index"), col("right_index")))
).show(1, False)
+------------------------------------------------+
|st_intersects_agg(left_index, right_index)|
+------------------------------------------------+
|true |
+------------------------------------------------+
val leftDf = List("POLYGON ((0 0, 0 3, 3 3, 3 0))").toDF("geom")
.select(grid_tessellateexplode($"geom", lit(1)).alias("left_index"))
val rightDf = List("POLYGON ((2 2, 2 4, 4 4, 4 2))").toDF("geom")
.select(grid_tessellateexplode($"geom", lit(1)).alias("right_index"))
leftDf
.join(rightDf, $"left_index.index_id" === $"right_index.index_id")
.groupBy()
.agg(st_intersects_agg($"left_index", $"right_index"))
.show(false)
+------------------------------------------------+
|st_intersects_agg(left_index, right_index)|
+------------------------------------------------+
|true |
+------------------------------------------------+
WITH l AS (SELECT grid_tessellateexplode("POLYGON ((0 0, 0 3, 3 3, 3 0))", 1) AS left_index),
r AS (SELECT grid_tessellateexplode("POLYGON ((2 2, 2 4, 4 4, 4 2))", 1) AS right_index)
SELECT st_intersects_agg(l.left_index, r.right_index)
FROM l INNER JOIN r on l.left_index.index_id = r.right_index.index_id
+------------------------------------------------+
|st_intersects_agg(left_index, right_index)|
+------------------------------------------------+
|true |
+------------------------------------------------+
df.l <- select(
createDataFrame(data.frame(geom = "POLYGON ((0 0, 0 3, 3 3, 3 0))")),
alias(grid_tessellateexplode(column("geom"), lit(1L)), "left_index")
)
df.r <- select(
createDataFrame(data.frame(geom = "POLYGON ((2 2, 2 4, 4 4, 4 2))")),
alias(grid_tessellateexplode(column("geom"), lit(1L)), "right_index")
)
showDF(
select(
join(df.l, df.r, df.l$left_index.index_id == df.r$right_index.index_id),
st_intersects_agg(column("left_index"), column("right_index"))
), truncate=F
)
+------------------------------------------------+
|st_intersects_agg(left_index, right_index)|
+------------------------------------------------+
|true |
+------------------------------------------------+
st_intersection_agg
- st_intersection_agg(leftIndex, rightIndex)
Computes the intersections of
leftIndex
andrightIndex
and returns the union of these intersections.- Parameters:
leftIndex (Column) – Geometry
rightIndex (Column) – Geometry
- Return type:
Column
- Example:
left_df = (
spark.createDataFrame([{'geom': 'POLYGON ((0 0, 0 3, 3 3, 3 0))'}])
.select(grid_tessellateexplode(col("geom"), lit(1)).alias("left_index"))
)
right_df = (
spark.createDataFrame([{'geom': 'POLYGON ((2 2, 2 4, 4 4, 4 2))'}])
.select(grid_tessellateexplode(col("geom"), lit(1)).alias("right_index"))
)
(
left_df
.join(right_df, col("left_index.index_id") == col("right_index.index_id"))
.groupBy()
.agg(st_astext(st_intersection_agg(col("left_index"), col("right_index"))))
).show(1, False)
+--------------------------------------------------------------+
|convert_to(st_intersection_agg(left_index, right_index))|
+--------------------------------------------------------------+
|POLYGON ((2 2, 3 2, 3 3, 2 3, 2 2)) |
+--------------------------------------------------------------+
val leftDf = List("POLYGON ((0 0, 0 3, 3 3, 3 0))").toDF("geom")
.select(grid_tessellateexplode($"geom", lit(1)).alias("left_index"))
val rightDf = List("POLYGON ((2 2, 2 4, 4 4, 4 2))").toDF("geom")
.select(grid_tessellateexplode($"geom", lit(1)).alias("right_index"))
leftDf
.join(rightDf, $"left_index.index_id" === $"right_index.index_id")
.groupBy()
.agg(st_astext(st_intersection_agg($"left_index", $"right_index")))
.show(false)
+--------------------------------------------------------------+
|convert_to(st_intersection_agg(left_index, right_index))|
+--------------------------------------------------------------+
|POLYGON ((2 2, 3 2, 3 3, 2 3, 2 2)) |
+--------------------------------------------------------------+
WITH l AS (SELECT grid_tessellateexplode("POLYGON ((0 0, 0 3, 3 3, 3 0))", 1) AS left_index),
r AS (SELECT grid_tessellateexplode("POLYGON ((2 2, 2 4, 4 4, 4 2))", 1) AS right_index)
SELECT st_astext(st_intersection_agg(l.left_index, r.right_index))
FROM l INNER JOIN r on l.left_index.index_id = r.right_index.index_id
+--------------------------------------------------------------+
|convert_to(st_intersection_agg(left_index, right_index))|
+--------------------------------------------------------------+
|POLYGON ((2 2, 3 2, 3 3, 2 3, 2 2)) |
+--------------------------------------------------------------+
df.l <- select(
createDataFrame(data.frame(geom = "POLYGON ((0 0, 0 3, 3 3, 3 0))")),
alias(grid_tessellateexplode(column("geom"), lit(1L)), "left_index")
)
df.r <- select(
createDataFrame(data.frame(geom = "POLYGON ((2 2, 2 4, 4 4, 4 2))")),
alias(grid_tessellateexplode(column("geom"), lit(1L)), "right_index")
)
showDF(
select(
join(df.l, df.r, df.l$left_index.index_id == df.r$right_index.index_id),
st_astext(st_intersection_agg(column("left_index"), column("right_index")))
), truncate=F
)
+--------------------------------------------------------------+
|convert_to(st_intersection_agg(left_index, right_index))|
+--------------------------------------------------------------+
|POLYGON ((2 2, 3 2, 3 3, 2 3, 2 2)) |
+--------------------------------------------------------------+
st_union_agg
- st_union_agg(geom)
Computes the union of the input geometries.
- Parameters:
geom (Column) – Geometry
- Return type:
Column
- Example:
df = spark.createDataFrame([{'geom': 'POLYGON ((10 10, 20 10, 20 20, 10 20, 10 10))'}, {'geom': 'POLYGON ((15 15, 25 15, 25 25, 15 25, 15 15))'}])
df.select(st_astext(st_union_agg(col('geom')))).show()
+-------------------------------------------------------------------------+
| st_union_agg(geom) |
+-------------------------------------------------------------------------+
|POLYGON ((20 15, 20 10, 10 10, 10 20, 15 20, 15 25, 25 25, 25 15, 20 15))|
+-------------------------------------------------------------------------+
val df = List("POLYGON ((10 10, 20 10, 20 20, 10 20, 10 10))", "POLYGON ((10 10, 20 10, 20 20, 10 20, 10 10))").toDF("geom")
df.select(st_astext(st_union_agg(col('geom')))).show()
+-------------------------------------------------------------------------+
| st_union_agg(geom) |
+-------------------------------------------------------------------------+
|POLYGON ((20 15, 20 10, 10 10, 10 20, 15 20, 15 25, 25 25, 25 15, 20 15))|
+-------------------------------------------------------------------------+
WITH geoms ('geom') AS (VALUES ('POLYGON ((10 10, 20 10, 20 20, 10 20, 10 10))'), ('POLYGON ((10 10, 20 10, 20 20, 10 20, 10 10))'))
SELECT st_astext(st_union_agg(geoms));
+-------------------------------------------------------------------------+
| st_union_agg(geom) |
+-------------------------------------------------------------------------+
|POLYGON ((20 15, 20 10, 10 10, 10 20, 15 20, 15 25, 25 25, 25 15, 20 15))|
+-------------------------------------------------------------------------+
df.geom <- select(createDataFrame(data.frame(geom = c('POLYGON ((10 10, 20 10, 20 20, 10 20, 10 10))'), ('POLYGON ((10 10, 20 10, 20 20, 10 20, 10 10))'))))
showDF(select(st_astext(st_union_agg(column("geom")))), truncate=F)
+-------------------------------------------------------------------------+
| st_union_agg(geom) |
+-------------------------------------------------------------------------+
|POLYGON ((20 15, 20 10, 10 10, 10 20, 15 20, 15 25, 25 25, 25 15, 20 15))|
+-------------------------------------------------------------------------+
grid_cell_intersection_agg
- grid_cell_intersection_agg(chips)
Computes the chip representing the intersection of the input chips.
- Parameters:
chips (Column) – Chips
- Return type:
Column
- Example:
df = df.withColumn("chip", grid_tessellateexplode(...))
df.groupBy("chip.index_id").agg(grid_cell_intersection_agg("chip").alias("agg_chip")).limit(1).show()
+--------------------------------------------------------+
| agg_chip |
+--------------------------------------------------------+
|{is_core: false, index_id: 590418571381702655, wkb: ...}|
+--------------------------------------------------------+
val df = other_df.withColumn("chip", grid_tessellateexplode(...))
df.groupBy("chip.index_id").agg(grid_cell_intersection_agg("chip").alias("agg_chip")).limit(1).show()
+--------------------------------------------------------+
| agg_chip |
+--------------------------------------------------------+
|{is_core: false, index_id: 590418571381702655, wkb: ...}|
+--------------------------------------------------------+
WITH chips AS (SELECT grid_tessellateexplode(wkt) AS "chip" FROM ...)
SELECT grid_cell_intersection_agg(chips) AS agg_chip FROM chips GROUP BY chips.index_id;
+--------------------------------------------------------+
| agg_chip |
+--------------------------------------------------------+
|{is_core: false, index_id: 590418571381702655, wkb: ...}|
+--------------------------------------------------------+
showDF(select(grid_cell_intersection_agg(column("chip"))), truncate=F)
+--------------------------------------------------------+
| agg_chip |
+--------------------------------------------------------+
|{is_core: false, index_id: 590418571381702655, wkb: ...}|
+--------------------------------------------------------+
grid_cell_union_agg
- grid_cell_union_agg(chips)
Computes the chip representing the union of the input chips.
- Parameters:
chips (Column) – Chips
- Return type:
Column
- Example:
df = df.withColumn("chip", grid_tessellateexplode(...))
df.groupBy("chip.index_id").agg(grid_cell_union_agg("chip").alias("agg_chip")).limit(1).show()
+--------------------------------------------------------+
| agg_chip |
+--------------------------------------------------------+
|{is_core: false, index_id: 590418571381702655, wkb: ...}|
+--------------------------------------------------------+
val df = other_df.withColumn("chip", grid_tessellateexplode(...))
df.groupBy("chip.index_id").agg(grid_cell_union_agg("chip").alias("agg_chip")).limit(1).show()
+--------------------------------------------------------+
| agg_chip |
+--------------------------------------------------------+
|{is_core: false, index_id: 590418571381702655, wkb: ...}|
+--------------------------------------------------------+
WITH chips AS (SELECT grid_tessellateexplode(wkt) AS "chip" FROM ...)
SELECT grid_cell_union_agg(chips) AS agg_chip FROM chips GROUP BY chips.index_id;
+--------------------------------------------------------+
| agg_chip |
+--------------------------------------------------------+
|{is_core: false, index_id: 590418571381702655, wkb: ...}|
+--------------------------------------------------------+
showDF(select(grid_cell_union_agg(column("chip"))), truncate=F)
+--------------------------------------------------------+
| agg_chip |
+--------------------------------------------------------+
|{is_core: false, index_id: 590418571381702655, wkb: ...}|
+--------------------------------------------------------+