Using grid index systems in Mosaic
[ ]:
from pyspark.sql.functions import *
from mosaic import enable_mosaic
enable_mosaic(spark, dbutils)
Set operations over big geospatial datasets become very expensive without some form of spatial indexing.
Spatial indexes not only allow operations like point-in-polygon joins to be partitioned but, if only approximate results are required, can be used to reduce these to deterministic SQL joins directly on the indexes.

The workflow for a point-in-poly spatial join might look like the following:
1. Read the source point and polygon datasets.
[ ]:
drop_cols = [
  "rate_code_id", "store_and_fwd_flag", "dropoff_longitude",
  "dropoff_latitude", "payment_type", "fare_amount",
  "extra", "mta_tax", "tip_amount", "tolls_amount",
  "total_amount"
]
trips = (
  spark.table("delta.`/databricks-datasets/nyctaxi/tables/nyctaxi_yellow`")
  .drop(*drop_cols)
  .limit(5_000_000)
  .repartition(sc.defaultParallelism * 20)
)
trips.show()
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+
vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+
      CMT|2009-01-01 20:07:33|2009-01-01 20:12:28|              1|          0.8|      -74.001041|         40.731|
      CMT|2009-01-06 15:29:12|2009-01-06 15:51:57|              2|          3.3|      -73.996489|      40.725742|
      CMT|2010-02-14 17:42:16|2010-02-14 17:55:03|              1|          3.4|      -74.002949|      40.734254|
      CMT|2010-02-11 18:19:01|2010-02-11 18:27:54|              1|          1.5|      -73.998133|      40.682463|
      VTS|2009-04-29 12:26:00|2009-04-29 12:35:00|              3|         2.05|      -74.001332|       40.72006|
      VTS|2009-04-24 15:03:00|2009-04-24 15:23:00|              2|         2.89|      -73.989952|      40.734625|
      CMT|2010-02-28 13:55:44|2010-02-28 14:02:37|              1|          1.2|      -74.006015|      40.735279|
      VTS|2009-09-27 08:46:00|2009-09-27 08:59:00|              1|         3.97|      -74.000148|      40.717468|
      CMT|2010-02-18 09:48:52|2010-02-18 10:08:38|              1|          3.0|      -73.995177|      40.725297|
      CMT|2009-04-09 20:33:44|2009-04-09 20:39:33|              2|          0.6|      -73.990133|      40.729321|
      CMT|2010-02-13 22:41:10|2010-02-13 23:07:04|              1|          4.2|      -74.009175|      40.706284|
      CMT|2009-01-25 20:06:51|2009-01-25 20:12:37|              1|          1.3|      -74.007384|      40.717929|
      VTS|2010-02-27 18:19:00|2010-02-27 18:38:00|              1|          4.2|      -74.011512|      40.710588|
      VTS|2010-02-15 10:17:00|2010-02-15 10:24:00|              1|         1.74|      -74.016442|      40.711617|
      CMT|2009-12-26 18:45:49|2009-12-26 18:59:08|              1|          4.8|       -74.01014|      40.712263|
      CMT|2009-12-06 01:00:07|2009-12-06 01:11:41|              2|          4.2|      -74.002505|      40.729001|
      VTS|2009-10-04 14:36:00|2009-10-04 14:42:00|              1|         1.13|      -74.006767|      40.718942|
      CMT|2009-01-18 00:20:50|2009-01-18 00:36:29|              3|          2.1|      -73.993258|      40.721401|
      VTS|2009-05-18 13:24:00|2009-05-18 13:33:00|              1|         1.91|      -73.992785|      40.730412|
      VTS|2009-11-11 21:51:00|2009-11-11 22:13:00|              5|         4.71|      -74.010065|      40.733383|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+
