August 29, 2025

Hands-on with Tabsdata: Publishing and Subscribing NYC Taxi Data with Tabsdata

All code for this post can be found here: Publishing and Subscribing NYC Taxi Data into AWS Glue

Working with public datasets is one of the best ways to practice data engineering. The New York City Taxi & Limousine Commission publishes detailed trip records each month, and these files have become a classic playground for testing data integration pipelines.

While building data integration projects is fun, stitching together boilerplate code and managing orchestration is not. That’s why I built my workflow with Tabsdata.

In this post, I create an end-to-end integration that pulls all NYC yellow taxi trips since January 1, 2025. I then aggregate the data and compute daily metrics such as total trips completed, total driving time, total revenue earned, and average trip duration.

I was curious if there was a relationship between weather and taxi use, So I created a parallel workflow that ingests historical weather data and enriches the daily taxi metrics with additional datapoints including temperature, precipitation, windspeed, and hours of sunshine. Finally, I subscribe the results to AWS Glue Iceberg Tables.

Why Tabsdata?

In traditional data integration setups, executions run in isolated, stateless environments. Within each execution, you define steps/nodes that take inputs, do some type of work on those inputs, and produce outputs. These nodes are then manually chained together so that when one node finishes it's execution, it both passes its output to the next node and triggers that node to run as the next step in the workflow. It's kind of like a game of hot potato or telephone.

When this workflow runs, any data generated only persists until the execution ends, unless it is explicitly cached within the execution. This makes the workflow itself a black box where observability is limited and debugging is difficult.

Tabsdata’s architecture is designed to solve this exact problem. When you spin up a Tabsdata server, it becomes a repository that permanently stores tabular data in objects called Tables. Inside this server, you still build nodes called Functions that take inputs and produce outputs, but these inputs and outputs are executed against Tables, not other functions. When a function runs, it writes its output into a Table, and any function that depends on that Table will detect the new data and run automatically. This makes Tables the central hub where all data flows through.

Through this design, Tables serve a dual purpose. First, they retain full state and version history, so you can access the schema and data of a table at any point in time. Second, they act as breakpoints between functions. Because functions only interact with Tables, orchestration happens naturally: a function that depends on a Table listens for new commits and automatically triggers when new data is available. To create a new function, you don’t need to manually wire it into your workflow. Just specify the Table it should read from, and it will automatically run whenever new data is committed to that Table.

Getting Started

I first install the Tabsdata library through pip: 

$ pip install tabsdata --upgrade

Then I start my server:

$ tdserver start

Step 1. Extract Yellow Taxi Data

The TLC publishes their taxi data through monthly Parquet files. I first need to build a function that pulls all parquet files on 2025 taxi data and publishes it into a Tabsdata Table. 

Doing so will provide three huge benefits:

  1. The taxi data will become permanently stored as a Table in the Tabsdata server and can be sampled at any time
  2. Regardless of how many files the TLC breaks up the data into, the publisher will concatenate it into a single table for downstream functions to use
  3. Downstream functions will reference the Table, not the raw data within the TLC website

If this data was in something like MySQL or S3, I could just use a standard Tabsdata connector to ingest the data into my Tabsdata server. However, since I’m extracting this data from the TLC website, I'll need to build my own custom connector through the Source Plugin functionality.

In the plugin code, I first identify which months of taxi data are available. Since the URL pattern for each month’s Parquet file is predictable, I dynamically construct the endpoint URL for each month. I then use the requests library to fetch the data directly from those endpoints.

The td.SourcePlugin class uses an internal working directory called working_dir, where the plugin is expected to write data. Since I only register the function into the Tabsdata server, and Tabsdata is what actually runs the function, all I need to do is write the Parquet files into the working directory and return the file names. Tabsdata takes care of the rest.

