Object and field mapping
There are at least 3 approaches to object and field level mapping appropriate for different situations, in increasing level of effort / complexity.
- SQL-based mapping directly in the database
- Declarative power of SQL, mapped result visible to all db clients with no additional jobs to manage
- Runtime mapping in application code
- Not as declarative as SQL, and mapped result only visible in the calling application. However any changes to logic propagates instantly without any backfills or workloads to manage.
- Post-sync transformation triggered by webhooks
- Most flexible, but needs to manage additional workloads responsible for the transformation. Transformation results can be visible in DB but any changes in desired logic requires re-running the transforms
We recommend starting with the simplest approach and graduate to a more complex one only when necessary.
SQL-based mapping directly in the database
This approach assumes that Supaglue lands data into your postgres or a similar database.
Field mapping
Let’s start off with the simplest case of mapping - field mapping. Since Supaglue lands all of the data in a _supaglue_unified_data
jsonb column, the easiest way is to add a generated column pulling data out. For example
ALTER TABLE engagement_users ADD COLUMN is_locked boolean
GENERATED ALWAYS AS ((_supaglue_unified_data->> 'is_locked')::boolean) STORED;
In addition to renaming, you can even use the full power of SQL to do transformation on the field in the generated clause.
Object Mapping
Suppose you want both contacts
and accounts
from Hubspot to be mapped into a new entity called hubspot_objects
, you can accomplish this with a union query.
WITH hubspot_objects as (
SELECT
'hubspot_accounts' AS entity_name,
_supaglue_customer_id AS customer_id,
raw_data
FROM
hubspot_accounts
UNION
SELECT
'hubspot_contacts' AS entity_name,
_supaglue_customer_id AS customer_id,
raw_data
FROM
hubspot_contacts
)
SELECT * from hubspot_objects;
Of course, you can combine object mapping and field mapping together into a single query as well.
Per-customer object & field mapping
Now things gets a bit more complex. Suppose your customers store product ideas in their CRM and you need to pull them out to run some analysis. Except each customer may have a different workflow and therefore store their ideas in different places. customer_a
uses Salesforce and store them on opportunities
as notes
. customer_b
uses Hubspot and store them in a custom object called custom_ideas
inside a field called content
and customer_c
doesn’t even use a CRM and store them directly inside their engagement tool Outreach on contact
called ideas
. At the end of the day you just want a single table called product_ideas
to feed into your ML system. What then?
Go into the same database Supaglue has landed for you, and create a mapping table to capture this.
CREATE TABLE product_ideas_mapping (
customer_id varchar,
object_name varchar,
field_name varchar
);
INSERT INTO product_ideas_mapping (customer_id, object_name, field_name)
VALUES
('customer_1', 'salesforce_opportunities', 'notes'),
('customer_2', 'hubspot_ideas', 'content'),
('customer_3', 'outreach_contacts', 'ideas');
Then you combine the previously approach of field & object mapping together while joining against the mapping table to pull in the right. Notably this
--- This can be either a (materialized) view or a simple select query.
CREATE VIEW product_ideas AS (
WITH raw_table AS (
SELECT
'salesforce_opportunities' AS object_name,
_supaglue_customer_id AS customer_id,
raw_data
FROM
salesforce_opportunities
UNION
SELECT
name AS object_name,
_supaglue_customer_id AS customer_id,
raw_data
FROM
custom_objects
UNION
SELECT
'outreach_contacts' AS object_name,
_supaglue_customer_id AS customer_id,
entity_data as raw_data
FROM
outreach_contacts
)
SELECT
p.customer_id,
(r.raw_data ->> p. "field_name") AS idea
FROM
raw_table r
JOIN product_ideas_mapping p ON r.customer_id = p.customer_id
AND r.object_name = p.object_name
);
By the way, you may also want to specific on a per-customer basis what should be sync’ed in the first place to make available for mapping. To specify that make sure you are hitting the update connection sync config when customer changes their settings in your app.
Runtime mapping in application code
Sometimes it is inconvenient or not possible to modify the database. Let’s take product idea mapping example from the previous section. Perhaps your primary application database where you store user mapping is in MongoDB while Supabase lands data into a Postgres DB. In this case you can first retrieve the mapping for a given user from Mongo, then use it to construct a dynamic SQL query (with or without the help of ORM) to retrieve the objects you care about.
for (const customerId in ['customer_1', 'customer_2', 'customer_3']) {
const {object_name, field_name} = mongo.productIdeaMapping.find({customerId: 'customer_1')
if (object_name.startsWith('hubspot_custom')) {
const ideas = await sql`
SELECT _supaglue_customer_id as customer_id, raw_data->>${field_name}
FROM custom_objects
WHERE name = ${object_name}`;
} else {
const ideas = await sql`
SELECT _supaglue_customer_id as customer_id, raw_data->>${field_name}
from ${object_name}`;
}
}
Supposed you are using TypeORM
, you can similarly accomplish it with the following
const repos = {
salesforce_opportunities: SalesforceOpportunityRepo,
custom_objects: HubspotCustomObjectsRepo
outreach_contacts: OutreachContactsRepo
}
for (const customerId in ['customer_1', 'customer_2', 'customer_3']) {
const {object_name, field_name} = mongo.productIdeaMapping.find({customerId: 'customer_1')
if (object_name.startsWith('hubspot_custom')) {
const ideas = repos.hubspotCustomObject.createQueryBuilder('ideas')
.select()
.addSelect(`raw_data->>'${field_name}'`, 'idea')
.where('name', '=', object_name)
.getRows()
// NOTE: You can also
} else {
const repo = repos[object_name]
const ideas = repo.createQueryBuilder('ideas')
.select()
.addSelect(`raw_data->>'${field_name}'`, 'idea')
.getRows()
}
}
Similar to the SQL based mapping, you may need to hit the update connection sync config API to configure what data should be synced int the first place on a per-customer baiss.
Post-sync transformation triggered by webhooks
This tutorial will use Supaglue to sync your customers' CRM records into existing tables in your database using your schema.
Prerequisites
This tutorial assumes you have already gone through Supaglue's Quickstart and will use the following technologies:
- Nextjs
- Prisma
- Postgres
Scenario
Suppose that your application has two database tables, contact
and opportunity
, that we want to write into using your customers' CRM data:
Table "production.contact"
Column | Type | Collation | Nullable | Default
--------------------------+--------------------------------+-----------+----------+---------
first_name | text | | not null |
last_name | text | | not null |
id | text | | not null |
Table "production.opportunity"
Column | Type | Collation | Nullable | Default
--------------------------+--------------------------------+-----------+----------+---------
name | text | | not null |
description | text | | not null |
probability | text | | not null |
id | text | | not null |
In this tutorial, we'll use three example customers: user-
, user-2
, and user-3
, that use the following CRMs and need the following objects to be mapped to your contact
and opportunity
tables:
Your database table | user-1 | user-2 | user-3 |
---|---|---|---|
contact | Hubspot contact | Salesforce Lead | Salesforce Contact |
opportunity | Hubspot deal | Salesforce Opportunity | Salesforce Opportunity |
Write mapped records to your database
We will go over how to do the following:
- Listen for Supaglue's
sync.complete
webhook event to trigger a transformation function - Define the transformation function that will map per-provider/customer/object records to your application entities
- Paginate over and run the transform function to convert the raw CRM data provided by Supaglue into your existing database tables
"Entities" is used to refer to your application data models.
1. Listen for Supaglue's webhook event
Supaglue emits different kinds of notification webhooks. For object and field mapping, we want to process the sync.complete
event which Supaglue emits after a full or incremental sync completes.
export async function POST(request: NextRequest) {
const data = await request.json();
if (data.webhook_event_type !== 'sync.complete') {
return NextResponse.json({});
}
if (data.result !== 'SUCCESS') {
return NextResponse.json({});
}
// call your transformation function here
return NextResponse.json({});
}
Please refer to the consuming webhooks tutorial for more details on how to do it.
2. Define the transformation function
When a sync completes for a specific provider/customer/object, we need to determine if that maps to a contact
or opportunity
in your application.
Define a per-customer mapper file
First, create a per-customer mapper file containing the mapping configuration for each customer's records to your database tables. The following are examples for user-1
, user-2
, user-3
from the Scenario section above:
- user-1
- user-2
- user-3
// user1 mapping:
// hubspot's contact.firstname --> contact.firstName
// hubspot's contact.lastname --> contact.lastName
// and:
// hubspot's deal.hs_deal_stage_probability --> opportunity.probability (a float)
// hubspot's deal.dealname --> opportunity.name
// hubspot's deal.description --> opportunity.description
const user1: UserConfig = {
providerName: 'hubspot',
entityMappings: {
contact: {
object: 'contact',
mappingFn: (record: Record<string, string | null>): Contact => ({
firstName: record['firstname'],
lastName: record['lastname'],
}),
},
opportunity: {
object: 'deal',
mappingFn: (record: Record<string, string | null>): Opportunity => {
const hs_deal_stage_probability = record['hs_deal_stage_probability'];
const probability = hs_deal_stage_probability !== null ? parseFloat(hs_deal_stage_probability) : null;
return {
name: record['dealname'],
description: record['description'],
probability,
};
},
},
},
};
// user2 mapping:
// salesforce's Lead.FirstName --> Contact.firstName
// salesforce's Lead.LastName --> Contact.lastName
// and:
// salesforce's Opportunity.Name --> opportunity.name
// salesforce's Opportunity.Description --> opportunity.decription
// salesforce's Opportunity.ProbabilityV2__c --> opportunity.probability (a float)
const user2: UserConfig = {
providerName: 'salesforce',
entityMappings: {
contact: {
object: 'Lead',
mappingFn: (record: Record<string, string>): Contact => ({
firstName: record['FirstName'],
lastName: record['LastName'],
}),
},
opportunity: {
object: 'Opportunity',
mappingFn: (record: Record<string, string>): Opportunity => ({
name: record['Name'],
description: record['Description'],
probability: parseInt(record['ProbabilityV2__c']) / 100, // ProbabilityV2__c is field that is a picklist of values [10, 20, ..., 100]
}),
},
},
};
// user3 mapping:
// salesforce's Contact.FirstName --> Contact.firstName
// salesforce's Contact.LastName --> Contact.lastName
// and:
// salesforce's Opportunity.Name --> opportunity.name
// salesforce's Opportunity.Description --> opportunity.decription
// salesforce's Opportunity.Probability --> opportunity.probability (a float)
const user3: UserConfig = {
providerName: 'salesforce',
entityMappings: {
contact: {
object: 'Contact',
mappingFn: (record: Record<string, string>): Contact => ({
firstName: record['FirstName'],
lastName: record['LastName'],
}),
},
opportunity: {
object: 'Opportunity',
mappingFn: (record: Record<string, string>): Opportunity => ({
name: record['Name'],
description: record['Description'],
probability: parseFloat(record['Probability']) / 100,
}),
},
},
};
Create a helper to retrieve the right mapper based on {providerName, customerId, object}
The following code uses the per-customer mapper files from above:
const userConfigs: Record<string, UserConfig> = {
'user-1': user1,
'user-2': user2,
'user-3': user3,
};
// this function can be called to get the mapper (to go from Supaglue object to your application entity) for the given user
export const getMapper = (providerName: string, customerId: string, object: string): MapperAny | null => {
const config = userConfigs[customerId];
if (!config) {
return null;
}
if (config.providerName !== providerName) {
return null;
}
// Find the corresponding entity name and mapper
for (const [entityName, entityMappingConfig] of Object.entries(config.entityMappings)) {
if (entityMappingConfig.object === object) {
return {
object,
entityName,
mappingFn: entityMappingConfig.mappingFn,
} as MapperAny;
}
}
};
In this sample code, you, the developer, define the object and field mappings. You could also evolve this code to allow your customers to configure the object and field mappings.
Use the helper to call the mapper
Now, let's call the mapper file from your transformation function:
// For different customers, we want to map things differently
const mapper = getMapper(data.provider_name, data.customer_id, data.object);
3. Paginate over and transform newly-synced records
Now, we want to transform the newly-synced records to write into your existing database tables. We'll use the mapper that we wrote earlier to do this.
Please refer to the Pagination tutorial for more details on pagination over newly-synced records.
- Algorithm
- Example Code
Upon a sync.complete
webhook event:
- Read all provider/customer/object records since the last high-water mark
- For each record:
- Transform it using the mapper
- Upsert the transformed record into your database table
- Set the new high watermark
const lastMaxModifiedAt = lastMaxLastModifiedAtMs ? new Date(lastMaxLastModifiedAtMs) : undefined;
const newMaxLastModifiedAtMs = await step.run('Update records', async () => {
async function getSupaglueRecords(providerName: string, object: string) {
const params = {
where: {
supaglue_provider_name: data.provider_name,
supaglue_customer_id: data.customer_id,
supaglue_last_modified_at: {
gt: lastMaxModifiedAt,
},
},
};
switch (providerName) {
case 'salesforce': {
switch (object) {
case 'Contact':
return prismaSupaglue.supaglueSalesforceContact.findMany(params);
case 'Lead':
return prismaSupaglue.supaglueSalesforceLead.findMany(params);
case 'Opportunity':
return prismaSupaglue.supaglueSalesforceOpportunity.findMany(params);
default:
throw new Error(`Unsupported Salesforce object: ${object}`);
}
}
case 'hubspot': {
switch (object) {
case 'contact':
return prismaSupaglue.supaglueHubSpotContact.findMany(params);
case 'deal':
return prismaSupaglue.supaglueHubSpotDeal.findMany(params);
default:
throw new Error(`Unsupported HubSpot object: ${object}`);
}
}
default:
throw new Error(`Unsupported provider: ${providerName}`);
}
}
// Read from staging table
const records = await getSupaglueRecords(data.provider_name, data.object);
if (!records.length) {
return undefined;
}
let maxLastModifiedAtMs = 0;
// TODO: don't iterate one by one
for (const record of records) {
const lastModifiedAtMs = record.supaglue_last_modified_at.getTime();
if (lastModifiedAtMs > maxLastModifiedAtMs) {
maxLastModifiedAtMs = lastModifiedAtMs;
}
if (record.supaglue_is_deleted) {
// Delete
const params = {
where: {
providerName: data.provider_name,
customerId: data.customer_id,
originalId: record.supaglue_id,
},
};
switch (mapper.entityName) {
case 'contact':
await prismaApp.contact.deleteMany(params);
break;
case 'opportunity':
await prismaApp.opportunity.deleteMany(params);
break;
}
} else {
// Upsert
switch (mapper.entityName) {
case 'contact': {
const mappedRecord = mapper.mappingFn(record.supaglue_mapped_data as Record<string, unknown>);
const decoratedData = {
providerName: data.provider_name,
customerId: data.customer_id,
originalId: record.supaglue_id,
...mappedRecord,
};
await prismaApp.contact.upsert({
create: decoratedData,
update: decoratedData,
where: {
providerName_customerId_originalId: {
providerName: data.provider_name,
customerId: data.customer_id,
originalId: record.supaglue_id,
},
},
});
break;
}
case 'opportunity': {
const mappedRecord = mapper.mappingFn(record.supaglue_mapped_data as Record<string, unknown>);
const decoratedData = {
providerName: data.provider_name,
customerId: data.customer_id,
originalId: record.supaglue_id,
...mappedRecord,
};
await prismaApp.opportunity.upsert({
create: decoratedData,
update: decoratedData,
where: {
providerName_customerId_originalId: {
providerName: data.provider_name,
customerId: data.customer_id,
originalId: record.supaglue_id,
},
},
});
break;
}
}
}
}
return maxLastModifiedAtMs;
});
More information
You can try out a working example of this tutorial by cloning the object-field-mapping-example repository and following the instructions in the README.
You'll want to customize the code from this tutorial to fit your specific application data model, use case, performance, and reliability requirements.