Using the Stitch Connect API, select a source’s streams and fields for replication using this step-by-step tutorial.

Note: While this guide will walk you through creating, configuring, and advancing a new source to its field_selection connection step, the steps are still applicable to existing fully_configured sources if their Report Cards have a field_selection step. You may only select streams and fields when a source’s current_step is field_selection or fully_configured.


Prerequisites


Step 1: Create and configure the source

Create and configure a source. Refer to steps 1-3 of the Create and configure a source using the Connect API guide for instructions.


Step 2: Wait for a successful connection check and discovery

After the Source API reports that the source’s current_step is equal to the discover_schema connection step, Stitch will automatically kick off a connection check. This is a test performed by Stitch that checks the configuration of a source’s connection parameters and discovers the streams and fields available for the source.

Step Action Endpoint
1 Get the source's last connection check GET /v4/sources/{source_id}/last-connection-check
2 Verify the current connection step GET /v4/sources/{source_id}

Step 2.1: Get the source's last connection check

To view the results of the source’s last connection check, make a request to GET /v4/sources/{source_id}/last-connection-check, replacing {source_id} with the source’s ID:

GET /v4/sources/{source_id}/last-connection-check
curl "https://api.stitchdata.com/v4/sources/122635/last-connection-check" \
     -H 'Authorization: Bearer [ACCESS_TOKEN]' \
     -H 'Content-Type: application/json'

A successful connection check and discovery will have a status of succeeded and a discovery_exit_status of 0:

Response for GET /v4/sources/{source_id}/last-connection-check
{
  "target_exit_status": null,
  "tap_error_message": null,
  "check_exit_status": 0,
  "name": "116078.122635.check.c5e705e2-1b62-11e9-b0e4-0e61abdd375a",
  "start_time": "2019-01-18T20:51:05Z",
  "mode": "check",
  "tap_exit_status": null,
  "target_error_message": null,
  "discovery_exit_status": 0,
  "status": "succeeded",
  "completion_time": "2019-01-18T20:51:07Z",
  "error": false,
  "discovery_error_message": null
}

When the connection check completes, the source’s current_step will advance to field_selection.

Step 2.2: Verify the current connection step

Next, you’ll verify that the source has advanced to the field_selection step. This step indicates that available streams and fields can be selected for replication.

To get the source’s current_step, make a request to GET /v4/sources/{source_id}, replacing {source_id} with the source’s ID:

GET /v4/sources/{source_id}
curl "https://api.stitchdata.com/v4/sources/122635" \
     -H 'Authorization: Bearer [ACCESS_TOKEN]' \
     -H 'Content-Type: application/json'

The response will be the source’s report_card object. In this example, the current_step is 4, which corresponds to the field_selection step:

Response for GET /v4/sources/{source_id}
{
  "properties": {
    "anchor_time": "2019-01-22T21:00:00.000Z",
    "cron_expression": null,
    "frequency_in_minutes": "60",
    "image_version": "1.latest",
    "product": "pipeline",
    "shop": "<SHOP>",
    "start_date": "2018-01-10T19:38:04Z",
    "token": null
  },
  "updated_at": "2019-01-23T13:08:52Z",
  "name": "shopify",
  "type": "platform.shopify",
  "deleted_at": null,
  "system_paused_at": "2019-01-23T00:00:00Z",
  "stitch_client_id": 116078,
  "paused_at": null,
  "id": 122635,
  "display_name": "Shopify",
  "created_at": "2019-01-10T19:38:18Z",
  "report_card": {
    "type": "platform.shopify",
    "current_step": 4,
    "steps": [
      {
        "type": "form",
        "properties": [
          {
            "name": "image_version",
            "is_required": true,
            "provided": true,
            "is_credential": false,
            "system_provided": true,
            "json_schema": null,
            "tap_mutable": false
          },
          {
            "name": "frequency_in_minutes",
            "is_required": true,
            "provided": true,
            "is_credential": false,
            "system_provided": false,
            "json_schema": {
              "type": "string",
              "pattern": "^1$|^30$|^60$|^360$|^720$|^1440$"
            },
            "tap_mutable": false
          },
          {
            "name": "anchor_time",
            "is_required": false,
            "provided": true,
            "is_credential": false,
            "system_provided": false,
            "json_schema": {
              "type": "string",
              "format": "date-time"
            },
            "tap_mutable": false
          },
          {
            "name": "date_window_size",
            "is_required": false,
            "provided": false,
            "is_credential": false,
            "system_provided": false,
            "json_schema": {
              "type": "integer"
            },
            "tap_mutable": false
          },
          {
            "name": "shop",
            "is_required": true,
            "provided": true,
            "is_credential": false,
            "system_provided": false,
            "json_schema": {
              "type": "string"
            },
            "tap_mutable": false
          },
          {
            "name": "start_date",
            "is_required": true,
            "provided": true,
            "is_credential": false,
            "system_provided": false,
            "json_schema": {
              "type": "string",
              "pattern": "^\\d{4}-\\d{2}-\\d{2}T00:00:00Z$"
            },
            "tap_mutable": false
          }
        ]
      },
      {
        "type": "oauth",
        "properties": [
          {
            "name": "api_key",
            "is_required": true,
            "provided": true,
            "is_credential": true,
            "system_provided": true,
            "json_schema": {
              "type": "string"
            },
            "tap_mutable": false
          }
        ]
      },
      {
        "type": "discover_schema",
        "properties": []
      },
      {
        "type": "field_selection",
        "properties": []
      },
      {
        "type": "fully_configured",
        "properties": []
      }
    ]
  }
}

