Supabase Integration

Supabase Integration

The Complete Backend-as-a-Service Solution

Supabase serves as the foundational backend infrastructure for AIMatrix Core Platform, providing a unified, scalable, and developer-friendly solution that combines PostgreSQL database, authentication, real-time subscriptions, storage, and Edge Functions. This integration enables AIMatrix to deliver enterprise-grade features while maintaining the flexibility and performance required for AI-powered applications.

Architecture Overview

  graph TB
    subgraph "AIMatrix Platform"
        AGENTS[AI Agents]
        API[Core API]
        UI[Web Interface]
        MCP[MCP Servers]
    end
    
    subgraph "Supabase Infrastructure"
        subgraph "Authentication & Identity"
            AUTH[Supabase Auth]
            RLS[Row Level Security]
            POLICIES[Security Policies]
        end
        
        subgraph "Database Layer"
            PG[(PostgreSQL)]
            VECTOR[pgvector Extension]
            FUNCTIONS[Database Functions]
            TRIGGERS[Triggers & Webhooks]
        end
        
        subgraph "Real-time Engine"
            RT[Real-time Subscriptions]
            CHANNELS[Channels & Broadcasts]
            PRESENCE[Presence System]
        end
        
        subgraph "API Layer"
            POSTGREST[PostgREST API]
            GRAPHQL[GraphQL API]
            CUSTOM[Custom Endpoints]
        end
        
        subgraph "Storage & Compute"
            STORAGE[Supabase Storage]
            EDGE[Edge Functions]
            CDN[Global CDN]
        end
    end
    
    subgraph "AI/ML Services"
        EMBEDDINGS[Vector Embeddings]
        SIMILARITY[Semantic Search]
        ML[ML Model Store]
    end
    
    AGENTS --> API
    UI --> API
    MCP --> API
    
    API --> AUTH
    API --> POSTGREST
    API --> RT
    API --> EDGE
    
    AUTH --> PG
    AUTH --> RLS
    POSTGREST --> PG
    RT --> PG
    
    PG --> VECTOR
    PG --> FUNCTIONS
    PG --> TRIGGERS
    
    STORAGE --> CDN
    EDGE --> PG
    EDGE --> STORAGE
    
    VECTOR --> EMBEDDINGS
    EMBEDDINGS --> SIMILARITY
    SIMILARITY --> ML

Database Schema Design

Core Tables Structure

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
CREATE EXTENSION IF NOT EXISTS "vector";

-- Organizations table
CREATE TABLE organizations (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    name TEXT NOT NULL,
    slug TEXT UNIQUE NOT NULL,
    plan TEXT NOT NULL DEFAULT 'starter' CHECK (plan IN ('starter', 'professional', 'enterprise')),
    settings JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- User profiles table (extends Supabase auth.users)
CREATE TABLE user_profiles (
    id UUID PRIMARY KEY REFERENCES auth.users ON DELETE CASCADE,
    email TEXT NOT NULL,
    full_name TEXT,
    avatar_url TEXT,
    organization_id UUID REFERENCES organizations(id),
    role TEXT NOT NULL DEFAULT 'member' CHECK (role IN ('owner', 'admin', 'member')),
    metadata JSONB DEFAULT '{}',
    mfa_enabled BOOLEAN DEFAULT FALSE,
    last_login TIMESTAMP WITH TIME ZONE,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- AI Agents table
CREATE TABLE agents (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    name TEXT NOT NULL,
    description TEXT,
    owner_id UUID REFERENCES user_profiles(id),
    organization_id UUID REFERENCES organizations(id),
    type TEXT NOT NULL DEFAULT 'custom' CHECK (type IN ('sales', 'support', 'analytics', 'custom')),
    configuration JSONB NOT NULL DEFAULT '{}',
    model_settings JSONB DEFAULT '{}',
    status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'inactive', 'archived')),
    embedding VECTOR(1536), -- OpenAI embedding dimension
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Agent executions table
CREATE TABLE agent_executions (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    agent_id UUID REFERENCES agents(id) ON DELETE CASCADE,
    user_id UUID REFERENCES user_profiles(id),
    input_data JSONB NOT NULL,
    output_data JSONB,
    execution_time_ms INTEGER,
    tokens_used INTEGER,
    model_used TEXT,
    status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'completed', 'failed')),
    error_message TEXT,
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    completed_at TIMESTAMP WITH TIME ZONE
);

-- Documents table for vector search
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    organization_id UUID REFERENCES organizations(id),
    created_by UUID REFERENCES user_profiles(id),
    document_type TEXT NOT NULL DEFAULT 'general',
    tags TEXT[] DEFAULT '{}',
    metadata JSONB DEFAULT '{}',
    embedding VECTOR(1536),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Knowledge base for AI context
