Using grid index systems in Mosaic
[ ]:
from pyspark.sql.functions import *
from mosaic import enable_mosaic
enable_mosaic(spark, dbutils)
Set operations over big geospatial datasets become very expensive without some form of spatial indexing.
Spatial indexes not only allow operations like point-in-polygon joins to be partitioned but, if only approximate results are required, can be used to reduce these to deterministic SQL joins directly on the indexes.
The workflow for a point-in-poly spatial join might look like the following:
1. Read the source point and polygon datasets.
[ ]:
drop_cols = [
"rate_code_id", "store_and_fwd_flag", "dropoff_longitude",
"dropoff_latitude", "payment_type", "fare_amount",
"extra", "mta_tax", "tip_amount", "tolls_amount",
"total_amount"
]
trips = (
spark.table("delta.`/databricks-datasets/nyctaxi/tables/nyctaxi_yellow`")
.drop(*drop_cols)
.limit(5_000_000)
.repartition(sc.defaultParallelism * 20)
)
trips.show()
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+
vendor_id| pickup_datetime| dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+
CMT|2009-01-01 20:07:33|2009-01-01 20:12:28| 1| 0.8| -74.001041| 40.731|
CMT|2009-01-06 15:29:12|2009-01-06 15:51:57| 2| 3.3| -73.996489| 40.725742|
CMT|2010-02-14 17:42:16|2010-02-14 17:55:03| 1| 3.4| -74.002949| 40.734254|
CMT|2010-02-11 18:19:01|2010-02-11 18:27:54| 1| 1.5| -73.998133| 40.682463|
VTS|2009-04-29 12:26:00|2009-04-29 12:35:00| 3| 2.05| -74.001332| 40.72006|
VTS|2009-04-24 15:03:00|2009-04-24 15:23:00| 2| 2.89| -73.989952| 40.734625|
CMT|2010-02-28 13:55:44|2010-02-28 14:02:37| 1| 1.2| -74.006015| 40.735279|
VTS|2009-09-27 08:46:00|2009-09-27 08:59:00| 1| 3.97| -74.000148| 40.717468|
CMT|2010-02-18 09:48:52|2010-02-18 10:08:38| 1| 3.0| -73.995177| 40.725297|
CMT|2009-04-09 20:33:44|2009-04-09 20:39:33| 2| 0.6| -73.990133| 40.729321|
CMT|2010-02-13 22:41:10|2010-02-13 23:07:04| 1| 4.2| -74.009175| 40.706284|
CMT|2009-01-25 20:06:51|2009-01-25 20:12:37| 1| 1.3| -74.007384| 40.717929|
VTS|2010-02-27 18:19:00|2010-02-27 18:38:00| 1| 4.2| -74.011512| 40.710588|
VTS|2010-02-15 10:17:00|2010-02-15 10:24:00| 1| 1.74| -74.016442| 40.711617|
CMT|2009-12-26 18:45:49|2009-12-26 18:59:08| 1| 4.8| -74.01014| 40.712263|
CMT|2009-12-06 01:00:07|2009-12-06 01:11:41| 2| 4.2| -74.002505| 40.729001|
VTS|2009-10-04 14:36:00|2009-10-04 14:42:00| 1| 1.13| -74.006767| 40.718942|
CMT|2009-01-18 00:20:50|2009-01-18 00:36:29| 3| 2.1| -73.993258| 40.721401|
VTS|2009-05-18 13:24:00|2009-05-18 13:33:00| 1| 1.91| -73.992785| 40.730412|
VTS|2009-11-11 21:51:00|2009-11-11 22:13:00| 5| 4.71| -74.010065| 40.733383|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+
only showing top 20 rows
[ ]:
from mosaic import st_geomfromgeojson
user = spark.sql("select current_user() as user").collect()[0]["user"]
neighbourhoods = (
spark.read.format("json")
.load(f"dbfs:/FileStore/shared_uploads/{user}/NYC_Taxi_Zones.geojson")
.repartition(sc.defaultParallelism)
.withColumn("geometry", st_geomfromgeojson(to_json(col("geometry"))))
.select("properties.*", "geometry")
.drop("shape_area", "shape_leng")
)
neighbourhoods.show()
+-------------+-----------+--------+-------------------+--------------------+
borough|location_id|objectid| zone| geometry|
+-------------+-----------+--------+-------------------+--------------------+
Brooklyn| 123| 123| Homecrest|{6, 4326, [[[-73....|
Manhattan| 153| 153| Marble Hill|{6, 4326, [[[-73....|
Brooklyn| 112| 112| Greenpoint|{6, 4326, [[[-73....|
Manhattan| 233| 233|UN/Turtle Bay South|{6, 4326, [[[-73....|
Manhattan| 43| 43| Central Park|{6, 4326, [[[-73....|
Queens| 201| 201| Rockaway Park|{6, 4326, [[[-73....|
Queens| 131| 131| Jamaica Estates|{6, 4326, [[[-73....|
Brooklyn| 111| 111|Green-Wood Cemetery|{6, 4326, [[[-73....|
Queens| 226| 226| Sunnyside|{6, 4326, [[[-73....|
Queens| 129| 129| Jackson Heights|{6, 4326, [[[-73....|
Manhattan| 120| 120| Highbridge Park|{6, 4326, [[[-73....|
Brooklyn| 76| 76| East New York|{6, 4326, [[[-73....|
Manhattan| 24| 24| Bloomingdale|{6, 4326, [[[-73....|
Manhattan| 202| 202| Roosevelt Island|{6, 4326, [[[-73....|
Manhattan| 100| 100| Garment District|{6, 4326, [[[-73....|
Staten Island| 251| 251| Westerleigh|{6, 4326, [[[-74....|
Manhattan| 74| 74| East Harlem North|{6, 4326, [[[-73....|
Queens| 98| 98| Fresh Meadows|{6, 4326, [[[-73....|
Manhattan| 211| 211| SoHo|{6, 4326, [[[-74....|
Bronx| 174| 174| Norwood|{6, 4326, [[[-73....|
+-------------+-----------+--------+-------------------+--------------------+
only showing top 20 rows
2. Compute the resolution of index required to optimize the join.
[ ]:
from mosaic import MosaicFrame
neighbourhoods_mdf = MosaicFrame(neighbourhoods, "geometry")
help(neighbourhoods_mdf.get_optimal_resolution)
Help on method get_optimal_resolution in module mosaic.core.mosaic_frame:
get_optimal_resolution(sample_rows: Union[int, NoneType] = None, sample_fraction: Union[float, NoneType] = None) -> int method of mosaic.core.mosaic_frame.MosaicFrame instance
Analyzes the geometries in the currently selected geometry column and proposes an optimal
grid-index resolution.
Provide either `sample_rows` or `sample_fraction` parameters to control how much data is passed to the analyzer.
(Providing too little data to the analyzer may result in a `NotEnoughGeometriesException`)
Parameters
----------
sample_rows: int, optional
The number of rows to sample.
sample_fraction: float, optional
The proportion of rows to sample.
Returns
-------
int
The recommended grid-index resolution to apply to this MosaicFrame.
[ ]:
(resolution := neighbourhoods_mdf.get_optimal_resolution(sample_fraction=1.))
Out[15]: 9
3. Apply the index to the set of points in your left-hand dataframe.
This will generate an index value that corresponds to the grid ‘cell’ that this point occupies.
[ ]:
from mosaic import grid_longlatascellid
indexed_trips = trips.withColumn("ix", grid_longlatascellid(lon="pickup_longitude", lat="pickup_latitude", resolution=lit(resolution)))
indexed_trips.show()
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------------+
vendor_id| pickup_datetime| dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude| ix|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------------+
DDS|2009-01-17 18:49:57|2009-01-17 18:56:29| 3| 1.2| -74.004043| 40.733409|617733151092113407|
DDS|2009-12-01 00:47:52|2009-12-01 01:00:16| 1| 3.4| -73.991702| 40.726342|617733151087132671|
CMT|2009-02-09 16:50:21|2009-02-09 17:02:47| 1| 2.6| -73.999673| 40.733586|617733123805806591|
CMT|2009-12-07 07:15:47|2009-12-07 07:32:07| 1| 3.8| -74.01211| 40.716893|617733151084773375|
VTS|2009-10-16 22:02:00|2009-10-16 22:08:00| 1| 1.1| -74.010903| 40.71624|617733151084773375|
VTS|2009-12-23 22:13:00|2009-12-23 22:18:00| 1| 0.37| -74.002343| 40.73366|617733151092113407|
VTS|2009-12-12 01:24:00|2009-12-12 01:38:00| 2| 3.55| -74.002565| 40.728188|617733151091326975|
CMT|2009-12-07 13:10:37|2009-12-07 13:13:45| 1| 0.5| -73.999184| 40.73428|617733123805806591|
CMT|2009-11-08 22:20:44|2009-11-08 22:31:23| 1| 1.9| -74.003029| 40.733385|617733151092113407|
VTS|2009-12-27 20:01:00|2009-12-27 20:04:00| 1| 1.04| -74.000227| 40.732603|617733151092375551|
VTS|2009-02-13 14:33:00|2009-02-13 14:50:00| 3| 1.59| -74.006535| 40.732303|617733151092637695|
CMT|2009-11-15 21:13:32|2009-11-15 21:25:56| 3| 3.0| -73.998795| 40.730621|617733151092375551|
VTS|2009-01-08 18:13:00|2009-01-08 18:33:00| 2| 4.18| -74.0079| 40.712012|617733151021334527|
CMT|2009-11-30 13:30:13|2009-11-30 13:41:55| 1| 1.6| -74.004487| 40.734072|617733151092637695|
CMT|2009-01-11 20:02:22|2009-01-11 20:08:15| 1| 1.0| -74.004493| 40.713349|617733151020810239|
CMT|2009-12-30 18:46:08|2009-12-30 19:02:23| 1| 2.3| -74.010798| 40.716717|617733151084773375|
CMT|2009-11-18 21:50:12|2009-11-18 22:05:19| 1| 5.8| -73.992515| 40.694106|617733151038111743|
VTS|2009-11-21 12:51:00|2009-11-21 13:27:00| 1| 14.18| -73.9923| 40.715218|617733151109414911|
CMT|2009-01-20 09:34:49|2009-01-20 09:37:15| 1| 0.4| -74.0027| 40.733479|617733151092113407|
VTS|2009-01-03 07:07:00|2009-01-03 07:18:00| 1| 7.81| -73.994358| 40.690345|617733151037325311|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------------+
only showing top 20 rows
4. Compute the set of indices that fully covers each polygon in the right-hand dataframe
This is commonly referred to as a polyfill operation.
[ ]:
from mosaic import grid_polyfill
indexed_neighbourhoods = (
neighbourhoods
.select("*", grid_polyfill("geometry", lit(resolution)).alias("ix_set"))
.drop("geometry")
)
indexed_neighbourhoods.show()
+-------------+-----------+--------+-------------------+--------------------+
borough|location_id|objectid| zone| ix_set|
+-------------+-----------+--------+-------------------+--------------------+
Brooklyn| 123| 123| Homecrest|[6177331514226769...|
Manhattan| 153| 153| Marble Hill|[6177331229858201...|
Brooklyn| 112| 112| Greenpoint|[6177331237832622...|
Manhattan| 233| 233|UN/Turtle Bay South|[6177331238679347...|
Manhattan| 43| 43| Central Park|[6177331225792348...|
Queens| 201| 201| Rockaway Park|[6177331357831659...|
Queens| 131| 131| Jamaica Estates|[6177331242658693...|
Brooklyn| 111| 111|Green-Wood Cemetery|[6177331522277212...|
Queens| 226| 226| Sunnyside|[6177331238566625...|
Queens| 129| 129| Jackson Heights|[6177331243222302...|
Manhattan| 120| 120| Highbridge Park|[6177331231976325...|
Brooklyn| 76| 76| East New York|[6177331236938711...|
Manhattan| 24| 24| Bloomingdale|[6177331226458193...|
Manhattan| 202| 202| Roosevelt Island|[6177331237777571...|
Manhattan| 100| 100| Garment District|[6177331509717893...|
Staten Island| 251| 251| Westerleigh|[6177331466128588...|
Manhattan| 74| 74| East Harlem North|[6177331226508001...|
Queens| 98| 98| Fresh Meadows|[6177331242448977...|
Manhattan| 211| 211| SoHo|[6177331510784819...|
Bronx| 174| 174| Norwood|[6177331205497159...|
+-------------+-----------+--------+-------------------+--------------------+
only showing top 20 rows
5. ‘Explode’ the polygon index dataframe, such that each polygon index becomes a row in a new dataframe.
[ ]:
exploded_indexed_neighbourhoods = (
indexed_neighbourhoods
.withColumn("ix", explode("ix_set"))
.drop("ix_set")
)
exploded_indexed_neighbourhoods.show()
+--------+-----------+--------+---------+------------------+
borough|location_id|objectid| zone| ix|
+--------+-----------+--------+---------+------------------+
Brooklyn| 123| 123|Homecrest|617733151422676991|
Brooklyn| 123| 123|Homecrest|617733151503417343|
Brooklyn| 123| 123|Homecrest|617733151502893055|
Brooklyn| 123| 123|Homecrest|617733151502368767|
Brooklyn| 123| 123|Homecrest|617733151492407295|
Brooklyn| 123| 123|Homecrest|617733151488737279|
Brooklyn| 123| 123|Homecrest|617733151484542975|
Brooklyn| 123| 123|Homecrest|617733151484018687|
Brooklyn| 123| 123|Homecrest|617733151483494399|
Brooklyn| 123| 123|Homecrest|617733151425560575|
Brooklyn| 123| 123|Homecrest|617733151424511999|
Brooklyn| 123| 123|Homecrest|617733151423463423|
Brooklyn| 123| 123|Homecrest|617733151511019519|
Brooklyn| 123| 123|Homecrest|617733151505776639|
Brooklyn| 123| 123|Homecrest|617733151505252351|
Brooklyn| 123| 123|Homecrest|617733151504203775|
Brooklyn| 123| 123|Homecrest|617733151503679487|
Brooklyn| 123| 123|Homecrest|617733151503155199|
Brooklyn| 123| 123|Homecrest|617733151502630911|
Brooklyn| 123| 123|Homecrest|617733151502106623|
+--------+-----------+--------+---------+------------------+
only showing top 20 rows
6. Join the new left- and right-hand dataframes directly on the index.
[ ]:
joined_df = (
indexed_trips.alias("t")
.join(exploded_indexed_neighbourhoods.alias("n"), on="ix", how="inner"))
joined_df.count()
Out[25]: 4934937
Final notes
Mosaic provides support for Uber’s H3 spatial indexing library as a core part of the API, but we plan to add support for other index systems, including S2 and British National Grid in due course.