Step 3: Get the source's available streams

When the Source API reports that the source’s current_step is equal to field_selection, you can retrieve a list of the streams available for the source.

In general, a stream is:

  • A unique table or database view in a data source, or
  • An API endpoint in a data source

To return the streams available for selection, make a request to GET /v4/sources/{source_id}/streams, replacing {source_id} with the source’s ID:

GET /v4/sources/{source_id}/streams
curl "https://api.stitchdata.com/v4/sources/122635/streams" \
     -H 'Authorization: Bearer [ACCESS_TOKEN]' \
     -H 'Content-Type: application/json'

The response will be an array of Stream objects, each object corresponding to a stream available for selection:

Response for GET /v4/sources/{source_id}/streams
[
  {
    "selected": null,
    "stream_id": 2288757,
    "tap_stream_id": "abandoned_checkouts",
    "stream_name": "abandoned_checkouts",
    "metadata": {
      "forced-replication-method": "INCREMENTAL",
      "selected": null,
      "table-key-properties": [
        "id"
      ],
      "valid-replication-keys": [
        "updated_at"
      ]
    }
  },
  {
    "selected": null,
    "stream_id": 2288759,
    "tap_stream_id": "collects",
    "stream_name": "collects",
    "metadata": {
      "forced-replication-method": "INCREMENTAL",
      "selected": null,
      "table-key-properties": [
        "id"
      ],
      "valid-replication-keys": [
        "updated_at"
      ]
    }
  },
  {
    "selected": null,
    "stream_id": 2288758,
    "tap_stream_id": "custom_collections",
    "stream_name": "custom_collections",
    "metadata": {
      "forced-replication-method": "INCREMENTAL",
      "selected": null,
      "table-key-properties": [
        "id"
      ],
      "valid-replication-keys": [
        "updated_at"
      ]
    }
  },
  {
    "selected": null,
    "stream_id": 2288756,
    "tap_stream_id": "customers",
    "stream_name": "customers",
    "metadata": {
      "forced-replication-method": "INCREMENTAL",
      "selected": null,
      "table-key-properties": [
        "id"
      ],
      "valid-replication-keys": [
        "updated_at"
      ]
    }
  },
  {
    "selected": null,
    "stream_id": 2288754,
    "tap_stream_id": "metafields",
    "stream_name": "metafields",
    "metadata": {
      "forced-replication-method": "INCREMENTAL",
      "selected": null,
      "table-key-properties": [
        "id"
      ],
      "valid-replication-keys": [
        "updated_at"
      ]
    }
  },
  {
    "selected": null,
    "stream_id": 2288751,
    "tap_stream_id": "order_refunds",
    "stream_name": "order_refunds",
    "metadata": {
      "forced-replication-method": "INCREMENTAL",
      "selected": null,
      "table-key-properties": [
        "id"
      ],
      "valid-replication-keys": [
        "created_at"
      ]
    }
  },
  {
    "selected": null,
    "stream_id": 2288753,
    "tap_stream_id": "orders",
    "stream_name": "orders",
    "metadata": {
      "forced-replication-method": "INCREMENTAL",
      "selected": null,
      "table-key-properties": [
        "id"
      ],
      "valid-replication-keys": [
        "updated_at"
      ]
    }
  },
  {
    "selected": null,
    "stream_id": 2288755,
    "tap_stream_id": "products",
    "stream_name": "products",
    "metadata": {
      "forced-replication-method": "INCREMENTAL",
      "selected": null,
      "table-key-properties": [
        "id"
      ],
      "valid-replication-keys": [
        "updated_at"
      ]
    }
  },
  {
    "selected": null,
    "stream_id": 2288752,
    "tap_stream_id": "transactions",
    "stream_name": "transactions",
    "metadata": {
      "forced-replication-method": "INCREMENTAL",
      "selected": null,
      "table-key-properties": [
        "id"
      ],
      "valid-replication-keys": [
        "created_at"
      ]
    }
  }
]