CREATE TABLE knowledge_entries (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    organization_id UUID REFERENCES organizations(id),
    category TEXT NOT NULL DEFAULT 'general',
    tags TEXT[] DEFAULT '{}',
    embedding VECTOR(1536),
    usage_count INTEGER DEFAULT 0,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Chat conversations
CREATE TABLE conversations (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    user_id UUID REFERENCES user_profiles(id),
    agent_id UUID REFERENCES agents(id),
    organization_id UUID REFERENCES organizations(id),
    title TEXT,
    status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'archived', 'deleted')),
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Chat messages
CREATE TABLE messages (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
    sender_type TEXT NOT NULL CHECK (sender_type IN ('user', 'agent', 'system')),
    sender_id UUID, -- References either user_profiles.id or agents.id
    content TEXT NOT NULL,
    message_type TEXT NOT NULL DEFAULT 'text' CHECK (message_type IN ('text', 'image', 'file', 'function_call', 'function_result')),
    metadata JSONB DEFAULT '{}',
    embedding VECTOR(1536),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- File uploads
CREATE TABLE file_uploads (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    filename TEXT NOT NULL,
    original_filename TEXT NOT NULL,
    file_size BIGINT NOT NULL,
    mime_type TEXT NOT NULL,
    storage_path TEXT NOT NULL,
    uploaded_by UUID REFERENCES user_profiles(id),
    organization_id UUID REFERENCES organizations(id),
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- System activities/audit log
CREATE TABLE activities (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    user_id UUID REFERENCES user_profiles(id),
    organization_id UUID REFERENCES organizations(id),
    action TEXT NOT NULL,
    resource_type TEXT NOT NULL,
    resource_id UUID,
    details JSONB DEFAULT '{}',
    ip_address INET,
    user_agent TEXT,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Indexes for performance
CREATE INDEX idx_user_profiles_organization ON user_profiles(organization_id);
CREATE INDEX idx_agents_organization ON agents(organization_id);
CREATE INDEX idx_agents_owner ON agents(owner_id);
CREATE INDEX idx_agent_executions_agent ON agent_executions(agent_id);
CREATE INDEX idx_agent_executions_user ON agent_executions(user_id);
CREATE INDEX idx_agent_executions_created ON agent_executions(created_at);
CREATE INDEX idx_documents_organization ON documents(organization_id);
CREATE INDEX idx_documents_embedding ON documents USING ivfflat (embedding vector_cosine_ops);
CREATE INDEX idx_knowledge_entries_embedding ON knowledge_entries USING ivfflat (embedding vector_cosine_ops);
CREATE INDEX idx_messages_conversation ON messages(conversation_id);
CREATE INDEX idx_messages_created ON messages(created_at);
CREATE INDEX idx_activities_user ON activities(user_id);
CREATE INDEX idx_activities_organization ON activities(organization_id);
CREATE INDEX idx_activities_created ON activities(created_at);

Row Level Security (RLS) Policies

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
-- Enable RLS on all tables
ALTER TABLE organizations ENABLE ROW LEVEL SECURITY;
ALTER TABLE user_profiles ENABLE ROW LEVEL SECURITY;
ALTER TABLE agents ENABLE ROW LEVEL SECURITY;
ALTER TABLE agent_executions ENABLE ROW LEVEL SECURITY;
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
ALTER TABLE knowledge_entries ENABLE ROW LEVEL SECURITY;
ALTER TABLE conversations ENABLE ROW LEVEL SECURITY;
ALTER TABLE messages ENABLE ROW LEVEL SECURITY;
ALTER TABLE file_uploads ENABLE ROW LEVEL SECURITY;
ALTER TABLE activities ENABLE ROW LEVEL SECURITY;

-- Organization policies
CREATE POLICY "Users can view their organization" ON organizations
  FOR SELECT USING (
    id IN (SELECT organization_id FROM user_profiles WHERE id = auth.uid())
  );

-- User profile policies
CREATE POLICY "Users can view own profile" ON user_profiles
  FOR SELECT USING (id = auth.uid());

CREATE POLICY "Users can update own profile" ON user_profiles
  FOR UPDATE USING (id = auth.uid());

CREATE POLICY "Organization admins can view all profiles" ON user_profiles
  FOR SELECT USING (
    organization_id IN (
      SELECT organization_id FROM user_profiles 
      WHERE id = auth.uid() AND role IN ('owner', 'admin')
    )
  );

-- Agent policies
CREATE POLICY "Organization members can view agents" ON agents
  FOR SELECT USING (
    organization_id IN (
      SELECT organization_id FROM user_profiles WHERE id = auth.uid()
    )
  );

CREATE POLICY "Agent owners can manage their agents" ON agents
  FOR ALL USING (owner_id = auth.uid());

CREATE POLICY "Organization admins can manage all agents" ON agents
  FOR ALL USING (
    organization_id IN (
      SELECT organization_id FROM user_profiles 
      WHERE id = auth.uid() AND role IN ('owner', 'admin')
    )
  );

-- Agent execution policies
CREATE POLICY "Users can view their executions" ON agent_executions
  FOR SELECT USING (user_id = auth.uid());

CREATE POLICY "Agent owners can view executions" ON agent_executions
  FOR SELECT USING (
    agent_id IN (SELECT id FROM agents WHERE owner_id = auth.uid())
  );

-- Document policies
CREATE POLICY "Organization members can access documents" ON documents
  FOR SELECT USING (
    organization_id IN (
      SELECT organization_id FROM user_profiles WHERE id = auth.uid()
    )
  );

CREATE POLICY "Users can create documents" ON documents
  FOR INSERT WITH CHECK (
    organization_id IN (
      SELECT organization_id FROM user_profiles WHERE id = auth.uid()
    )
    AND created_by = auth.uid()
  );

-- Knowledge entry policies  
CREATE POLICY "Organization members can access knowledge" ON knowledge_entries
  FOR SELECT USING (
    organization_id IN (
      SELECT organization_id FROM user_profiles WHERE id = auth.uid()
    )
  );

-- Conversation policies
CREATE POLICY "Users can access their conversations" ON conversations
  FOR ALL USING (user_id = auth.uid());

-- Message policies
CREATE POLICY "Users can access messages from their conversations" ON messages
  FOR ALL USING (
    conversation_id IN (
      SELECT id FROM conversations WHERE user_id = auth.uid()
    )
  );

-- File upload policies
CREATE POLICY "Users can access their files" ON file_uploads
  FOR ALL USING (uploaded_by = auth.uid());

CREATE POLICY "Organization members can view files" ON file_uploads
  FOR SELECT USING (
    organization_id IN (
      SELECT organization_id FROM user_profiles WHERE id = auth.uid()
    )
  );

-- Activity policies
CREATE POLICY "Users can view their activities" ON activities
  FOR SELECT USING (user_id = auth.uid());

CREATE POLICY "Organization admins can view all activities" ON activities
  FOR SELECT USING (
    organization_id IN (
      SELECT organization_id FROM user_profiles 
      WHERE id = auth.uid() AND role IN ('owner', 'admin')
    )
  );

Authentication Implementation

Supabase Auth Configuration

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# Supabase authentication service
import os
from supabase import create_client, Client
from typing import Dict, Optional, List
import jwt
import asyncio

class AIMatrixSupabaseAuth:
    def __init__(self):
        self.url = os.getenv("SUPABASE_URL")
        self.anon_key = os.getenv("SUPABASE_ANON_KEY")
        self.service_role_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
        
        if not all([self.url, self.anon_key, self.service_role_key]):
            raise ValueError("Missing required Supabase environment variables")
        
        self.client = create_client(self.url, self.anon_key)
        self.admin_client = create_client(self.url, self.service_role_key)
    
    async def sign_up_with_email(self, email: str, password: str, metadata: Dict = None):
        """Register new user with email and password"""
        
        try:
            response = self.client.auth.sign_up({
                "email": email,
                "password": password,
                "options": {
                    "data": metadata or {}
                }
            })
            
            if response.user:
                # Create user profile
                await self._create_user_profile(response.user, metadata)
                
                return {
                    "success": True,
                    "user": response.user,
                    "session": response.session,
                    "message": "User registered successfully. Please check your email for verification."
                }
            else:
                return {"success": False, "error": "Registration failed"}
                
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def sign_in_with_email(self, email: str, password: str):
        """Authenticate user with email and password"""
        
        try:
            response = self.client.auth.sign_in_with_password({
                "email": email,
                "password": password
            })
            
            if response.user:
                # Update last login
                await self._update_last_login(response.user.id)
                
                # Get user profile with organization data
                profile = await self._get_user_profile_with_org(response.user.id)
                
                return {
                    "success": True,
                    "user": response.user,
                    "session": response.session,
                    "profile": profile,
                    "access_token": response.session.access_token,
                    "refresh_token": response.session.refresh_token
                }
            else:
                return {"success": False, "error": "Invalid credentials"}
                
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def sign_in_with_oauth(self, provider: str, redirect_url: str):
        """Authenticate with OAuth providers"""
        
        supported_providers = ['google', 'github', 'azure', 'linkedin', 'discord']
        
        if provider not in supported_providers:
            return {"success": False, "error": f"Provider {provider} not supported"}
        
        try:
            response = self.client.auth.sign_in_with_oauth({
                "provider": provider,
                "options": {
                    "redirect_to": redirect_url,
                    "scopes": "email profile"
                }
            })
            
            return {"success": True, "url": response.url}
            
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def send_magic_link(self, email: str, redirect_url: str):
        """Send magic link for passwordless authentication"""
        
        try:
            response = self.client.auth.sign_in_with_otp({
                "email": email,
                "options": {
                    "email_redirect_to": redirect_url
                }
            })
            
            return {
                "success": True,
                "message": "Magic link sent to your email"
            }
            
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def setup_mfa(self, user_id: str):
        """Setup multi-factor authentication"""
        
        try:
            # Enroll TOTP factor
            response = self.client.auth.mfa.enroll({
                "factor_type": "totp",
                "friendly_name": "AIMatrix TOTP"
            })
            
            if response.get("id"):
                # Update user profile
                await self._update_mfa_status(user_id, True)
                
                return {
                    "success": True,
                    "factor_id": response["id"],
                    "qr_code": response.get("qr_code"),
                    "secret": response.get("secret")
                }
            else:
                return {"success": False, "error": "MFA enrollment failed"}
                
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def verify_mfa(self, factor_id: str, challenge_id: str, code: str):
        """Verify MFA code"""
        
        try:
            response = self.client.auth.mfa.verify({
                "factor_id": factor_id,
                "challenge_id": challenge_id,
                "code": code
            })
            
            return {"success": True, "session": response}
            
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def refresh_session(self, refresh_token: str):
        """Refresh user session"""
        
        try:
            response = self.client.auth.refresh_session(refresh_token)
            
            return {
                "success": True,
                "session": response.session,
                "access_token": response.session.access_token,
                "refresh_token": response.session.refresh_token
            }
            
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def sign_out(self):
        """Sign out current user"""
        
        try:
            response = self.client.auth.sign_out()
            return {"success": True, "message": "Signed out successfully"}
            
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def get_user(self, access_token: str):
        """Get current user from access token"""
        
        try:
            response = self.client.auth.get_user(access_token)
            
            if response.user:
                profile = await self._get_user_profile_with_org(response.user.id)
                return {
                    "success": True,
                    "user": response.user,
                    "profile": profile
                }
            else:
                return {"success": False, "error": "Invalid token"}
                
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def _create_user_profile(self, user, metadata: Dict = None):
        """Create user profile after registration"""
        
        try:
            profile_data = {
                "id": user.id,
                "email": user.email,
                "full_name": metadata.get("full_name") if metadata else None,
                "metadata": metadata or {},
                "created_at": "now()",
                "updated_at": "now()"
            }
            
            # If organization_id is provided in metadata, add it
            if metadata and metadata.get("organization_id"):
                profile_data["organization_id"] = metadata["organization_id"]
            
            response = self.admin_client.table('user_profiles').insert(profile_data).execute()
            return response
            
        except Exception as e:
            print(f"Error creating user profile: {str(e)}")
    
    async def _update_last_login(self, user_id: str):
        """Update user's last login timestamp"""
        
        try:
            self.admin_client.table('user_profiles').update({
                "last_login": "now()",
                "updated_at": "now()"
            }).eq('id', user_id).execute()
            
        except Exception as e:
            print(f"Error updating last login: {str(e)}")
    
    async def _update_mfa_status(self, user_id: str, enabled: bool):
        """Update user's MFA status"""
        
        try:
            self.admin_client.table('user_profiles').update({
                "mfa_enabled": enabled,
                "updated_at": "now()"
            }).eq('id', user_id).execute()
            
        except Exception as e:
            print(f"Error updating MFA status: {str(e)}")
    
    async def _get_user_profile_with_org(self, user_id: str):
        """Get user profile with organization data"""
        
        try:
            response = self.admin_client.table('user_profiles').select(
                '*, organizations(*)'
            ).eq('id', user_id).single().execute()
            
            return response.data
            
        except Exception as e:
            print(f"Error getting user profile: {str(e)}")
            return None
    
    def decode_jwt(self, token: str) -> Dict:
        """Decode JWT token and extract claims"""
        
        try:
            # Note: In production, you should verify the signature
            decoded = jwt.decode(token, options={"verify_signature": False})
            
            return {
                "user_id": decoded.get("sub"),
                "email": decoded.get("email"),
                "role": decoded.get("role"),
                "user_metadata": decoded.get("user_metadata", {}),
                "app_metadata": decoded.get("app_metadata", {}),
                "exp": decoded.get("exp"),
                "iat": decoded.get("iat")
            }
            
        except Exception as e:
            raise ValueError(f"Invalid token: {str(e)}")

Real-time Features Implementation

Real-time Subscriptions

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Real-time subscription manager
import asyncio
import json
from typing import Callable, Dict, List
from supabase import Client

class SupabaseRealtimeManager:
    def __init__(self, supabase_client: Client):
        self.supabase = supabase_client
        self.subscriptions = {}
        self.event_handlers = {}
    
    async def subscribe_to_table(self, 
                                table_name: str, 
                                events: List[str] = None,
                                filter_condition: str = None,
                                callback: Callable = None):
        """Subscribe to real-time changes on a table"""
        
        if events is None:
            events = ['*']  # All events
        
        subscription_id = f"{table_name}_{hash(str([events, filter_condition]))}"
        
        # Create channel
        channel = self.supabase.channel(f"table-{subscription_id}")
        
        # Configure postgres changes
        postgres_config = {
            "event": events,
            "schema": "public",
            "table": table_name
        }
        
        if filter_condition:
            postgres_config["filter"] = filter_condition
        
        # Setup callback
        def handle_changes(payload):
            if callback:
                asyncio.create_task(callback(payload))
            
            # Store in event handlers for processing
            self._handle_realtime_event(table_name, payload)
        
        # Setup subscription
        channel.on_postgres_changes(
            **postgres_config,
            callback=handle_changes
        )
        
        # Subscribe
        channel.subscribe()
        
        # Store subscription
        self.subscriptions[subscription_id] = {
            "channel": channel,
            "table": table_name,
            "events": events,
            "filter": filter_condition
        }
        
        return subscription_id
    
    async def subscribe_to_user_activities(self, user_id: str, callback: Callable):
        """Subscribe to activities for specific user"""
        
        return await self.subscribe_to_table(
            table_name="activities",
            events=["INSERT"],
            filter_condition=f"user_id=eq.{user_id}",
            callback=callback
        )
    
    async def subscribe_to_agent_executions(self, agent_id: str, callback: Callable):
        """Subscribe to agent execution updates"""
        
        return await self.subscribe_to_table(
            table_name="agent_executions",
            events=["INSERT", "UPDATE"],
            filter_condition=f"agent_id=eq.{agent_id}",
            callback=callback
        )
    
    async def subscribe_to_conversations(self, user_id: str, callback: Callable):
        """Subscribe to conversation updates for user"""
        
        return await self.subscribe_to_table(
            table_name="conversations",
            events=["INSERT", "UPDATE"],
            filter_condition=f"user_id=eq.{user_id}",
            callback=callback
        )
    
    async def subscribe_to_messages(self, conversation_id: str, callback: Callable):
        """Subscribe to new messages in conversation"""
        
        return await self.subscribe_to_table(
            table_name="messages",
            events=["INSERT"],
            filter_condition=f"conversation_id=eq.{conversation_id}",
            callback=callback
        )
    
    async def unsubscribe(self, subscription_id: str):
        """Unsubscribe from real-time updates"""
        
        if subscription_id in self.subscriptions:
            subscription = self.subscriptions[subscription_id]
            subscription["channel"].unsubscribe()
            del self.subscriptions[subscription_id]
            return True
        
        return False
    
    async def unsubscribe_all(self):
        """Unsubscribe from all real-time updates"""
        
        for subscription_id in list(self.subscriptions.keys()):
            await self.unsubscribe(subscription_id)
    
    def _handle_realtime_event(self, table_name: str, payload: Dict):
        """Internal event handler for processing real-time events"""
        
        event_type = payload.get("eventType")
        new_record = payload.get("new", {})
        old_record = payload.get("old", {})
        
        print(f"Real-time event: {table_name}.{event_type}")
        
        # Process based on table and event type
        if table_name == "agent_executions":
            self._handle_agent_execution_event(event_type, new_record, old_record)
        elif table_name == "conversations":
            self._handle_conversation_event(event_type, new_record, old_record)
        elif table_name == "messages":
            self._handle_message_event(event_type, new_record, old_record)
    
    def _handle_agent_execution_event(self, event_type: str, new_record: Dict, old_record: Dict):
        """Handle agent execution real-time events"""
        
        if event_type == "INSERT":
            print(f"New agent execution started: {new_record.get('id')}")
        elif event_type == "UPDATE":
            old_status = old_record.get("status")
            new_status = new_record.get("status")
            
            if old_status != new_status:
                print(f"Agent execution status changed: {old_status} -> {new_status}")
                
                if new_status == "completed":
                    print(f"Agent execution completed in {new_record.get('execution_time_ms')}ms")
                elif new_status == "failed":
                    print(f"Agent execution failed: {new_record.get('error_message')}")
    
    def _handle_conversation_event(self, event_type: str, new_record: Dict, old_record: Dict):
        """Handle conversation real-time events"""
        
        if event_type == "INSERT":
            print(f"New conversation created: {new_record.get('title', 'Untitled')}")
    
    def _handle_message_event(self, event_type: str, new_record: Dict, old_record: Dict):
        """Handle message real-time events"""
        
        if event_type == "INSERT":
            sender_type = new_record.get("sender_type")
            content_preview = new_record.get("content", "")[:50] + "..."
            print(f"New {sender_type} message: {content_preview}")

# Usage example
async def setup_realtime_subscriptions(user_id: str, supabase_client: Client):
    """Setup real-time subscriptions for a user session"""
    
    rt_manager = SupabaseRealtimeManager(supabase_client)
    
    # Subscribe to user activities
    async def activity_callback(payload):
        activity = payload.get("new", {})
        print(f"User activity: {activity.get('action')} on {activity.get('resource_type')}")
    
    await rt_manager.subscribe_to_user_activities(user_id, activity_callback)
    
    # Subscribe to conversations
    async def conversation_callback(payload):
        conversation = payload.get("new", {})
        if payload.get("eventType") == "INSERT":
            print(f"New conversation: {conversation.get('title', 'Untitled')}")
    
    await rt_manager.subscribe_to_conversations(user_id, conversation_callback)
    
    return rt_manager

Vector Embeddings & Semantic Search

Vector Operations Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# Vector operations and semantic search
import numpy as np
from typing import List, Dict, Optional, Tuple
from openai import OpenAI
import asyncio

class SupabaseVectorOperations:
    def __init__(self, supabase_client: Client, openai_api_key: str):
        self.supabase = supabase_client
        self.openai_client = OpenAI(api_key=openai_api_key)
        self.embedding_model = "text-embedding-3-small"  # OpenAI's latest model
        self.embedding_dimension = 1536
    
    async def generate_embedding(self, text: str) -> List[float]:
        """Generate embedding for text using OpenAI"""
        
        try:
            response = self.openai_client.embeddings.create(
                model=self.embedding_model,
                input=text,
                encoding_format="float"
            )
            
            return response.data[0].embedding
        
        except Exception as e:
            raise Exception(f"Failed to generate embedding: {str(e)}")
    
    async def store_document_with_embedding(self, 
                                          title: str, 
                                          content: str, 
                                          organization_id: str,
                                          created_by: str,
                                          document_type: str = "general",
                                          tags: List[str] = None,
                                          metadata: Dict = None):
        """Store document with generated embedding"""
        
        # Generate embedding
        embedding = await self.generate_embedding(f"{title}\n\n{content}")
        
        # Store document
        document_data = {
            "title": title,
            "content": content,
            "organization_id": organization_id,
            "created_by": created_by,
            "document_type": document_type,
            "tags": tags or [],
            "metadata": metadata or {},
            "embedding": embedding
        }
        
        response = self.supabase.table('documents').insert(document_data).execute()
        return response.data[0] if response.data else None
    
    async def store_knowledge_entry(self, 
                                  title: str, 
                                  content: str, 
                                  organization_id: str,
                                  category: str = "general",
                                  tags: List[str] = None):
        """Store knowledge entry with embedding"""
        
        # Generate embedding
        embedding = await self.generate_embedding(f"{title}\n\n{content}")
        
        # Store knowledge entry
        knowledge_data = {
            "title": title,
            "content": content,
            "organization_id": organization_id,
            "category": category,
            "tags": tags or [],
            "embedding": embedding
        }
        
        response = self.supabase.table('knowledge_entries').insert(knowledge_data).execute()
        return response.data[0] if response.data else None
    
    async def semantic_search_documents(self, 
                                      query: str, 
                                      organization_id: str,
                                      similarity_threshold: float = 0.7,
                                      limit: int = 10,
                                      document_type: str = None) -> List[Dict]:
        """Perform semantic search on documents"""
        
        # Generate query embedding
        query_embedding = await self.generate_embedding(query)
        
        # Build search parameters
        search_params = {
            "query_embedding": query_embedding,
            "similarity_threshold": similarity_threshold,
            "match_count": limit,
            "organization_id": organization_id
        }
        
        if document_type:
            search_params["document_type_filter"] = document_type
        
        # Call stored procedure for vector search
        response = self.supabase.rpc(
            'search_documents',
            search_params
        ).execute()
        
        return response.data or []
    
    async def semantic_search_knowledge(self, 
                                      query: str, 
                                      organization_id: str,
                                      similarity_threshold: float = 0.7,
                                      limit: int = 10,
                                      category: str = None) -> List[Dict]:
        """Perform semantic search on knowledge entries"""
        
        # Generate query embedding
        query_embedding = await self.generate_embedding(query)
        
        # Build search parameters
        search_params = {
            "query_embedding": query_embedding,
            "similarity_threshold": similarity_threshold,
            "match_count": limit,
            "organization_id": organization_id
        }
        
        if category:
            search_params["category_filter"] = category
        
        # Call stored procedure
        response = self.supabase.rpc(
            'search_knowledge',
            search_params
        ).execute()
        
        # Update usage count for matched entries
        if response.data:
            matched_ids = [entry['id'] for entry in response.data]
            await self._increment_usage_count(matched_ids)
        
        return response.data or []
    
    async def find_similar_agents(self, 
                                agent_id: str, 
                                organization_id: str,
                                similarity_threshold: float = 0.8,
                                limit: int = 5) -> List[Dict]:
        """Find similar agents based on configuration embeddings"""
        
        # Get agent embedding
        agent_response = self.supabase.table('agents').select(
            'embedding'
        ).eq('id', agent_id).single().execute()
        
        if not agent_response.data or not agent_response.data['embedding']:
            return []
        
        agent_embedding = agent_response.data['embedding']
        
        # Search for similar agents
        response = self.supabase.rpc(
            'find_similar_agents',
            {
                "query_embedding": agent_embedding,
                "similarity_threshold": similarity_threshold,
                "match_count": limit,
                "organization_id": organization_id,
                "exclude_agent_id": agent_id
            }
        ).execute()
        
        return response.data or []
    
    async def get_contextual_knowledge(self, 
                                     conversation_id: str,
                                     query: str,
                                     limit: int = 5) -> List[Dict]:
        """Get contextual knowledge based on conversation history"""
        
        # Get conversation messages for context
        messages_response = self.supabase.table('messages').select(
            'content, embedding'
        ).eq('conversation_id', conversation_id).order(
            'created_at', desc=True
        ).limit(10).execute()
        
        if not messages_response.data:
            # Fall back to simple query search
            return await self.semantic_search_knowledge(query, limit=limit)
        
        # Combine recent messages for context
        context_text = " ".join([msg['content'] for msg in messages_response.data[:5]])
        combined_query = f"{query}\n\nContext: {context_text}"
        
        # Generate embedding for enhanced query
        query_embedding = await self.generate_embedding(combined_query)
        
        # Search knowledge with context
        response = self.supabase.rpc(
            'search_contextual_knowledge',
            {
                "query_embedding": query_embedding,
                "conversation_embeddings": [msg['embedding'] for msg in messages_response.data if msg['embedding']],
                "similarity_threshold": 0.6,
                "match_count": limit
            }
        ).execute()
        
        return response.data or []
    
    async def _increment_usage_count(self, knowledge_ids: List[str]):
        """Increment usage count for knowledge entries"""
        
        try:
            for knowledge_id in knowledge_ids:
                self.supabase.table('knowledge_entries').update({
                    "usage_count": self.supabase.table('knowledge_entries').select(
                        'usage_count'
                    ).eq('id', knowledge_id).single().execute().data['usage_count'] + 1,
                    "updated_at": "now()"
                }).eq('id', knowledge_id).execute()
        
        except Exception as e:
            print(f"Error incrementing usage count: {str(e)}")

# PostgreSQL functions for vector operations
vector_search_functions = """
-- Document search function
CREATE OR REPLACE FUNCTION search_documents(
    query_embedding VECTOR(1536),
    similarity_threshold FLOAT DEFAULT 0.7,
    match_count INT DEFAULT 10,
    organization_id UUID DEFAULT NULL,
    document_type_filter TEXT DEFAULT NULL
)
RETURNS TABLE (
    id UUID,
    title TEXT,
    content TEXT,
    document_type TEXT,
    tags TEXT[],
    similarity FLOAT,
    created_at TIMESTAMPTZ
)
LANGUAGE SQL
AS $$
  SELECT 
    d.id,
    d.title,
    d.content,
    d.document_type,
    d.tags,
    (d.embedding <=> query_embedding) * -1 + 1 as similarity,
    d.created_at
  FROM documents d
  WHERE 
    (d.embedding <=> query_embedding) * -1 + 1 > similarity_threshold
    AND ($4::UUID IS NULL OR d.organization_id = $4)
    AND ($5::TEXT IS NULL OR d.document_type = $5)
  ORDER BY d.embedding <=> query_embedding
  LIMIT match_count;
$$;

-- Knowledge search function
CREATE OR REPLACE FUNCTION search_knowledge(
    query_embedding VECTOR(1536),
    similarity_threshold FLOAT DEFAULT 0.7,
    match_count INT DEFAULT 10,
    organization_id UUID DEFAULT NULL,
    category_filter TEXT DEFAULT NULL
)
RETURNS TABLE (
    id UUID,
    title TEXT,
    content TEXT,
    category TEXT,
    tags TEXT[],
    usage_count INT,
    similarity FLOAT,
    created_at TIMESTAMPTZ
)
LANGUAGE SQL
AS $$
  SELECT 
    k.id,
    k.title,
    k.content,
    k.category,
    k.tags,
    k.usage_count,
    (k.embedding <=> query_embedding) * -1 + 1 as similarity,
    k.created_at
  FROM knowledge_entries k
  WHERE 
    (k.embedding <=> query_embedding) * -1 + 1 > similarity_threshold
    AND ($4::UUID IS NULL OR k.organization_id = $4)
    AND ($5::TEXT IS NULL OR k.category = $5)
  ORDER BY k.embedding <=> query_embedding
  LIMIT match_count;
$$;

-- Agent similarity function
CREATE OR REPLACE FUNCTION find_similar_agents(
    query_embedding VECTOR(1536),
    similarity_threshold FLOAT DEFAULT 0.8,
    match_count INT DEFAULT 5,
    organization_id UUID DEFAULT NULL,
    exclude_agent_id UUID DEFAULT NULL
)
RETURNS TABLE (
    id UUID,
    name TEXT,
    description TEXT,
    type TEXT,
    similarity FLOAT,
    created_at TIMESTAMPTZ
)
LANGUAGE SQL
AS $$
  SELECT 
    a.id,
    a.name,
    a.description,
    a.type,
    (a.embedding <=> query_embedding) * -1 + 1 as similarity,
    a.created_at
  FROM agents a
  WHERE 
    a.embedding IS NOT NULL
    AND (a.embedding <=> query_embedding) * -1 + 1 > similarity_threshold
    AND ($4::UUID IS NULL OR a.organization_id = $4)
    AND ($5::UUID IS NULL OR a.id != $5)
  ORDER BY a.embedding <=> query_embedding
  LIMIT match_count;
$$;
"""

Storage Implementation

Supabase Storage Integration

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# Supabase Storage operations
import base64
import mimetypes
from pathlib import Path
from typing import Dict, List, Optional, BinaryIO
import uuid

class SupabaseStorageManager:
    def __init__(self, supabase_client: Client):
        self.supabase = supabase_client
        self.default_bucket = "aimatrix-files"
        self.avatar_bucket = "avatars"
        self.document_bucket = "documents"
        
        # Initialize buckets
        self._ensure_buckets_exist()
    
    def _ensure_buckets_exist(self):
        """Ensure required storage buckets exist"""
        
        buckets_to_create = [
            {
                "name": self.default_bucket,
                "public": False
            },
            {
                "name": self.avatar_bucket,
                "public": True
            },
            {
                "name": self.document_bucket,
                "public": False
            }
        ]
        
        for bucket_config in buckets_to_create:
            try:
                self.supabase.storage.create_bucket(
                    bucket_config["name"],
                    options={"public": bucket_config["public"]}
                )
            except Exception:
                # Bucket might already exist, continue
                pass
    
    async def upload_file(self, 
                         file_data: bytes, 
                         filename: str,
                         bucket: str = None,
                         folder: str = None,
                         user_id: str = None,
                         organization_id: str = None,
                         content_type: str = None) -> Dict:
        """Upload file to Supabase Storage"""
        
        if bucket is None:
            bucket = self.default_bucket
        
        # Generate unique filename to avoid conflicts
        file_extension = Path(filename).suffix
        unique_filename = f"{uuid.uuid4()}{file_extension}"
        
        # Build file path
        file_path = unique_filename
        if folder:
            file_path = f"{folder}/{unique_filename}"
        
        # Detect content type if not provided
        if content_type is None:
            content_type, _ = mimetypes.guess_type(filename)
            if content_type is None:
                content_type = "application/octet-stream"
        
        try:
            # Upload file
            upload_response = self.supabase.storage.from_(bucket).upload(
                path=file_path,
                file=file_data,
                file_options={
                    "content-type": content_type,
                    "upsert": False
                }
            )
            
            if upload_response.get('error'):
                raise Exception(upload_response['error']['message'])
            
            # Get file URL
            if bucket == self.avatar_bucket:
                # Public bucket
                file_url = self.supabase.storage.from_(bucket).get_public_url(file_path)
            else:
                # Private bucket - create signed URL (valid for 1 hour)
                file_url = self.supabase.storage.from_(bucket).create_signed_url(
                    file_path, 3600
                )
            
            # Store file metadata in database
            file_metadata = {
                "filename": unique_filename,
                "original_filename": filename,
                "file_size": len(file_data),
                "mime_type": content_type,
                "storage_path": f"{bucket}/{file_path}",
                "uploaded_by": user_id,
                "organization_id": organization_id,
                "metadata": {
                    "bucket": bucket,
                    "folder": folder,
                    "content_type": content_type
                }
            }
            
            db_response = self.supabase.table('file_uploads').insert(file_metadata).execute()
            
            return {
                "success": True,
                "file_id": db_response.data[0]['id'] if db_response.data else None,
                "filename": unique_filename,
                "original_filename": filename,
                "file_size": len(file_data),
                "file_url": file_url,
                "storage_path": file_path,
                "content_type": content_type
            }
            
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    async def upload_avatar(self, 
                           file_data: bytes, 
                           filename: str,
                           user_id: str) -> Dict:
        """Upload user avatar"""
        
        result = await self.upload_file(
            file_data=file_data,
            filename=filename,
            bucket=self.avatar_bucket,
            folder=f"users/{user_id}",
            user_id=user_id,
            content_type="image/jpeg"  # Assuming processed avatar
        )
        
        if result["success"]:
            # Update user profile with avatar URL
            self.supabase.table('user_profiles').update({
                "avatar_url": result["file_url"],
                "updated_at": "now()"
            }).eq('id', user_id).execute()
        
        return result
    
    async def upload_document(self, 
                             file_data: bytes, 
                             filename: str,
                             user_id: str,
                             organization_id: str,
                             document_type: str = "general") -> Dict:
        """Upload document file"""
        
        result = await self.upload_file(
            file_data=file_data,
            filename=filename,
            bucket=self.document_bucket,
            folder=f"organizations/{organization_id}/documents",
            user_id=user_id,
            organization_id=organization_id
        )
        
        if result["success"] and filename.lower().endswith(('.txt', '.md', '.pdf')):
            # Extract text content for vector embedding
            try:
                content = await self._extract_text_content(file_data, filename)
                if content:
                    # Store as searchable document
                    vector_ops = SupabaseVectorOperations(self.supabase, "openai_key")
                    await vector_ops.store_document_with_embedding(
                        title=filename,
                        content=content,
                        organization_id=organization_id,
                        created_by=user_id,
                        document_type=document_type,
                        metadata={"file_id": result["file_id"]}
                    )
            except Exception as e:
                print(f"Error processing document content: {str(e)}")
        
        return result
    
    async def download_file(self, file_path: str, bucket: str = None) -> bytes:
        """Download file from storage"""
        
        if bucket is None:
            bucket = self.default_bucket
        
        try:
            response = self.supabase.storage.from_(bucket).download(file_path)
            return response
            
        except Exception as e:
            raise Exception(f"Failed to download file: {str(e)}")
    
    async def delete_file(self, file_id: str) -> Dict:
        """Delete file from storage and database"""
        
        try:
            # Get file metadata
            file_response = self.supabase.table('file_uploads').select(
                '*'
            ).eq('id', file_id).single().execute()
            
            if not file_response.data:
                return {"success": False, "error": "File not found"}
            
            file_data = file_response.data
            storage_path = file_data['storage_path']
            
            # Extract bucket and path
            bucket, file_path = storage_path.split('/', 1)
            
            # Delete from storage
            storage_response = self.supabase.storage.from_(bucket).remove([file_path])
            
            if storage_response.get('error'):
                return {"success": False, "error": storage_response['error']['message']}
            
            # Delete from database
            self.supabase.table('file_uploads').delete().eq('id', file_id).execute()
            
            return {"success": True, "message": "File deleted successfully"}
            
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def get_file_info(self, file_id: str) -> Dict:
        """Get file information"""
        
        try:
            response = self.supabase.table('file_uploads').select(
                '*'
            ).eq('id', file_id).single().execute()
            
            if response.data:
                file_data = response.data
                
                # Generate signed URL if private
                bucket, file_path = file_data['storage_path'].split('/', 1)
                
                if bucket != self.avatar_bucket:
                    # Private file - create signed URL
                    signed_url = self.supabase.storage.from_(bucket).create_signed_url(
                        file_path, 3600
                    )
                    file_data['download_url'] = signed_url
                else:
                    # Public file
                    public_url = self.supabase.storage.from_(bucket).get_public_url(file_path)
                    file_data['download_url'] = public_url
                
                return {"success": True, "file": file_data}
            else:
                return {"success": False, "error": "File not found"}
                
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def list_user_files(self, user_id: str, limit: int = 50) -> List[Dict]:
        """List files uploaded by user"""
        
        try:
            response = self.supabase.table('file_uploads').select(
                '*'
            ).eq('uploaded_by', user_id).order(
                'created_at', desc=True
            ).limit(limit).execute()
            
            return response.data or []
            
        except Exception as e:
            print(f"Error listing user files: {str(e)}")
            return []
    
    async def _extract_text_content(self, file_data: bytes, filename: str) -> str:
        """Extract text content from file for indexing"""
        
        try:
            if filename.lower().endswith('.txt'):
                return file_data.decode('utf-8')
            elif filename.lower().endswith('.md'):
                return file_data.decode('utf-8')
            elif filename.lower().endswith('.pdf'):
                # Would need PDF processing library like PyPDF2
                # For now, return empty string
                return ""
            else:
                return ""
                
        except Exception:
            return ""

Edge Functions

Supabase Edge Functions Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Edge Function for AI agent execution
import { serve } from "https://deno.land/std@0.168.0/http/server.ts"
import { createClient } from "https://esm.sh/@supabase/supabase-js@2"

const corsHeaders = {
  'Access-Control-Allow-Origin': '*',
  'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type',
}

serve(async (req) => {
  // Handle CORS
  if (req.method === 'OPTIONS') {
    return new Response('ok', { headers: corsHeaders })
  }

  try {
    const supabase = createClient(
      Deno.env.get('SUPABASE_URL') ?? '',
      Deno.env.get('SUPABASE_SERVICE_ROLE_KEY') ?? ''
    )

    // Get request data
    const { agent_id, user_input, context } = await req.json()

    // Get agent configuration
    const { data: agent, error: agentError } = await supabase
      .from('agents')
      .select('*')
      .eq('id', agent_id)
      .single()

    if (agentError || !agent) {
      throw new Error('Agent not found')
    }

    // Create execution record
    const { data: execution, error: executionError } = await supabase
      .from('agent_executions')
      .insert({
        agent_id: agent_id,
        user_id: context.user_id,
        input_data: { user_input, context },
        status: 'running'
      })
      .select()
      .single()

    if (executionError) {
      throw new Error('Failed to create execution record')
    }

    // Execute AI model
    const startTime = Date.now()
    
    const openaiResponse = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${Deno.env.get('OPENAI_API_KEY')}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        model: agent.model_settings?.model || 'gpt-4',
        messages: [
          {
            role: 'system',
            content: agent.configuration.system_prompt || 'You are a helpful AI assistant.'
          },
          {
            role: 'user',
            content: user_input
          }
        ],
        temperature: agent.model_settings?.temperature || 0.7,
        max_tokens: agent.model_settings?.max_tokens || 1000
      })
    })

    const openaiData = await openaiResponse.json()
    const executionTime = Date.now() - startTime

    if (!openaiResponse.ok) {
      throw new Error(`OpenAI API error: ${openaiData.error?.message}`)
    }

    const response = openaiData.choices[0]?.message?.content || 'No response generated'
    const tokensUsed = openaiData.usage?.total_tokens || 0

    // Update execution record
    await supabase
      .from('agent_executions')
      .update({
        output_data: { response },
        execution_time_ms: executionTime,
        tokens_used: tokensUsed,
        model_used: agent.model_settings?.model || 'gpt-4',
        status: 'completed',
        completed_at: new Date().toISOString()
      })
      .eq('id', execution.id)

    // Log activity
    await supabase
      .from('activities')
      .insert({
        user_id: context.user_id,
        organization_id: agent.organization_id,
        action: 'agent_execution',
        resource_type: 'agent',
        resource_id: agent_id,
        details: {
          execution_id: execution.id,
          tokens_used: tokensUsed,
          execution_time_ms: executionTime
        }
      })

    return new Response(
      JSON.stringify({
        success: true,
        execution_id: execution.id,
        response: response,
        tokens_used: tokensUsed,
        execution_time_ms: executionTime
      }),
      {
        headers: { ...corsHeaders, 'Content-Type': 'application/json' },
        status: 200,
      },
    )

  } catch (error) {
    return new Response(
      JSON.stringify({
        success: false,
        error: error.message
      }),
      {
        headers: { ...corsHeaders, 'Content-Type': 'application/json' },
        status: 400,
      },
    )
  }
})

