Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.sigmic.ai/llms.txt

Use this file to discover all available pages before exploring further.

Python Examples

Complete working examples for integrating the Sigmic AI API into your Python applications.

Setup

Install the requests library:
pip install requests
import os
import json
import requests

API_KEY = os.environ.get('SIGMIC_API_KEY', 'sigmic_your_key_here')
BASE_URL = 'https://api.sigmic.ai'

SSE Stream Parser

Reusable generator to parse Server-Sent Events from the stream endpoint:
def parse_sse_stream(task_id):
    """Connect to stream endpoint and yield SSE events."""
    response = requests.get(
        f'{BASE_URL}/api/v1/tasks/{task_id}/stream',
        headers={'Authorization': f'Bearer {API_KEY}'},
        stream=True
    )

    event_type = None
    for line in response.iter_lines():
        if not line:
            continue

        line = line.decode('utf-8')
        if line.startswith(':'):
            continue  # Skip heartbeats
        elif line.startswith('event:'):
            event_type = line.replace('event: ', '')
        elif line.startswith('data:'):
            try:
                data = json.loads(line.replace('data: ', ''))
                yield event_type, data
            except json.JSONDecodeError:
                pass


def process_sse_stream(task_id, handlers=None):
    """Process SSE stream with optional handlers."""
    handlers = handlers or {}
    final_response = ''

    for event_type, data in parse_sse_stream(task_id):
        # Call handler if provided
        if event_type in handlers:
            handlers[event_type](data)

        # Capture final response
        if event_type == 'done':
            final_response = data.get('finalResponse', '')

    return {'final_response': final_response}

Create Task

def create_task(message, system_prompt=None, auto_execute=True, files=None):
    """Create a new task and stream the response."""
    headers = {'Authorization': f'Bearer {API_KEY}'}
    data = {
        'message': message,
        'autoExecute': str(auto_execute).lower()
    }

    if system_prompt:
        data['systemPrompt'] = system_prompt

    files_payload = []
    if files:
        for file_path in files:
            files_payload.append(('files', open(file_path, 'rb')))

    # Create the task (returns JSON immediately)
    response = requests.post(
        f'{BASE_URL}/api/v1/tasks',
        headers=headers,
        data=data,
        files=files_payload if files_payload else None,
    )

    result = response.json()
    task_id = result['data']['id']
    print(f'Task created: {task_id}')

    # Close file handles
    for _, f in files_payload:
        f.close()

    # Stream the execution
    def print_history(data):
        role = data.get('role', '?')
        content = data.get('content', '')[:50]
        print(f"[History] {role}: {content}...")

    def print_history_done(data):
        print(f"[History] {data['messageCount']} messages, streaming: {data['isStreaming']}")

    def print_content(data):
        print(data.get('text', ''), end='', flush=True)

    def print_status(data):
        print(f"[Status] {data.get('message', data.get('status', ''))}")

    def print_thought(data):
        print(f"[Thought] {data.get('subject', '')}")

    def print_tool(data):
        print(f"[Tool] {data.get('toolName', '')}: {data.get('status', '')}")

    def print_error(data):
        print(f"[Error] {data.get('message', '')}")

    def print_done(data):
        print('\n[Done]')

    stream_result = process_sse_stream(task_id, {
        'history_message': print_history,
        'history_done': print_history_done,
        'status': print_status,
        'thought': print_thought,
        'content': print_content,
        'tool_call': print_tool,
        'error': print_error,
        'done': print_done
    })

    return {'task_id': task_id, **stream_result}

Send Follow-up Message

def send_followup(task_id, message):
    """Send a follow-up message (auto-restores expired sessions)."""
    headers = {
        'Authorization': f'Bearer {API_KEY}',
        'Content-Type': 'application/json'
    }

    # Send the follow-up (fire-and-forget)
    requests.post(
        f'{BASE_URL}/api/v1/tasks/{task_id}/messages',
        headers=headers,
        json={'message': message}
    )

    # Stream the response
    def print_content(data):
        print(data.get('text', ''), end='', flush=True)

    def print_done(data):
        print('\n[Done]')

    return process_sse_stream(task_id, {
        'content': print_content,
        'done': print_done
    })