Step 4: Understand and retrieve the stream's schema

Step Action Endpoint
1 Understand field metadata
2 Get the stream's schema GET /v4/sources/{source_id}/streams/{stream_id}

Step 4.1: Understand field metadata

Before you retrieve the stream’s schema, we’ll touch on the properties the Stream Schema object contains. You’ll eventually use this data to select streams and fields, and if applicable, configure the stream’s Replication Method.

The Stream Schema object contains three root properties:

  • schema - The JSON schema describing the stream’s fields.
  • metadata - An array of Metadata objects, each object referring to a field in the stream.
  • non-discoverable-metadata-keys - A list of metadata keys that can be modified.

Each metadata object in the response corresponds to a field in the stream, or a breadcrumb. The breadcrumb is a path into the schema that describes the part of the schema associated with the metadata.

Consider this schema:

{
  "schema":{"properties":{"id":{"type":["null","integer"]},"name":{"type":["null","string"]},"updated":{"format":"date-time","type":["null","string"]}}
}

For this example, there would be four different breadcrumb values:

  1. [] - Refers to the entire schema, or stream
  2. ["properties":"id"] - Refers to properties.id, or a field named id
  3. ["properties":"name"] - Refers to properties.name, or a field named name
  4. ["properties":"updated"] - Refers to properties.name, or a field named updated

Below is what the Stream Schema object for this stream might look like:

{
    "schema": "{\"properties\":{\"id\":{\"type\":[\"null\",\"integer\"]},\"name\":{\"type\":[\"null\",\"string\"]},\"updated\":{\"format\":\"date-time\",\"type\":[\"null\",\"string\"]}},\"type\":[\"null\",\"object\"]}",
    "metadata": [
      {
        "breadcrumb": [],
        "metadata": {
          "forced-replication-method": "INCREMENTAL",
          "valid-replication-keys": [
            "updated"
          ],
          "table-key-properties": [
            "id"
          ]
        }
      },
      {
        "breadcrumb": [
          "properties",
          "id"
        ],
        "metadata": {
          "inclusion": "automatic"
        }
      },
      {
        "breadcrumb": [
          "properties",
          "name"
        ],
        "metadata": {
          "inclusion": "available"
        }
      },
      {
        "breadcrumb": [
          "properties",
          "updated"
        ],
        "metadata": {
          "inclusion": "automatic"
        }
      }
    ],
    "non-discoverable-metadata-keys": [
      "selected",
      "replication-method",
      "replication-key",
      "view-key-properties"
    ]
  }

Step 4.2: Get the stream's schema

Next, you’ll retrieve the schema for each stream you want to select for replication. The stream schema is a list of fields the stream contains.

To retrieve a stream’s schema, make a request to GET /v4/sources/{source_id}/streams/{stream_id}, replacing {source_id} and {stream_id} with the source ID and stream ID, respectively.

In this example, we’ll get the schema for the custom_collections table (stream_id: 2288758):

GET /v4/sources/{source_id}/streams/{stream_id}
curl "https://api.stitchdata.com/v4/sources/122635/streams/2288758" \
     -H 'Authorization: Bearer [ACCESS_TOKEN]' \
     -H 'Content-Type: application/json'

The response will be a single Stream Schema object:

Response for GET /v4/sources/{source_id}/streams/{stream_id}
{
  "schema": "{\"type\":\"object\",\"properties\":{\"handle\":{\"type\":[\"null\",\"string\"]},\"sort_order\":{\"type\":[\"null\",\"string\"]},\"published_at\":{\"type\":[\"null\",\"string\"]},\"published_scope\":{\"type\":[\"null\",\"string\"]},\"image\":{\"type\":[\"null\",\"object\"],\"properties\":{\"width\":{\"type\":[\"null\",\"integer\"]},\"created_at\":{\"type\":[\"null\",\"string\"]},\"alt\":{\"type\":[\"null\",\"string\"]},\"src\":{\"type\":[\"null\",\"string\"]},\"height\":{\"type\":[\"null\",\"integer\"]}}},\"id\":{\"type\":[\"null\",\"integer\"]},\"template_suffix\":{\"type\":[\"null\",\"string\"]},\"updated_at\":{\"type\":[\"null\",\"string\"]},\"admin_graphql_api_id\":{\"type\":[\"null\",\"string\"]},\"title\":{\"type\":[\"null\",\"string\"]},\"body_html\":{\"type\":[\"null\",\"string\"]}}}",
  "metadata": [
    {
      "breadcrumb": [
        "properties",
        "handle"
      ],
      "metadata": {
        "inclusion": "available"
      }
    },
    {
      "breadcrumb": [],
      "metadata": {
        "table-key-properties": [
          "id"
        ],
        "forced-replication-method": "INCREMENTAL",
        "valid-replication-keys": [
          "updated_at"
        ]
      }
    },
    {
      "breadcrumb": [
        "properties",
        "template_suffix"
      ],
      "metadata": {
        "inclusion": "available"
      }
    },
    {
      "breadcrumb": [
        "properties",
        "body_html"
      ],
      "metadata": {
        "inclusion": "available"
      }
    },
    {
      "breadcrumb": [
        "properties",
        "published_at"
      ],
      "metadata": {
        "inclusion": "available"
      }
    },
    {
      "breadcrumb": [
        "properties",
        "sort_order"
      ],
      "metadata": {
        "inclusion": "available"
      }
    },
    {
      "breadcrumb": [
        "properties",
        "title"
      ],
      "metadata": {
        "inclusion": "available"
      }
    },
    {
      "breadcrumb": [
        "properties",
        "updated_at"
      ],
      "metadata": {
        "inclusion": "automatic"
      }
    },
    {
      "breadcrumb": [
        "properties",
        "published_scope"
      ],
      "metadata": {
        "inclusion": "available"
      }
    },
    {
      "breadcrumb": [
        "properties",
        "id"
      ],
      "metadata": {
        "inclusion": "automatic"
      }
    },
    {
      "breadcrumb": [
        "properties",
        "admin_graphql_api_id"
      ],
      "metadata": {
        "inclusion": "available"
      }
    },
    {
      "breadcrumb": [
        "properties",
        "image"
      ],
      "metadata": {
        "inclusion": "available"
      }
    }
  ],
  "non-discoverable-metadata-keys": [
    "selected",
    "replication-method",
    "replication-key",
    "view-key-properties"
  ]
}

Step 5: Select and configure a stream

Step Action Endpoint
1 Create the request body
2 Configure stream replication
3 Submit the request PUT /v4/sources/{source_id}/streams/metadata

Step 5.1: Create the request body

To select a stream, you’ll make a request to PUT /v4/sources/{source_id}/streams/metadata with a request body that contains:

  1. The stream’s tap_stream_id. Note This is different than the stream_id, which is always numeric.

    For example: In the examples in this guide, the stream_id for the custom_collections table is 2288758 while its tap_stream_id is custom_collections.

  2. A Metadata object with a breadcrumb property that refers to the entire schema, and
  3. A Stream-level Metadata object with a selected property with a true value. This is ultimately what will select the stream.

This is an example of what the request body will look like:

Example request body to update a stream's metadata
'{
  "streams": [
    {
      "tap_stream_id": "<TAP_STREAM_ID>",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "selected": true
          }
        }
      ]
    }
  ]
}'

