8. Examples

PostgreSQL to Pinecone

from vector_etl import create_flow

source = {
    "source_data_type": "database",
    "db_type": "postgres",
    "host": "localhost",
    "database_name": "mydb",
    "username": "user",
    "password": "password",
    "port": 5432,
    "query": "SELECT * FROM mytable WHERE updated_at > :last_updated_at",
    "batch_size": 1000,
    "chunk_size": 1000,
    "chunk_overlap": 0
}

embedding = {
    "embedding_model": "OpenAI",
    "api_key": "your-openai-api-key",
    "model_name": "text-embedding-ada-002"
}

target = {
    "target_database": "Pinecone",
    "pinecone_api_key": "your-pinecone-api-key",
    "index_name": "my-index",
    "dimension": 1536,
    "metric": "cosine",
    "cloud": "aws",
    "region": "us-east-1"
}

embed_columns = [
    "column1",
    "column2",
    "column3"
]

flow = create_flow()
flow.set_source(source)
flow.set_embedding(embedding)
flow.set_target(target)
flow.set_embed_columns(embed_columns)

# Execute the flow
flow.execute()

Dropbox to Weaviate (using Google Gemini embedding)

from vector_etl import create_flow

source = {
    "source_data_type": "Dropbox",
    "key": "",
    "folder_path": "/root/ContextData/",
    "file_type": "csv",
    "chunk_size": 1000,
    "chunk_overlap": 0
}

embedding = {
    "embedding_model": "Google Gemini",
    "api_key": "my-gemini-api-key",
    "model_name": "embedding-001"
}

target = {
    "target_database": "Weaviate",
    "weaviate_url": "my-clustername.region.gcp.weaviate.cloud",
    "weaviate_api_key": "my-weaviate-api-key",
    "class_name": "my-weaviate-class-name"
}

embed_columns = [] #Empty Array: File based sources do not require embedding columns

flow = create_flow()
flow.set_source(source)
flow.set_embedding(embedding)
flow.set_target(target)
flow.set_embed_columns(embed_columns)

# Execute the flow
flow.execute()

Importing python configuration into a current application workflow

You can also set up the configurations in one python file as below:

# my_user_script.py

from vector_etl import create_flow

def run_etl_job():
    # Define configurations
    source = {
        "source_data_type": "Local File",
        "file_path": "/path/to/your/data/",
        "file_type": "csv",
        "chunk_size": 1000,
        "chunk_overlap": 0
    }

    embedding = {
        "embedding_model": "OpenAI",
        "api_key": "your-openai-api-key",
        "model_name": "text-embedding-ada-002"
    }

    target = {
        "target_database": "Pinecone",
        "pinecone_api_key": "your-pinecone-api-key",
        "index_name": "my-index",
    }

    # Create and configure the flow
    flow = create_flow()
    flow.set_source(source)
    flow.set_embedding(embedding)
    flow.set_target(target)

    # Execute the flow
    flow.execute()

if __name__ == "__main__":
    run_etl_job()

and then import it into another python file as below:

# my_current_app.py
import logging
from my_user_script import run_etl_job

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def run_etl():
    logger.info("Starting ETL job...")
    try:
        run_etl_job()
        logger.info("ETL job completed successfully.")
    except Exception as e:
        logger.error(f"An error occurred during the ETL job: {str(e)}")
    logger.info("ETL job process ended.")

def rag_query(query):
    # Logic for rag query here....file

run_etl = run_etl()
my_query = rag_query(my_query)

Yaml Configuration file examples

PostgreSQL to Pinecone

source:
  source_data_type: "database"
  db_type: "postgres"
  host: "localhost"
  database_name: "mydb"
  username: "user"
  password: "password"
  port: 5432
  query: "SELECT * FROM mytable WHERE updated_at > :last_updated_at"
  batch_size: 1000 #[Optional] Default is 1000
  chunk_size: 1000 #[Optional] Default is 1000
  chunk_overlap: 0 #[Optional] Default is 0

embedding:
  embedding_model: "OpenAI"
  api_key: "your-openai-api-key"
  model_name: "text-embedding-ada-002"

target:
  target_database: "Pinecone"
  pinecone_api_key: "your-pinecone-api-key"
  index_name: "my-index"
  dimension: 1536 #[Optional] Only required if creating a new index
  metric: "cosine" #[Optional] Only required if creating a new index
  cloud: "aws" #[Optional] Only required if creating a new index
  region: "us-east-1" #[Optional] Only required if creating a new index