Python Edge Function Alternative

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# Python-based edge function using Supabase client
import os
import json
import asyncio
from supabase import create_client, Client
import openai

class AIMatrixEdgeFunctions:
    def __init__(self):
        self.supabase = create_client(
            os.getenv("SUPABASE_URL"),
            os.getenv("SUPABASE_SERVICE_ROLE_KEY")
        )
        self.openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    
    async def execute_agent(self, agent_id: str, user_input: str, context: dict):
        """Execute AI agent with given input"""
        
        try:
            # Get agent configuration
            agent_response = self.supabase.table('agents').select('*').eq('id', agent_id).single().execute()
            
            if not agent_response.data:
                raise Exception("Agent not found")
            
            agent = agent_response.data
            
            # Create execution record
            execution_data = {
                "agent_id": agent_id,
                "user_id": context.get('user_id'),
                "input_data": {"user_input": user_input, "context": context},
                "status": "running"
            }
            
            execution_response = self.supabase.table('agent_executions').insert(execution_data).execute()
            execution = execution_response.data[0]
            
            # Execute AI model
            start_time = asyncio.get_event_loop().time()
            
            messages = [
                {
                    "role": "system",
                    "content": agent.get('configuration', {}).get('system_prompt', 'You are a helpful AI assistant.')
                },
                {
                    "role": "user",
                    "content": user_input
                }
            ]
            
            # Add context if available
            if context.get('conversation_history'):
                messages = context['conversation_history'] + messages
            
            # Call OpenAI
            response = self.openai_client.chat.completions.create(
                model=agent.get('model_settings', {}).get('model', 'gpt-4'),
                messages=messages,
                temperature=agent.get('model_settings', {}).get('temperature', 0.7),
                max_tokens=agent.get('model_settings', {}).get('max_tokens', 1000)
            )
            
            execution_time = int((asyncio.get_event_loop().time() - start_time) * 1000)
            ai_response = response.choices[0].message.content
            tokens_used = response.usage.total_tokens
            
            # Update execution record
            self.supabase.table('agent_executions').update({
                "output_data": {"response": ai_response},
                "execution_time_ms": execution_time,
                "tokens_used": tokens_used,
                "model_used": agent.get('model_settings', {}).get('model', 'gpt-4'),
                "status": "completed",
                "completed_at": "now()"
            }).eq('id', execution['id']).execute()
            
            # Log activity
            self.supabase.table('activities').insert({
                "user_id": context.get('user_id'),
                "organization_id": agent['organization_id'],
                "action": "agent_execution",
                "resource_type": "agent",
                "resource_id": agent_id,
                "details": {
                    "execution_id": execution['id'],
                    "tokens_used": tokens_used,
                    "execution_time_ms": execution_time
                }
            }).execute()
            
            return {
                "success": True,
                "execution_id": execution['id'],
                "response": ai_response,
                "tokens_used": tokens_used,
                "execution_time_ms": execution_time
            }
            
        except Exception as e:
            # Update execution record with error
            if 'execution' in locals():
                self.supabase.table('agent_executions').update({
                    "status": "failed",
                    "error_message": str(e),
                    "completed_at": "now()"
                }).eq('id', execution['id']).execute()
            
            return {
                "success": False,
                "error": str(e)
            }
    
    async def process_document_upload(self, file_id: str):
        """Process uploaded document for vector indexing"""
        
        try:
            # Get file info
            file_response = self.supabase.table('file_uploads').select('*').eq('id', file_id).single().execute()
            
            if not file_response.data:
                raise Exception("File not found")
            
            file_data = file_response.data
            
            # Download file content
            bucket, file_path = file_data['storage_path'].split('/', 1)
            file_content = self.supabase.storage.from_(bucket).download(file_path)
            
            # Extract text content
            text_content = self._extract_text_from_file(file_content, file_data['original_filename'])
            
            if text_content:
                # Generate embedding
                embedding_response = self.openai_client.embeddings.create(
                    model="text-embedding-3-small",
                    input=text_content
                )
                
                embedding = embedding_response.data[0].embedding
                
                # Store as searchable document
                document_data = {
                    "title": file_data['original_filename'],
                    "content": text_content,
                    "organization_id": file_data['organization_id'],
                    "created_by": file_data['uploaded_by'],
                    "document_type": "uploaded_file",
                    "embedding": embedding,
                    "metadata": {"file_id": file_id}
                }
                
                self.supabase.table('documents').insert(document_data).execute()
                
                return {"success": True, "message": "Document processed and indexed"}
            else:
                return {"success": False, "error": "Unable to extract text content"}
                
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    def _extract_text_from_file(self, file_content: bytes, filename: str) -> str:
        """Extract text content from various file types"""
        
        try:
            if filename.lower().endswith(('.txt', '.md')):
                return file_content.decode('utf-8')
            elif filename.lower().endswith('.pdf'):
                # Would use PyPDF2 or similar library
                return ""
            else:
                return ""
        except:
            return ""

