November 28, 2025

Hands-on with Tabsdata: Publishing and Subscribing Salesforce Reports

All code for this post can be found here: Publishing and Subscribing Salesforce Reports

Working in Marketing Operations for a few years prior to coming into Tabsdata, I have quite a bit of experience working with Salesforce. Because it sits at the intersection of so many different teams: Sales, Marketing, Customer Success, Sales Eng, Finance, etc., one thing you notice is that even though we all share the same database, the ways people use that data diverge massively.  

Data Eng prefers to do some sort of ELT into a data warehouse or datalake, while Sales manages everything within Salesforce Reports.  It’s not a problem per se, however it gets difficult when data in your BI tool starts diverging from the data that Sales or Marketing has in their Salesforce reports. Then you have to rebuild the salesforce report manually with SOQL and constantly be monitoring if the report filters have changed.

Also, Salesforce is not great at tracking changes to your data on a macro scale. Sure, you can see data value changes on an individual record, but you need to set field history tracking on a limited set of fields; so answering business level questions like “how many mqls did I have in this report 6 days ago” is difficult unless you’re explicitly snapshotting your data every day. 

With Tabsdata’s 1.3.0 release, we introduce new connectors for Salesforce reports that allow you to easily Publish Salesforce Report Data. All you need is your salesforce report  name and you can easily propagate salesforce report data while automatically snapshotting and versioning the data as well.

In this blog and tutorial, I not only publish data from Salesforce, but I also generate a daily lead count by status and a delta of how these lead counts differ day-to-day.

Overview

The pipeline consists of two parts:

  1. Publisher: Reads Salesforce Report Data and Publishes it into Tabsdata.
  2. Transformer: Calculates Breakdown of Leads by Status.
  3. Subscriber: Writes Tabsdata Table Data into Snowflake

Publishing from Salesforce

I start by registering a publisher that queries the my Salesforce Report and publishes it into a Tabsdata Table called sf_snapshot


@td.publisher(
    source=td.SalesforceReportSource(
        report=SALESFORCE_REPORT,
        find_report_by="name",
        column_name_strategy="columnName",
        credentials=td.SalesforceTokenCredentials(
            SALESFORCE_USER,
            SALESFORCE_PASSWORD,
            SALESFORCE_TOKEN,
        ),
    ),
    tables="sf_snapshot",
)
def salesforce_pub(
    tf: td.TableFrame,
):
    return tf
  

Aggregating Data by Status

I then build a transformer that does two things:

  1. Calculates the count of leads in each lead status.
  2. Generates a delta of how each status's lead count compares to the previous version of the report.

@td.transformer(
    input_tables=["sf_snapshot", "sf_snapshot@HEAD^"],
    output_tables=["status_agg"],
)
def agg_statuses(leads: td.TableFrame, old_leads: td.TableFrame):

    status_agg = leads.group_by(td.col("STATUS")).agg(
        td.col("STATUS").alias("STATUS_COUNT").count()
    )
    if old_leads is None:
        old_leads = status_agg.rename({"STATUS_COUNT": "OLD_STATUS_COUNT"})
    else:
        old_leads = old_leads.group_by(td.col("STATUS")).agg(
            td.col("STATUS").alias("OLD_STATUS_COUNT").count()
        )

    status_agg = status_agg.join(
        old_leads,
        how="left",
        on="STATUS",
    )
    status_agg = (
        status_agg.with_columns(
            td.col("STATUS_COUNT")
            .sub(td.col("OLD_STATUS_COUNT").cast(td.Int32))
            .alias("DELTA")
        )
        .drop("OLD_STATUS_COUNT")
        .sort(td.col("STATUS"), descending=True)
    )
    return status_agg
  

This makes it very easy to view the state of your leads at a specific point in time and pinpoint when large fluctuations in statuses are occuring.

Subscribing to Snowflake

I then subscribe my two tables into Snowflake


@td.subscriber(
    tables=["sf_snapshot", "status_agg"],
    destination=td.SnowflakeDestination(
        CONNECTION_PARAMETERS,
        destination_table=["sf_snapshot", "status_agg"],
        if_table_exists="replace",
    ),
)
def snowflake_sub(tf, tf1):
    return tf, tf1
  

Results

With three functions, I was able to build a basic ELT workflow between Salesforce and Snowflake. After I invoked my publisher a few times, I was able to dive into the UI to see my execution history and versioned lead data.