March 13, 2025
March 13, 2025
(This article is part of the Hands-on with Tabsdata series, where we work through examples to understand the Tabsdata system and explore its potential. If you would just like to work with the code, you can go to the tutorials section of Tabsdata GitHub repo.)
Data engineers are no strangers to Pub/Sub systems. They power real-time event streaming, messaging queues, and distributed processing across modern data architectures. But here’s the thing—Pub/Sub is great for events, but what about tables? Data engineers primarily work with dataframes, not just event streams. Yet, today’s solutions force them to awkwardly fit structured data into event-driven paradigms, introducing significant overheads.
Imagine a world where you can publish and subscribe to entire tables, not just event streams. You won’t have to implement event stream based change-data-capture to track the updates to tables, and every tuple in the table comes with a detailed lineage and provenance. That’s what we enable with Tabsdata: Pub/Sub for Tables.
--
In this tutorial, we’ll explore how Pub/Sub for Tables solves a common data engineering challenge: manually processing an input file for each new stakeholder, or when there is an update to the input data.
Imagine you’re handling a CSV file containing the customer data that needs to be shared across various teams—such as sales, manufacturing, and logistics.
Here is an example of such a file:
To enforce best data governance practices only certain relevant columns from the CSV such as FIRST_NAME, LAST_NAME, COMPANY_NAME, EMAIL, and WEB should be shared with rest of teams. A typical approach to achieve this may involve using a local Python script to process the CSV and generate a sanitized version for sharing. However, this process must be repeated for each new stakeholder and every time the customer data is updated, making it inefficient and time-consuming. Additionally, different teams may end up working with outdated or inconsistent versions of the data, leading to discrepancies and reconciliation issues.
A Pub/Sub for table approach can automate this workflow and streamline data distribution. The key advantage lies in integrating data transformations directly into the publishing step. In this approach, a data engineer will define a publisher to process the raw CSV file and publish the resultant data as a table. Stakeholders from different teams can then subscribe to the resultant table in their preferred format, delivered to their chosen system. Every time the input changes it will automatically be transformed and published as a new version of the same table, which will be pushed to every subscriber.
This approach eliminates the need for manual file processing for each stakeholder and ensures that all teams receive a single, consistent, and up-to-date version of the data.
--
In the rest of the article, we’ll explore how to achieve this automation of workflow using Tabsdata.
We will start by setting up Tabsdata and registering a publisher that reads data from a CSV file, selects some aspects of it, and publishes it as a table within the system. Following that, we will register a subscriber that subscribes to this published table, and exports it to the file system in a JSON format. We will then demonstrate that when the publisher is rerun to load new data, the subscriber automatically writes it to the external system.
In a real-world scenario, your data source could be a database, an S3 bucket, or another storage location, while the subscriber could write data to various endpoints such as a database or file system.
Let’s dive in!
To install/update the Tabsdata Python package, run this command in your CLI:
Please note that you need Python 3.12 or later, to install the package. Additionally, you need Tabsdata python package 0.9.2 or later to successfully run the functions from this article.
To start the Tabsdata server, use the following command:
To verify that the Tabsdata server instance is running:
Output:
The presence of supervisor and apiserv confirms that the server is running.
Before you can use Tabsdata, you must login to the server which can be done as follows:
When prompted for password put:
If you haven't already, copy the GitHub repo to your system.
Using SSH:
Using GitHub CLI:
Remember that the data source for pur publisher is a directory on the file system, and our subscriber’s output system is also a directory on the file system for this example. To define these input and output directories we will use an environment variable called TDX. You can choose to define the absolute working directory of your input and output files in any other manner of your choice.
To store the value of your working directory in the variable TDX, run the following command in your CLI from the directory where you have copied the Github repo:
For Linux or macOS:
For Windows:
If you run an ls
(for Linux or macOS) or dir
(Windows) on t01_csv_pub_sub
you would see the following files and folders:
README.md
input/
|__ customers.csv
|__ customers_02.csv
publisher.py
subscriber.py
assets/
|_table_sample.png
|_table_schema.png
Here the folder input
contains two files customers.csv
and customers_02.csv
that serve as input files. To begin with, we'll use customers.csv
within the input
folder in the publisher function. Later we will replace the customers.csv
file with customers_02.csv
by overwriting it to demonstrate another core functionality of the system.
There are two Python source files - publisher.py
and subscriber.py
- which contain the publisher and subscriber functions. Feel free to take a peak at them - they are pretty straightforward. The assets folder has images for this README file.
Now that Tabsdata server is up and running, we can proceed to create our first publisher. A publisher is a simple Python function that uses built-in connectors provided by Tabsdata to read data from external source(s) and map it to one or more Tabsdata tables. A few things of note before we proceed:
In order to register our first publisher, we must create a collection. By default there are no collections within a Tabsdata server until you create one. You can see this by running the following command:
For this article, we will create a collection called CUSTOMERS where we will register our publisher function. To create this collection use the following command:
This should have created the collection that you can verify by running the previous list command. You can also see more details about this collection using the info
command as follows:
Output:
We will now register a publisher function that reads data from a CSV file on a specific input directory and publishes some selected columns of this data to a table. For convenience, we have this function ready to use in the file publisher.py
and the name of this function is publish_customers
. Here is what this function looks like:
@td.publisher(
source = td.LocalFileSource(os.path.join(os.getenv("TDX"), "input", "customers.csv")),
tables = ["CUSTOMER_LEADS"],
)
def publish_customers(tf: td.TableFrame):
output_tf = tf.select(["FIRST_NAME","LAST_NAME","COMPANY_NAME","EMAIL","WEB"])
return output_tf
Here the @td.publisher
decorator defines the following metadata:
$TDX/input/customers.csv
CUSTOMER_LEADS
The function definition is very simple in this case with the following details:
publish_customers
that takes a TableFrame
called tf
. Note that a TableFrame
is the API similar to a traditional DataFrame
for use with Tabsdata. Note also that when this function executes, this input TableFrame
will be populated by the data read from the $TDX/input/customers.csv
file as specified in the decorator.TableFrame
and returns it as an output. Note that this output TableFrame
will be mapped to a table called CUSTOMER_LEADS
as specified in the decorator.That is all there is to a publisher. In a real world scenario, your publisher function can have many more inputs and may produce many more out outputs. Moreover, the body of the function may do more complex operations on the data before publishing them to output tables.
Register this publisher function to the CUSTOMERS
collection using the following command.
You can now verify that the function was registered successfully by running the following command:
This output confirms that the function publish_customers
has been registered within the collection CUSTOMERS
.
As a reminder, registering a function in a collection does not execute it, and it must be invoked by a trigger. And if a publisher function has never been triggered, its corresponding output tables will not be initialized in the system.
Let's trigger our publisher. This can be done using the following command:
You can see the status whether the functions have finished executing by using the following command:
Output:
If the function has finished executing, you will see Published in the status.
The Tabsdata table CUSTOMER_LEADS
has been created in the CUSTOMERS
collection. This table can now be subscribed to, by various stakeholders within the organization.
To check the schema of the table in Tabsdata, run this command in your CLI:
Output:
The columns $td.id
and $td.src
are internal columns created by Tabsdata to track row level lineage and provenance of data.
To check the sample of the table in Tabsdata, run this command in your CLI:
Output:
Now that the customer data is available in the Tabsdata system as a table, it’s ready for subscription. To demonstrate this we will create our first subscriber. A subscriber is a simple Python function that reads data from tables published within Tabsdata and uses built-in connectors provided by Tabasdata to send the data out to an external system. A few things of note before we proceed:
CUSTOMERS
.We will now register a subscriber function that reads data from the CUSTOMERS_LEADS table created by our publisher function in the prior steps, and externalizes this data in JSON line format to a specific output directory. For convenience we have this function ready to use in the subscriber.py
and the name of the function is subscribe_customers
. Here is what this function looks like:
@td.subscriber(
tables = ["CUSTOMER_LEADS"],
destination = td.LocalFileDestination(os.path.join(os.getenv("TDX"), "output", "customer_leads.jsonl")),
)
def subscribe_customers(tf: td.TableFrame):
return tf
Here the @td.subscriber
decorator defines the following metadata:
CUSTOMER_LEADS
$TDX/output/customer_leads.jsonl
The function definition is very simple with following details:
subscribe_customers
that takes a TableFrame
as input. When executed, this input will be populated by the data coming from the CUSTOMER_LEADS
table.That is all there is to a subscriber. In a real world scenario, your subscriber function may take input data from multiple tables, process it and create a derived output that is then sent to an external system.
Register this subscriber function to the CUSTOMERS
collection using the following command:
You can now verify that the function was registered successfully by running the following command:
This should confirm that the subscribe_customers
has been registered within the collection CUSTOMERS
.
As is the case with publisher functions, registering the subscriber function does not execute it. It must be executed by a trigger. In this step we will manually trigger the subscriber function for the first time and verify the generated output.
We begin by making sure that there is no output directory present on our system. The following command should error out:
For Linux or macOS:
For Windows:
If this directory exists, go ahead and delete it. When the subscribe function is triggerd it will create the necessary output directory store the output file.
Let's now manually trigger our subscriber function using the following command:
Remember that you can see the status whether the functions have finished executing by using the following command:
If the function has finished executing, you will see Published in the status.
Once executed, the subscriber would have generated the output file customer_leads.jsonl
in the $TDX/output
directory.
Here is some sample data from customer_leads.jsonl
:
{"FIRST_NAME":"Peter","LAST_NAME":"Gutierres","COMPANY_NAME":"Niagara Custombuilt Mfg Co","EMAIL":"peter_gutierres@yahoo.com","WEB":"https://www.niagaracustombuiltmfgco.co.uk"}
{"FIRST_NAME":"Octavio","LAST_NAME":"Salvadore","COMPANY_NAME":"Practical Periphrals","EMAIL":"octavio.salvadore@yahoo.com","WEB":"https://www.practicalperiphrals.co.uk"}
{"FIRST_NAME":"Martha","LAST_NAME":"Teplica","COMPANY_NAME":"Curtin, Patricia M Esq","EMAIL":"mteplica@teplica.co.uk","WEB":"https://www.curtinpatriciamesq.co.uk"}
As you see from the output file, only the columns selected from the customers.csv
defined in publisher.py
file have been exported, and the jsonl
file is ready for consumption.
What happens when there is an update in your input data? How do you update the data used by the downstream users?
Let’s say there is an update in your CSV file, and 20 new customers get added to the CSV file. The customers_02.csv
file in the input
folder presents one such scenario. The file has 20 new customers in addition to the customers present in the customers.csv
file in the input
folder.
Here are details of 3 new customers from the 20 who have been added:
Before we can demonstrate the automatic execution of this workflow, we must provision the new input file in the correct location for the publisher to read and publish it accordingly. This can be done using the following command:
For Linux or macOS:
For Windows:
This will overwrite the customers.csv
file that was previously copied from customers_01.csv
file for our first execution.
When this new workflow executes, the subscriber will overwrite the output file $TDX/output/customer_leads.jsonl
with new data. Hence, let's create a backup of this file for later comparison using the following command:
For Linux or macOS:
For Windows:
The publisher function that we registered earlier creates a table called CUSTOMER_LEADS
. This table in turn has a registered subscriber. Together, this publisher/subscriber pair makes a simple data engineering workflow. When the publisher activates and updates the table, it will automatically trigger any subscribers for the updated tables.
To demonstrate this, we will trigger our publisher function manually. This should automatically trigger the subscriber function which in turn should overwrite our expected output file. Since the new input file has 20 more customer records, we expect that the output file will also have 20 more customer records available.
Use the following command to trigger the publisher to read new input file:
Remember that you can see the status whether the functions have finished executing by using the following command:
If the function has finished executing, you will see Published in the status.
In this example, there is only one subscriber that was executed on refresh of the published table. However, it will work for any number of subscribers that are registered and have their input tables associated with the publisher.
Once the publisher has been executed, you can check the customer_leads.jsonl
file in the output
folder to see if the changes are getting reflected.
Here is some sample data from the new customer_leads.jsonl
:
{"FIRST_NAME":"Aleshia","LAST_NAME":"Tomkiewicz","COMPANY_NAME":"Alan D Rosenburg Cpa Pc","EMAIL":"atomkiewicz@hotmail.com","WEB":"https://www.alandrosenburgcpapc.co.uk"}
{"FIRST_NAME":"Evan","LAST_NAME":"Zigomalas","COMPANY_NAME":"Cap Gemini America","EMAIL":"evan.zigomalas@gmail.com","WEB":"https://www.capgeminiamerica.co.uk"}
{"FIRST_NAME":"France","LAST_NAME":"Andrade","COMPANY_NAME":"Elliott, John W Esq","EMAIL":"france.andrade@hotmail.com","WEB":"https://www.elliottjohnwesq.co.uk"}
The above users were not present in the JSON file before, and have been added after the publisher was triggered with the new customers.csv
file. You can verify this by comparing the customer_leads.jsonl
file with customer_leads_01.jsonl
that we saved for comparison earlier.
We have successfully implemented a Pub/Sub for Tables using Tabsdata. We published the data from a CSV file as a table after selecting certain columns from it. We then subscribed to the published table. We also demonstrated automatic execution of the entire workflow when a data source was refreshed.
For the next steps, here are a couple of experiements you can try:
CUSTOMER_LEADS
table using a Tabsdata tranformer, and connect the output table from the transformer to a subscriber.I hope this gave you a good introduction to the Tabsdata system! I'd love to hear your thoughts—let us know how we can improve, what use cases you'd like us to cover in future blogs, or any other questions or feedback you have. Join the conversation on Discord, Github Discussions or reach out to us here.