Client Integration Examples

JavaScript/TypeScript Client

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
// AIMatrix Supabase client integration
import { createClient, SupabaseClient } from '@supabase/supabase-js'

export class AIMatrixClient {
  private supabase: SupabaseClient
  private realtimeSubscriptions: Map<string, any> = new Map()

  constructor(supabaseUrl: string, supabaseKey: string) {
    this.supabase = createClient(supabaseUrl, supabaseKey)
  }

  // Authentication methods
  async signUp(email: string, password: string, metadata?: any) {
    const { data, error } = await this.supabase.auth.signUp({
      email,
      password,
      options: {
        data: metadata
      }
    })

    if (error) throw error

    return data
  }

  async signIn(email: string, password: string) {
    const { data, error } = await this.supabase.auth.signInWithPassword({
      email,
      password
    })

    if (error) throw error

    // Get user profile
    const profile = await this.getUserProfile(data.user.id)

    return { ...data, profile }
  }

  async signInWithOAuth(provider: 'google' | 'github' | 'azure', redirectTo: string) {
    const { data, error } = await this.supabase.auth.signInWithOAuth({
      provider,
      options: {
        redirectTo
      }
    })

    if (error) throw error
    return data
  }

  async signOut() {
    const { error } = await this.supabase.auth.signOut()
    if (error) throw error
  }