class NycTaxiStatsSource(td.SourcePlugin):
    def chunk(self, working_dir: str) -> str:
        filenames = []
        data = []

        #get current datetime info
        now = datetime.now()
        year = now.year
        month = now.month
        timestamp = now.strftime("%Y%m%d%H%M%S")

        #create endpoints to append to base url for data
        months = [f"{year}-{m:02d}" for m in range(1, month)]

        #request to nyc taxi site and read parquet data into temp directory
        for i in months:
            endpoint = f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{i}.parquet"
            try:
                payload = requests.get(endpoint)
                filename = f"yellow_tripdata_{i}"
                destination_file = f"{timestamp}_{filename}.parquet" 
                destination_path = os.path.join(working_dir, destination_file)
                with open(destination_path, "wb") as f:
                    f.write(payload.content)
                filenames.append(destination_file)
            except:
                payload = None
        return [filenames]

My plugin returns [filenames], which represents a nested list where each item inside the list filenames holds the data from one parquet file ingested from the TLC website

The shape of what I return is important, since each element in my return expression maps to a parameter in my function body. For example, by returning[filenames], I'm passing all my data into the first positional argument of my function body.

Using the list of data as input, my function body concatenates the data together into a single dataframe and commits the output into the nyc_taxi_stats Table.

@td.publisher(
    source=NycTaxiStatsSource(),
    tables="nyc_taxi_stats",
)

def nyc_taxi_pub(tf: List[td.TableFrame]):
    return td.concat(tf)

After running this function, the nyc_taxi_stats Table now contains about 27 million rows. The dataset is highly detailed, perhaps more detailed than I need for my purposes. What I really want is to view aggregated metrics by day, so the next step will be to run some aggregations on this data.

Step 2. Aggregate Daily Taxi Metrics

Now that I have the raw trip data, I want to roll it up to daily stats like total fares, trip counts, and average duration. This is where Tabsdata’s transformer function comes in. In my transformer code, I grab data from the nyc_taxi_stats  table, apply some renaming, type casting, and aggregation, and load the transformed data into a new table called daily_taxi_metrics

@td.transformer(
    input_tables=["nyc_taxi_stats"],
    output_tables=[
        "daily_taxi_metrics"
    ],

)
def agg_taxi_metrics_trf(nyc_taxi_stats: td.TableFrame): 
    #rename some columns, calculate minute duration of each trip, and cast pickup_datetime into Date format
    nyc_taxi_stats = nyc_taxi_stats.rename({'tpep_pickup_datetime': 'pickup_datetime', 'tpep_dropoff_datetime': 'dropoff_datetime'})
    nyc_taxi_stats = nyc_taxi_stats.with_columns([
        td.col("dropoff_datetime").sub(td.col("pickup_datetime")).alias("trip_duration").dt.total_minutes(),
        td.col("pickup_datetime").cast(td.Date).alias("pickup_day")
        ])
    
    #aggregate by day and calculate revenue/day, trips/day, driving time/day, and average trip duration/day
    tf_daily = nyc_taxi_stats.group_by(
        td.col("pickup_day")).agg(
            td.col("total_amount").sum(),
            td.col("pickup_day").count().alias("total_trips"),
            td.col("trip_duration").sum().alias('total_driving_time'),
            td.col("trip_duration").mean().alias('average_trip_time')
            )
    
    return tf_daily

Step 3. Publish Daily Weather Metrics into Tabsdata

In order to find relationships between taxi usage and weather, I first need to publish weather metrics into Tabsdata. To do this, I build another SourcePlugin Publisher that connects Tabsdata to OpenMeteo, a free weather API.

Before publishing the data into the nyc_weather Table, I do some light preparation: renaming a few columns, converting the sunshine_duration column from seconds to hours, and casting the time column from a datetime to a date.

class WeatherNYCSource(td.SourcePlugin):
    def chunk(self, working_dir: str) -> str:
        now = datetime.now()
        today = now.date() - timedelta(days=1)
        timestamp = now.strftime("%Y%m%d%H%M%S")

        payload = requests.get(f'https://archive-api.open-meteo.com/v1/archive?latitude=40.7143&longitude=-74.006&start_date=2025-01-01&end_date={today}&daily=temperature_2m_mean,wind_speed_10m_max,precipitation_hours,sunshine_duration&timezone=America%2FNew_York').json()
        payload = pl.DataFrame(payload["daily"])

        filename = f"nyc_weather"
        destination_file = f"{timestamp}_{filename}.parquet" 
        destination_path = os.path.join(working_dir, destination_file)
        print(destination_path)
        payload.write_parquet(destination_path)
        
        return destination_file
      
