Reference
SDK Cheat Sheet
One-page reference for common Onyx SDK calls. Select a language to see the syntax.
Onyx Environment Variables
Set these to control how the SDK scopes requests and where it fetches credentials.
| Env Var | Description | Default |
|---|---|---|
| ONYX_DATABASE_ID | Database UUID used to scope requests; required. | none |
| ONYX_DATABASE_BASE_URL | HTTP base for DB API. | https://api.onyx.dev |
| ONYX_DATABASE_API_KEY | API key for the database; required. | none |
| ONYX_DATABASE_API_SECRET | API secret for the database; required. | none |
| ONYX_AI_BASE_URL | Base URL for AI/chat endpoints. | https://ai.onyx.dev |
| ONYX_DEFAULT_MODEL | Model used by db.chat() shorthand. | onyx |
| ONYX_CONFIG_PATH | Path to JSON credentials file (Node only; ignored on edge). Falls back to project/home files after env vars. | unset |
| ONYX_DEBUG | When "true", enables request/response logging and config debug output. | false |
| ONYX_STREAM_DEBUG | When "true"/"1", logs streaming connection details. | false |
Initialization Patterns
Initialize the SDK with typed configuration.
Auto resolution
Resolution order: env vars → ONYX_CONFIG_PATH file → ./onyx-database.json (default locations).
from onyx_database import onyx
db = onyx.init() # picks up env vars or onyx-database.json automaticallyInitialize with config object
Pass credentials directly when you can’t rely on env/file resolution.
from onyx_database import onyx
db = onyx.init({
"apiKey": "YOUR_API_KEY",
"apiSecret": "YOUR_API_SECRET",
})Optional config fields
| Attribute | Description | Default |
|---|---|---|
| baseUrl | REST base URL for database operations. | https://api.onyx.dev |
| databaseId | Database ID. Usually inferred from DB-scoped API keys; set for org-wide keys. | resolved from API key |
| aiBaseUrl | Base URL for AI endpoints. | https://ai.onyx.dev |
| fetch | Custom fetch implementation (useful in non-Node runtimes). | global fetch |
| defaultModel | Fallback AI model when using shorthand chat calls. | onyx |
| partition | Default partition for queries, findById, and deletes. | none (use entity partition) |
| requestLoggingEnabled | Log HTTP requests and bodies to console. | false |
| responseLoggingEnabled | Log HTTP responses and bodies to console. | false |
| ttl | Milliseconds to cache resolved credentials. | 300000 (5 minutes) |
| retry | Retry configuration for idempotent GET requests (honors Retry-After). | enabled; 3 retries; backoff 300/600/1200ms |
Core CRUD
Upsert, fetch, update, and delete entities by primary key.
from onyx_database import onyx
db = onyx.init()
account = {
"id": "acct_1",
"name": "Checking",
"balance": 1250,
}
db.save("Account", account)
fetched = db.from_table("Account").find("acct_1")
db.save("Account", { **account, "balance": 1400 })
db.delete("Account", "acct_1")Atomic Saving
Persist a parent plus related records in one call—no explicit transactions needed; the graph commits all-or-none.
Cascade save
Tell Onyx which related collection to save with the parent. Format: field:RelatedTable(targetField, sourceField). Assumes db is already initialized and tables is imported.
from onyx_database import onyx
db = onyx.init()
account_with_transactions = {
"id": "acct_1",
"name": "Primary",
"balance": 0,
"transactions": [
{ "id": "txn_1", "accountId": "acct_1", "amount": 250, "currency": "USD" },
{ "id": "txn_2", "accountId": "acct_1", "amount": 125, "currency": "USD" },
],
}
saved_account = db.cascade("transactions:Transaction(accountId, id)").save("Account", account_with_transactions)Cascade builder
Fluent builder emits the same cascade string; helpful when composing or reusing mappings.
from onyx_database import onyx
db = onyx.init()
tx_rel = (
db.cascade_builder()
.graph("transactions")
.graph_type("Transaction")
.target_field("accountId")
.source_field("id")
)
print(f"cascade string: {tx_rel}") # transactions:Transaction(accountId, id)Cascade string syntax
<field>:<RelatedTable>(<targetField>, <sourceField>) — field: property on the parent holding related rows; RelatedTable: table to upsert; targetField: FK on the related table; sourceField: parent field to copy.
Resolvers
Hydrate schema-defined relations inline to avoid extra round trips; resolvers are declared in your generated schema.
Fetch by id with resolver
Load a record and its related resolver in one call.
tx = db.find_by_id("Transaction", "txn_123", resolvers=["account"])Query with resolver
Include related data while filtering the base table.
from onyx_database import eq
transactions_with_accounts = (
db.from_table("Transaction")
.where(eq("status", "posted"))
.resolve("account")
.list()
)Filter by resolver field
Filter using fields exposed by the resolver.
from onyx_database import eq
primary_account_transactions = (
db.from_table("Transaction")
.where(eq("account.name", "Primary"))
.resolve("account")
.list()
)Bulk Save
Insert many records efficiently by passing an array to save.
transactions = [
{ "id": "txn_batch_1", "accountId": "acct_batch", "amount": 1.99, "currency": "USD" },
{ "id": "txn_batch_2", "accountId": "acct_batch", "amount": 12.5, "currency": "USD" },
{ "id": "txn_batch_3", "accountId": "acct_batch", "amount": 45.0, "currency": "USD" },
{ "id": "txn_batch_4", "accountId": "acct_batch", "amount": 7.25, "currency": "USD" },
]
db.save("Transaction", transactions)Query & Filter
Boolean filters, ranges, text contains, ordering, and limits.
from onyx_database import onyx, eq, gt, gte, contains, desc
db = onyx.init()
transactions = (
db.from_table("Transaction")
.where(
eq("status", "posted")
.and_(gt("amount", 100))
.and_(gte("createdAt", "2025-01-01"))
.and_(contains("merchant", "aws"))
)
.order_by(desc("createdAt"))
.limit(25)
.list()
)Defaults: if you omit limit() and pageSize, the API uses its server-side default page size (capped at 1000); responses include nextPage when more rows remain.
Query clauses
| Clause | Signature | What it does |
|---|---|---|
| where | where(condition) | Adds a boolean filter to the query. |
| and_ | where(...).and_(condition) | Chain additional AND conditions. |
| or_ | where(...).or_(condition) | Combine conditions with OR. |
| order_by | order_by(asc|desc("field")) | Sort the result set. |
| group_by | group_by(*fields) | Group results for aggregates. |
| distinct | distinct() | Return unique rows for selected fields. |
| limit | limit(n) | Caps number of records returned. |
| page_size | page_size(n) | Sets page size for streamed pagination. |
| in_partition | in_partition(partition) | Filter to a single partition. |
| page | page(next_page=None) | Fetch a page (cursor-based). |
Filters
| Helper | Signature | What it does |
|---|---|---|
| eq | eq(field, value) | Field equals value. |
| ne | ne(field, value) | Field not equal value. |
| gt | gt(field, value) | Field greater than value. |
| gte | gte(field, value) | Field greater or equal value. |
| lt | lt(field, value) | Field less than value. |
| lte | lte(field, value) | Field less or equal value. |
| between | between(field, low, high) | Field between two values (inclusive). |
| contains | contains(field, substring) | Case-insensitive substring match. |
| starts_with | starts_with(field, prefix) | Field starts with prefix. |
| ends_with | ends_with(field, suffix) | Field ends with suffix. |
| in_op | in_op(field, list) | Field is in list of values. |
| not_in | not_in(field, list) | Field not in list of values. |
| is_null | is_null(field) | Field is null. |
| not_null | not_null(field) | Field is not null. |
| within | within(field, subquery) | Field is in results of subquery. |
| search | search(lucene_query, min_score=0) | Lucene text search condition. |
Select Query
Pick specific columns to return from a table.
rows = (
db.select("id", "accountId", "amount", "createdAt")
.from_table("Transaction")
.list()
)First or Null
When you expect a single row, use firstOrNull() or its alias one().
from onyx_database import onyx, eq
db = onyx.init()
tx = (
db.from_table("Transaction")
.where(eq("id", "txn_123"))
.first_or_none()
)
tx_alt = tx # first_or_none() returns the record or NoneInner Queries
Filter a table using sub-selects returned from another query.
from onyx_database import onyx, within, gt
db = onyx.init()
accounts_with_large_tx = (
db.from_table("Account")
.where(
within(
"id",
db
.select("accountId")
.from_table("Transaction")
.where(gt("amount", 1000))
)
)
.list()
)Update Query
Set partial updates for all rows matching a condition.
from onyx_database import onyx, eq, gt
db = onyx.init()
updated = (
db.from_table("Transaction")
.where(eq("status", "pending").and_(gt("amount", 100)))
.set_updates({ "status": "posted" })
.update()
)Delete Query
Delete all rows that match a filter.
from onyx_database import onyx, eq
db = onyx.init()
deleted_count = (
db.from_table("Transaction")
.where(eq("status", "archived"))
.delete()
)Group By
Group results by one or more fields to segment metrics.
from onyx_database import onyx, eq
db = onyx.init()
by_merchant = (
db.select("merchant", "currency")
.from_table("Transaction")
.where(eq("status", "posted"))
.group_by("merchant", "currency")
.list()
)Aggregations
Server-side rollups for dashboards and billing summaries.
from onyx_database import onyx, sum, eq
db = onyx.init()
totals = (
db.select(sum("amount"), "merchant")
.from_table("Transaction")
.where(eq("status", "posted"))
.list()
)Server-side aggregate helpers
| Function | Signature | What it does |
|---|---|---|
| sum | sum("field") | Numeric sum of a column. |
| count | count("field" | "*") | Row count; accepts a field or "*". |
| avg | avg("field") | Arithmetic mean. |
| min | min("field") | Smallest value. |
| max | max("field") | Largest value. |
| median | median("field") | 50th percentile. |
| percentile | percentile("field", p) | p-th percentile; p is 0–100. |
| std | std("field") | Sample standard deviation. |
| variance | variance("field") | Sample variance. |
| upper | upper("field") | Uppercases text for grouping/aggregation. |
| lower | lower("field") | Lowercases text for grouping/aggregation. |
| format | format("field", "pattern") | Apply a Java-style date/number format before grouping (e.g., yyyy-MM). |
| substring | substring("field", from, length) | Substring of text (0-based offset). |
| replace | replace("field", pattern, repl) | Regex/substring replacement prior to grouping. |
| groupBy | groupBy(...fields) | Groups rows by one or more fields before aggregating. |
| select | select(...fields | aggregates) | Choose which columns or aggregate expressions to return. |
| distinct | distinct() | Deduplicate rows on the selected fields before aggregation. |
| resolve | resolve(...relations) | Resolve related values prior to grouping/aggregating. |
Streaming Queries
Consume large result sets incrementally and react to live query responses.
from onyx_database import onyx, eq
import time
db = onyx.init()
events = []
handle = (
db.from_table("Transaction")
.where(eq("status", "posted"))
.page_size(100)
.on_item(lambda tx, action: events.append((action, tx)))
.stream(include_query_results=True, keep_alive=False)
)
time.sleep(1)
handle["cancel"]()Stream behavior
- Wire format: newline-delimited JSON; Python client calls
stream(include_query_results=True, keep_alive=True|False)on a query builder. include_query_resultssends initial rows asQUERY_RESPONSE; set False to get only live CREATE/UPDATE/DELETE events.keep_aliveemitsKEEP_ALIVEheartbeats ~every 10s; False closes after the initial batch.- Actions delivered: QUERY_RESPONSE, CREATE, UPDATE, DELETE, KEEP_ALIVE. Cancel with
handle["cancel"]().
Full-Text Search
Lucene-style search across tables marked SEARCHABLE in your schema.
All searchable tables
Runs across every table marked SEARCHABLE in your schema (optionally set a minimum score).
hits = (
db.search("error AND status:active", 4.0)
.limit(50)
.list()
)Table-scoped search with minScore
Scopes search to a single table and applies a score threshold.
hits = (
db.from_table("Transaction")
.search('status:posted AND merchant:"AWS"', 4.4)
.list()
)Using from(tables.Transaction) scopes Lucene search to that table only (no system-wide cross-table search). The second example uses a typical query with a score threshold of 4.4.
Pagination
Page through ordered results with stable cursors (offset-free).
from onyx_database import onyx, asc, eq
db = onyx.init()
page1 = (
db.from_table("Transaction")
.order_by(asc("createdAt"))
.limit(50)
.page()
)
records = page1.get("records", [])
next_page = page1.get("nextPage") or page1.get("next_page")
if next_page:
page2 = (
db.from_table("Transaction")
.order_by(asc("createdAt"))
.page(next_page=next_page)
)
print("page2 size", len(page2.get("records", [])), "next token", page2.get("nextPage"))AI Chat
Use the AI endpoint to answer questions that reference your data.
Chat completions
Control model, messages, streaming/raw response options.
response = db.ai.chat(
{
"model": "onyx",
"messages": [
{ "role": "system", "content": "You are a finance assistant. Respond with 3 concise bullet points." },
{ "role": "assistant", "content": "I summarize trends and call out anomalies each Friday." },
{ "role": "user", "content": "Draft a status update for finance stakeholders." },
],
"stream": False,
}
)
print(response)Streaming chat
Stream tokens as they arrive; remember to consume the async iterator.
stream = db.ai.chat(
{
"model": "onyx",
"messages": [{ "role": "user", "content": "List the top 3 spend categories this week." }],
"stream": True,
}
)
for chunk in stream:
delta = chunk.get("choices", [{}])[0].get("delta", {})
if delta.get("content"):
print(delta["content"], end="")Shorthand chat (string in, string out)
Quick, typed call that returns the first message content.
answer = db.chat("Summarize yesterday's transactions")List available models
Discover model IDs before picking one.
models = db.ai.get_models()
print(models)Get model details
Inspect a specific model's capabilities and limits.
model = db.ai.get_model("onyx")Request script approval
Validate a mutation script before execution.
approval = db.ai.request_script_approval({
"script": "db.save({ id: 'acct_1', name: 'Checking' })",
})Documents
Upload binary files with metadata, fetch with optional resizing, or delete stored documents.
Save a document
Stores metadata in the DB and writes bytes to the database’s _documents path.
import base64
from onyx_database import onyx
db = onyx.init()
payload = base64.b64encode(b"hello").decode("ascii")
doc_id = db.save_document({
"documentId": "hello.txt",
"path": "/docs/hello.txt",
"mimeType": "text/plain",
"content": payload,
})Get a document (optional resize for images)
Fetches by documentId; width/height resize images on the fly.
doc = db.get_document("hello.txt")Delete a document
Removes the file and its metadata.
db.delete_document("hello.txt")Storage & search
Files are written to your database’s filesystem under _documents/; only metadata (path, mimeType, timestamps) is stored in the Document record. If you provide plaintext content, it is chunked into Lucene-indexed parts so full-text search can find the document.
Secrets
Manage encrypted secrets stored per database; client calls never persist plaintext.
List secrets (metadata only)
Returns keys, purposes, and timestamps—no values.
from onyx_database import onyx
db = onyx.init()
secrets = db.list_secrets()
print(secrets)Fetch a secret value
Decrypts and returns the plaintext for a single key.
secret = db.get_secret("stripe_api_key")Create or rotate a secret
Stores the value encrypted at rest; re-encrypts on each update.
db.put_secret(
"stripe_api_key",
{
"value": "sk_live_...",
"purpose": "Stripe server key",
},
)Delete a secret
Removes the record from the database.
db.delete_secret("stripe_api_key")How Onyx secures secrets
- Each secret is encrypted with a random AES-256-GCM key; that key is wrapped using the database’s 4096-bit RSA public key. Only ciphertext and wrapped keys are stored.
- Per-database RSA private keys are themselves encrypted with a master key.
- API routes require valid database credentials; plaintext values are only returned on authenticated
GET /secret/{key}and are never persisted decrypted. - Back up the keystore directory (
secret-keystore) together with the master key to preserve decryption ability.
Schema
Fetch, diff, validate, and publish schema revisions.
from onyx_database import onyx
db = onyx.init()
current = db.get_schema() or {}
entities = current.get("entities", [])
temp_table = {
"name": "TempTable",
"identifier": { "name": "id", "generator": "UUID", "type": "String" },
"attributes": [
{ "name": "id", "type": "String", "isNullable": False },
{ "name": "name", "type": "String", "isNullable": False },
],
}
next_schema = {
**current,
"entities": [*entities, temp_table],
"revisionDescription": "Add TempTable via SDK",
}
diff = db.diff_schema(next_schema)
validation = db.validate_schema(next_schema)
if validation.get("valid"):
db.update_schema(next_schema, publish=True)Need the JSON shape for entities, attributes, and resolvers? See Schema Example.