embed_columns:
  - "column1"
  - "column2"
  - "column3"

Box to Pinecone

source:
  source_data_type: "Box"
  folder_path: "MyFolder"
  file_type: "pdf" #required if folder_path is a directory: Will retrieve all files with filetype
  access_token: "your-developer-token" #developer token from Box App console
  chunk_size: 1000
  chunk_overlap: 200

embedding:
  embedding_model: "OpenAI"
  api_key: "your-openai-api-key"
  model_name: "text-embedding-ada-002"

target:
  target_database: "Pinecone"
  pinecone_api_key: "your-pinecone-api-key"
  index_name: "my-index"
  dimension: 1536 #[Optional] Only required if creating a new index
  metric: "cosine" #[Optional] Only required if creating a new index
  cloud: "aws" #[Optional] Only required if creating a new index
  region: "us-east-1" #[Optional] Pinecone will default to us-east-1

embed_columns: [] #Empty Array: File based sources do not require embedding columns

Dropbox to Weaviate (using Google Gemini embedding)

source:
  source_data_type: "Dropbox"
  key: ''
  folder_path: "/root/ContextData/"
  file_type: "csv"
  chunk_size: 1000
  chunk_overlap: 0

embedding:
  embedding_model: "Google Gemini"
  api_key: "my-gemini-api-key"
  model_name: "embedding-001"

target:
  target_database: "Weaviate"
  weaviate_url: "my-clustername.region.gcp.weaviate.cloud"
  weaviate_api_key: "my-weaviate-api-key"
  class_name: "my-weaviate-class-name"

embed_columns: [] #Empty Array: File based sources do not require embedding columns

Local file to Pinecone

source:
  source_data_type: "Local File"
  file_path: "/root/ContextData/"
  file_type: "csv"
  chunk_size: 1000
  chunk_overlap: 0

embedding:
  embedding_model: "OpenAI"
  api_key: "your-openai-api-key"
  model_name: "text-embedding-ada-002"

target:
  target_database: "Pinecone"
  pinecone_api_key: "your-pinecone-api-key"
  index_name: "my-index"
  dimension: 1536 #[Optional] Only required if creating a new index
  metric: "cosine" #[Optional] Only required if creating a new index
  cloud: "aws" #[Optional] Only required if creating a new index
  region: "us-east-1" #[Optional] Only required if creating a new index

embed_columns: [] #Empty Array: File based sources do not require embedding columns

Google Cloud Storage (GCS) to Qdrant (suing Cohere)

source:
  source_data_type: "Google Cloud Storage"
  credentials_path: "/path/to/your/credentials.json"
  bucket_name: "myBucket"
  prefix: "prefix/"
  file_type: "csv" #required if prefix is a directory: Will retrieve all files with filetype
  chunk_size: 1000 #[Optional] Default is 1000
  chunk_overlap: 0 #[Optional] Default is 0

embedding:
  embedding_model: "Cohere"
  api_key: "my-cohere-key"
  model_name: "embed-english-v3.0"

target:
  target_database: "Qdrant"
  qdrant_url: "https://your-qdrant-cluster-url.qdrant.io"
  qdrant_api_key: "your-qdrant-api-key"
  collection_name: "my-collection"

embed_columns: [] #Empty Array: File based sources do not require embedding columns

Amazon S3 to Pinecone

source:
  source_data_type: "Amazon S3"
  bucket_name: "myBucket"
  prefix: "Dir/Subdir/"
  file_type: "csv" #required if prefix is a directory: Will retrieve all files with filetype
  aws_access_key_id: "your-access-key"
  aws_secret_access_key: "your-secret-access-key"

embedding:
  embedding_model: "OpenAI"
  api_key: "your-openai-api-key"
  model_name: "text-embedding-ada-002"

target:
  target_database: "Pinecone"
  pinecone_api_key: "your-pinecone-api-key"
  index_name: "my-index"
  dimension: 1536 #[Optional] Only required if creating a new index
  metric: "cosine" #[Optional] Only required if creating a new index
  cloud: "aws" #[Optional] Only required if creating a new index
  region: "us-east-1" #[Optional] Only required if creating a new index

embed_columns: [] #Empty Array: File based sources do not require embedding columns