@td.publisher(
    source=WeatherNYCSource(),
    tables="nyc_weather",
    trigger_by = ["nyc_taxi_stats"]
)

def weather_pub(tf: td.TableFrame):
    tf = tf.rename({"temperature_2m_mean":"temperature_celsius", "wind_speed_10m_max": "wind_speed", "precipitation_hours": "hours_precipitation"})
    tf = tf.with_columns(td.col("sunshine_duration").truediv(3600).round_sig_figs(3).alias('hours_sunshine'))
    tf = tf.drop('sunshine_duration')
    tf = tf.with_columns(td.col('time').cast(td.Date).alias('date'))

    return tf

In my decorator, I am adding an additional trigger_by parameter which tells Tabsdata to automatically run this function whenever new data appears in the nyc_taxi_stats table. 

Step 4. Join Weather with Taxi Metrics

With both metrics and weather available, I create another transformer to join the two tables, nyc_weather and daily_taxi_metrics, then store the result in a new Table called taxi_metrics_with_weather.

@td.transformer(
    input_tables=["daily_taxi_metrics", "nyc_weather"],
    output_tables=[
        "taxi_metrics_with_weather"
    ],
)
def join_weather_trf(daily_taxi_metrics: td.TableFrame, nyc_weather: td.TableFrame): 
    daily_taxi_metrics = daily_taxi_metrics.join(nyc_weather, left_on="pickup_day", right_on="date", how="left")
    return daily_taxi_metrics

Step 5. Subscribe Data to AWS Glue Iceberg Tables

Now that I’m happy with my data, I create a subscriber to subscribe my data into AWS Glue

username = td.EnvironmentSecret("AWS_ACCESS_KEY_ID")
password = td.EnvironmentSecret("AWS_SECRET_ACCESS_KEY")
AWS_REGION = td.EnvironmentSecret("AWS_REGION").secret_value
AWS_BUCKET = td.EnvironmentSecret("AWS_BUCKET").secret_value
AWS_GLUE_DATABASE = td.EnvironmentSecret("AWS_GLUE_DATABASE").secret_value

s3_credentials=td.S3AccessKeyCredentials(username, password)

@td.subscriber(
    tables=["taxi_metrics_with_weather"],
    destination=td.S3Destination(
        uri=[f"{AWS_BUCKET}/daily_metrics/daily_metrics-$EXPORT_TIMESTAMP.parquet"],
        region=AWS_REGION,
        credentials=s3_credentials,
        catalog = td.AWSGlue(
        definition= {
            "name": "default",
            "type": "glue",
            "client.region": AWS_REGION,
        },
        tables=[f"{AWS_GLUE_DATABASE}.daily_metrics"],
        auto_create_at=[AWS_BUCKET],
        if_table_exists="replace",
        credentials = s3_credentials
    )
    ),
)
def sub_s3_iceberg(daily_taxi_metrics: td.TableFrame, ):
    return daily_taxi_metrics

Now the data is available and queryable via Athena or any Glue-integrated tool.

Results

With just a few lines of code, I was able to build an end-to-end integration where I ingested data from two different sources, transformed each of the ingested datasets, joined them together, and subscribed the resulting data into S3. 

From here, I would be interested in running statistical analysis and visualization on the data to find relationships and correlations between weather and usage of taxi services. 

Closing Thoughts

The TLC taxi data was tricky to handle because each month came in its own Parquet file. Once I loaded everything into a Tabsdata Table, all my downstream functions could pull directly from that table. This simplified things by abstracting away the raw file format.

Orchestration was another big win. Without Tabsdata, I would have had to manually sequence or orchestrate my two publishers. Since the weather data was much smaller than the taxi data, I would also have needed to manage race conditions before joining them. Tabsdata’s orchestrator solved this automatically by holding the join function until all upstream data had finished processing.