only showing top 20 rows
[ ]:
from mosaic import st_geomfromgeojson
user = spark.sql("select current_user() as user").collect()[0]["user"]
neighbourhoods = (
  spark.read.format("json")
  .load(f"dbfs:/FileStore/shared_uploads/{user}/NYC_Taxi_Zones.geojson")
  .repartition(sc.defaultParallelism)
  .withColumn("geometry", st_geomfromgeojson(to_json(col("geometry"))))
  .select("properties.*", "geometry")
  .drop("shape_area", "shape_leng")
)
neighbourhoods.show()
+-------------+-----------+--------+-------------------+--------------------+
      borough|location_id|objectid|               zone|            geometry|
+-------------+-----------+--------+-------------------+--------------------+
     Brooklyn|        123|     123|          Homecrest|{6, 4326, [[[-73....|
    Manhattan|        153|     153|        Marble Hill|{6, 4326, [[[-73....|
     Brooklyn|        112|     112|         Greenpoint|{6, 4326, [[[-73....|
    Manhattan|        233|     233|UN/Turtle Bay South|{6, 4326, [[[-73....|
    Manhattan|         43|      43|       Central Park|{6, 4326, [[[-73....|
       Queens|        201|     201|      Rockaway Park|{6, 4326, [[[-73....|
       Queens|        131|     131|    Jamaica Estates|{6, 4326, [[[-73....|
     Brooklyn|        111|     111|Green-Wood Cemetery|{6, 4326, [[[-73....|
       Queens|        226|     226|          Sunnyside|{6, 4326, [[[-73....|
       Queens|        129|     129|    Jackson Heights|{6, 4326, [[[-73....|
    Manhattan|        120|     120|    Highbridge Park|{6, 4326, [[[-73....|
     Brooklyn|         76|      76|      East New York|{6, 4326, [[[-73....|
    Manhattan|         24|      24|       Bloomingdale|{6, 4326, [[[-73....|
    Manhattan|        202|     202|   Roosevelt Island|{6, 4326, [[[-73....|
    Manhattan|        100|     100|   Garment District|{6, 4326, [[[-73....|
Staten Island|        251|     251|        Westerleigh|{6, 4326, [[[-74....|
    Manhattan|         74|      74|  East Harlem North|{6, 4326, [[[-73....|
       Queens|         98|      98|      Fresh Meadows|{6, 4326, [[[-73....|
    Manhattan|        211|     211|               SoHo|{6, 4326, [[[-74....|
        Bronx|        174|     174|            Norwood|{6, 4326, [[[-73....|
+-------------+-----------+--------+-------------------+--------------------+
only showing top 20 rows
2. Compute the resolution of index required to optimize the join.
[ ]:
from mosaic import MosaicFrame
neighbourhoods_mdf = MosaicFrame(neighbourhoods, "geometry")
help(neighbourhoods_mdf.get_optimal_resolution)
Help on method get_optimal_resolution in module mosaic.core.mosaic_frame:
get_optimal_resolution(sample_rows: Union[int, NoneType] = None, sample_fraction: Union[float, NoneType] = None) -> int method of mosaic.core.mosaic_frame.MosaicFrame instance
    Analyzes the geometries in the currently selected geometry column and proposes an optimal
    grid-index resolution.
    Provide either `sample_rows` or `sample_fraction` parameters to control how much data is passed to the analyzer.
    (Providing too little data to the analyzer may result in a `NotEnoughGeometriesException`)
    Parameters
    ----------
    sample_rows: int, optional
        The number of rows to sample.
    sample_fraction: float, optional
        The proportion of rows to sample.
    Returns
    -------
    int
        The recommended grid-index resolution to apply to this MosaicFrame.
[ ]:
(resolution := neighbourhoods_mdf.get_optimal_resolution(sample_fraction=1.))
Out[15]: 9
3. Apply the index to the set of points in your left-hand dataframe.
This will generate an index value that corresponds to the grid ‘cell’ that this point occupies.
[ ]:
from mosaic import grid_longlatascellid
indexed_trips = trips.withColumn("ix", grid_longlatascellid(lon="pickup_longitude", lat="pickup_latitude", resolution=lit(resolution)))
indexed_trips.show()
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------------+
vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|                ix|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------------+
      DDS|2009-01-17 18:49:57|2009-01-17 18:56:29|              3|          1.2|      -74.004043|      40.733409|617733151092113407|
      DDS|2009-12-01 00:47:52|2009-12-01 01:00:16|              1|          3.4|      -73.991702|      40.726342|617733151087132671|
      CMT|2009-02-09 16:50:21|2009-02-09 17:02:47|              1|          2.6|      -73.999673|      40.733586|617733123805806591|
      CMT|2009-12-07 07:15:47|2009-12-07 07:32:07|              1|          3.8|       -74.01211|      40.716893|617733151084773375|
      VTS|2009-10-16 22:02:00|2009-10-16 22:08:00|              1|          1.1|      -74.010903|       40.71624|617733151084773375|
      VTS|2009-12-23 22:13:00|2009-12-23 22:18:00|              1|         0.37|      -74.002343|       40.73366|617733151092113407|
      VTS|2009-12-12 01:24:00|2009-12-12 01:38:00|              2|         3.55|      -74.002565|      40.728188|617733151091326975|
      CMT|2009-12-07 13:10:37|2009-12-07 13:13:45|              1|          0.5|      -73.999184|       40.73428|617733123805806591|
      CMT|2009-11-08 22:20:44|2009-11-08 22:31:23|              1|          1.9|      -74.003029|      40.733385|617733151092113407|
      VTS|2009-12-27 20:01:00|2009-12-27 20:04:00|              1|         1.04|      -74.000227|      40.732603|617733151092375551|
      VTS|2009-02-13 14:33:00|2009-02-13 14:50:00|              3|         1.59|      -74.006535|      40.732303|617733151092637695|
      CMT|2009-11-15 21:13:32|2009-11-15 21:25:56|              3|          3.0|      -73.998795|      40.730621|617733151092375551|
      VTS|2009-01-08 18:13:00|2009-01-08 18:33:00|              2|         4.18|        -74.0079|      40.712012|617733151021334527|
      CMT|2009-11-30 13:30:13|2009-11-30 13:41:55|              1|          1.6|      -74.004487|      40.734072|617733151092637695|
      CMT|2009-01-11 20:02:22|2009-01-11 20:08:15|              1|          1.0|      -74.004493|      40.713349|617733151020810239|
      CMT|2009-12-30 18:46:08|2009-12-30 19:02:23|              1|          2.3|      -74.010798|      40.716717|617733151084773375|
      CMT|2009-11-18 21:50:12|2009-11-18 22:05:19|              1|          5.8|      -73.992515|      40.694106|617733151038111743|
      VTS|2009-11-21 12:51:00|2009-11-21 13:27:00|              1|        14.18|        -73.9923|      40.715218|617733151109414911|
      CMT|2009-01-20 09:34:49|2009-01-20 09:37:15|              1|          0.4|        -74.0027|      40.733479|617733151092113407|
      VTS|2009-01-03 07:07:00|2009-01-03 07:18:00|              1|         7.81|      -73.994358|      40.690345|617733151037325311|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------------+
only showing top 20 rows
4. Compute the set of indices that fully covers each polygon in the right-hand dataframe
This is commonly referred to as a polyfill operation.
[ ]:
from mosaic import grid_polyfill
indexed_neighbourhoods = (
  neighbourhoods
  .select("*", grid_polyfill("geometry", lit(resolution)).alias("ix_set"))
  .drop("geometry")
)
indexed_neighbourhoods.show()
+-------------+-----------+--------+-------------------+--------------------+
      borough|location_id|objectid|               zone|              ix_set|
+-------------+-----------+--------+-------------------+--------------------+
     Brooklyn|        123|     123|          Homecrest|[6177331514226769...|
    Manhattan|        153|     153|        Marble Hill|[6177331229858201...|
     Brooklyn|        112|     112|         Greenpoint|[6177331237832622...|
    Manhattan|        233|     233|UN/Turtle Bay South|[6177331238679347...|
    Manhattan|         43|      43|       Central Park|[6177331225792348...|
       Queens|        201|     201|      Rockaway Park|[6177331357831659...|
       Queens|        131|     131|    Jamaica Estates|[6177331242658693...|
     Brooklyn|        111|     111|Green-Wood Cemetery|[6177331522277212...|
       Queens|        226|     226|          Sunnyside|[6177331238566625...|
       Queens|        129|     129|    Jackson Heights|[6177331243222302...|
    Manhattan|        120|     120|    Highbridge Park|[6177331231976325...|
     Brooklyn|         76|      76|      East New York|[6177331236938711...|
    Manhattan|         24|      24|       Bloomingdale|[6177331226458193...|
    Manhattan|        202|     202|   Roosevelt Island|[6177331237777571...|
    Manhattan|        100|     100|   Garment District|[6177331509717893...|
Staten Island|        251|     251|        Westerleigh|[6177331466128588...|
    Manhattan|         74|      74|  East Harlem North|[6177331226508001...|
       Queens|         98|      98|      Fresh Meadows|[6177331242448977...|
    Manhattan|        211|     211|               SoHo|[6177331510784819...|
        Bronx|        174|     174|            Norwood|[6177331205497159...|
+-------------+-----------+--------+-------------------+--------------------+
only showing top 20 rows
5. ‘Explode’ the polygon index dataframe, such that each polygon index becomes a row in a new dataframe.
[ ]:
exploded_indexed_neighbourhoods = (
  indexed_neighbourhoods
  .withColumn("ix", explode("ix_set"))
  .drop("ix_set")
)
exploded_indexed_neighbourhoods.show()
+--------+-----------+--------+---------+------------------+
 borough|location_id|objectid|     zone|                ix|
+--------+-----------+--------+---------+------------------+
Brooklyn|        123|     123|Homecrest|617733151422676991|
Brooklyn|        123|     123|Homecrest|617733151503417343|
Brooklyn|        123|     123|Homecrest|617733151502893055|
Brooklyn|        123|     123|Homecrest|617733151502368767|
Brooklyn|        123|     123|Homecrest|617733151492407295|
Brooklyn|        123|     123|Homecrest|617733151488737279|
Brooklyn|        123|     123|Homecrest|617733151484542975|
Brooklyn|        123|     123|Homecrest|617733151484018687|
Brooklyn|        123|     123|Homecrest|617733151483494399|
Brooklyn|        123|     123|Homecrest|617733151425560575|
Brooklyn|        123|     123|Homecrest|617733151424511999|
Brooklyn|        123|     123|Homecrest|617733151423463423|
Brooklyn|        123|     123|Homecrest|617733151511019519|
Brooklyn|        123|     123|Homecrest|617733151505776639|
Brooklyn|        123|     123|Homecrest|617733151505252351|
Brooklyn|        123|     123|Homecrest|617733151504203775|
Brooklyn|        123|     123|Homecrest|617733151503679487|
Brooklyn|        123|     123|Homecrest|617733151503155199|
Brooklyn|        123|     123|Homecrest|617733151502630911|
Brooklyn|        123|     123|Homecrest|617733151502106623|
+--------+-----------+--------+---------+------------------+
only showing top 20 rows
6. Join the new left- and right-hand dataframes directly on the index.
[ ]:
joined_df = (
  indexed_trips.alias("t")
  .join(exploded_indexed_neighbourhoods.alias("n"), on="ix", how="inner"))
joined_df.count()
Out[25]: 4934937
Final notes
Mosaic provides support for Uber’s H3 spatial indexing library as a core part of the API, but we plan to add support for other index systems, including S2 and British National Grid in due course.