⚡ Updated .gitignore to include additional Python, IDE, OS, and application-specific artifacts for better project cleanliness.
- Added a startup event in `api_server.py` to verify database connectivity and provide warnings if no workflows are found. - Implemented new API endpoints for fetching service categories and searching workflows by category, improving the API's usability and functionality. - Removed deprecated scripts and files to streamline the codebase and focus on the new FastAPI system.
This commit is contained in:
283
workflow_db.py
283
workflow_db.py
@@ -16,7 +16,10 @@ from pathlib import Path
|
||||
class WorkflowDatabase:
|
||||
"""High-performance SQLite database for workflow metadata and search."""
|
||||
|
||||
def __init__(self, db_path: str = "workflows.db"):
|
||||
def __init__(self, db_path: str = None):
|
||||
# Use environment variable if no path provided
|
||||
if db_path is None:
|
||||
db_path = os.environ.get('WORKFLOW_DB_PATH', 'workflows.db')
|
||||
self.db_path = db_path
|
||||
self.workflows_dir = "workflows"
|
||||
self.init_database()
|
||||
@@ -106,6 +109,44 @@ class WorkflowDatabase:
|
||||
hash_md5.update(chunk)
|
||||
return hash_md5.hexdigest()
|
||||
|
||||
def format_workflow_name(self, filename: str) -> str:
|
||||
"""Convert filename to readable workflow name."""
|
||||
# Remove .json extension
|
||||
name = filename.replace('.json', '')
|
||||
|
||||
# Split by underscores
|
||||
parts = name.split('_')
|
||||
|
||||
# Skip the first part if it's just a number
|
||||
if len(parts) > 1 and parts[0].isdigit():
|
||||
parts = parts[1:]
|
||||
|
||||
# Convert parts to title case and join with spaces
|
||||
readable_parts = []
|
||||
for part in parts:
|
||||
# Special handling for common terms
|
||||
if part.lower() == 'http':
|
||||
readable_parts.append('HTTP')
|
||||
elif part.lower() == 'api':
|
||||
readable_parts.append('API')
|
||||
elif part.lower() == 'webhook':
|
||||
readable_parts.append('Webhook')
|
||||
elif part.lower() == 'automation':
|
||||
readable_parts.append('Automation')
|
||||
elif part.lower() == 'automate':
|
||||
readable_parts.append('Automate')
|
||||
elif part.lower() == 'scheduled':
|
||||
readable_parts.append('Scheduled')
|
||||
elif part.lower() == 'triggered':
|
||||
readable_parts.append('Triggered')
|
||||
elif part.lower() == 'manual':
|
||||
readable_parts.append('Manual')
|
||||
else:
|
||||
# Capitalize first letter
|
||||
readable_parts.append(part.capitalize())
|
||||
|
||||
return ' '.join(readable_parts)
|
||||
|
||||
def analyze_workflow_file(self, file_path: str) -> Optional[Dict[str, Any]]:
|
||||
"""Analyze a single workflow file and extract metadata."""
|
||||
try:
|
||||
@@ -122,7 +163,7 @@ class WorkflowDatabase:
|
||||
# Extract basic metadata
|
||||
workflow = {
|
||||
'filename': filename,
|
||||
'name': data.get('name', filename.replace('.json', '')),
|
||||
'name': self.format_workflow_name(filename),
|
||||
'workflow_id': data.get('id', ''),
|
||||
'active': data.get('active', False),
|
||||
'nodes': data.get('nodes', []),
|
||||
@@ -134,6 +175,12 @@ class WorkflowDatabase:
|
||||
'file_size': file_size
|
||||
}
|
||||
|
||||
# Use JSON name if available and meaningful, otherwise use formatted filename
|
||||
json_name = data.get('name', '').strip()
|
||||
if json_name and json_name != filename.replace('.json', '') and not json_name.startswith('My workflow'):
|
||||
workflow['name'] = json_name
|
||||
# If no meaningful JSON name, use formatted filename (already set above)
|
||||
|
||||
# Analyze nodes
|
||||
node_count = len(workflow['nodes'])
|
||||
workflow['node_count'] = node_count
|
||||
@@ -162,12 +209,127 @@ class WorkflowDatabase:
|
||||
trigger_type = 'Manual'
|
||||
integrations = set()
|
||||
|
||||
# Enhanced service mapping for better recognition
|
||||
service_mappings = {
|
||||
# Messaging & Communication
|
||||
'telegram': 'Telegram',
|
||||
'telegramTrigger': 'Telegram',
|
||||
'discord': 'Discord',
|
||||
'slack': 'Slack',
|
||||
'whatsapp': 'WhatsApp',
|
||||
'mattermost': 'Mattermost',
|
||||
'teams': 'Microsoft Teams',
|
||||
'rocketchat': 'Rocket.Chat',
|
||||
|
||||
# Email
|
||||
'gmail': 'Gmail',
|
||||
'mailjet': 'Mailjet',
|
||||
'emailreadimap': 'Email (IMAP)',
|
||||
'emailsendsmt': 'Email (SMTP)',
|
||||
'outlook': 'Outlook',
|
||||
|
||||
# Cloud Storage
|
||||
'googledrive': 'Google Drive',
|
||||
'googledocs': 'Google Docs',
|
||||
'googlesheets': 'Google Sheets',
|
||||
'dropbox': 'Dropbox',
|
||||
'onedrive': 'OneDrive',
|
||||
'box': 'Box',
|
||||
|
||||
# Databases
|
||||
'postgres': 'PostgreSQL',
|
||||
'mysql': 'MySQL',
|
||||
'mongodb': 'MongoDB',
|
||||
'redis': 'Redis',
|
||||
'airtable': 'Airtable',
|
||||
'notion': 'Notion',
|
||||
|
||||
# Project Management
|
||||
'jira': 'Jira',
|
||||
'github': 'GitHub',
|
||||
'gitlab': 'GitLab',
|
||||
'trello': 'Trello',
|
||||
'asana': 'Asana',
|
||||
'mondaycom': 'Monday.com',
|
||||
|
||||
# AI/ML Services
|
||||
'openai': 'OpenAI',
|
||||
'anthropic': 'Anthropic',
|
||||
'huggingface': 'Hugging Face',
|
||||
|
||||
# Social Media
|
||||
'linkedin': 'LinkedIn',
|
||||
'twitter': 'Twitter/X',
|
||||
'facebook': 'Facebook',
|
||||
'instagram': 'Instagram',
|
||||
|
||||
# E-commerce
|
||||
'shopify': 'Shopify',
|
||||
'stripe': 'Stripe',
|
||||
'paypal': 'PayPal',
|
||||
|
||||
# Analytics
|
||||
'googleanalytics': 'Google Analytics',
|
||||
'mixpanel': 'Mixpanel',
|
||||
|
||||
# Calendar & Tasks
|
||||
'googlecalendar': 'Google Calendar',
|
||||
'googletasks': 'Google Tasks',
|
||||
'cal': 'Cal.com',
|
||||
'calendly': 'Calendly',
|
||||
|
||||
# Forms & Surveys
|
||||
'typeform': 'Typeform',
|
||||
'googleforms': 'Google Forms',
|
||||
'form': 'Form Trigger',
|
||||
|
||||
# Development Tools
|
||||
'webhook': 'Webhook',
|
||||
'httpRequest': 'HTTP Request',
|
||||
'graphql': 'GraphQL',
|
||||
'sse': 'Server-Sent Events',
|
||||
|
||||
# Utility nodes (exclude from integrations)
|
||||
'set': None,
|
||||
'function': None,
|
||||
'code': None,
|
||||
'if': None,
|
||||
'switch': None,
|
||||
'merge': None,
|
||||
'split': None,
|
||||
'stickynote': None,
|
||||
'stickyNote': None,
|
||||
'wait': None,
|
||||
'schedule': None,
|
||||
'cron': None,
|
||||
'manual': None,
|
||||
'stopanderror': None,
|
||||
'noop': None,
|
||||
'noOp': None,
|
||||
'error': None,
|
||||
'limit': None,
|
||||
'aggregate': None,
|
||||
'summarize': None,
|
||||
'filter': None,
|
||||
'sort': None,
|
||||
'removeDuplicates': None,
|
||||
'dateTime': None,
|
||||
'extractFromFile': None,
|
||||
'convertToFile': None,
|
||||
'readBinaryFile': None,
|
||||
'readBinaryFiles': None,
|
||||
'executionData': None,
|
||||
'executeWorkflow': None,
|
||||
'executeCommand': None,
|
||||
'respondToWebhook': None,
|
||||
}
|
||||
|
||||
for node in nodes:
|
||||
node_type = node.get('type', '')
|
||||
node_name = node.get('name', '')
|
||||
node_name = node.get('name', '').lower()
|
||||
|
||||
# Determine trigger type
|
||||
if 'webhook' in node_type.lower() or 'webhook' in node_name.lower():
|
||||
if 'webhook' in node_type.lower() or 'webhook' in node_name:
|
||||
trigger_type = 'Webhook'
|
||||
elif 'cron' in node_type.lower() or 'schedule' in node_type.lower():
|
||||
trigger_type = 'Scheduled'
|
||||
@@ -175,12 +337,45 @@ class WorkflowDatabase:
|
||||
if 'manual' not in node_type.lower():
|
||||
trigger_type = 'Webhook'
|
||||
|
||||
# Extract integrations
|
||||
# Extract integrations with enhanced mapping
|
||||
service_name = None
|
||||
|
||||
# Handle n8n-nodes-base nodes
|
||||
if node_type.startswith('n8n-nodes-base.'):
|
||||
service = node_type.replace('n8n-nodes-base.', '')
|
||||
service = service.replace('Trigger', '').replace('trigger', '')
|
||||
if service and service not in ['set', 'function', 'if', 'switch', 'merge', 'stickyNote']:
|
||||
integrations.add(service.title())
|
||||
raw_service = node_type.replace('n8n-nodes-base.', '').lower()
|
||||
raw_service = raw_service.replace('trigger', '')
|
||||
service_name = service_mappings.get(raw_service, raw_service.title() if raw_service else None)
|
||||
|
||||
# Handle @n8n/ namespaced nodes
|
||||
elif node_type.startswith('@n8n/'):
|
||||
raw_service = node_type.split('.')[-1].lower() if '.' in node_type else node_type.lower()
|
||||
raw_service = raw_service.replace('trigger', '')
|
||||
service_name = service_mappings.get(raw_service, raw_service.title() if raw_service else None)
|
||||
|
||||
# Handle custom nodes
|
||||
elif '-' in node_type:
|
||||
# Try to extract service name from custom node names like "n8n-nodes-youtube-transcription-kasha.youtubeTranscripter"
|
||||
parts = node_type.lower().split('.')
|
||||
for part in parts:
|
||||
if 'youtube' in part:
|
||||
service_name = 'YouTube'
|
||||
break
|
||||
elif 'telegram' in part:
|
||||
service_name = 'Telegram'
|
||||
break
|
||||
elif 'discord' in part:
|
||||
service_name = 'Discord'
|
||||
break
|
||||
|
||||
# Also check node names for service hints
|
||||
for service_key, service_value in service_mappings.items():
|
||||
if service_key in node_name and service_value:
|
||||
service_name = service_value
|
||||
break
|
||||
|
||||
# Add to integrations if valid service found
|
||||
if service_name and service_name not in ['None', None]:
|
||||
integrations.add(service_name)
|
||||
|
||||
# Determine if complex based on node variety and count
|
||||
if len(nodes) > 10 and len(integrations) > 3:
|
||||
@@ -445,6 +640,76 @@ class WorkflowDatabase:
|
||||
'last_indexed': datetime.datetime.now().isoformat()
|
||||
}
|
||||
|
||||
def get_service_categories(self) -> Dict[str, List[str]]:
|
||||
"""Get service categories for enhanced filtering."""
|
||||
return {
|
||||
'messaging': ['Telegram', 'Discord', 'Slack', 'WhatsApp', 'Mattermost', 'Microsoft Teams', 'Rocket.Chat'],
|
||||
'email': ['Gmail', 'Mailjet', 'Email (IMAP)', 'Email (SMTP)', 'Outlook'],
|
||||
'cloud_storage': ['Google Drive', 'Google Docs', 'Google Sheets', 'Dropbox', 'OneDrive', 'Box'],
|
||||
'database': ['PostgreSQL', 'MySQL', 'MongoDB', 'Redis', 'Airtable', 'Notion'],
|
||||
'project_management': ['Jira', 'GitHub', 'GitLab', 'Trello', 'Asana', 'Monday.com'],
|
||||
'ai_ml': ['OpenAI', 'Anthropic', 'Hugging Face'],
|
||||
'social_media': ['LinkedIn', 'Twitter/X', 'Facebook', 'Instagram'],
|
||||
'ecommerce': ['Shopify', 'Stripe', 'PayPal'],
|
||||
'analytics': ['Google Analytics', 'Mixpanel'],
|
||||
'calendar_tasks': ['Google Calendar', 'Google Tasks', 'Cal.com', 'Calendly'],
|
||||
'forms': ['Typeform', 'Google Forms', 'Form Trigger'],
|
||||
'development': ['Webhook', 'HTTP Request', 'GraphQL', 'Server-Sent Events', 'YouTube']
|
||||
}
|
||||
|
||||
def search_by_category(self, category: str, limit: int = 50, offset: int = 0) -> Tuple[List[Dict], int]:
|
||||
"""Search workflows by service category."""
|
||||
categories = self.get_service_categories()
|
||||
if category not in categories:
|
||||
return [], 0
|
||||
|
||||
services = categories[category]
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Build OR conditions for all services in category
|
||||
service_conditions = []
|
||||
params = []
|
||||
for service in services:
|
||||
service_conditions.append("integrations LIKE ?")
|
||||
params.append(f'%"{service}"%')
|
||||
|
||||
where_clause = " OR ".join(service_conditions)
|
||||
|
||||
# Count total results
|
||||
count_query = f"SELECT COUNT(*) as total FROM workflows WHERE {where_clause}"
|
||||
cursor = conn.execute(count_query, params)
|
||||
total = cursor.fetchone()['total']
|
||||
|
||||
# Get paginated results
|
||||
query = f"""
|
||||
SELECT * FROM workflows
|
||||
WHERE {where_clause}
|
||||
ORDER BY analyzed_at DESC
|
||||
LIMIT {limit} OFFSET {offset}
|
||||
"""
|
||||
|
||||
cursor = conn.execute(query, params)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Convert to dictionaries and parse JSON fields
|
||||
results = []
|
||||
for row in rows:
|
||||
workflow = dict(row)
|
||||
workflow['integrations'] = json.loads(workflow['integrations'] or '[]')
|
||||
raw_tags = json.loads(workflow['tags'] or '[]')
|
||||
clean_tags = []
|
||||
for tag in raw_tags:
|
||||
if isinstance(tag, dict):
|
||||
clean_tags.append(tag.get('name', str(tag.get('id', 'tag'))))
|
||||
else:
|
||||
clean_tags.append(str(tag))
|
||||
workflow['tags'] = clean_tags
|
||||
results.append(workflow)
|
||||
|
||||
conn.close()
|
||||
return results, total
|
||||
|
||||
|
||||
def main():
|
||||
"""Command-line interface for workflow database."""
|
||||
|
||||
Reference in New Issue
Block a user