  // User profile methods
  async getUserProfile(userId: string) {
    const { data, error } = await this.supabase
      .from('user_profiles')
      .select('*, organizations(*)')
      .eq('id', userId)
      .single()

    if (error) throw error
    return data
  }

  async updateUserProfile(userId: string, updates: any) {
    const { data, error } = await this.supabase
      .from('user_profiles')
      .update(updates)
      .eq('id', userId)
      .select()
      .single()

    if (error) throw error
    return data
  }

  // Agent methods
  async createAgent(agentData: any) {
    const { data, error } = await this.supabase
      .from('agents')
      .insert(agentData)
      .select()
      .single()

    if (error) throw error
    return data
  }

  async getAgents(organizationId: string) {
    const { data, error } = await this.supabase
      .from('agents')
      .select('*')
      .eq('organization_id', organizationId)
      .eq('status', 'active')

    if (error) throw error
    return data
  }

  async executeAgent(agentId: string, userInput: string, context?: any) {
    const { data, error } = await this.supabase.functions.invoke('execute-agent', {
      body: {
        agent_id: agentId,
        user_input: userInput,
        context: context || {}
      }
    })

    if (error) throw error
    return data
  }

  // Real-time methods
  async subscribeToAgentExecutions(agentId: string, callback: (payload: any) => void) {
    const subscription = this.supabase
      .channel(`agent-executions-${agentId}`)
      .on('postgres_changes', {
        event: '*',
        schema: 'public',
        table: 'agent_executions',
        filter: `agent_id=eq.${agentId}`
      }, callback)
      .subscribe()

    this.realtimeSubscriptions.set(`agent-executions-${agentId}`, subscription)
    return subscription
  }

