5. Data Sources
VectorETL supports a wide range of data sources, allowing you to extract data from various systems and formats.
Overview of Supported Sources
Amazon S3
Box
Local files
Databases (PostgreSQL, MySQL, Snowflake, Salesforce)
Dropbox
Stripe
Zendesk
Google Drive
Google Cloud Storage
Amazon S3
Python variable (as JSON)
{
"source_data_type": "Amazon S3",
"bucket_name": "myBucket",
"prefix": "Dir/Subdir/",
"file_type": "csv",
"aws_access_key_id": "your-access-key",
"aws_secret_access_key": "your-secret-access-key",
"chunk_size": 1000,
"chunk_overlap": 0
}
YAML
source:
source_data_type: "Amazon S3"
bucket_name: "myBucket"
prefix: "Dir/Subdir/"
file_type: "csv"
aws_access_key_id: "your-access-key"
aws_secret_access_key: "your-secret-access-key"
chunk_size: 1000
chunk_overlap: 0
Box
Python variable (as JSON)
{
"source_data_type": "Box",
"folder_path": "MyFolder",
"file_type": "pdf",
"access_token": "your-developer-token",
"chunk_size": 1000,
"chunk_overlap": 200
}
YAML
source:
source_data_type: "Box"
folder_path: "MyFolder"
file_type: "pdf"
access_token: "your-developer-token"
chunk_size: 1000
chunk_overlap: 200
Local Files
Python variable (as JSON)
{
"source_data_type": "Local File",
"file_path": "/path/to/your/data/",
"file_type": "csv",
"chunk_size": 1000,
"chunk_overlap": 0
}
YAML
source:
source_data_type: "Local File"
file_path: "/path/to/your/data/"
file_type: "csv"
chunk_size: 1000
chunk_overlap: 0
Database (PostgreSQL example)
Python variable (as JSON)
{
"source_data_type": "database",
"db_type": "postgres",
"host": "localhost",
"database_name": "mydb",
"username": "user",
"password": "password",
"port": 5432,
"query": "SELECT * FROM mytable WHERE updated_at > :last_updated_at",
"batch_size": 1000,
"chunk_size": 1000,
"chunk_overlap": 0
}
YAML
source:
source_data_type: "database"
db_type: "postgres"
host: "localhost"
database_name: "mydb"
username: "user"
password: "password"
port: 5432
query: "SELECT * FROM mytable WHERE updated_at > :last_updated_at"
batch_size: 1000
chunk_size: 1000
chunk_overlap: 0
Dropbox
Python variable (as JSON)
{
"source_data_type": "Dropbox",
"key": "your-dropbox-key",
"folder_path": "/path/to/folder/",
"file_type": "csv",
"chunk_size": 1000,
"chunk_overlap": 0
}
YAML
source:
source_data_type: "Dropbox"
key: "your-dropbox-key"
folder_path: "/path/to/folder/"
file_type: "csv"
chunk_size: 1000
chunk_overlap: 0
Stripe
Python variable (as JSON)
{
"source_data_type": "stripe",
"access_token": "your-stripe-access-token",
"table": "charges",
"chunk_size": 1000,
"chunk_overlap": 0
}
YAML
source:
source_data_type: "stripe"
access_token: "your-stripe-access-token"
table: "charges"
chunk_size: 1000
chunk_overlap: 0
Zendesk
Python variable (as JSON)
{
"source_data_type": "zendesk",
"user_email": "[email protected]",
"access_token": "your-zendesk-access-token",
"subdomain": "your-subdomain",
"table": "tickets",
"chunk_size": 1000,
"chunk_overlap": 0
}
YAML
source:
source_data_type: "zendesk"
user_email: "[email protected]"
access_token: "your-zendesk-access-token"
subdomain: "your-subdomain"
table: "tickets"
chunk_size: 1000
chunk_overlap: 0
Google Drive
Python variable (as JSON)
{
"source_data_type": "Google Drive",
"credentials_path": "/path/to/your/credentials.json",
"folder_id": "your-folder-id",
"file_type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"chunk_size": 1000,
"chunk_overlap": 0
}
YAML
source:
source_data_type: "Google Drive"
credentials_path: "/path/to/your/credentials.json"
folder_id: "your-folder-id"
file_type: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
chunk_size: 1000
chunk_overlap: 0
Google Cloud Storage
Python variable (as JSON)
{
"source_data_type": "Google Cloud Storage",
"credentials_path": "/path/to/your/credentials.json",
"bucket_name": "myBucket",
"prefix": "prefix/",
"file_type": "csv",
"chunk_size": 1000,
"chunk_overlap": 0
}
YAML
source:
source_data_type: "Google Cloud Storage"
credentials_path: "/path/to/your/credentials.json"
bucket_name: "myBucket"
prefix: "prefix/"
file_type: "csv"
chunk_size: 1000
chunk_overlap: 0
Using Unstructured to process files
Starting from version 0.1.6.3, you can now add Unstructured as file processing API. Users can now utilize the Unstructured’s Serverless API to efficiently extract data from a multitude of file based sources.
This is limited to [PDF, DOCX, DOC, TXT] files
In order to use Unstructured, you will need three additional parameters
use_unstructured
: (True/False) indicator telling the framework to use the Unstructured APIunstructured_api_key
: Enter your Unstructured API Keyunstructured_url
: Enter your API Url from your Unstructured dashboard
# Example using Local file
from vector_etl import create_flow
source = {
"source_data_type": "Local File",
"file_path": "/path/to/file.docx",
"file_type": "docx",
"use_unstructured": true,
"unstructured_api_key": "my-unstructured-key",
"unstructured_url": "https://my-domain.api.unstructuredapp.io"
}
# Example using Amazon S3
from vector_etl import create_flow
source = {
"source_data_type": "Amazon S3",
"bucket_name": "myBucket",
"prefix": "Dir/Subdir/",
"file_type": "pdf",
"aws_access_key_id": "your-access-key",
"aws_secret_access_key": "your-secret-access-key",
"use_unstructured": true,
"unstructured_api_key": "my-unstructured-key",
"unstructured_url": "https://my-domain.api.unstructuredapp.io"
}
# Example using Google Cloud Storage
from vector_etl import create_flow
source = {
"source_data_type": "Google Cloud Storage",
"credentials_path": "/path/to/your/credentials.json",
"bucket_name": "myBucket",
"prefix": "prefix/",
"file_type": "csv",
"use_unstructured": true,
"unstructured_api_key": "my-unstructured-key",
"unstructured_url": "https://my-domain.api.unstructuredapp.io"
}
Adding Custom Data Sources
To add a custom data source:
Create a new file in the
source_mods
directory.Implement a new class that inherits from
BaseSource
.Implement the required methods:
connect()
andfetch_data()
.Update the
get_source_class()
function insource_mods/__init__.py
to include your new source.
Example of a custom source class:
from .base import BaseSource
class MyCustomSource(BaseSource):
def __init__(self, config):
self.config = config
def connect(self):
# Implement connection logic here
pass
def fetch_data(self):
# Implement data fetching logic here
pass