Data Processing¶
The DataProcessing class provides utilities for transforming and analyzing data retrieved from APIs.
Overview¶
Data processing capabilities include:
- Converting data to pandas DataFrames
- Building MongoDB-style filters
- Merging and transforming data
- Extracting specific fields
- Chunking large datasets
Converting to DataFrames¶
Basic Conversion¶
from bioepic_skills.data_processing import DataProcessing
dp = DataProcessing()
# Convert list of dictionaries to DataFrame
records = [
{"id": "1", "name": "Sample 1", "value": 10},
{"id": "2", "name": "Sample 2", "value": 20},
]
df = dp.convert_to_df(records)
print(df)
Output:
Working with API Results¶
from bioepic_skills.api_search import APISearch
api_client = APISearch(collection_name="samples")
records = api_client.get_records(max_page_size=100)
# Convert to DataFrame for analysis
df = dp.convert_to_df(records)
# Now use pandas operations
print(df.describe())
print(df.info())
Building Filters¶
Regex Filters (Partial Match)¶
# Build a filter for partial matches
filter_dict = dp.build_filter(
{
"name": "sample",
"type": "biological"
},
exact_match=False
)
# Use with API
api_client = APISearch(collection_name="samples")
results = api_client.get_record_by_filter(filter_dict)
The filter will match records where:
- name contains "sample" (case-insensitive)
- type contains "biological" (case-insensitive)
Exact Match Filters¶
# Build a filter for exact matches
filter_dict = dp.build_filter(
{
"id": "sample-12345",
"status": "active"
},
exact_match=True
)
Special Characters in Filters¶
Special characters are automatically escaped:
# This works correctly with special characters
filter_dict = dp.build_filter(
{
"title": "GC-MS (2009)", # Parentheses will be escaped
},
exact_match=False
)
Extracting Fields¶
Extract Single Field¶
# Extract IDs from records
ids = dp.extract_field(records, "id")
print(f"Found {len(ids)} IDs: {ids[:5]}")
# Extract names
names = dp.extract_field(records, "name")
Extract Multiple Fields¶
# Extract different fields
ids = dp.extract_field(records, "id")
names = dp.extract_field(records, "name")
values = dp.extract_field(records, "value")
# Create a new DataFrame with selected fields
import pandas as pd
df_selected = pd.DataFrame({
"id": ids,
"name": names,
"value": values
})
Merging Data¶
Simple Merge¶
Merge two DataFrames on a common column:
# Two DataFrames with common 'id' column
df1 = dp.convert_to_df(samples)
df2 = dp.convert_to_df(metadata)
# Merge on 'id'
merged = dp.merge_dataframes("id", df1, df2)
Advanced Merge¶
Merge with different column names:
# df1 has 'sample_id', df2 has 'id'
merged = dp.merge_df(
df1=samples_df,
df2=metadata_df,
key1="sample_id",
key2="id"
)
This method also: - Automatically handles list-type columns by exploding them - Removes duplicate rows - Performs inner join by default
List Operations¶
Split into Chunks¶
# Split a large list into smaller chunks
large_list = list(range(250))
chunks = dp.split_list(large_list, chunk_size=100)
print(f"Split into {len(chunks)} chunks")
# Output: Split into 3 chunks
# Process each chunk
for i, chunk in enumerate(chunks):
print(f"Chunk {i+1}: {len(chunk)} items")
Use case - batch processing:
# Get a large list of IDs
all_ids = dp.extract_field(records, "id")
# Process in batches of 50
for batch in dp.split_list(all_ids, chunk_size=50):
# Process this batch
batch_data = api_client.get_batch_records(batch)
# ... process batch_data
Renaming Columns¶
# Original DataFrame
df = dp.convert_to_df(records)
print(df.columns)
# Output: Index(['id', 'name', 'type'], dtype='object')
# Rename all columns
new_names = ["ID", "Sample Name", "Sample Type"]
df_renamed = dp.rename_columns(df, new_names)
print(df_renamed.columns)
# Output: Index(['ID', 'Sample Name', 'Sample Type'], dtype='object')
Column Count Must Match
The number of new column names must exactly match the number of columns in the DataFrame.
Complete Workflow Example¶
Data Retrieval and Processing¶
from bioepic_skills.api_search import APISearch
from bioepic_skills.data_processing import DataProcessing
import pandas as pd
# Initialize
api_client = APISearch(collection_name="samples")
dp = DataProcessing()
# Step 1: Retrieve data with filter
filter_dict = dp.build_filter(
{"category": "research", "status": "active"},
exact_match=False
)
records = api_client.get_record_by_filter(
filter=filter_dict,
all_pages=True
)
print(f"Retrieved {len(records)} records")
# Step 2: Convert to DataFrame
df = dp.convert_to_df(records)
# Step 3: Extract and process specific fields
sample_ids = dp.extract_field(records, "id")
print(f"Sample IDs: {len(sample_ids)}")
# Step 4: Get related data
related_records = []
for chunk in dp.split_list(sample_ids, chunk_size=50):
# Get related data for this chunk
chunk_filter = dp.build_filter(
{"sample_id": chunk[0]}, # Simplified example
exact_match=True
)
batch = api_client.get_record_by_filter(chunk_filter)
related_records.extend(batch)
# Step 5: Merge datasets
related_df = dp.convert_to_df(related_records)
final_df = dp.merge_df(
df1=df,
df2=related_df,
key1="id",
key2="sample_id"
)
# Step 6: Process and export
final_df["date"] = pd.to_datetime(final_df["date"])
final_df = final_df.sort_values("date")
final_df.to_csv("research_samples_processed.csv", index=False)
print(f"Exported {len(final_df)} processed records")
Batch Processing Large Datasets¶
def process_large_dataset(api_client, dp, total_records):
"""Process a large dataset in manageable chunks"""
chunk_size = 100
all_results = []
for offset in range(0, total_records, chunk_size):
# Get chunk
records = api_client.get_records(
max_page_size=chunk_size,
# Implement pagination based on your API
)
# Process chunk
df_chunk = dp.convert_to_df(records)
# Filter and transform
df_chunk = df_chunk[df_chunk["status"] == "active"]
df_chunk["processed"] = True
# Store results
all_results.append(df_chunk)
print(f"Processed {offset + len(records)}/{total_records}")
# Combine all chunks
final_df = pd.concat(all_results, ignore_index=True)
return final_df
Performance Tips¶
1. Use Chunking for Large Datasets¶
# Instead of loading everything at once
big_list = range(10000)
for chunk in dp.split_list(big_list, chunk_size=100):
# Process manageable chunks
pass
2. Select Only Needed Fields¶
# Get only required fields from API
records = api_client.get_records(
max_page_size=100,
fields="id,name,date" # Only get what you need
)
3. Filter Early¶
# Filter at the API level, not in pandas
filter_dict = dp.build_filter({"status": "active"})
records = api_client.get_record_by_filter(filter_dict)
# Better than:
# all_records = api_client.get_records()
# df = dp.convert_to_df(all_records)
# df = df[df["status"] == "active"] # Slower!
API Reference¶
For detailed API documentation, see: