Documentation Index
Fetch the complete documentation index at: https://docs.sigmic.ai/llms.txt
Use this file to discover all available pages before exploring further.
Python Examples
Complete working examples for integrating the Sigmic AI API into your Python applications.Setup
Install the requests library:pip install requests
import os
import json
import requests
API_KEY = os.environ.get('SIGMIC_API_KEY', 'sigmic_your_key_here')
BASE_URL = 'https://api.sigmic.ai'
SSE Stream Parser
Reusable generator to parse Server-Sent Events from the stream endpoint:def parse_sse_stream(task_id):
"""Connect to stream endpoint and yield SSE events."""
response = requests.get(
f'{BASE_URL}/api/v1/tasks/{task_id}/stream',
headers={'Authorization': f'Bearer {API_KEY}'},
stream=True
)
event_type = None
for line in response.iter_lines():
if not line:
continue
line = line.decode('utf-8')
if line.startswith(':'):
continue # Skip heartbeats
elif line.startswith('event:'):
event_type = line.replace('event: ', '')
elif line.startswith('data:'):
try:
data = json.loads(line.replace('data: ', ''))
yield event_type, data
except json.JSONDecodeError:
pass
def process_sse_stream(task_id, handlers=None):
"""Process SSE stream with optional handlers."""
handlers = handlers or {}
final_response = ''
for event_type, data in parse_sse_stream(task_id):
# Call handler if provided
if event_type in handlers:
handlers[event_type](data)
# Capture final response
if event_type == 'done':
final_response = data.get('finalResponse', '')
return {'final_response': final_response}
Create Task
def create_task(message, system_prompt=None, auto_execute=True, files=None):
"""Create a new task and stream the response."""
headers = {'Authorization': f'Bearer {API_KEY}'}
data = {
'message': message,
'autoExecute': str(auto_execute).lower()
}
if system_prompt:
data['systemPrompt'] = system_prompt
files_payload = []
if files:
for file_path in files:
files_payload.append(('files', open(file_path, 'rb')))
# Create the task (returns JSON immediately)
response = requests.post(
f'{BASE_URL}/api/v1/tasks',
headers=headers,
data=data,
files=files_payload if files_payload else None,
)
result = response.json()
task_id = result['data']['id']
print(f'Task created: {task_id}')
# Close file handles
for _, f in files_payload:
f.close()
# Stream the execution
def print_history(data):
role = data.get('role', '?')
content = data.get('content', '')[:50]
print(f"[History] {role}: {content}...")
def print_history_done(data):
print(f"[History] {data['messageCount']} messages, streaming: {data['isStreaming']}")
def print_content(data):
print(data.get('text', ''), end='', flush=True)
def print_status(data):
print(f"[Status] {data.get('message', data.get('status', ''))}")
def print_thought(data):
print(f"[Thought] {data.get('subject', '')}")
def print_tool(data):
print(f"[Tool] {data.get('toolName', '')}: {data.get('status', '')}")
def print_error(data):
print(f"[Error] {data.get('message', '')}")
def print_done(data):
print('\n[Done]')
stream_result = process_sse_stream(task_id, {
'history_message': print_history,
'history_done': print_history_done,
'status': print_status,
'thought': print_thought,
'content': print_content,
'tool_call': print_tool,
'error': print_error,
'done': print_done
})
return {'task_id': task_id, **stream_result}
Send Follow-up Message
def send_followup(task_id, message):
"""Send a follow-up message (auto-restores expired sessions)."""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
# Send the follow-up (fire-and-forget)
requests.post(
f'{BASE_URL}/api/v1/tasks/{task_id}/messages',
headers=headers,
json={'message': message}
)
# Stream the response
def print_content(data):
print(data.get('text', ''), end='', flush=True)
def print_done(data):
print('\n[Done]')
return process_sse_stream(task_id, {
'content': print_content,
'done': print_done
})
Get Task Status
def get_task(task_id):
"""Get details of a specific task."""
response = requests.get(
f'{BASE_URL}/api/v1/tasks/{task_id}',
headers={'Authorization': f'Bearer {API_KEY}'}
)
result = response.json()
if not result.get('success'):
raise Exception(result.get('error', {}).get('message', 'Unknown error'))
return result['data']
List Tasks
def list_tasks(limit=50):
"""List all tasks for the API key."""
response = requests.get(
f'{BASE_URL}/api/v1/tasks',
headers={'Authorization': f'Bearer {API_KEY}'},
params={'limit': limit}
)
result = response.json()
if not result.get('success'):
raise Exception(result.get('error', {}).get('message', 'Unknown error'))
return result['data']
Cancel Task
def cancel_task(task_id):
"""Cancel a running task."""
response = requests.delete(
f'{BASE_URL}/api/v1/tasks/{task_id}',
headers={'Authorization': f'Bearer {API_KEY}'}
)
result = response.json()
if not result.get('success'):
raise Exception(result.get('error', {}).get('message', 'Unknown error'))
return result['data']
List & Download Artifacts
def list_artifacts(task_id):
"""List workspace files generated by the task."""
response = requests.get(
f'{BASE_URL}/api/v1/tasks/{task_id}/artifacts',
headers={'Authorization': f'Bearer {API_KEY}'}
)
result = response.json()
if not result.get('success'):
raise Exception(result.get('error', {}).get('message', 'Unknown error'))
return result['data']['files']
def download_artifact(task_id, file_path, save_as=None):
"""Download a specific file from the workspace."""
response = requests.get(
f'{BASE_URL}/api/v1/tasks/{task_id}/artifacts/download',
headers={'Authorization': f'Bearer {API_KEY}'},
params={'path': file_path}
)
if response.status_code != 200:
raise Exception(f'Download failed: {response.status_code}')
output_path = save_as or file_path.split('/')[-1]
with open(output_path, 'wb') as f:
f.write(response.content)
print(f'Downloaded {len(response.content)} bytes to {output_path}')
return output_path
Tool Approval Handler
def create_task_with_approval(message, approval_callback):
"""Create a task with manual tool approval."""
headers = {'Authorization': f'Bearer {API_KEY}'}
data = {
'message': message,
'autoExecute': 'false'
}
# Create the task
response = requests.post(
f'{BASE_URL}/api/v1/tasks',
headers=headers,
data=data,
)
task_id = response.json()['data']['id']
print(f'Task created: {task_id}')
final_response = ''
for event_type, data in parse_sse_stream(task_id):
if event_type == 'history_done':
print(f"[History] {data['messageCount']} messages loaded")
elif event_type == 'tool_call' and data.get('status') == 'awaiting_approval':
tool_name = data.get('toolName')
args = data.get('args', {})
print(f'[Tool] {tool_name} requires approval')
print(f' Args: {json.dumps(args, indent=2)}')
# Get approval decision
should_approve = approval_callback(tool_name, args)
# Send approval
requests.post(
f'{BASE_URL}/api/v1/tasks/{task_id}/tools/{data["callId"]}/approve',
headers={
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
},
json={'outcome': 'proceed_once' if should_approve else 'cancel'}
)
elif event_type == 'content':
print(data.get('text', ''), end='', flush=True)
elif event_type == 'done':
final_response = data.get('finalResponse', '')
print('\n[Done]')
return {'task_id': task_id, 'final_response': final_response}
Complete Workflow Example
def main():
print('=== Sigmic AI API Demo ===\n')
# 1. Create a task
print('1. Creating task...')
result = create_task('What are the three laws of robotics?')
task_id = result['task_id']
print(f'Task ID: {task_id}')
print(f'Response: {result["final_response"]}')
# 2. Send follow-up (auto-restores expired sessions)
print('\n2. Sending follow-up...')
followup = send_followup(task_id, 'Who created these laws?')
print(f'Follow-up response: {followup["final_response"]}')
# 3. Check task status
print('\n3. Checking task status...')
task = get_task(task_id)
print(f'Status: {task["status"]}')
print(f'Created: {task["createdAt"]}')
print(f'Completed: {task["completedAt"]}')
# 4. List artifacts
print('\n4. Listing artifacts...')
artifacts = list_artifacts(task_id)
print(f'Found {len(artifacts)} files:')
for f in artifacts:
print(f' - {f["name"]} ({f["size"]} bytes)')
# 5. List recent tasks
print('\n5. Listing recent tasks...')
tasks = list_tasks(5)
print(f'Found {len(tasks)} tasks:')
for t in tasks:
print(f' - {t["id"]}: {t["status"]}')
if __name__ == '__main__':
main()
File Upload Example
def analyze_file(file_path, question):
"""Upload a file and ask a question about it."""
headers = {'Authorization': f'Bearer {API_KEY}'}
data = {'message': question}
with open(file_path, 'rb') as f:
response = requests.post(
f'{BASE_URL}/api/v1/tasks',
headers=headers,
data=data,
files={'files': f},
)
task_id = response.json()['data']['id']
return process_sse_stream(task_id, {
'content': lambda d: print(d.get('text', ''), end='', flush=True),
'done': lambda d: print('\n')
})
# Usage
result = analyze_file('./data.csv', 'Summarize the data in this CSV file')
print(f'Analysis: {result["final_response"]}')
Error Handling
class APIError(Exception):
def __init__(self, code, message):
self.code = code
self.message = message
super().__init__(f'{code}: {message}')
def make_request(method, endpoint, **kwargs):
"""Make an API request with error handling."""
headers = kwargs.pop('headers', {})
headers['Authorization'] = f'Bearer {API_KEY}'
response = requests.request(
method,
f'{BASE_URL}{endpoint}',
headers=headers,
**kwargs
)
result = response.json()
if not result.get('success'):
error = result.get('error', {})
raise APIError(
error.get('code', 'UNKNOWN'),
error.get('message', 'Unknown error')
)
return result
# Usage with error handling
try:
task = get_task('invalid-id')
except APIError as e:
if e.code == 'NOT_FOUND':
print('Task not found')
elif e.code == 'INVALID_API_KEY':
print('Check your API key')
else:
raise
Async Example (with aiohttp)
import aiohttp
import asyncio
async def create_task_async(message):
"""Create a task and stream using async/await."""
async with aiohttp.ClientSession() as session:
# 1. Create the task
data = aiohttp.FormData()
data.add_field('message', message)
async with session.post(
f'{BASE_URL}/api/v1/tasks',
headers={'Authorization': f'Bearer {API_KEY}'},
data=data
) as response:
result = await response.json()
task_id = result['data']['id']
print(f'Task created: {task_id}')
# 2. Stream the execution
async with session.get(
f'{BASE_URL}/api/v1/tasks/{task_id}/stream',
headers={'Authorization': f'Bearer {API_KEY}'}
) as response:
final_response = ''
event_type = None
async for line in response.content:
line = line.decode('utf-8').strip()
if line.startswith('event:'):
event_type = line.replace('event: ', '')
elif line.startswith('data:'):
try:
data = json.loads(line[6:])
if event_type == 'history_done':
print(f"History: {data['messageCount']} messages")
if event_type == 'content':
print(data.get('text', ''), end='', flush=True)
if event_type == 'done':
final_response = data.get('finalResponse', '')
print('\n')
except json.JSONDecodeError:
pass
return {'task_id': task_id, 'final_response': final_response}
# Usage
asyncio.run(create_task_async('What is Python?'))