Get Task Status

def get_task(task_id):
    """Get details of a specific task."""
    response = requests.get(
        f'{BASE_URL}/api/v1/tasks/{task_id}',
        headers={'Authorization': f'Bearer {API_KEY}'}
    )

    result = response.json()
    if not result.get('success'):
        raise Exception(result.get('error', {}).get('message', 'Unknown error'))

    return result['data']

List Tasks

def list_tasks(limit=50):
    """List all tasks for the API key."""
    response = requests.get(
        f'{BASE_URL}/api/v1/tasks',
        headers={'Authorization': f'Bearer {API_KEY}'},
        params={'limit': limit}
    )

    result = response.json()
    if not result.get('success'):
        raise Exception(result.get('error', {}).get('message', 'Unknown error'))

    return result['data']

Cancel Task

def cancel_task(task_id):
    """Cancel a running task."""
    response = requests.delete(
        f'{BASE_URL}/api/v1/tasks/{task_id}',
        headers={'Authorization': f'Bearer {API_KEY}'}
    )

    result = response.json()
    if not result.get('success'):
        raise Exception(result.get('error', {}).get('message', 'Unknown error'))

    return result['data']

List & Download Artifacts

def list_artifacts(task_id):
    """List workspace files generated by the task."""
    response = requests.get(
        f'{BASE_URL}/api/v1/tasks/{task_id}/artifacts',
        headers={'Authorization': f'Bearer {API_KEY}'}
    )

    result = response.json()
    if not result.get('success'):
        raise Exception(result.get('error', {}).get('message', 'Unknown error'))

    return result['data']['files']


def download_artifact(task_id, file_path, save_as=None):
    """Download a specific file from the workspace."""
    response = requests.get(
        f'{BASE_URL}/api/v1/tasks/{task_id}/artifacts/download',
        headers={'Authorization': f'Bearer {API_KEY}'},
        params={'path': file_path}
    )

    if response.status_code != 200:
        raise Exception(f'Download failed: {response.status_code}')

    output_path = save_as or file_path.split('/')[-1]
    with open(output_path, 'wb') as f:
        f.write(response.content)

    print(f'Downloaded {len(response.content)} bytes to {output_path}')
    return output_path

Tool Approval Handler

def create_task_with_approval(message, approval_callback):
    """Create a task with manual tool approval."""
    headers = {'Authorization': f'Bearer {API_KEY}'}
    data = {
        'message': message,
        'autoExecute': 'false'
    }

    # Create the task
    response = requests.post(
        f'{BASE_URL}/api/v1/tasks',
        headers=headers,
        data=data,
    )

    task_id = response.json()['data']['id']
    print(f'Task created: {task_id}')
    final_response = ''

    for event_type, data in parse_sse_stream(task_id):
        if event_type == 'history_done':
            print(f"[History] {data['messageCount']} messages loaded")

        elif event_type == 'tool_call' and data.get('status') == 'awaiting_approval':
            tool_name = data.get('toolName')
            args = data.get('args', {})

            print(f'[Tool] {tool_name} requires approval')
            print(f'  Args: {json.dumps(args, indent=2)}')

            # Get approval decision
            should_approve = approval_callback(tool_name, args)

            # Send approval
            requests.post(
                f'{BASE_URL}/api/v1/tasks/{task_id}/tools/{data["callId"]}/approve',
                headers={
                    'Authorization': f'Bearer {API_KEY}',
                    'Content-Type': 'application/json'
                },
                json={'outcome': 'proceed_once' if should_approve else 'cancel'}
            )

        elif event_type == 'content':
            print(data.get('text', ''), end='', flush=True)

        elif event_type == 'done':
            final_response = data.get('finalResponse', '')
            print('\n[Done]')

    return {'task_id': task_id, 'final_response': final_response}

Complete Workflow Example

