import tabsdata as td
# Ingesting data from Salesforce
@td.publisher(
source=td.SalesforceSource(
username=td.HashiCorpSecret("secrets","SALESFORCE_USERNAME"),
password=td.HashiCorpSecret("secrets","SALESFORCE_PASSWORD"),
security_token=td.HashiCorpSecret("secrets","SALESFORCE_SECURITY_TOKEN"),
query=["SELECT Id, Name, Country FROM Contact WHERE CreatedDate > 2025-01-01"]),
tables=["contacts"],
)
def read_contacts(contacts: td.TableFrame):
return contacts
import tabsdata as td
# Processing data for further consumption
@td.transformer(
input_tables=["contacts"],
output_tables=["contacts_usa"]
)
def contacts_filter_usa(contacts: td.TableFrame):
contacts_usa = contacts.filter(td.col("Country").str.contains(pattern = "USA", literal = True))
return contacts_usa
import tabsdata as td
# Loading data to AWS Data Lake
@td.subscriber(
tables=["contacts_usa"],
destination=td.S3Destination(
"s3://acme_inc_sales/contacts_usa.csv",
credentials = td.S3AccessKeyCredentials(
td.HashiCorpSecret("secrets","AWS_KEY_ID"),td.HashiCorpSecret("secrets","AWS_ACCESS_KEY")),
region = "us-east-2",
)
)
def contacts_usa(tf: td.TableFrame):
return tf
import tabsdata as td
# Ingesting data from Salesforce
@td.publisher(
source=td.SalesforceSource(
username=td.HashiCorpSecret(....),
password=td.HashiCorpSecret(....),
security_token=td.HashiCorpSecret(....),
query=["SELECT Id, Name, Country FROM Contact"]),
tables=["contacts"],
)
def read_contacts(contacts: td.TableFrame):
return contacts
import tabsdata as td
# Processing data for further consumption
@td.transformer(
input_tables=["contacts"],
output_tables=["contacts_usa"]
)
def contacts_filter_usa(contacts: td.TableFrame):
contacts_usa = contacts.filter(
td.col("Country").str.contains(
pattern = "USA", literal = True))
return contacts_usa
import tabsdata as td
# Loading data to AWS Data Lake
@td.subscriber(
tables=["contacts_usa"],
destination=td.S3Destination(
"s3://acme_inc_sales/contacts_usa.csv",
credentials = td.S3AccessKeyCredentials(
td.HashiCorpSecret("secrets","KEY_ID"),
td.HashiCorpSecret("secrets","ACCESS_KEY")),
region = "us-east-2")
)
def contacts_usa(tf: td.TableFrame):
return tf
$ pip install tabsdata
Copy