March 13, 2025
May 5, 2025
In this blog, we’ll explore how Tabsdata enables Pub/Sub for Tables with source and destination data being a locally hosted PostgreSQL database.
We will start by setting up Tabsdata, and PostgreSQL. Then we register and run a publisher that reads data from PostgreSQL, and publishes that as a table to Tabsdata. Following that, we will register a subscriber that subscribes to this published table, filters some data, and exports it to PostgreSQL. We will then demonstrate that when the publisher is re-run to load new data, the subscriber automatically writes it to PostgreSQL.
In a real-world scenario, your data source could be any other database, or storage location, while the subscriber could write data to various endpoints such as a database or file system. You can check the list of source and destination connectors in Tabsdata documentation.
Let’s dive in!
If you haven't already, clone the GitHub repo to your system.
Using SSH:
Using GitHub CLI:
In this tutorial, our data source and destination is PostgreSQL. The data for PostgreSQL is present in the SQL files in the tutorial folder. The Tabsdata functions to publish and subscribe to data are also in the tutorial folder.
For convenience we will use an environment variable called TDX for referencing the folder containing the Tabsdata functions and SQL files. To do that, let's set up this variable to point to the base directory of this tutorial. You can do this using the appropriate commands from below:
For Linux or macOS:
For Windows Command Prompt:
If you run an ls (for Linux or macOS) or dir (Windows) on t02_csv_pub_sub you would see the following files and folders:
For this tutorial, we are using PostgreSQL as our input and output, and below are the steps to set that up.
You can download PostgreSQL for your relevant OS from the PostgreSQL website or use the docker image.
Once installed you can follow the PostgreSQL documentation to get started.
Assuming that you have successfully installed and set up PostgreSQL, you can run the following command to create the database to hold the input and output tables for this tutorial.
Run the following query to create the input Table “customer_leads” in the "customers" db with relevant data and columns:
Output:
The above output indicates that the input table has been created.
Tabsdata supports different ways to store credentials that have been detailed here in our documentation.
For the purpose of this tutorial, I am storing all the credentials in Hashicorp to present a more secure way that Tabsdata uses, to deal with credentials. You can see more details on how to set up Hashicorp here in the Tabsdata documentation.
I have stored the PostgreSQL username and password under the variable names PG_USERNAME
and PG_PASSWORD
respectively in the Hashicorp Vault in the key store "td-pg". The same names are used in the Python code for Tabsdata functions. If you use different names, make sure that you change at both the places: Hashicorp and the Python code.
To set up HASHICORP, you will need to set the values of these environment variables as highlighted in the documentation, where _hashicorp_vault_url_, _hashicorp_vault_token_, and _hashicorp_namespace_ are the values.
You can store the PostgreSQL username and password as environment variables in your local system before starting the Tabsdata server as highlighted here in the documentation.
Important: If you use the environment variables, you will need to modify the Python code accordingly.
To install/update the Tabsdata and PostreSQL dependencies, run these command in your CLI:
Note: You can use a virtual environment to run this command, or run it in your system directly.
Please note that you need Python v3.12 or later, to install the package. Additionally, you need Tabsdata python package v0.9.5 or later to successfully run the functions from this article.
It is suggested that to work with newer versions of Tabsdata (v0.9.5 and above in this case), you remove the older Tabsdata instance, in case you have started the Tabsdata server earlier. This enables you to start from scratch, reducing the possibilities of error or conflicts.
Run the following commands in your CLI, to stop the Tabsdata server and clear the instance:
For Linux or macOS:
For Windows:
To start the Tabsdata server, use the following command:
To verify that the Tabsdata server instance is running:
Both, supervisor and apiserver need to become active for the server to be completely up and running.
Output:
The presence of supervisor and apiserv confirms that the server is running.
Before you can use Tabsdata, you must login to the server which can be done as follows:
When prompted from password put:
Output:
--
Now with all the relevant pieces setup, we can proceed to create our first publisher. A publisher is a simple Python function that reads data from external source(s) and map it to one or more Tabsdata tables. It can connect to external systems using built-in connectors provided by Tabsdata, or you can create your own connectors. A few things to note before we proceed:
In order to register our first publisher, we must create a collection. By default there are no collections within a Tabsdata server until you create one.
You can see this by running the following command:
For this tutorial, we will create a collection called "customers" to register our functions and hold the output Tabsdata tables. To create this collection use the following command:
This should have created the collection that you can verify by running the previous list command. You can also see more details about this collection using the info command as follows:
We will now register a publisher function that reads data from PostgreSQL and publishes some selected columns of this data to a table. For convenience, we have this function ready to use in the file publisher.py
and the name of this function is publish_customers
. Here is what this function looks like:
Here the @td.publisher decorator defines the following metadata:
customers
in the PostgreSQL database “customers” hosted locally. pg_username
and pg_password
are PostgreSQL credentials to access the database, retrieved from the Hashicorp Vault.customer_leads
.The function definition is very simple in this case with the following details:
publish_customers
that takes a TableFrame called tf
. Note that a TableFrame is the API similar to a traditional DataFrame. Note also that when this function executes, this input TableFrame will be populated by the data read from PostgreSQL table as specified in the decorator.That is all there is to a publisher. In a real world scenario, your publisher function can have many more inputs and may produce many more outputs. Moreover, the body of the function may do more complex operations on the data before publishing them to output tables. You can read more about that in the Tabsdata documentation. Register this publisher function to the "customers" collection using the following command.
You can now verify that the function was registered successfully by running the following command:
Output:
This output confirms that the function publish_customers
has been registered within the collection customers
.
As a reminder, registering a function in a collection does not execute it, and it must be invoked by a trigger. And if a publisher function has never been triggered, its corresponding output tables will not be initialized in the system.
Let's trigger our publisher. This can be done using the following command:
You can see the status whether the functions have finished executing by using the following command:
Output:
If the function has finished executing, you will see Published in the status.
The Tabsdata table "customer_leads" has been created in the "customers" collection. This table can now be subscribed to, by various stakeholders within the organization.
To check the schema of the table in Tabsdata, run this command in your CLI:
Output:
The columns $td.id
and $td.src
are columns created by Tabsdata to track row level provenance of data.
To check the sample of the table in Tabsdata, run this command in your CLI:
We’ll now create a subscriber to send data to PostgreSQL. This subscriber will create or update the high_value_customer_leads
table in the customers
database in PostgreSQL.
We will now register a subscriber function that reads data from the "customer_leads" table created by our publisher function in the prior steps, and externalizes this data to PostgreSQL. For convenience we have this function ready to use in the subscriber.py
and the name of the function is subscribe_customers
. Here is what this function looks like:
Here the @td.subscriber decorator defines the following metadata:
high_value_customer_leads
table in PostgreSQL. If the table is not present it will be created, and if the table exists it will be replaced. You can modify the if_table_exists
to append
if you want the new rows to be addended the existing table. It defaults to append
.The function definition is very simple with following details:
subscribe_customers
that takes a TableFrame as input. When executed, this input will be populated by the data coming from the "customer_leads" table.high_value_customer_leads
table in the "customers" database in PostgreSQL.In a real world scenario, your subscriber function may take input data from multiple tables, process it and create a derived output that is then sent to an external system.
Register this subscriber function to the "customers" collection using the following command:
You can verify that the function was registered successfully by running the following command:
Output:
As is the case with publisher functions, registering the subscriber function does not execute it. It must be executed by a trigger. In this step we will manually trigger the subscriber function and verify the generated output.
Let's now trigger our subscriber function using the following command:
Remember that you can see the status whether the functions have finished executing by using the following command:
If the function has finished executing, you will see Published in the status.
Once executed, the subscriber would have generated the high_value_customer_leads
table with the relevant details in the customers
database in PostgreSQL.
You can run the following command to check the output in PostgreSQL:
What happens when there is an update in your input data? How do you update the data used by the downstream users?
Let’s say there is an update in your input PostgreSQL table, and new customers get added to it. The customers_02.sql
file in the input directory presents one such scenario. This file has 20 new customers to be added to the customer_leads
table.
Before we can demonstrate the automatic execution of this workflow, we must add the new data to the customer_leads
table in the customers
database of PostgreSQL.
This can be done using the following command:
Output:
When this new workflow executes, the subscriber will overwrite the output table in PostgreSQL with new data. Hence, let's create a backup of this table for later comparison using the following command:
You can use the following command to create a copy of the PostgreSQL table created by the subscriber.
The publisher function that we registered earlier creates a table called customer_leads
. This table in turn has two registered subscribers. Together the publisher and subscribers make a simple data engineering workflow. When the publisher activates and updates the table, it will automatically trigger the subscribers to update their output.
To demonstrate this, we will trigger our publisher function. This should automatically trigger the subscriber functions which in turn should update the high_value_customer_leads
table in PostgreSQL.
Use the following command to trigger the publisher to read new input file:
Remember that you can see the status whether the functions have finished executing by using the following command:
If the function has finished executing, you will see Published in the status.
In this example, there is only one subscriber that was executed on refresh of the published table. However, it will work for any number of subscribers that are registered and have their input tables associated with the publisher.
The high_value_customer_leads
table in PostgreSQL would have been updated with the new data. You can run the command below, to check the difference between the old and new table.
Output:
Out of 20, 12 items have been added. These are the ones with deal value higher than or equal to $4000.
We have successfully implemented a Pub/Sub for Tables using Tabsdata. We published the data from PostgreSQL database to Tabsdata after selecting certain columns from it. We then subscribed to the published table, filtered it based on certain criteria, and exported the data to PostgreSQL. We also demonstrated automatic execution of the entire workflow when the data source was refreshed.
For the next steps, you can try adding a Tabsdata transformer in the mix. Perform complex transformations on the "customer_leads" table using a Tabsdata transformer, and connect the output table from the transformer to various subscribers. You can also experiment with different sources and destinations, beyond PostgreSQL.
I hope this gave you a better understanding of the Tabsdata system! I'd love to hear your thoughts—let us know how we can improve, what use cases you'd like us to cover in future blogs, or any other questions or feedback you have. Join the conversation on Discord, Slack, GitHub Discussions or reach out to us here.