  async subscribeToConversation(conversationId: string, callback: (payload: any) => void) {
    const subscription = this.supabase
      .channel(`conversation-${conversationId}`)
      .on('postgres_changes', {
        event: 'INSERT',
        schema: 'public',
        table: 'messages',
        filter: `conversation_id=eq.${conversationId}`
      }, callback)
      .subscribe()

    this.realtimeSubscriptions.set(`conversation-${conversationId}`, subscription)
    return subscription
  }

  async unsubscribe(subscriptionKey: string) {
    const subscription = this.realtimeSubscriptions.get(subscriptionKey)
    if (subscription) {
      await this.supabase.removeChannel(subscription)
      this.realtimeSubscriptions.delete(subscriptionKey)
    }
  }

  // Vector search methods
  async searchDocuments(query: string, organizationId: string, options?: any) {
    // First, generate embedding for the query (would call your embedding service)
    const embedding = await this.generateEmbedding(query)

    const { data, error } = await this.supabase.rpc('search_documents', {
      query_embedding: embedding,
      similarity_threshold: options?.threshold || 0.7,
      match_count: options?.limit || 10,
      organization_id: organizationId
    })

    if (error) throw error
    return data
  }

  async searchKnowledge(query: string, organizationId: string, options?: any) {
    const embedding = await this.generateEmbedding(query)

    const { data, error } = await this.supabase.rpc('search_knowledge', {
      query_embedding: embedding,
      similarity_threshold: options?.threshold || 0.7,
      match_count: options?.limit || 10,
      organization_id: organizationId
    })

    if (error) throw error
    return data
  }