def main():
    print('=== Sigmic AI API Demo ===\n')

    # 1. Create a task
    print('1. Creating task...')
    result = create_task('What are the three laws of robotics?')
    task_id = result['task_id']
    print(f'Task ID: {task_id}')
    print(f'Response: {result["final_response"]}')

    # 2. Send follow-up (auto-restores expired sessions)
    print('\n2. Sending follow-up...')
    followup = send_followup(task_id, 'Who created these laws?')
    print(f'Follow-up response: {followup["final_response"]}')

    # 3. Check task status
    print('\n3. Checking task status...')
    task = get_task(task_id)
    print(f'Status: {task["status"]}')
    print(f'Created: {task["createdAt"]}')
    print(f'Completed: {task["completedAt"]}')

    # 4. List artifacts
    print('\n4. Listing artifacts...')
    artifacts = list_artifacts(task_id)
    print(f'Found {len(artifacts)} files:')
    for f in artifacts:
        print(f'  - {f["name"]} ({f["size"]} bytes)')

    # 5. List recent tasks
    print('\n5. Listing recent tasks...')
    tasks = list_tasks(5)
    print(f'Found {len(tasks)} tasks:')
    for t in tasks:
        print(f'  - {t["id"]}: {t["status"]}')


if __name__ == '__main__':
    main()

File Upload Example

def analyze_file(file_path, question):
    """Upload a file and ask a question about it."""
    headers = {'Authorization': f'Bearer {API_KEY}'}
    data = {'message': question}

    with open(file_path, 'rb') as f:
        response = requests.post(
            f'{BASE_URL}/api/v1/tasks',
            headers=headers,
            data=data,
            files={'files': f},
        )

    task_id = response.json()['data']['id']

    return process_sse_stream(task_id, {
        'content': lambda d: print(d.get('text', ''), end='', flush=True),
        'done': lambda d: print('\n')
    })


# Usage
result = analyze_file('./data.csv', 'Summarize the data in this CSV file')
print(f'Analysis: {result["final_response"]}')

Error Handling

class APIError(Exception):
    def __init__(self, code, message):
        self.code = code
        self.message = message
        super().__init__(f'{code}: {message}')


def make_request(method, endpoint, **kwargs):
    """Make an API request with error handling."""
    headers = kwargs.pop('headers', {})
    headers['Authorization'] = f'Bearer {API_KEY}'

    response = requests.request(
        method,
        f'{BASE_URL}{endpoint}',
        headers=headers,
        **kwargs
    )

    result = response.json()

    if not result.get('success'):
        error = result.get('error', {})
        raise APIError(
            error.get('code', 'UNKNOWN'),
            error.get('message', 'Unknown error')
        )

    return result


# Usage with error handling
try:
    task = get_task('invalid-id')
except APIError as e:
    if e.code == 'NOT_FOUND':
        print('Task not found')
    elif e.code == 'INVALID_API_KEY':
        print('Check your API key')
    else:
        raise

Async Example (with aiohttp)

import aiohttp
import asyncio

async def create_task_async(message):
    """Create a task and stream using async/await."""
    async with aiohttp.ClientSession() as session:
        # 1. Create the task
        data = aiohttp.FormData()
        data.add_field('message', message)

        async with session.post(
            f'{BASE_URL}/api/v1/tasks',
            headers={'Authorization': f'Bearer {API_KEY}'},
            data=data
        ) as response:
            result = await response.json()
            task_id = result['data']['id']
            print(f'Task created: {task_id}')

        # 2. Stream the execution
        async with session.get(
            f'{BASE_URL}/api/v1/tasks/{task_id}/stream',
            headers={'Authorization': f'Bearer {API_KEY}'}
        ) as response:
            final_response = ''
            event_type = None

            async for line in response.content:
                line = line.decode('utf-8').strip()

                if line.startswith('event:'):
                    event_type = line.replace('event: ', '')
                elif line.startswith('data:'):
                    try:
                        data = json.loads(line[6:])

                        if event_type == 'history_done':
                            print(f"History: {data['messageCount']} messages")

                        if event_type == 'content':
                            print(data.get('text', ''), end='', flush=True)

                        if event_type == 'done':
                            final_response = data.get('finalResponse', '')
                            print('\n')

                    except json.JSONDecodeError:
                        pass

            return {'task_id': task_id, 'final_response': final_response}


# Usage
asyncio.run(create_task_async('What is Python?'))