Skip to content

Usage Guide

This guide covers common usage patterns and best practices for BioEPIC Skills.

API Client Initialization

Basic Initialization

from bioepic_skills.api_search import APISearch

# Initialize with a collection name
api_client = APISearch(collection_name="samples")

Environment Configuration

You can specify different environments (production or development):

# Use production environment (default)
api_client = APISearch(collection_name="samples", env="prod")

# Use development environment
api_client = APISearch(collection_name="samples", env="dev")

Querying Data

Get Records

Retrieve records from a collection:

# Get first 100 records
records = api_client.get_records(max_page_size=100)

# Get all records (with pagination)
all_records = api_client.get_records(max_page_size=100, all_pages=True)

# Get specific fields only
records = api_client.get_records(
    max_page_size=50,
    fields="id,name,description"
)

Filter Records

Use MongoDB-style filters:

# Simple filter
filter_str = '{"type": "sample"}'
records = api_client.get_record_by_filter(filter_str)

# Complex filter with multiple conditions
filter_str = '{"type": "sample", "status": "active"}'
records = api_client.get_record_by_filter(
    filter=filter_str,
    max_page_size=50,
    all_pages=True
)

Search by Attribute

Search for records matching specific attributes:

# Partial match (default)
results = api_client.get_record_by_attribute(
    attribute_name="name",
    attribute_value="test",
    exact_match=False
)

# Exact match
results = api_client.get_record_by_attribute(
    attribute_name="id",
    attribute_value="sample-12345",
    exact_match=True
)

Get Single Record

Retrieve a specific record by ID:

record = api_client.get_record_by_id("sample-12345")
print(f"Record name: {record['name']}")

Working with Results

Convert to DataFrame

from bioepic_skills.data_processing import DataProcessing

dp = DataProcessing()

# Convert list of dicts to pandas DataFrame
df = dp.convert_to_df(records)

# View first few rows
print(df.head())

# Get summary statistics
print(df.describe())

Extract Specific Fields

# Extract a single field from all records
ids = dp.extract_field(records, "id")
names = dp.extract_field(records, "name")

print(f"Found {len(ids)} IDs")

Process Large Datasets

# Split large lists into manageable chunks
large_id_list = [...]  # Your list of IDs
chunks = dp.split_list(large_id_list, chunk_size=100)

# Process each chunk
for i, chunk in enumerate(chunks):
    print(f"Processing chunk {i+1}/{len(chunks)}")
    # Process chunk...

Building Filters

Simple Filters

# Build a regex-based filter
filter_dict = dp.build_filter(
    {"name": "sample", "type": "biological"},
    exact_match=False
)

results = api_client.get_record_by_filter(filter_dict)

Exact Match Filters

# Build an exact match filter
filter_dict = dp.build_filter(
    {"id": "sample-12345", "status": "active"},
    exact_match=True
)

Data Transformation

Merge DataFrames

# Simple merge on a common column
merged_df = dp.merge_dataframes("id", df1, df2)

# Advanced merge with different keys
merged_df = dp.merge_df(
    df1=samples_df,
    df2=metadata_df,
    key1="sample_id",
    key2="id"
)

Rename Columns

# Rename all columns
new_names = ["ID", "Name", "Description"]
df_renamed = dp.rename_columns(df, new_names)

Error Handling

Always include proper error handling:

try:
    records = api_client.get_records(max_page_size=100)
    df = dp.convert_to_df(records)
except RuntimeError as e:
    print(f"API request failed: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Logging and Debugging

Enable Debug Logging

import logging

# Set logging level
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Now your API calls will show detailed debug information

Custom Logger

import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Use in your code
logger.info("Starting data retrieval...")
records = api_client.get_records(max_page_size=100)
logger.info(f"Retrieved {len(records)} records")

Best Practices

1. Use Environment Variables

Always store credentials in environment variables:

import os
from dotenv import load_dotenv

load_dotenv()

client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")

2. Handle Pagination Efficiently

For large datasets:

# Process data in chunks instead of loading all at once
def process_in_batches():
    page_size = 100
    records = api_client.get_records(max_page_size=page_size)

    while records:
        # Process current batch
        df = dp.convert_to_df(records)
        # ... do something with df

        # Get next batch
        # (Implementation depends on your API's pagination)
        break  # Remove this and implement proper pagination

3. Validate Data

Always validate your data:

# Check for required fields
required_fields = ["id", "name", "type"]
if all(field in record for field in required_fields):
    # Process record
    pass
else:
    print("Missing required fields")

4. Cache Results

For expensive queries:

import pickle

# Save results
with open("cached_results.pkl", "wb") as f:
    pickle.dump(records, f)

# Load cached results
with open("cached_results.pkl", "rb") as f:
    records = pickle.load(f)

Example Workflows

Complete Data Retrieval and Analysis

from bioepic_skills.api_search import APISearch
from bioepic_skills.data_processing import DataProcessing
import pandas as pd

# Initialize
api_client = APISearch(collection_name="samples")
dp = DataProcessing()

# Retrieve data
records = api_client.get_record_by_attribute(
    attribute_name="category",
    attribute_value="research",
    all_pages=True
)

# Convert and process
df = dp.convert_to_df(records)

# Filter and transform
df_filtered = df[df["status"] == "active"]
df_filtered["date"] = pd.to_datetime(df_filtered["date"])

# Export results
df_filtered.to_csv("research_samples.csv", index=False)
print(f"Exported {len(df_filtered)} records")

Next Steps