  // Storage methods
  async uploadFile(file: File, bucket: string = 'aimatrix-files', folder?: string) {
    const fileExt = file.name.split('.').pop()
    const fileName = `${crypto.randomUUID()}.${fileExt}`
    const filePath = folder ? `${folder}/${fileName}` : fileName

    const { data, error } = await this.supabase.storage
      .from(bucket)
      .upload(filePath, file)

    if (error) throw error

    // Store metadata in database
    const { data: fileRecord, error: dbError } = await this.supabase
      .from('file_uploads')
      .insert({
        filename: fileName,
        original_filename: file.name,
        file_size: file.size,
        mime_type: file.type,
        storage_path: `${bucket}/${filePath}`
      })
      .select()
      .single()

    if (dbError) throw dbError

    return { storage: data, record: fileRecord }
  }

  async downloadFile(filePath: string, bucket: string = 'aimatrix-files') {
    const { data, error } = await this.supabase.storage
      .from(bucket)
      .download(filePath)

    if (error) throw error
    return data
  }

  private async generateEmbedding(text: string): Promise<number[]> {
    // This would call your embedding generation service
    // For now, return a placeholder
    return Array(1536).fill(0).map(() => Math.random())
  }
}

// Usage example
const client = new AIMatrixClient(
  process.env.NEXT_PUBLIC_SUPABASE_URL!,
  process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY!
)

// Sign in and setup real-time subscriptions
client.signIn('user@example.com', 'password').then(async (authData) => {
  console.log('Signed in:', authData)

  // Subscribe to agent executions
  await client.subscribeToAgentExecutions('agent-123', (payload) => {
    console.log('Agent execution update:', payload)
  })

  // Execute an agent
  const result = await client.executeAgent('agent-123', 'Hello, how can you help me?')
  console.log('Agent response:', result)
})

Monitoring and Analytics

Performance Monitoring

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# Supabase performance monitoring
import asyncio
import time
from datetime import datetime, timedelta
from typing import Dict, List
import psutil