Note: Multiple streams in a source can be updated in a single request, but for clarity, this guide will focus on selecting a single stream. Refer to the Update a Stream endpoint documentation for examples.

Step 5.2: Configure stream replication

Stitch uses one of three Replication Methods to replicate data from selected streams:

  • Full Table Replication - Full Table Replication is a replication method in which all rows in a table - including new, updated, and existing - are replicated during every replication job.
  • Key-based Incremental Replication - Key-based Incremental Replication is a replication method in which Stitch identifies new and updated data using a column called a Replication Key.
  • Log-based Incremental Replication - Log-based Incremental Replication is a replication method in which Stitch identifies modifications to records - including inserts, updates, and deletes - using a database’s binary log files. Note: This Replication Method is only available to select database integrations and requires additional configuration steps when setting up the source. Refer to the documentation for the database for more info.

Streams with configurable Replication Methods

For some sources - mainly databases and Salesforce - you can configure how a stream is replicated by Stitch by providing the method via the replication-method metadata property. Accepted values are FULL_TABLE, INCREMENTAL, and LOG_BASED.

In this request body example, the demni2mf59dt10-public-customers stream is set to use INCREMENTAL replication with updated_at as the replication-key:

Example request body for a stream with a configurable Replication Method
'{
  "streams": [
    {
      "tap_stream_id": "demni2mf59dt10-public-customers",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "selected": true,
            "replication-method": "INCREMENTAL",
            "replication-key": "updated_at"
          }
        }
      ]
    }
  ]
}'

