Build a common schema
Suppose your application integrates with multiple third-party Providers in one category, like Salesforce, Hubspot, and Pipedrive in CRM. In that case, you likely want to read their data in a unified way. For example, for reading contacts from these CRMs, use one table in your database instead of three tables.
This tutorial will teach you how to structure your transformation code and how to build a common schema using Salesforce as an example.
Prerequisites
This tutorial assumes you have already gone through Supaglue's Quickstart, have read our documentation on listening for webhooks, and have read our documentation on pagination.
We also will use the following technologies:
- Next.js
- Prisma
- Postgres
Scenario
Our example application has the concepts of users, opportunities, leads, accounts, and contacts and you want to create a table for each to store data from Salesforce, Hubspot, and Pipedrive. The schemas for these models (using Prisma syntax) look like the following:
- User
- Opportunity
- Lead
- Account
- Contact
model User {
id String @id
customerId String @map("_supaglue_customer_id")
providerName String @map("_supaglue_provider_name")
name String
email String
isActive Boolean @map("is_active")
createdAt DateTime? @map("created_at")
updatedAt DateTime? @map("updated_at")
@@unique([customerId, id, providerName])
@@map("users")
}
A User
is a person who can log into your customer's third-party Provider account.
model Opportunity {
id String @id
customerId String @map("_supaglue_customer_id")
providerName String @map("_supaglue_provider_name")
name String
description String?
ownerId String? @map("owner_id")
status String?
stage String?
closeDate DateTime? @map("close_date")
accountId String @map("account_id")
pipeline String?
amount BigInt?
lastActivityAt DateTime? @map("last_activity_at")
createdAt DateTime? @map("created_at")
updatedAt DateTime? @map("updated_at")
@@unique([customerId, id, providerName])
@@map("opportunities")
}
model Lead {
id String @id
customerId String @map("_supaglue_customer_id")
providerName String @map("_supaglue_provider_name")
firstName String @map("first_name")
lastName String @map("last_name")
ownerId String? @map("owner_id")
title String?
company String?
convertedDate DateTime? @map("converted_date")
leadSource String? @map("lead_source")
convertedAccountId String? @map("converted_account_id")
convertedContactId String? @map("converted_contact_id")
/// in the format: [{ "addressType": "shipping", "street1": "1234 Main St.", "street1": "Suite 123", "city": "San Francisco", "state": "CA", "postalCode": "94123", "country": "USA" }, ...]
addresses Json? @map("addresses")
/// in the format: [{ "emailAddress": "john@doe.com", emailAddressType: "primary" }]
emailAddresses Json? @map("email_addresses")
/// in the format: [{ "phoneNumber": "555-555-5555", "phoneNumberType": "mobile" }]
phoneNumbers Json? @map("phone_numbers")
createdAt DateTime? @map("created_at")
updatedAt DateTime? @map("updated_at")
@@unique([customerId, id, providerName])
@@map("leads")
}
model Account {
id String @id
customerId String @map("_supaglue_customer_id")
providerName String @map("_supaglue_provider_name")
name String
description String?
ownerId String? @map("owner_id")
industry String?
website String?
numberOfEmployees Int? @map("number_of_employees")
/// in the format: [{ "addressType": "shipping", "street1": "1234 Main St.", "street1": "Suite 123", "city": "San Francisco", "state": "CA", "postalCode": "94123", "country": "USA" }, ...]
addresses Json? @map("addresses")
/// in the format: [{ "phoneNumber": "555-555-5555", "phoneNumberType": "mobile" }]
phoneNumbers Json? @map("phone_numbers")
lifecycleStage String? @map("lifecycle_stage")
lastActivityAt DateTime? @map("last_activity_at")
createdAt DateTime? @map("created_at")
updatedAt DateTime? @map("updated_at")
@@unique([customerId, id, providerName])
@@map("accounts")
}
model Contact {
id String @id
customerId String @map("_supaglue_customer_id")
providerName String @map("_supaglue_provider_name")
accountId String? @map("account_id")
ownerId String? @map("owner_id")
firstName String? @map("first_name")
lastName String? @map("last_name")
/// in the format: [{ "addressType": "shipping", "street1": "1234 Main St.", "street1": "Suite 123", "city": "San Francisco", "state": "CA", "postalCode": "94123", "country": "USA" }, ...]
addresses Json? @map("addresses")
/// in the format: [{ "emailAddress": "john@doe.com", emailAddressType: "primary" }]
emailAddresses Json? @map("email_addresses")
/// in the format: [{ "phoneNumber": "555-555-5555", "phoneNumberType": "mobile" }]
phoneNumbers Json? @map("phone_numbers")
lifecycleStage String? @map("lifecycle_stage")
lastActivityAt DateTime? @map("last_activity_at")
createdAt DateTime? @map("created_at")
updatedAt DateTime? @map("updated_at")
@@unique([customerId, id, providerName])
@@map("contacts")
}
Setup
Trigger your transformation code after it is synced to Supaglue-managed tables by listening for the sync.complete
webhook. For more information, please see our documentation on listening for webhooks.
Transforming data
You can map data from each provider to your common schema using two approaches:
- SQL selects, aliasing, and functions for simple transformations like renaming fields and format conversion
- Code for more complex transformations like operating on objects and arrays
In the examples below, we rename fields using SQL aliasing and use code for more complex transformations.
- Algorigthm
- Simple Example (User)
- Complex Example (Contact)
Upon a sync.complete
webhook event:
- Read all customer/provider/object records since the last high watermark
- For each record, transform it using the mapper
- Upsert the transformed record into your database table
- Set the new high watermark
Please refer to the Pagination tutorial for more details on pagination over newly-synced records.
This example maps Salesforce's User
fields to our User
model fields using SQL:
import prisma from '@prisma/client';
import { sql } from '@vercel/postgres';
// fetch the lastMaxModifiedAt for this customer/provider/object from the persistent store
const { rows } = await (lastMaxModifiedAt
? sql`
SELECT
_supaglue_raw_data->>'Id' AS id,
${data.customer_id} AS customerId,
${data.provider_name} AS providerName,
_supaglue_raw_data->>'Name' AS name,
_supaglue_raw_data->>'Email' AS email,
_supaglue_raw_data->>'IsActive' as isActive,
_supaglue_raw_data->>'CreatedDate' AS createdAd,
_supaglue_raw_data->>'SystemModstamp' AS updatedAt,
_supaglue_is_deleted AS isDeleted,
_supaglue_last_modified_at AS lastModifiedAt
FROM supaglue.salesforce_user
WHERE _supaglue_last_modified_at > ${lastMaxModifiedAt.toISOString()}
ORDER BY _supaglue_last_modified_at ASC`
: sql`
SELECT
_supaglue_raw_data->>'Id' AS id,
${data.customer_id} AS customerId,
${data.provider_name} AS providerName,
_supaglue_raw_data->>'Name' AS name,
_supaglue_raw_data->>'Email' AS email,
_supaglue_raw_data->>'IsActive' as isActive,
_supaglue_raw_data->>'CreatedDate' AS createdAt,
_supaglue_raw_data->>'SystemModstamp' AS updatedAt,
_supaglue_is_deleted AS isDeleted,
_supaglue_last_modified_at AS lastModifiedAt
FROM supaglue.salesforce_user
ORDER BY _supaglue_last_modified_at ASC`);
// this is to keep track of the latest modified date we see so we can persist it later
let newMaxLastModifiedAt: Date = lastMaxModifiedAt || new Date(0);
for (const row of rows) {
if (row.isDeleted) {
await prisma.user.delete({
where: {
id: row.id,
customerId: row.customerId,
providerName: row.providerName,
},
});
continue;
}
await prisma.user.upsert({
create: {
id: row.id,
customerId: row.customerId,
providerName: row.providerName,
name: row.name,
email: row.email,
isActive: row.isActive === 'true',
createdAt: new Date(row.createdAt),
updatedAt: new Date(row.updatedAt),
},
update: {
email: row.email,
name: row.name,
isActive: row.isActive === 'true',
createdAt: new Date(row.createdAt),
updatedAt: new Date(row.updatedAt),
},
where: {
id: row.id,
customerId: row.customerId,
providerName: row.providerName,
},
});
const newMaxLastModifiedAt =
newMaxLastModifiedAt > new Date(row.lastModifiedAt) ? newMaxLastModifiedAt : new Date(row.lastModifiedAt);
}
// persist the newMaxLastModifiedAt high watermark
This example maps Salesforce's Contact
fields using SQL for field renaming and then code for mapping phone numbers, addresses, and email addresses:
import prisma from '@prisma/client';
import { sql } from '@vercel/postgres';
// fetch the lastMaxModifiedAt for this customer/provider/object from the persistent store
const { rows } = await (lastMaxModifiedAt
? sql`
SELECT
_supaglue_raw_data->>'Id' AS id,
${data.customer_id} AS customerId,
${data.provider_name} AS providerName,
_supaglue_raw_data->>'FirstName' AS firstName,
_supaglue_raw_data->>'LastName' AS lastName,
_supaglue_raw_data->>'AccountId' AS accountId,
_supaglue_raw_data->>'OwnerId' AS ownerId,
_supaglue_raw_data->>'Email' AS email,
_supaglue_raw_data->>'Phone' AS phone,
_supaglue_raw_data->>'MobilePhone' AS mobilePhone,
_supaglue_raw_data->>'MailingCity' AS mailingCity,
_supaglue_raw_data->>'MailingCountry' AS mailingCountry,
_supaglue_raw_data->>'MailingPostalCode' AS mailingPostalCode,
_supaglue_raw_data->>'MailingState' AS mailingState,
_supaglue_raw_data->>'MailingStreet' AS mailingStreet,
_supaglue_raw_data->>'OtherCity' AS otherCity,
_supaglue_raw_data->>'OtherCountry' AS otherCountry,
_supaglue_raw_data->>'OtherPostalCode' AS otherPostalCode,
_supaglue_raw_data->>'OtherState' AS otherState,
_supaglue_raw_data->>'OtherStreet' AS otherStreet,
_supaglue_raw_data->>'CreatedDate' AS createdAt,
_supaglue_raw_data->>'SystemModstamp' AS updatedAt,
_supaglue_raw_data->>'LastActivityDate' as lastActivityAt,
_supaglue_is_deleted AS isDeleted,
_supaglue_last_modified_at AS lastModifiedAt
FROM supaglue.salesforce_contact
WHERE _supaglue_last_modified_at > ${lastMaxModifiedAt.toISOString()}
ORDER BY _supaglue_last_modified_at ASC`
: sql`
SELECT
_supaglue_raw_data->>'Id' AS id,
${data.customer_id} AS customerId,
${data.provider_name} AS providerName,
_supaglue_raw_data->>'FirstName' AS firstName,
_supaglue_raw_data->>'LastName' AS lastName,
_supaglue_raw_data->>'AccountId' AS accountId,
_supaglue_raw_data->>'OwnerId' AS accountId,
_supaglue_raw_data->>'Email' AS email,
_supaglue_raw_data->>'Phone' AS phone,
_supaglue_raw_data->>'MobilePhone' AS mobilePhone,
_supaglue_raw_data->>'Fax' AS faxPhone,
_supaglue_raw_data->>'MailingCity' AS mailingCity,
_supaglue_raw_data->>'MailingCountry' AS mailingCountry,
_supaglue_raw_data->>'MailingPostalCode' AS mailingPostalCode,
_supaglue_raw_data->>'MailingState' AS mailingState,
_supaglue_raw_data->>'MailingStreet' AS mailingStreet,
_supaglue_raw_data->>'OtherCity' AS otherCity,
_supaglue_raw_data->>'OtherCountry' AS otherCountry,
_supaglue_raw_data->>'OtherPostalCode' AS otherPostalCode,
_supaglue_raw_data->>'OtherState' AS otherState,
_supaglue_raw_data->>'OtherStreet' AS otherStreet,
_supaglue_raw_data->>'CreatedDate' AS createdAt,
_supaglue_raw_data->>'SystemModstamp' AS updatedAt,
_supaglue_raw_data->>'LastActivityDate' as lastActivityAt,
_supaglue_is_deleted AS isDeleted,
_supaglue_last_modified_at AS lastModifiedAt
FROM supaglue.salesforce_contact
ORDER BY _supaglue_last_modified_at ASC`);
// this is to keep track of the latest modified date we see so we can persist it later
let newMaxLastModifiedAt: Date = lastMaxModifiedAt || new Date(0);
for (const row of rows) {
if (row.isDeleted) {
await prisma.contact.delete({
where: {
id: row.id,
customerId: row.customerId,
providerName: row.providerName,
},
});
continue;
}
const addresses = [
row.mailingCity || row.mailingCountry || row.mailingPostalCode || row.mailingState || row.mailingStreet
? {
addressType: 'mailing',
street1: row.mailingStreet,
street2: null,
city: row.mailingCity,
state: row.mailingState,
postalCode: row.mailingPostalCode,
country: row.mailingCountry,
}
: null,
row.otherCity || row.otherCountry || row.otherPostalCode || row.otherState || row.otherStreet
? {
addressType: 'other',
street1: row.otherStreet,
street2: null,
city: row.otherCity,
state: row.otherState,
postalCode: row.otherPostalCode,
country: row.otherCountry,
}
: null,
].filter(Boolean);
const phoneNumbers = [
row.phone ? { phoneNumber: row.phone, phoneNumberType: 'primary' } : null,
row.mobilePhone ? { phoneNumber: row.mobilePhone, phoneNumberType: 'mobile' } : null,
row.faxPhone ? { phoneNumber: row.faxPhone, phoneNumberType: 'fax' } : null,
].filter(Boolean);
await prisma.contact.upsert({
create: {
id: row.id,
customerId: row.customerId,
providerName: row.providerName,
accountId: row.accountId,
ownerId: row.ownerId,
firstName: row.firstName,
lastName: row.lastName,
emailAddresses: row.email ? [{ emailAddress: row.email, emailAddressType: 'primary' }] : [],
phoneNumbers,
addresses,
lastActivityAt: row.lastActivityAt ? new Date(row.lastActivityAt) : undefined,
createdAt: new Date(row.createdAt),
updatedAt: new Date(row.updatedAt),
},
update: {
accountId: row.accountId,
ownerId: row.ownerId,
firstName: row.firstName,
lastName: row.lastName,
emailAddresses: row.email ? [{ emailAddress: row.email, emailAddressType: 'primary' }] : [],
phoneNumbers,
addresses,
lastActivityAt: row.lastActivityAt ? new Date(row.lastActivityAt) : undefined,
createdAt: new Date(row.createdAt),
updatedAt: new Date(row.updatedAt),
},
where: {
id: row.id,
customerId: row.customerId,
providerName: row.providerName,
},
});
newMaxLastModifiedAt =
newMaxLastModifiedAt > new Date(row.lastModifiedAt) ? newMaxLastModifiedAt : new Date(row.lastModifiedAt);
// persist the newMaxLastModifiedAt high watermark
}
More information
You can try out a working example of this tutorial by cloning the common-model-example repository and following the instructions in the README.
You'll want to customize the code from this tutorial to fit your specific application data model, use case, performance, and reliability requirements.