class SupabaseMonitoring:
    def __init__(self, supabase_client: Client):
        self.supabase = supabase_client
        self.metrics_table = 'performance_metrics'
    
    async def log_query_performance(self, 
                                   query_type: str,
                                   table_name: str, 
                                   execution_time_ms: int,
                                   row_count: int = None,
                                   user_id: str = None):
        """Log query performance metrics"""
        
        try:
            metrics_data = {
                "metric_type": "query_performance",
                "query_type": query_type,  # SELECT, INSERT, UPDATE, DELETE
                "table_name": table_name,
                "execution_time_ms": execution_time_ms,
                "row_count": row_count,
                "user_id": user_id,
                "timestamp": datetime.utcnow().isoformat()
            }
            
            await self.supabase.table(self.metrics_table).insert(metrics_data).execute()
            
        except Exception as e:
            print(f"Error logging performance metrics: {str(e)}")
    
    async def log_realtime_metrics(self, 
                                  channel_count: int,
                                  active_subscriptions: int,
                                  messages_per_second: float):
        """Log real-time performance metrics"""
        
        try:
            metrics_data = {
                "metric_type": "realtime_performance",
                "channel_count": channel_count,
                "active_subscriptions": active_subscriptions,
                "messages_per_second": messages_per_second,
                "timestamp": datetime.utcnow().isoformat()
            }
            
            await self.supabase.table(self.metrics_table).insert(metrics_data).execute()
            
        except Exception as e:
            print(f"Error logging realtime metrics: {str(e)}")
    
    async def log_vector_search_metrics(self, 
                                       search_time_ms: int,
                                       result_count: int,
                                       similarity_threshold: float,
                                       embedding_dimension: int):
        """Log vector search performance"""
        
        try:
            metrics_data = {
                "metric_type": "vector_search",
                "search_time_ms": search_time_ms,
                "result_count": result_count,
                "similarity_threshold": similarity_threshold,
                "embedding_dimension": embedding_dimension,
                "timestamp": datetime.utcnow().isoformat()
            }
            
            await self.supabase.table(self.metrics_table).insert(metrics_data).execute()
            
        except Exception as e:
            print(f"Error logging vector search metrics: {str(e)}")
    
    async def get_performance_summary(self, hours: int = 24) -> Dict:
        """Get performance summary for the last N hours"""
        
        try:
            cutoff_time = datetime.utcnow() - timedelta(hours=hours)
            
            # Query performance summary
            query_response = await self.supabase.rpc('get_performance_summary', {
                'start_time': cutoff_time.isoformat(),
                'end_time': datetime.utcnow().isoformat()
            }).execute()
            
            return query_response.data
            
        except Exception as e:
            print(f"Error getting performance summary: {str(e)}")
            return {}
    
    async def detect_performance_issues(self) -> List[Dict]:
        """Detect potential performance issues"""
        
        issues = []
        
        try:
            # Check for slow queries (>5 seconds)
            slow_queries = await self.supabase.table(self.metrics_table).select(
                '*'
            ).eq('metric_type', 'query_performance').gt(
                'execution_time_ms', 5000
            ).gte('timestamp', (datetime.utcnow() - timedelta(hours=1)).isoformat()).execute()
            
            if slow_queries.data:
                issues.append({
                    "type": "slow_queries",
                    "count": len(slow_queries.data),
                    "description": f"Found {len(slow_queries.data)} slow queries in the last hour",
                    "severity": "high" if len(slow_queries.data) > 10 else "medium"
                })
            
            # Check for high error rates
            error_response = await self.supabase.rpc('get_error_rate', {
                'hours': 1
            }).execute()
            
            if error_response.data and error_response.data.get('error_rate', 0) > 0.05:
                issues.append({
                    "type": "high_error_rate",
                    "error_rate": error_response.data['error_rate'],
                    "description": f"Error rate is {error_response.data['error_rate']*100:.1f}%",
                    "severity": "critical"
                })
            
            # Check vector search performance
            vector_metrics = await self.supabase.table(self.metrics_table).select(
                'search_time_ms'
            ).eq('metric_type', 'vector_search').gte(
                'timestamp', (datetime.utcnow() - timedelta(hours=1)).isoformat()
            ).execute()
            
            if vector_metrics.data:
                avg_search_time = sum(m['search_time_ms'] for m in vector_metrics.data) / len(vector_metrics.data)
                
                if avg_search_time > 2000:  # 2 seconds
                    issues.append({
                        "type": "slow_vector_search",
                        "avg_time_ms": avg_search_time,
                        "description": f"Vector search averaging {avg_search_time:.0f}ms",
                        "severity": "medium"
                    })
            
            return issues
            
        except Exception as e:
            print(f"Error detecting performance issues: {str(e)}")
            return []

# PostgreSQL monitoring functions
monitoring_functions = """
-- Performance summary function
CREATE OR REPLACE FUNCTION get_performance_summary(
    start_time TIMESTAMPTZ,
    end_time TIMESTAMPTZ
)
RETURNS TABLE (
    metric_type TEXT,
    avg_execution_time FLOAT,
    max_execution_time FLOAT,
    total_queries BIGINT,
    avg_row_count FLOAT
)
LANGUAGE SQL
AS $$
  SELECT 
    pm.metric_type,
    AVG(pm.execution_time_ms) as avg_execution_time,
    MAX(pm.execution_time_ms) as max_execution_time,
    COUNT(*) as total_queries,
    AVG(pm.row_count) as avg_row_count
  FROM performance_metrics pm
  WHERE pm.timestamp BETWEEN $1 AND $2
    AND pm.metric_type = 'query_performance'
  GROUP BY pm.metric_type;
$$;

-- Error rate function
CREATE OR REPLACE FUNCTION get_error_rate(hours INT)
RETURNS TABLE (error_rate FLOAT, total_requests BIGINT, failed_requests BIGINT)
LANGUAGE SQL
AS $$
  WITH request_stats AS (
    SELECT 
      COUNT(*) as total,
      COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed
    FROM agent_executions
    WHERE created_at >= NOW() - INTERVAL '1 hour' * $1
  )
  SELECT 
    CASE 
      WHEN total > 0 THEN failed::FLOAT / total::FLOAT 
      ELSE 0 
    END as error_rate,
    total as total_requests,
    failed as failed_requests
  FROM request_stats;
$$;
"""

Deployment and Configuration

Production Deployment Guide

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# docker-compose.yml for local development
version: '3.8'
services:
  # Your application services would connect to hosted Supabase
  aimatrix-api:
    build: .
    environment:
      - SUPABASE_URL=${SUPABASE_URL}
      - SUPABASE_ANON_KEY=${SUPABASE_ANON_KEY}
      - SUPABASE_SERVICE_ROLE_KEY=${SUPABASE_SERVICE_ROLE_KEY}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    ports:
      - "8000:8000"
    depends_on:
      - redis
      
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

  # For self-hosted Supabase (optional)
  postgres:
    image: supabase/postgres:15.1.0.117
    environment:
      POSTGRES_PASSWORD: your-super-secret-and-long-postgres-password
      POSTGRES_DB: postgres
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

volumes:
  postgres_data:

Environment Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# .env file for production
SUPABASE_URL=https://your-project-id.supabase.co
SUPABASE_ANON_KEY=your-anon-key
SUPABASE_SERVICE_ROLE_KEY=your-service-role-key

# OpenAI for embeddings and AI features
OPENAI_API_KEY=your-openai-key

# Optional: Self-hosted Supabase
POSTGRES_URL=postgresql://postgres:password@localhost:5432/postgres
REDIS_URL=redis://localhost:6379

# Security
JWT_SECRET=your-jwt-secret
WEBHOOK_SECRET=your-webhook-secret

# Monitoring
MONITORING_ENABLED=true
LOG_LEVEL=info

Key Advantages of Supabase Integration

1. Unified Backend Stack

  • Single platform for database, authentication, real-time, storage, and Edge Functions
  • Reduces infrastructure complexity and maintenance overhead
  • Built-in scalability and performance optimization

2. Real-time by Default

  • Native real-time subscriptions for live data updates
  • Perfect for AI agent status updates, chat interfaces, and collaborative features
  • Automatic WebSocket management and connection handling

3. PostgreSQL Power

  • Full SQL database with ACID compliance
  • Advanced features like triggers, functions, and extensions
  • pgvector extension for native vector operations and semantic search

4. Security Built-in

  • Row Level Security (RLS) for fine-grained access control
  • JWT-based authentication with automatic token management
  • Built-in user management and social login support

5. Developer Experience

  • Auto-generated APIs from database schema
  • Real-time collaboration features
  • Comprehensive client libraries for all major platforms

6. Scalability & Performance

  • Global edge network for low latency
  • Automatic connection pooling and query optimization
  • Built-in caching and CDN integration

7. Open Source & Self-hostable

  • Full control over data and infrastructure when needed
  • Active community and transparent development
  • No vendor lock-in with self-hosting options

This comprehensive Supabase integration provides AIMatrix Core Platform with enterprise-grade backend infrastructure while maintaining the flexibility and developer experience needed for rapid AI application development.


Note

Production Ready: This integration is designed for production use with proper security, monitoring, and scalability considerations built-in.

Tip

Start with Hosted: Begin with Supabase’s hosted service for fastest development, then consider self-hosting for specific compliance or control requirements.


Supabase Integration - The complete backend foundation for modern AI applications