Note: When replication-method is set to INCREMENTAL, the value of the replication-key property must be one of the following:

  1. One of the fields in the valid-replication-keys property, if provided. Note: If using this method, you must set the replication-key value using one of these fields. Selecting a field in the valid-replication-keys property for replication will not automatically set it as the stream’s Replication Key.
  2. The name of an integer, date-time, or timestamp field in the stream. Refer to the Replication Keys documentation for more info.

Streams with forced Replication Methods

In cases where a stream can only be replicated using one method, the stream’s metadata may indicate the method it will use via the forced-replication-method property:

Example metadata for a stream with a forced Replication Method
{
  "selected": null,
  "stream_id": 2288758,
  "tap_stream_id": "custom_collections",
  "stream_name": "custom_collections",
  "metadata": {
    "forced-replication-method": "INCREMENTAL",
    "selected": null,
    "table-key-properties": [
      "id"
    ],
    "valid-replication-keys": [
      "updated_at"
    ]
  }
}

When the stream’s metadata contains the forced-replication-method property, its Replication Method cannot be changed. If selected, the stream will use the forced-replication-method and the field in valid-replication-keys as a Replication Key, if applicable.

Your request to select the stream will not need to include a replication-method property:

Example request body for a stream with a forced Replication Method
'{
  "streams": [
    {
      "tap_stream_id": "custom_collections",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "selected": true
          }
        }
      ]
    }
  ]
}'

Step 5.3: Submit the request

To select a stream, make a request to PUT /v4/sources/{source_id}/streams/metadata, replacing {source_id} with the source ID. The request body must contain with the appropriate request body metadata properties:

PUT /v4/sources/{source_id}/streams/metadata
curl -X "PUT" "https://api.stitchdata.com/v4/sources/122635/streams/metadata" \
     -H 'Authorization: Bearer [ACCESS_TOKEN]' \
     -H 'Content-Type: application/json' \
     -d \
'{
  "streams": [
    {
      "tap_stream_id": "custom_collections",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "selected": true
          }
        }
      ]
    }
  ]
}'

Step 6: Select fields in a stream

After stream selection, field selection can be used to select which fields are replicated from the source stream. The request to select a field is analogous to the request to select a stream, except that the breadcrumb should point to the field’s path in the schema.

For example: This request selects the id field in the custom_collections stream:

Selecting a single field via PUT /v4/sources/{source_id}/streams/metadata
curl -X "PUT" "https://api.stitchdata.com/v4/sources/122635/streams/metadata" \
     -H 'Authorization: Bearer [ACCESS_TOKEN]' \
     -H 'Content-Type: application/json' \
     -d \
'{
  "streams": [
    {
      "tap_stream_id": "custom_collections",
      "metadata": [
        {
          "breadcrumb": [
            "properties",
            "id"
          ],
          "metadata": {
            "selected": true
          }
        }
      ]
    }
  ]
}'

Multiple fields in a stream can be submitted as part of the same request. For each field included in the request body, include a metadata object referencing the field.

For example: This request selects the id, published_at, title, and handle fields in the custom_collections stream:

Selecting multiple fields via PUT /v4/sources/{source_id}/streams/metadata
curl -X "PUT" "https://api.stitchdata.com/v4/sources/122635/streams/metadata" \
     -H 'Authorization: Bearer [ACCESS_TOKEN]' \
     -H 'Content-Type: application/json' \
     -d \
'{
  "streams": [
    {
      "tap_stream_id": "custom_collections",
      "metadata": [
        {
          "breadcrumb": [
            "properties",
            "id"
          ],
          "metadata": {
            "selected": true
          }
        },
        {
          "breadcrumb": [
            "properties",
            "published_at"
          ],
          "metadata": {
            "selected": true
          }
        },
        {
          "breadcrumb": [
            "properties",
            "title"
          ],
          "metadata": {
            "selected": true
          }
        },
        {
          "breadcrumb": [
            "properties",
            "handle"
          ],
          "metadata": {
            "selected": true
          }
        }
      ]
    }
  ]
}'

Note: Fields with metadata properties of inclusion: automatic or selected-by-default: true don’t need to be explicitly selected through a request. These fields will be automatically selected for replication regardless of their selected value. Refer to the Field selection and compatibility rules guide for more info.

Next steps

Stream and field selection may occur any time when a source’s current_step is field_selection or fully_configured, as long as the source’s report card has a field_selection step. To select additional streams and fields, follow steps 3 - 6 of this guide.