Add external data sources and indicators for backtesting

External Sources:
- New ExternalSources.py module for real-time API data fetching
- Configurable refresh intervals, JSONPath extraction, and auth headers
- External sources can be used in signals for live strategy conditions

External Indicators:
- New ExternalIndicators.py module for historical API data
- Supports backtesting by fetching and caching historical data points
- Timestamps are matched to candle times during backtest execution
- Fear & Greed index and similar APIs now work in backtests

Signal System Enhancements:
- Added signal Blockly blocks for use in strategy builder
- New signal_blocks.js for dynamic signal block generation
- Signals category added to strategy toolbox
- PythonGenerator updated to handle signal_* block types
- process_signal() method added to StrategyInstance classes

notify_user Block Enhancement:
- Now supports dynamic values (variables, indicators, expressions)
- Can display indicator values and calculations in notifications

Bug Fixes:
- Fixed Socket.IO parse error causing strategies not to load on refresh
- Enhanced sanitize_for_json to handle pandas, datetime, bytes types
- Fixed external indicator name matching (underscore/space conversion)
- Fixed PRAGMA query missing fetch_all=True in ExternalIndicators

Dependencies:
- Added jsonpath-ng for JSONPath extraction from API responses

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rob 2026-03-09 21:25:36 -03:00
parent a4422dc556
commit b4bf6c5c8e
26 changed files with 3289 additions and 93 deletions

View File

@ -14,6 +14,7 @@ email_validator~=2.2.0
aiohttp>=3.9.0
websockets>=12.0
requests>=2.31.0
jsonpath-ng>=1.6.0
# Bitcoin wallet and encryption
bit>=0.8.0
cryptography>=41.0.0

View File

@ -11,9 +11,12 @@ from Configuration import Configuration
from ExchangeInterface import ExchangeInterface
from indicators import Indicators
from Signals import Signals
from ExternalSources import ExternalSources
from ExternalIndicators import ExternalIndicatorsManager
from trade import Trades
from edm_client import EdmClient, EdmWebSocketClient
from wallet import WalletManager
from utils import sanitize_for_json
# Configure logging
logger = logging.getLogger(__name__)
@ -51,6 +54,12 @@ class BrighterTrades:
# Object that maintains signals.
self.signals = Signals(self.data)
# Object that maintains external data sources (custom signal types).
self.external_sources = ExternalSources(self.data)
# Object that maintains external indicators (API-based indicators with historical data).
self.external_indicators = ExternalIndicatorsManager(self.data)
# Object that maintains candlestick and price data.
self.candles = Candles(users=self.users, exchanges=self.exchanges, datacache=self.data,
config=self.config, edm_client=self.edm_client)
@ -69,7 +78,8 @@ class BrighterTrades:
# Object responsible for testing trade and strategies data.
self.backtester = Backtester(data_cache=self.data, strategies=self.strategies,
indicators=self.indicators, socketio=socketio,
edm_client=self.edm_client)
edm_client=self.edm_client,
external_indicators=self.external_indicators)
self.backtests = {} # In-memory storage for backtests (replace with DB access in production)
# Wallet manager for Bitcoin wallets and credits ledger
@ -423,7 +433,12 @@ class BrighterTrades:
self.candles.set_new_candle(cdata)
# i_updates = self.indicators.update_indicators()
state_changes = self.signals.process_all_signals(self.indicators)
state_changes = self.signals.process_all_signals(
self.indicators, self.external_sources, self.external_indicators
)
# Refresh external sources if needed
self.external_sources.refresh_all_sources()
# Build price updates dict: trades.update expects {symbol: price}
symbol = cdata.get('symbol', cdata.get('market', 'BTC/USDT'))
@ -1554,8 +1569,8 @@ class BrighterTrades:
logger.info(f"[SOCKET] Received message type: {msg_type}")
def standard_reply(reply_msg: str, reply_data: Any) -> dict:
""" Formats a standard reply message. """
return {"reply": reply_msg, "data": reply_data}
""" Formats a standard reply message with JSON-safe data. """
return {"reply": reply_msg, "data": sanitize_for_json(reply_data)}
# Use authenticated_user_id if provided (from secure socket mapping)
# Otherwise fall back to resolving from msg_data (for backwards compatibility)

718
src/ExternalIndicators.py Normal file
View File

@ -0,0 +1,718 @@
"""
External Indicators Manager - Handles custom API-based indicators with historical data.
Allows users to define external data sources that provide historical data,
enabling their use in backtesting alongside standard technical indicators.
"""
import json
import logging
import uuid
import datetime as dt
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from datetime import date, datetime
import requests
from jsonpath_ng import parse as jsonpath_parse
import pandas as pd
from DataCache_v3 import DataCache
logger = logging.getLogger(__name__)
@dataclass
class ExternalIndicatorConfig:
"""Configuration for an external API-based indicator."""
name: str
historical_url: str # URL template with {start_date}, {end_date}, {limit} placeholders
auth_header: str = "" # e.g., "X-CMC_PRO_API_KEY"
auth_key: str = "" # The actual API key
value_jsonpath: str = "$.data[*].value" # JSONPath to extract values array
timestamp_jsonpath: str = "$.data[*].timestamp" # JSONPath to extract timestamps
date_format: str = "ISO" # "ISO" (2024-01-15) or "UNIX" (timestamp)
date_param_format: str = "ISO" # Format for URL date parameters
creator_id: int = None
enabled: bool = True
tbl_key: str = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
'name': self.name,
'historical_url': self.historical_url,
'auth_header': self.auth_header,
'value_jsonpath': self.value_jsonpath,
'timestamp_jsonpath': self.timestamp_jsonpath,
'date_format': self.date_format,
'date_param_format': self.date_param_format,
'creator_id': self.creator_id,
'enabled': self.enabled,
'tbl_key': self.tbl_key,
# Note: auth_key intentionally excluded for security
}
class ExternalIndicator:
"""
Represents an external indicator instance with cached historical data.
"""
def __init__(self, config: ExternalIndicatorConfig, data_cache: DataCache):
self.config = config
self.data_cache = data_cache
self._cache: Dict[date, float] = {} # In-memory cache: date -> value
self._cache_loaded = False
def _load_cache_from_db(self) -> None:
"""Load cached historical data from database."""
if self._cache_loaded:
return
try:
rows = self.data_cache.db.execute_sql(
"""SELECT data_date, value FROM external_indicator_data
WHERE indicator_key = ? ORDER BY data_date""",
params=(self.config.tbl_key,),
fetch_all=True
)
if rows:
for row in rows:
data_date = datetime.strptime(row[0], '%Y-%m-%d').date()
self._cache[data_date] = float(row[1])
logger.debug(f"Loaded {len(self._cache)} cached values for {self.config.name}")
self._cache_loaded = True
except Exception as e:
logger.error(f"Error loading indicator cache: {e}")
self._cache_loaded = True
def _save_to_cache(self, data_date: date, value: float) -> None:
"""Save a single data point to database cache."""
try:
self.data_cache.db.execute_sql(
"""INSERT OR REPLACE INTO external_indicator_data
(indicator_key, data_date, value) VALUES (?, ?, ?)""",
params=(self.config.tbl_key, data_date.strftime('%Y-%m-%d'), value)
)
except Exception as e:
logger.error(f"Error saving to indicator cache: {e}")
def _format_date_param(self, d: date) -> str:
"""Format a date for URL parameter based on config."""
if self.config.date_param_format == "UNIX":
return str(int(datetime.combine(d, datetime.min.time()).timestamp()))
else: # ISO
return d.strftime('%Y-%m-%d')
def _parse_timestamp(self, ts_value: Any) -> Optional[date]:
"""Parse a timestamp from API response."""
try:
if self.config.date_format == "UNIX":
# Unix timestamp (seconds or milliseconds)
ts = float(ts_value)
if ts > 1e11: # Milliseconds
ts = ts / 1000
return datetime.fromtimestamp(ts).date()
else: # ISO format
if isinstance(ts_value, str):
# Try common formats
for fmt in ['%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%dT%H:%M:%S', '%Y-%m-%d']:
try:
return datetime.strptime(ts_value[:19], fmt[:len(ts_value)]).date()
except ValueError:
continue
# Last resort: just take first 10 chars as date
return datetime.strptime(ts_value[:10], '%Y-%m-%d').date()
return None
except Exception as e:
logger.debug(f"Error parsing timestamp {ts_value}: {e}")
return None
def fetch_historical(self, start_date: date, end_date: date) -> Dict[date, float]:
"""
Fetch historical data from API for the given date range.
:param start_date: Start of date range.
:param end_date: End of date range.
:return: Dict mapping dates to values.
"""
self._load_cache_from_db()
# Check what we already have cached
missing_start = start_date
missing_end = end_date
# Find gaps in cache
cached_dates = set(self._cache.keys())
requested_dates = set()
current = start_date
while current <= end_date:
requested_dates.add(current)
current += dt.timedelta(days=1)
missing_dates = requested_dates - cached_dates
if not missing_dates:
# All data is cached
return {d: self._cache[d] for d in requested_dates if d in self._cache}
# Need to fetch missing data
missing_start = min(missing_dates)
missing_end = max(missing_dates)
try:
# Build URL with date parameters
url = self.config.historical_url
url = url.replace('{start_date}', self._format_date_param(missing_start))
url = url.replace('{end_date}', self._format_date_param(missing_end))
# Calculate limit (days between dates)
limit = (missing_end - missing_start).days + 1
url = url.replace('{limit}', str(limit))
headers = {'Accept': 'application/json'}
if self.config.auth_header and self.config.auth_key:
headers[self.config.auth_header] = self.config.auth_key
response = requests.get(url, headers=headers, timeout=30)
if not response.ok:
logger.error(f"API error fetching {self.config.name}: {response.status_code}")
return {d: self._cache.get(d) for d in requested_dates if d in self._cache}
data = response.json()
logger.info(f"[ExternalIndicator] API Response keys: {list(data.keys()) if isinstance(data, dict) else 'not a dict'}")
# Extract values and timestamps using JSONPath
value_expr = jsonpath_parse(self.config.value_jsonpath)
timestamp_expr = jsonpath_parse(self.config.timestamp_jsonpath)
values = [match.value for match in value_expr.find(data)]
timestamps = [match.value for match in timestamp_expr.find(data)]
logger.info(f"[ExternalIndicator] Extracted {len(values)} values and {len(timestamps)} timestamps")
if values:
logger.info(f"[ExternalIndicator] Sample values: {values[:3]}")
if timestamps:
logger.info(f"[ExternalIndicator] Sample timestamps: {timestamps[:3]}")
if len(values) != len(timestamps):
logger.error(f"Mismatch: {len(values)} values, {len(timestamps)} timestamps")
return {d: self._cache.get(d) for d in requested_dates if d in self._cache}
# Parse and cache the data
for ts, val in zip(timestamps, values):
parsed_date = self._parse_timestamp(ts)
if parsed_date:
try:
numeric_val = float(val)
self._cache[parsed_date] = numeric_val
self._save_to_cache(parsed_date, numeric_val)
except (TypeError, ValueError):
logger.debug(f"Non-numeric value for {parsed_date}: {val}")
logger.info(f"Fetched {len(values)} data points for {self.config.name}")
except requests.exceptions.RequestException as e:
logger.error(f"Request error fetching {self.config.name}: {e}")
except Exception as e:
logger.error(f"Error fetching {self.config.name}: {e}", exc_info=True)
# Return requested data from cache
return {d: self._cache.get(d) for d in requested_dates if d in self._cache}
def get_value_for_timestamp(self, ts: datetime) -> Optional[float]:
"""
Get the indicator value for a specific timestamp.
Uses the value for that day (handles granularity mismatch).
:param ts: The timestamp to look up.
:return: The indicator value or None if not available.
"""
self._load_cache_from_db()
target_date = ts.date() if isinstance(ts, datetime) else ts
return self._cache.get(target_date)
def calculate(self, candles: pd.DataFrame) -> pd.Series:
"""
Calculate indicator values for a DataFrame of candles.
:param candles: DataFrame with 'time', 'timestamp', or 'date' column.
:return: Series of indicator values aligned to candles.
"""
if candles.empty:
return pd.Series(dtype=float)
# Determine date range from candles
# Check various column names used by different data sources
if 'time' in candles.columns:
# 'time' column is typically Unix timestamp in milliseconds
time_col = candles['time']
if time_col.dtype in ['int64', 'float64'] and time_col.iloc[0] > 1e9:
# Convert from milliseconds or seconds
if time_col.iloc[0] > 1e12:
dates = pd.to_datetime(time_col, unit='ms')
else:
dates = pd.to_datetime(time_col, unit='s')
else:
dates = pd.to_datetime(time_col)
elif 'timestamp' in candles.columns:
dates = pd.to_datetime(candles['timestamp'])
elif 'date' in candles.columns:
dates = pd.to_datetime(candles['date'])
elif 'datetime' in candles.columns:
dates = pd.to_datetime(candles['datetime'])
else:
# Try index
dates = pd.to_datetime(candles.index)
start_date = dates.min().date()
end_date = dates.max().date()
logger.info(f"[ExternalIndicator] Fetching data for {self.config.name} from {start_date} to {end_date}")
# Fetch historical data for this range
historical_data = self.fetch_historical(start_date, end_date)
logger.info(f"[ExternalIndicator] Got {len(historical_data)} data points from API")
# Map each candle to its date's value
values = []
for d in dates:
candle_date = d.date()
val = historical_data.get(candle_date)
values.append(val)
# Log some stats
non_null = sum(1 for v in values if v is not None)
logger.info(f"[ExternalIndicator] Mapped {non_null}/{len(values)} candles to values")
return pd.Series(values, index=candles.index, name=self.config.name)
class ExternalIndicatorsManager:
"""Manages external API-based indicators."""
def __init__(self, data_cache: DataCache):
self.data_cache = data_cache
self._ensure_tables_exist()
# In-memory storage of indicator configs
self.configs: Dict[str, ExternalIndicatorConfig] = {}
# Cached ExternalIndicator instances
self._indicators: Dict[str, ExternalIndicator] = {}
# Load existing configs from database
self._load_configs_from_db()
def _ensure_tables_exist(self) -> None:
"""Create necessary database tables."""
try:
# Table for indicator configurations
if not self.data_cache.db.table_exists('external_indicators'):
self.data_cache.db.execute_sql("""
CREATE TABLE IF NOT EXISTS external_indicators (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tbl_key TEXT UNIQUE NOT NULL,
creator_id INTEGER,
name TEXT NOT NULL,
historical_url TEXT NOT NULL,
auth_header TEXT,
auth_key TEXT,
value_jsonpath TEXT DEFAULT '$.data[*].value',
timestamp_jsonpath TEXT DEFAULT '$.data[*].timestamp',
date_format TEXT DEFAULT 'ISO',
date_param_format TEXT DEFAULT 'ISO',
enabled INTEGER DEFAULT 1
)
""", params=[])
logger.info("Created external_indicators table")
# Table for cached historical data
if not self.data_cache.db.table_exists('external_indicator_data'):
self.data_cache.db.execute_sql("""
CREATE TABLE IF NOT EXISTS external_indicator_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
indicator_key TEXT NOT NULL,
data_date TEXT NOT NULL,
value REAL NOT NULL,
UNIQUE(indicator_key, data_date)
)
""", params=[])
self.data_cache.db.execute_sql(
"CREATE INDEX IF NOT EXISTS idx_ext_ind_data ON external_indicator_data(indicator_key, data_date)",
params=[]
)
logger.info("Created external_indicator_data table")
except Exception as e:
logger.error(f"Error creating external indicator tables: {e}", exc_info=True)
def _load_configs_from_db(self) -> None:
"""Load all indicator configurations from database."""
try:
rows = self.data_cache.db.execute_sql(
"SELECT * FROM external_indicators",
params=[],
fetch_all=True
)
if rows:
# Get column names
column_info = self.data_cache.db.execute_sql(
"PRAGMA table_info(external_indicators)",
params=[],
fetch_all=True
)
columns = [row[1] for row in (column_info or [])]
for row in rows:
row_dict = dict(zip(columns, row))
config = ExternalIndicatorConfig(
name=row_dict.get('name', ''),
historical_url=row_dict.get('historical_url', ''),
auth_header=row_dict.get('auth_header', ''),
auth_key=row_dict.get('auth_key', ''),
value_jsonpath=row_dict.get('value_jsonpath', '$.data[*].value'),
timestamp_jsonpath=row_dict.get('timestamp_jsonpath', '$.data[*].timestamp'),
date_format=row_dict.get('date_format', 'ISO'),
date_param_format=row_dict.get('date_param_format', 'ISO'),
creator_id=row_dict.get('creator_id'),
enabled=bool(row_dict.get('enabled', 1)),
tbl_key=row_dict.get('tbl_key')
)
if config.tbl_key:
self.configs[config.tbl_key] = config
logger.info(f"Loaded {len(self.configs)} external indicator configs")
except Exception as e:
logger.error(f"Error loading external indicator configs: {e}", exc_info=True)
def _save_config_to_db(self, config: ExternalIndicatorConfig) -> None:
"""Save an indicator configuration to database."""
existing = self.data_cache.db.execute_sql(
"SELECT id FROM external_indicators WHERE tbl_key = ?",
params=(config.tbl_key,),
fetch_one=True
)
if existing:
self.data_cache.db.execute_sql("""
UPDATE external_indicators SET
creator_id = ?, name = ?, historical_url = ?, auth_header = ?,
auth_key = ?, value_jsonpath = ?, timestamp_jsonpath = ?,
date_format = ?, date_param_format = ?, enabled = ?
WHERE tbl_key = ?
""", params=(
config.creator_id, config.name, config.historical_url,
config.auth_header, config.auth_key, config.value_jsonpath,
config.timestamp_jsonpath, config.date_format, config.date_param_format,
1 if config.enabled else 0, config.tbl_key
))
else:
self.data_cache.db.execute_sql("""
INSERT INTO external_indicators (
tbl_key, creator_id, name, historical_url, auth_header, auth_key,
value_jsonpath, timestamp_jsonpath, date_format, date_param_format, enabled
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", params=(
config.tbl_key, config.creator_id, config.name, config.historical_url,
config.auth_header, config.auth_key, config.value_jsonpath,
config.timestamp_jsonpath, config.date_format, config.date_param_format,
1 if config.enabled else 0
))
def test_indicator(self, historical_url: str, auth_header: str, auth_key: str,
value_jsonpath: str, timestamp_jsonpath: str,
date_format: str, date_param_format: str) -> Dict[str, Any]:
"""
Test an external indicator configuration by fetching sample data.
:return: Dict with success status, sample values, and any errors.
"""
try:
# Build URL with sample date range (last 7 days)
end_date = date.today()
start_date = end_date - dt.timedelta(days=7)
url = historical_url
# Format dates based on config
if date_param_format == "UNIX":
start_str = str(int(datetime.combine(start_date, datetime.min.time()).timestamp()))
end_str = str(int(datetime.combine(end_date, datetime.min.time()).timestamp()))
else:
start_str = start_date.strftime('%Y-%m-%d')
end_str = end_date.strftime('%Y-%m-%d')
url = url.replace('{start_date}', start_str)
url = url.replace('{end_date}', end_str)
url = url.replace('{limit}', '7')
headers = {'Accept': 'application/json'}
if auth_header and auth_key:
headers[auth_header] = auth_key
response = requests.get(url, headers=headers, timeout=15)
# Try to parse JSON for error messages
try:
data = response.json()
except json.JSONDecodeError:
return {
'success': False,
'error': f'Invalid JSON response (HTTP {response.status_code})'
}
if not response.ok:
# Try to extract API error message
error_msg = self._extract_api_error(data)
if error_msg:
return {'success': False, 'error': f'API Error: {error_msg}'}
return {'success': False, 'error': f'HTTP {response.status_code}: {response.reason}'}
# Extract values using JSONPath
value_expr = jsonpath_parse(value_jsonpath)
timestamp_expr = jsonpath_parse(timestamp_jsonpath)
values = [match.value for match in value_expr.find(data)]
timestamps = [match.value for match in timestamp_expr.find(data)]
if not values:
return {
'success': False,
'error': f"Value JSONPath '{value_jsonpath}' found no matches",
'response_preview': str(data)[:300]
}
if not timestamps:
return {
'success': False,
'error': f"Timestamp JSONPath '{timestamp_jsonpath}' found no matches",
'response_preview': str(data)[:300]
}
if len(values) != len(timestamps):
return {
'success': False,
'error': f"Mismatch: {len(values)} values but {len(timestamps)} timestamps"
}
# Verify values are numeric
sample_values = []
for val in values[:5]:
try:
sample_values.append(float(val))
except (TypeError, ValueError):
return {
'success': False,
'error': f"Non-numeric value found: {val}"
}
# Verify timestamps can be parsed
parsed_dates = []
for ts in timestamps[:5]:
parsed = self._parse_timestamp_static(ts, date_format)
if parsed:
parsed_dates.append(parsed.strftime('%Y-%m-%d'))
else:
return {
'success': False,
'error': f"Could not parse timestamp: {ts}"
}
return {
'success': True,
'data_points': len(values),
'sample_values': sample_values,
'sample_dates': parsed_dates,
'message': f"Found {len(values)} data points"
}
except requests.exceptions.Timeout:
return {'success': False, 'error': 'Request timed out (15s)'}
except requests.exceptions.ConnectionError:
return {'success': False, 'error': 'Connection failed - check URL'}
except Exception as e:
logger.error(f"Error testing external indicator: {e}", exc_info=True)
return {'success': False, 'error': str(e)}
def _extract_api_error(self, data: Any) -> Optional[str]:
"""Extract error message from common API error formats."""
if not isinstance(data, dict):
return None
# CoinMarketCap format
if 'status' in data and isinstance(data['status'], dict):
return data['status'].get('error_message')
# Generic formats
for key in ['error', 'message', 'error_message', 'msg']:
if key in data:
return str(data[key])
return None
@staticmethod
def _parse_timestamp_static(ts_value: Any, date_format: str) -> Optional[date]:
"""Static method to parse timestamp for testing."""
try:
if date_format == "UNIX":
ts = float(ts_value)
if ts > 1e11:
ts = ts / 1000
return datetime.fromtimestamp(ts).date()
else:
if isinstance(ts_value, str):
for fmt in ['%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%dT%H:%M:%S', '%Y-%m-%d']:
try:
return datetime.strptime(ts_value[:19], fmt[:len(ts_value)]).date()
except ValueError:
continue
return datetime.strptime(ts_value[:10], '%Y-%m-%d').date()
return None
except Exception:
return None
def create_indicator(self, name: str, historical_url: str, auth_header: str,
auth_key: str, value_jsonpath: str, timestamp_jsonpath: str,
date_format: str, date_param_format: str,
creator_id: int) -> Dict[str, Any]:
"""
Create a new external indicator after validating the configuration.
:return: Dict with success status and indicator data.
"""
# First test the configuration
test_result = self.test_indicator(
historical_url, auth_header, auth_key,
value_jsonpath, timestamp_jsonpath,
date_format, date_param_format
)
if not test_result['success']:
return {
'success': False,
'message': f"Validation failed: {test_result['error']}"
}
try:
tbl_key = str(uuid.uuid4())
config = ExternalIndicatorConfig(
name=name,
historical_url=historical_url,
auth_header=auth_header,
auth_key=auth_key,
value_jsonpath=value_jsonpath,
timestamp_jsonpath=timestamp_jsonpath,
date_format=date_format,
date_param_format=date_param_format,
creator_id=creator_id,
enabled=True,
tbl_key=tbl_key
)
# Save to database
self._save_config_to_db(config)
# Add to memory
self.configs[tbl_key] = config
logger.info(f"Created external indicator: {name}")
return {
'success': True,
'indicator': config.to_dict(),
'test_data': test_result,
'message': f"Created indicator '{name}' with {test_result['data_points']} data points available"
}
except Exception as e:
logger.error(f"Error creating external indicator: {e}", exc_info=True)
return {'success': False, 'message': str(e)}
def get_indicator(self, tbl_key: str) -> Optional[ExternalIndicator]:
"""Get an ExternalIndicator instance by tbl_key."""
if tbl_key not in self.configs:
return None
if tbl_key not in self._indicators:
self._indicators[tbl_key] = ExternalIndicator(
self.configs[tbl_key], self.data_cache
)
return self._indicators[tbl_key]
def get_indicator_by_name(self, name: str, user_id: int) -> Optional[ExternalIndicator]:
"""Get an ExternalIndicator by name for a specific user."""
for config in self.configs.values():
if config.name == name and config.creator_id == user_id:
return self.get_indicator(config.tbl_key)
return None
def get_indicators_for_user(self, user_id: int) -> List[Dict[str, Any]]:
"""Get all indicator configs for a user."""
return [
config.to_dict()
for config in self.configs.values()
if config.creator_id == user_id
]
def get_all_indicators(self) -> List[Dict[str, Any]]:
"""Get all indicator configs."""
return [config.to_dict() for config in self.configs.values()]
def delete_indicator(self, tbl_key: str, user_id: int) -> Dict[str, Any]:
"""Delete an external indicator."""
config = self.configs.get(tbl_key)
if not config:
return {'success': False, 'message': 'Indicator not found'}
if config.creator_id != user_id:
return {'success': False, 'message': 'Not authorized to delete this indicator'}
try:
# Delete cached data
self.data_cache.db.execute_sql(
"DELETE FROM external_indicator_data WHERE indicator_key = ?",
params=(tbl_key,)
)
# Delete config
self.data_cache.db.execute_sql(
"DELETE FROM external_indicators WHERE tbl_key = ?",
params=(tbl_key,)
)
# Remove from memory
del self.configs[tbl_key]
if tbl_key in self._indicators:
del self._indicators[tbl_key]
logger.info(f"Deleted external indicator: {config.name}")
return {'success': True, 'message': f"Deleted indicator '{config.name}'"}
except Exception as e:
logger.error(f"Error deleting external indicator: {e}", exc_info=True)
return {'success': False, 'message': str(e)}
def get_indicators_as_indicator_list(self, user_id: int) -> Dict[str, Dict[str, Any]]:
"""
Get external indicators formatted for the indicator dropdown.
:param user_id: The user ID.
:return: Dict mapping indicator names to indicator-like data.
"""
result = {}
for config in self.configs.values():
if config.creator_id == user_id and config.enabled:
result[config.name] = {
'type': 'EXTERNAL_HISTORICAL',
'source_type': 'external_indicator',
'tbl_key': config.tbl_key,
'value': None # Will be filled during calculation
}
return result

468
src/ExternalSources.py Normal file
View File

@ -0,0 +1,468 @@
"""
External Sources Manager - Handles custom API-based signal sources.
Allows users to define external data sources (like CoinMarketCap Fear & Greed Index)
that can be used as signal sources alongside technical indicators.
"""
import json
import logging
import uuid
import datetime as dt
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional
import requests
from jsonpath_ng import parse as jsonpath_parse
from DataCache_v3 import DataCache
logger = logging.getLogger(__name__)
@dataclass
class ExternalSource:
"""Represents an external API data source."""
name: str
url: str
auth_header: str = "" # e.g., "X-CMC_PRO_API_KEY"
auth_key: str = "" # The actual API key (stored encrypted ideally)
json_path: str = "$.data[0].value" # JSONPath to extract value
refresh_interval: int = 300 # Seconds between fetches (default 5 min)
last_value: float = None
last_fetch_time: float = None # Unix timestamp
creator_id: int = None
enabled: bool = True
tbl_key: str = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
'name': self.name,
'url': self.url,
'auth_header': self.auth_header,
'json_path': self.json_path,
'refresh_interval': self.refresh_interval,
'last_value': self.last_value,
'last_fetch_time': self.last_fetch_time,
'creator_id': self.creator_id,
'enabled': self.enabled,
'tbl_key': self.tbl_key,
# Note: auth_key intentionally excluded for security
}
class ExternalSources:
"""Manages external API data sources for signals."""
def __init__(self, data_cache: DataCache):
"""
Initialize the ExternalSources manager.
:param data_cache: DataCache instance for database operations.
"""
self.data_cache = data_cache
self._ensure_table_exists()
# Create cache for external sources
self.data_cache.create_cache(
name='external_sources',
cache_type='table',
size_limit=100,
eviction_policy='deny',
default_expiration=dt.timedelta(hours=24),
columns=[
"creator_id",
"name",
"url",
"auth_header",
"auth_key",
"json_path",
"refresh_interval",
"last_value",
"last_fetch_time",
"enabled",
"tbl_key"
]
)
# In-memory cache of ExternalSource objects
self.sources: Dict[str, ExternalSource] = {}
# Load existing sources
self._load_sources_from_db()
def _ensure_table_exists(self) -> None:
"""Create the external_sources table if it doesn't exist."""
try:
if not self.data_cache.db.table_exists('external_sources'):
create_sql = """
CREATE TABLE IF NOT EXISTS external_sources (
id INTEGER PRIMARY KEY AUTOINCREMENT,
creator_id INTEGER,
name TEXT NOT NULL,
url TEXT NOT NULL,
auth_header TEXT,
auth_key TEXT,
json_path TEXT DEFAULT '$.data[0].value',
refresh_interval INTEGER DEFAULT 300,
last_value REAL,
last_fetch_time REAL,
enabled INTEGER DEFAULT 1,
tbl_key TEXT UNIQUE
)
"""
self.data_cache.db.execute_sql(create_sql, params=[])
logger.info("Created external_sources table in database")
except Exception as e:
logger.error(f"Error creating external_sources table: {e}", exc_info=True)
def _load_sources_from_db(self) -> None:
"""Load all external sources from database into memory."""
try:
sources_df = self.data_cache.get_all_rows_from_datacache(cache_name='external_sources')
if sources_df is not None and not sources_df.empty:
for _, row in sources_df.iterrows():
source = ExternalSource(
name=row.get('name', ''),
url=row.get('url', ''),
auth_header=row.get('auth_header', ''),
auth_key=row.get('auth_key', ''),
json_path=row.get('json_path', '$.data[0].value'),
refresh_interval=int(row.get('refresh_interval', 300)),
last_value=row.get('last_value'),
last_fetch_time=row.get('last_fetch_time'),
creator_id=row.get('creator_id'),
enabled=bool(row.get('enabled', True)),
tbl_key=row.get('tbl_key')
)
if source.tbl_key:
self.sources[source.tbl_key] = source
logger.info(f"Loaded {len(self.sources)} external sources from database")
except Exception as e:
logger.error(f"Error loading external sources: {e}", exc_info=True)
def create_source(self, name: str, url: str, auth_header: str, auth_key: str,
json_path: str, refresh_interval: int, creator_id: int) -> Dict[str, Any]:
"""
Create a new external source.
:param name: Display name for the source.
:param url: API endpoint URL.
:param auth_header: Header name for authentication (e.g., "X-CMC_PRO_API_KEY").
:param auth_key: API key value.
:param json_path: JSONPath expression to extract value from response.
:param refresh_interval: Seconds between data fetches.
:param creator_id: User ID of the creator.
:return: Dict with success status and source data.
"""
try:
tbl_key = str(uuid.uuid4())
source = ExternalSource(
name=name,
url=url,
auth_header=auth_header,
auth_key=auth_key,
json_path=json_path,
refresh_interval=refresh_interval,
creator_id=creator_id,
enabled=True,
tbl_key=tbl_key
)
# Test the source by fetching data
fetch_result = self._fetch_value(source)
if not fetch_result['success']:
return {
'success': False,
'message': f"Failed to fetch data: {fetch_result['error']}"
}
source.last_value = fetch_result['value']
source.last_fetch_time = dt.datetime.now().timestamp()
# Save to database
self._save_source_to_db(source)
# Add to memory cache
self.sources[tbl_key] = source
logger.info(f"Created external source: {name} (value: {source.last_value})")
return {
'success': True,
'source': source.to_dict(),
'message': f"Created source '{name}' with initial value: {source.last_value}"
}
except Exception as e:
logger.error(f"Error creating external source: {e}", exc_info=True)
return {'success': False, 'message': str(e)}
def _save_source_to_db(self, source: ExternalSource) -> None:
"""Save an external source to the database (insert or update)."""
# Check if source already exists
existing = self.data_cache.db.execute_sql(
"SELECT id FROM external_sources WHERE tbl_key = ?",
params=(source.tbl_key,),
fetch_one=True
)
if existing:
# Update existing record
update_sql = """
UPDATE external_sources SET
creator_id = ?, name = ?, url = ?, auth_header = ?, auth_key = ?,
json_path = ?, refresh_interval = ?, last_value = ?, last_fetch_time = ?,
enabled = ?
WHERE tbl_key = ?
"""
self.data_cache.db.execute_sql(update_sql, params=(
source.creator_id,
source.name,
source.url,
source.auth_header,
source.auth_key,
source.json_path,
source.refresh_interval,
source.last_value,
source.last_fetch_time,
1 if source.enabled else 0,
source.tbl_key
))
else:
# Insert new record
columns = (
'creator_id', 'name', 'url', 'auth_header', 'auth_key',
'json_path', 'refresh_interval', 'last_value', 'last_fetch_time',
'enabled', 'tbl_key'
)
values = (
source.creator_id,
source.name,
source.url,
source.auth_header,
source.auth_key,
source.json_path,
source.refresh_interval,
source.last_value,
source.last_fetch_time,
1 if source.enabled else 0,
source.tbl_key
)
self.data_cache.insert_row_into_datacache(
cache_name='external_sources',
columns=columns,
values=values
)
def _fetch_value(self, source: ExternalSource) -> Dict[str, Any]:
"""
Fetch the current value from an external API.
:param source: The ExternalSource to fetch from.
:return: Dict with success status, value, and optional error.
"""
try:
headers = {'Accept': 'application/json'}
# Add authentication header if provided
if source.auth_header and source.auth_key:
headers[source.auth_header] = source.auth_key
response = requests.get(source.url, headers=headers, timeout=10)
# Parse JSON first to check for API error messages
try:
data = response.json()
except json.JSONDecodeError:
return {'success': False, 'error': f'Invalid JSON response (HTTP {response.status_code})'}
# Check for API-level errors in response
if not response.ok:
# Try to extract error message from common API error formats
error_msg = None
if isinstance(data, dict):
# CoinMarketCap format: {"status": {"error_message": "..."}}
if 'status' in data and isinstance(data['status'], dict):
error_msg = data['status'].get('error_message')
# Generic formats
elif 'error' in data:
error_msg = data.get('error')
elif 'message' in data:
error_msg = data.get('message')
elif 'error_message' in data:
error_msg = data.get('error_message')
if error_msg:
return {'success': False, 'error': f'API Error: {error_msg}'}
else:
return {'success': False, 'error': f'HTTP {response.status_code}: {response.reason}'}
# Extract value using JSONPath
jsonpath_expr = jsonpath_parse(source.json_path)
matches = jsonpath_expr.find(data)
if not matches:
# Show a snippet of the response to help debug
response_preview = str(data)[:200]
return {
'success': False,
'error': f"JSONPath '{source.json_path}' found no matches. Response: {response_preview}..."
}
value = matches[0].value
# Try to convert to float
try:
value = float(value)
except (TypeError, ValueError):
pass # Keep original value if not numeric
return {'success': True, 'value': value}
except requests.exceptions.Timeout:
return {'success': False, 'error': 'Request timed out (10s)'}
except requests.exceptions.ConnectionError:
return {'success': False, 'error': 'Connection failed - check URL'}
except requests.exceptions.RequestException as e:
return {'success': False, 'error': f'Request failed: {str(e)}'}
except Exception as e:
logger.error(f"Error fetching external source: {e}", exc_info=True)
return {'success': False, 'error': f'Error: {str(e)}'}
def refresh_source(self, tbl_key: str) -> Dict[str, Any]:
"""
Refresh the value for a specific source.
:param tbl_key: The unique key of the source to refresh.
:return: Dict with success status and new value.
"""
source = self.sources.get(tbl_key)
if not source:
return {'success': False, 'error': 'Source not found'}
result = self._fetch_value(source)
if result['success']:
source.last_value = result['value']
source.last_fetch_time = dt.datetime.now().timestamp()
self._save_source_to_db(source)
return {'success': True, 'value': result['value']}
return result
def refresh_all_sources(self) -> Dict[str, Any]:
"""
Refresh all sources that need updating based on their refresh interval.
:return: Dict with results for each source.
"""
results = {}
current_time = dt.datetime.now().timestamp()
for tbl_key, source in self.sources.items():
if not source.enabled:
continue
# Check if refresh is needed
if source.last_fetch_time:
time_since_fetch = current_time - source.last_fetch_time
if time_since_fetch < source.refresh_interval:
continue
result = self.refresh_source(tbl_key)
results[source.name] = result
return results
def get_source(self, tbl_key: str) -> Optional[ExternalSource]:
"""Get a source by its tbl_key."""
return self.sources.get(tbl_key)
def get_source_by_name(self, name: str) -> Optional[ExternalSource]:
"""Get a source by its name."""
for source in self.sources.values():
if source.name == name:
return source
return None
def get_sources_for_user(self, user_id: int) -> List[Dict[str, Any]]:
"""
Get all sources created by a specific user.
:param user_id: The user ID.
:return: List of source dictionaries.
"""
return [
source.to_dict()
for source in self.sources.values()
if source.creator_id == user_id
]
def get_all_sources(self) -> List[Dict[str, Any]]:
"""Get all sources as dictionaries."""
return [source.to_dict() for source in self.sources.values()]
def delete_source(self, tbl_key: str, user_id: int) -> Dict[str, Any]:
"""
Delete an external source.
:param tbl_key: The unique key of the source.
:param user_id: The user requesting deletion (must be creator).
:return: Dict with success status.
"""
source = self.sources.get(tbl_key)
if not source:
return {'success': False, 'message': 'Source not found'}
if source.creator_id != user_id:
return {'success': False, 'message': 'Not authorized to delete this source'}
try:
# Remove from database
self.data_cache.db.execute_sql(
"DELETE FROM external_sources WHERE tbl_key = ?",
params=(tbl_key,)
)
# Remove from memory
del self.sources[tbl_key]
logger.info(f"Deleted external source: {source.name}")
return {'success': True, 'message': f"Deleted source '{source.name}'"}
except Exception as e:
logger.error(f"Error deleting external source: {e}", exc_info=True)
return {'success': False, 'message': str(e)}
def get_source_value(self, name: str) -> Optional[float]:
"""
Get the current value of a source by name.
Used by the signals system.
:param name: The source name.
:return: The current value or None.
"""
source = self.get_source_by_name(name)
if source:
return source.last_value
return None
def get_sources_as_indicators(self, user_id: int) -> Dict[str, Dict[str, Any]]:
"""
Get sources formatted like indicators for use in signals.
:param user_id: The user ID to get sources for.
:return: Dict mapping source names to indicator-like data.
"""
result = {}
for source in self.sources.values():
if source.creator_id == user_id and source.enabled:
result[source.name] = {
'type': 'EXTERNAL',
'value': source.last_value,
'source_type': 'external',
'tbl_key': source.tbl_key,
'last_fetch_time': source.last_fetch_time
}
return result

View File

@ -143,6 +143,8 @@ class PythonGenerator:
# Route indicator_* types to the generic indicator handler
if node_type.startswith('indicator_'):
handler_method = self.handle_indicator
elif node_type.startswith('signal_'):
handler_method = self.handle_signal
else:
handler_method = getattr(self, f'handle_{node_type}', self.handle_default)
handler_code = handler_method(node, indent_level)
@ -191,6 +193,8 @@ class PythonGenerator:
# Route indicator_* types to the generic indicator handler
if node_type.startswith('indicator_'):
handler_method = self.handle_indicator
elif node_type.startswith('signal_'):
handler_method = self.handle_signal
else:
handler_method = getattr(self, f'handle_{node_type}', self.handle_default)
condition_code = handler_method(condition_node, indent_level=indent_level)
@ -239,6 +243,54 @@ class PythonGenerator:
logger.debug(f"Generated indicator condition: {expr}")
return expr
# ==============================
# Signals Handlers
# ==============================
def handle_signal(self, node: Dict[str, Any], indent_level: int) -> str:
"""
Handles signal nodes by generating a function call to check signal state.
Supports both:
- Generic 'signal' type with NAME field
- Custom 'signal_<name>' types where name is extracted from the type
:param node: The signal node.
:param indent_level: Current indentation level.
:return: A string representing the signal check.
"""
fields = node.get('fields', {})
node_type = node.get('type', '')
# Try to get signal name from fields first, then from type
signal_name = fields.get('NAME')
if not signal_name and node_type.startswith('signal_'):
# Extract name from type, e.g., 'signal_my_signal' -> 'my_signal'
# Replace underscores back to spaces for display names
signal_name = node_type[len('signal_'):].replace('_', ' ')
output_field = fields.get('OUTPUT', 'triggered')
if not signal_name:
logger.error(f"signal node missing name. type={node_type}, fields={fields}")
return 'False'
# Collect the signal information
if not hasattr(self, 'signals_used'):
self.signals_used = []
self.signals_used.append({
'name': signal_name,
'output': output_field
})
# Generate code that calls process_signal
if output_field == 'triggered':
expr = f"process_signal('{signal_name}', 'triggered')"
else:
expr = f"process_signal('{signal_name}', 'value')"
logger.debug(f"Generated signal condition: {expr}")
return expr
# ==============================
# Balances Handlers
# ==============================
@ -1192,14 +1244,43 @@ class PythonGenerator:
def handle_notify_user(self, node: Dict[str, Any], indent_level: int) -> List[str]:
"""
Handles the 'notify_user' node type.
Supports both static text messages and dynamic values (variables, calculations, etc.).
:param node: The notify_user node.
:param indent_level: Current indentation level.
:return: List of generated code lines.
"""
indent = ' ' * indent_level
message = node.get('message', 'No message provided.').replace("'", "\\'")
return [f"{indent}notify_user('{message}')"]
message = node.get('message', '').replace("'", "\\'")
value_node = node.get('value')
# Generate the dynamic value expression if present
value_expr = None
if value_node:
if isinstance(value_node, dict):
result = self.generate_code_from_json(value_node, indent_level)
# Handle case where result is a list (from dynamic_value or value_input)
if isinstance(result, list):
# Use first element if single, otherwise join them
value_expr = result[0] if len(result) == 1 else ', '.join(str(v) for v in result)
else:
value_expr = result
else:
value_expr = str(value_node)
# Build the notify_user call based on what's provided
if message and value_expr:
# Both static message and dynamic value: concatenate with str()
return [f"{indent}notify_user('{message}' + str({value_expr}))"]
elif message:
# Only static message
return [f"{indent}notify_user('{message}')"]
elif value_expr:
# Only dynamic value
return [f"{indent}notify_user(str({value_expr}))"]
else:
# Neither provided - fallback
return [f"{indent}notify_user('No message provided.')"]
def handle_value_input(self, node: Dict[str, Any], indent_level: int) -> Union[str, List[str]]:
"""

View File

@ -50,25 +50,33 @@ class Signal:
def compare(self):
if self.value1 is None:
raise ValueError('Signal: Cannot compare: value1 not set')
if self.value2 is None:
raise ValueError('Signal: Cannot compare: value2 not set')
previous_state = self.state
if self.operator == '+/-':
# Handle pattern operators (only need value1)
if self.operator == 'is_bullish':
# Bullish patterns return positive values (typically 100)
self.state = self.value1 > 0
elif self.operator == 'is_bearish':
# Bearish patterns return negative values (typically -100)
self.state = self.value1 < 0
elif self.operator == 'is_detected':
# Any pattern (bullish or bearish) returns non-zero
self.state = self.value1 != 0
elif self.operator == '+/-':
# Range comparison requires both values
if self.value2 is None:
raise ValueError('Signal: Cannot compare: value2 not set')
if self.range is None:
raise ValueError('Signal: Cannot compare: range not set')
if abs(self.value1 - self.value2) < self.range:
self.state = True
else:
self.state = False
self.state = abs(self.value1 - self.value2) < self.range
else:
# Standard comparison operators (>, <, ==)
if self.value2 is None:
raise ValueError('Signal: Cannot compare: value2 not set')
string = str(self.value1) + self.operator + str(self.value2)
logger.debug(f"Signal comparison: {string}")
if eval(string):
self.state = True
else:
self.state = False
self.state = eval(string)
state_change = self.state != previous_state
return state_change
@ -511,11 +519,13 @@ class Signals:
except Exception as e:
logger.error(f"Error updating signal state: {e}", exc_info=True)
def process_all_signals(self, indicators) -> dict:
def process_all_signals(self, indicators, external_sources=None, external_indicators=None) -> dict:
"""
Loop through all signals and process them based on the last indicator results.
:param indicators: The Indicators instance with calculated values.
:param external_sources: Optional ExternalSources instance for custom signal types (real-time only).
:param external_indicators: Optional ExternalIndicatorsManager for historical API indicators.
:return: Dictionary of signals that changed state.
"""
state_changes = {}
@ -535,6 +545,27 @@ class Signals:
name: IndicatorWrapper(data)
for name, data in indicator_list.items()
}
# Add external sources as indicator-like entries (real-time)
if external_sources:
external_data = external_sources.get_sources_as_indicators(user_id)
for name, data in external_data.items():
user_indicator_cache[user_id][name] = IndicatorWrapper(data)
# Add external indicators as indicator-like entries (historical API)
if external_indicators:
ext_ind_data = external_indicators.get_indicators_as_indicator_list(user_id)
for name, data in ext_ind_data.items():
# For live trading, get the most recent value
ext_ind = external_indicators.get_indicator_by_name(name, user_id)
if ext_ind:
# Fetch today's value
from datetime import datetime
today_value = ext_ind.get_value_for_timestamp(datetime.now())
if today_value is not None:
data['value'] = today_value
user_indicator_cache[user_id][name] = IndicatorWrapper(data)
except Exception as e:
logger.debug(f"Could not fetch indicators for user {user_id}: {e}")
user_indicator_cache[user_id] = {}

View File

@ -86,6 +86,7 @@ class StrategyInstance:
'exit_strategy': self.exit_strategy,
'notify_user': self.notify_user,
'process_indicator': self.process_indicator,
'process_signal': self.process_signal,
'get_strategy_profit_loss': self.get_strategy_profit_loss,
'is_in_profit': self.is_in_profit,
'is_in_loss': self.is_in_loss,
@ -774,6 +775,45 @@ class StrategyInstance:
traceback.print_exc()
return None
def process_signal(self, signal_name: str, output_field: str = 'triggered') -> Any:
"""
Checks the state of a user-defined signal.
:param signal_name: Name of the signal.
:param output_field: Either 'triggered' (returns bool) or 'value' (returns numeric).
:return: Boolean for triggered, numeric value for value, or None on error.
"""
try:
# Get the signal from the signals manager
if not hasattr(self, 'signals') or self.signals is None:
logger.warning(f"Signals manager not available in StrategyInstance")
return False if output_field == 'triggered' else None
# Look up the signal by name
signal = self.signals.get_signal_by_name(signal_name)
if signal is None:
logger.warning(f"Signal '{signal_name}' not found")
return False if output_field == 'triggered' else None
if output_field == 'triggered':
# Return whether the signal condition is currently met
# Signal is a dataclass with 'state' attribute
is_triggered = getattr(signal, 'state', False)
logger.debug(f"Signal '{signal_name}' triggered: {is_triggered}")
return is_triggered
else:
# Return the numeric value of the signal (value1)
value = getattr(signal, 'value1', None)
logger.debug(f"Signal '{signal_name}' value: {value}")
return value
except Exception as e:
logger.error(
f"Error processing signal '{signal_name}' in StrategyInstance '{self.strategy_instance_id}': {e}",
exc_info=True)
traceback.print_exc()
return False if output_field == 'triggered' else None
def get_strategy_profit_loss(self, strategy_id: str) -> float:
"""
Retrieves the current profit or loss of the strategy.

View File

@ -1032,6 +1032,186 @@ def admin_credit_user():
return jsonify(result)
# =============================================================================
# External Sources API Routes
# =============================================================================
@app.route('/api/external_sources/test', methods=['POST'])
def test_external_source():
"""Test an external source API connection without saving."""
data = request.get_json() or {}
url = data.get('url')
auth_header = data.get('auth_header', '')
auth_key = data.get('auth_key', '')
json_path = data.get('json_path', '$.data[0].value')
if not url:
return jsonify({'success': False, 'error': 'URL is required'})
from ExternalSources import ExternalSource
source = ExternalSource(
name='test',
url=url,
auth_header=auth_header,
auth_key=auth_key,
json_path=json_path
)
result = brighter_trades.external_sources._fetch_value(source)
return jsonify(result)
@app.route('/api/external_sources/create', methods=['POST'])
def create_external_source():
"""Create a new external source."""
user_id = _get_current_user_id()
if not user_id:
return jsonify({'success': False, 'message': 'Not logged in'}), 401
data = request.get_json() or {}
name = data.get('name')
url = data.get('url')
auth_header = data.get('auth_header', '')
auth_key = data.get('auth_key', '')
json_path = data.get('json_path', '$.data[0].value')
refresh_interval = data.get('refresh_interval', 300)
if not name or not url:
return jsonify({'success': False, 'message': 'Name and URL are required'})
result = brighter_trades.external_sources.create_source(
name=name,
url=url,
auth_header=auth_header,
auth_key=auth_key,
json_path=json_path,
refresh_interval=refresh_interval,
creator_id=user_id
)
return jsonify(result)
@app.route('/api/external_sources/list', methods=['GET'])
def list_external_sources():
"""List all external sources for the current user."""
user_id = _get_current_user_id()
if not user_id:
return jsonify({'success': False, 'error': 'Not logged in'}), 401
sources = brighter_trades.external_sources.get_sources_for_user(user_id)
return jsonify({'success': True, 'sources': sources})
@app.route('/api/external_sources/refresh/<tbl_key>', methods=['POST'])
def refresh_external_source(tbl_key):
"""Refresh a specific external source."""
user_id = _get_current_user_id()
if not user_id:
return jsonify({'success': False, 'error': 'Not logged in'}), 401
result = brighter_trades.external_sources.refresh_source(tbl_key)
return jsonify(result)
@app.route('/api/external_sources/delete/<tbl_key>', methods=['DELETE'])
def delete_external_source(tbl_key):
"""Delete an external source."""
user_id = _get_current_user_id()
if not user_id:
return jsonify({'success': False, 'message': 'Not logged in'}), 401
result = brighter_trades.external_sources.delete_source(tbl_key, user_id)
return jsonify(result)
# =============================================================================
# External Indicators API Routes (Historical data for backtesting)
# =============================================================================
@app.route('/api/external_indicators/test', methods=['POST'])
def test_external_indicator():
"""Test an external indicator API configuration without saving."""
data = request.get_json() or {}
historical_url = data.get('historical_url')
auth_header = data.get('auth_header', '')
auth_key = data.get('auth_key', '')
value_jsonpath = data.get('value_jsonpath', '$.data[*].value')
timestamp_jsonpath = data.get('timestamp_jsonpath', '$.data[*].timestamp')
date_format = data.get('date_format', 'ISO')
date_param_format = data.get('date_param_format', 'ISO')
if not historical_url:
return jsonify({'success': False, 'error': 'Historical URL is required'})
result = brighter_trades.external_indicators.test_indicator(
historical_url=historical_url,
auth_header=auth_header,
auth_key=auth_key,
value_jsonpath=value_jsonpath,
timestamp_jsonpath=timestamp_jsonpath,
date_format=date_format,
date_param_format=date_param_format
)
return jsonify(result)
@app.route('/api/external_indicators/create', methods=['POST'])
def create_external_indicator():
"""Create a new external indicator."""
user_id = _get_current_user_id()
if not user_id:
return jsonify({'success': False, 'message': 'Not logged in'}), 401
data = request.get_json() or {}
name = data.get('name')
historical_url = data.get('historical_url')
auth_header = data.get('auth_header', '')
auth_key = data.get('auth_key', '')
value_jsonpath = data.get('value_jsonpath', '$.data[*].value')
timestamp_jsonpath = data.get('timestamp_jsonpath', '$.data[*].timestamp')
date_format = data.get('date_format', 'ISO')
date_param_format = data.get('date_param_format', 'ISO')
if not name or not historical_url:
return jsonify({'success': False, 'message': 'Name and Historical URL are required'})
result = brighter_trades.external_indicators.create_indicator(
name=name,
historical_url=historical_url,
auth_header=auth_header,
auth_key=auth_key,
value_jsonpath=value_jsonpath,
timestamp_jsonpath=timestamp_jsonpath,
date_format=date_format,
date_param_format=date_param_format,
creator_id=user_id
)
return jsonify(result)
@app.route('/api/external_indicators/list', methods=['GET'])
def list_external_indicators():
"""List all external indicators for the current user."""
user_id = _get_current_user_id()
if not user_id:
return jsonify({'success': False, 'error': 'Not logged in'}), 401
indicators = brighter_trades.external_indicators.get_indicators_for_user(user_id)
return jsonify({'success': True, 'indicators': indicators})
@app.route('/api/external_indicators/delete/<tbl_key>', methods=['DELETE'])
def delete_external_indicator(tbl_key):
"""Delete an external indicator."""
user_id = _get_current_user_id()
if not user_id:
return jsonify({'success': False, 'message': 'Not logged in'}), 401
result = brighter_trades.external_indicators.delete_indicator(tbl_key, user_id)
return jsonify(result)
# =============================================================================
# Health Check Routes
# =============================================================================

View File

@ -156,7 +156,27 @@ class BacktestStrategyInstance(StrategyInstance):
logger.error("Backtrader strategy is not set in StrategyInstance.")
return None
# Try direct lookup first
df = self.backtrader_strategy.precomputed_indicators.get(indicator_name)
# If not found, try alternative name formats
if df is None:
available = list(self.backtrader_strategy.precomputed_indicators.keys())
# Try underscore->space conversion (Blockly sanitizes names with underscores)
alt_name = indicator_name.replace('_', ' ')
if alt_name in available:
df = self.backtrader_strategy.precomputed_indicators.get(alt_name)
logger.debug(f"[BACKTEST] Found indicator '{alt_name}' (converted from '{indicator_name}')")
# Try case-insensitive match
if df is None:
for avail_name in available:
if avail_name.lower() == indicator_name.lower():
df = self.backtrader_strategy.precomputed_indicators.get(avail_name)
logger.debug(f"[BACKTEST] Found indicator '{avail_name}' (case-insensitive match)")
break
if df is None:
logger.error(f"[BACKTEST DEBUG] Indicator '{indicator_name}' not found in precomputed_indicators!")
logger.error(f"[BACKTEST DEBUG] Available indicators: {list(self.backtrader_strategy.precomputed_indicators.keys())}")
@ -219,6 +239,50 @@ class BacktestStrategyInstance(StrategyInstance):
logger.info(f"[BACKTEST] process_indicator returning: {indicator_name}.{output_field} = {value}")
return value
# 2b. Override process_signal for backtesting
def process_signal(self, signal_name: str, output_field: str = 'triggered'):
"""
Evaluates a signal condition during backtesting.
For backtesting, signals can be precomputed or we return a placeholder.
Full signal backtesting support requires precomputed signal states.
"""
logger.debug(f"[BACKTEST] process_signal called: signal='{signal_name}', output='{output_field}'")
if self.backtrader_strategy is None:
logger.error("Backtrader strategy is not set in StrategyInstance.")
return False if output_field == 'triggered' else None
# Get precomputed signals if available
precomputed_signals = getattr(self.backtrader_strategy, 'precomputed_signals', {})
signal_data = precomputed_signals.get(signal_name)
if signal_data is not None and len(signal_data) > 0:
# Use precomputed signal data
signal_pointer = self.backtrader_strategy.signal_pointers.get(signal_name, 0)
if signal_pointer >= len(signal_data):
logger.debug(f"[BACKTEST] Signal '{signal_name}' pointer out of bounds: {signal_pointer}")
return False if output_field == 'triggered' else None
signal_row = signal_data.iloc[signal_pointer]
if output_field == 'triggered':
triggered = bool(signal_row.get('triggered', False))
return triggered
else:
value = signal_row.get('value', None)
return value
else:
# Signal not precomputed - log warning once and return default
if not hasattr(self, '_signal_warnings'):
self._signal_warnings = set()
if signal_name not in self._signal_warnings:
logger.warning(f"[BACKTEST] Signal '{signal_name}' not precomputed. "
"Signal blocks in backtesting require precomputed signal data.")
self._signal_warnings.add(signal_name)
return False if output_field == 'triggered' else None
# 3. Override get_current_price
def get_current_price(self, timeframe: str = '1h', exchange: str = 'binance',
symbol: str = 'BTC/USD') -> float:

View File

@ -42,13 +42,14 @@ class EquityCurveAnalyzer(bt.Analyzer):
# Backtester Class
class Backtester:
def __init__(self, data_cache: DataCache, strategies: Strategies, indicators: Indicators, socketio,
edm_client=None):
edm_client=None, external_indicators=None):
""" Initialize the Backtesting class with a cache for back-tests """
self.data_cache = data_cache
self.strategies = strategies
self.indicators_manager = indicators
self.socketio = socketio
self.edm_client = edm_client
self.external_indicators = external_indicators
# Ensure 'tests' cache exists
self.data_cache.create_cache(
@ -394,6 +395,27 @@ class Backtester:
logger.info(f"[BACKTEST] Computing indicator '{indicator_name}' on backtest data feed")
try:
# First, check if this is an external indicator (historical API)
if self.external_indicators:
ext_indicator = self.external_indicators.get_indicator_by_name(indicator_name, user_id)
if ext_indicator:
logger.info(f"[BACKTEST] '{indicator_name}' is an external indicator - fetching historical data from API")
# Use the external indicator's calculate method
result_series = ext_indicator.calculate(candle_data)
if result_series is not None and len(result_series) > 0:
# Convert series to DataFrame with 'value' column
result_df = pd.DataFrame({
'time': candle_data['time'] if 'time' in candle_data.columns else range(len(result_series)),
'value': result_series.values
})
result_df.reset_index(drop=True, inplace=True)
precomputed_indicators[indicator_name] = result_df
logger.info(f"[BACKTEST] Precomputed external indicator '{indicator_name}' with {len(result_df)} data points.")
else:
logger.warning(f"[BACKTEST] No historical data available for external indicator '{indicator_name}'.")
continue
# Fetch indicator definition from cache
indicators = self.data_cache.get_rows_from_datacache(
cache_name='indicators',
@ -613,7 +635,8 @@ class Backtester:
def run_backtest(self, strategy_class, data_feed: pd.DataFrame, msg_data: dict, user_name: str,
socket_conn_id: str, strategy_instance: BacktestStrategyInstance, backtest_name: str,
user_id: str, backtest_key: str, strategy_name: str, precomputed_indicators: dict):
user_id: str, backtest_key: str, strategy_name: str, precomputed_indicators: dict,
precomputed_signals: dict = None):
"""
Runs a backtest using Backtrader and uses Flask-SocketIO's background tasks.
Sends progress updates to the client via WebSocket.
@ -654,6 +677,7 @@ class Backtester:
strategy_class,
strategy_instance=strategy_instance,
precomputed_indicators=precomputed_indicators, # Pass precomputed indicators
precomputed_signals=precomputed_signals or {}, # Pass precomputed signals
socketio=self.socketio, # Pass SocketIO instance
socket_conn_id=socket_conn_id, # Pass SocketIO connection ID
data_length=len(data_feed), # Pass data length for progress updates

View File

@ -19,6 +19,7 @@ class MappedStrategy(bt.Strategy):
params = (
('strategy_instance', None), # Instance of BacktestStrategyInstance
('precomputed_indicators', None), # Dict of precomputed indicators
('precomputed_signals', None), # Dict of precomputed signal states
('socketio', None), # SocketIO instance for emitting progress
('socket_conn_id', None), # Socket connection ID for emitting progress
('data_length', None), # Total number of data points for progress calculation
@ -41,6 +42,12 @@ class MappedStrategy(bt.Strategy):
self.precomputed_indicators: Dict[str, pd.DataFrame] = self.p.precomputed_indicators or {}
self.indicator_pointers: Dict[str, int] = {name: 0 for name in self.precomputed_indicators.keys()}
self.indicator_names = list(self.precomputed_indicators.keys())
# Initialize signal data
self.precomputed_signals: Dict[str, pd.DataFrame] = self.p.precomputed_signals or {}
self.signal_pointers: Dict[str, int] = {name: 0 for name in self.precomputed_signals.keys()}
self.signal_names = list(self.precomputed_signals.keys())
self.current_step = 0
# Initialize lists to store orders and trades
@ -158,6 +165,11 @@ class MappedStrategy(bt.Strategy):
if name in self.indicator_pointers:
self.indicator_pointers[name] += 1
# Advance signal pointers for the next candle
for name in self.signal_names:
if name in self.signal_pointers:
self.signal_pointers[name] += 1
# Check if we're at the second-to-last bar
if self.current_step == (self.p.data_length - 1):
if self.position:

View File

@ -1050,6 +1050,10 @@ class StratWorkspaceManager {
const indicatorBlocksModule = await import('./blocks/indicator_blocks.js');
indicatorBlocksModule.defineIndicatorBlocks();
// Load and define signal blocks
const signalBlocksModule = await import('./blocks/signal_blocks.js');
signalBlocksModule.defineSignalBlocks();
} catch (error) {
console.error("Error loading Blockly modules: ", error);
return;

View File

@ -19,23 +19,29 @@ export function defineValuesAndFlags() {
* Description:
* This block allows users to send customizable notification messages to the user.
* It can be used to inform about strategy status updates, alerts, or other important events.
* Supports both static text messages and dynamic values (variables, calculations, etc.).
*/
Blockly.defineBlocksWithJsonArray([{
"type": "notify_user",
"message0": "Notify User %1",
"message0": "Notify User %1 %2",
"args0": [
{
"type": "field_input",
"name": "MESSAGE",
"text": "Message",
"check": "String",
"text": "",
"spellcheck": true
},
{
"type": "input_value",
"name": "VALUE",
"check": ["number", "string", "dynamic_value"],
"align": "RIGHT"
}
],
"previousStatement": null,
"nextStatement": null,
"colour": 210,
"tooltip": "Send a customizable notification message to the user. Can be used to inform about strategy status, alerts, or important events.",
"tooltip": "Send a notification message to the user. Enter a static message and/or attach dynamic values (variables, calculations). The values will be appended to the message.",
"helpUrl": "https://example.com/docs/notify_user"
}]);
/**

View File

@ -17,21 +17,35 @@ export function defineVAFGenerators() {
* Generator for 'notify_user' Block
*
* Description:
* Generates a JSON object representing a user notification. The message
* provided by the user is included in the JSON structure.
* Generates a JSON object representing a user notification. Supports both
* static text messages and dynamic values (variables, calculations, etc.).
*/
Blockly.JSON['notify_user'] = function(block) {
// Retrieve the 'MESSAGE' field input
const message = block.getFieldValue('MESSAGE');
// Retrieve the 'MESSAGE' field input (static text)
const message = block.getFieldValue('MESSAGE') || '';
// Validate the message
if (!message || message.trim() === "") {
console.warn("Empty MESSAGE in notify_user block. Defaulting to 'No message provided.'");
// Retrieve the connected VALUE block (dynamic value)
const valueBlock = block.getInputTargetBlock('VALUE');
let dynamicValue = null;
if (valueBlock) {
const generator = Blockly.JSON[valueBlock.type];
if (typeof generator === 'function') {
dynamicValue = generator(valueBlock);
} else {
console.warn(`No generator found for block type "${valueBlock.type}" in notify_user.`);
}
}
// Validate: at least one of message or dynamicValue should be present
if (message.trim() === "" && !dynamicValue) {
console.warn("Empty MESSAGE and no VALUE in notify_user block. Defaulting to 'No message provided.'");
}
const json = {
type: 'notify_user',
message: message && message.trim() !== "" ? message : 'No message provided.'
message: message.trim(),
value: dynamicValue
};
return json;
};

View File

@ -0,0 +1,91 @@
// client/signal_blocks.js
// Define Blockly blocks for user-created signals
export function defineSignalBlocks() {
// Ensure Blockly.JSON is available
if (!Blockly.JSON) {
console.error('Blockly.JSON is not defined. Ensure json_generators.js is loaded before signal_blocks.js.');
return;
}
// Get all user signals
const signals = window.UI?.signals?.dataManager?.getAllSignals?.() || [];
const toolboxCategory = document.querySelector('#toolbox_advanced category[name="Signals"]');
if (!toolboxCategory) {
console.error('Signals category not found in the toolbox.');
return;
}
// Check if there are any signals to display
if (signals.length === 0) {
// Add helpful message when no signals exist
const labelElement = document.createElement('label');
labelElement.setAttribute('text', 'No signals configured yet.');
toolboxCategory.appendChild(labelElement);
const labelElement2 = document.createElement('label');
labelElement2.setAttribute('text', 'Create signals from the Signals panel');
toolboxCategory.appendChild(labelElement2);
const labelElement3 = document.createElement('label');
labelElement3.setAttribute('text', 'on the right side of the screen.');
toolboxCategory.appendChild(labelElement3);
console.log('No signals available - added help message to toolbox.');
return;
}
for (const signal of signals) {
const signalName = signal.name;
// Create a unique block type by replacing spaces with underscores
const sanitizedSignalName = signalName.replace(/\s+/g, '_').replace(/[^a-zA-Z0-9_]/g, '');
const blockType = 'signal_' + sanitizedSignalName;
// Define the block for this signal
Blockly.defineBlocksWithJsonArray([{
"type": blockType,
"message0": `Signal: ${signalName} %1`,
"args0": [
{
"type": "field_dropdown",
"name": "OUTPUT",
"options": [
["is triggered", "triggered"],
["value", "value"]
]
}
],
"output": "dynamic_value",
"colour": 160,
"tooltip": `Check if the '${signalName}' signal is triggered or get its value`,
"helpUrl": ""
}]);
// Define the JSON generator for this block
Blockly.JSON[blockType] = function(block) {
const selectedOutput = block.getFieldValue('OUTPUT');
const json = {
type: 'signal',
fields: {
NAME: signalName,
OUTPUT: selectedOutput
}
};
// Output as dynamic_value
return {
type: 'dynamic_value',
values: [json]
};
};
// Append the newly created block to the Signals category in the toolbox
const blockElement = document.createElement('block');
blockElement.setAttribute('type', blockType);
toolboxCategory.appendChild(blockElement);
}
console.log(`Signal blocks defined: ${signals.length} signals added to toolbox.`);
}

View File

@ -33,6 +33,8 @@ class User_Interface {
this.initializeResizablePopup("exchanges_config_form", null, "exchange_draggable_header", "resize-exchange");
this.initializeResizablePopup("new_ind_form", null, "indicator_draggable_header", "resize-indicator");
this.initializeResizablePopup("new_sig_form", null, "signal_draggable_header", "resize-signal");
this.initializeResizablePopup("new_signal_type_form", null, "signal_type_draggable_header", "resize-signal-type");
this.initializeResizablePopup("external_indicator_form", null, "external_indicator_header", "resize-external-indicator");
this.initializeResizablePopup("new_trade_form", null, "trade_draggable_header", "resize-trade");
this.initializeResizablePopup("ai_strategy_form", null, "ai_strategy_header", "resize-ai-strategy");

View File

@ -747,6 +747,14 @@ class Indicators {
indicatorOutputs[name] = ['value']; // Default output if not defined
}
}
// Include external indicators (historical API data)
const externalIndicators = window.UI?.signals?.externalIndicators || [];
for (const extInd of externalIndicators) {
// External indicators have a single 'value' output
indicatorOutputs[extInd.name] = ['value'];
}
return indicatorOutputs;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,160 @@
<div class="form-popup" id="external_indicator_form" style="display: none; width: 500px;">
<div class="dialog-header" id="external_indicator_header">
<h1>Add External Indicator</h1>
<button class="dialog-close-btn" onclick="UI.signals.closeExternalIndicatorForm()">&times;</button>
</div>
<form class="form-container" onsubmit="return false;">
<p style="color: #888; font-size: 12px; margin-bottom: 15px;">
Configure an external API that provides historical data for backtesting.
</p>
<!-- Name -->
<label for="ext_ind_name"><b>Indicator Name:</b></label>
<input type="text" id="ext_ind_name" placeholder="e.g., Fear & Greed Index" required>
<!-- Historical URL -->
<label for="ext_ind_url"><b>Historical Data URL:</b></label>
<input type="text" id="ext_ind_url"
placeholder="https://api.example.com/data?start={start_date}&limit={limit}" required>
<p style="color: #666; font-size: 11px; margin: 5px 0 15px 0;">
Use placeholders: <code>{start_date}</code>, <code>{end_date}</code>, <code>{limit}</code>
</p>
<!-- Auth Header -->
<label for="ext_ind_auth_header"><b>Auth Header Name:</b> (optional)</label>
<input type="text" id="ext_ind_auth_header" placeholder="e.g., X-CMC_PRO_API_KEY">
<!-- API Key -->
<label for="ext_ind_auth_key"><b>API Key:</b> (optional)</label>
<input type="password" id="ext_ind_auth_key" placeholder="Your API key">
<!-- JSONPath for Values -->
<label for="ext_ind_value_jsonpath"><b>Value JSONPath:</b></label>
<input type="text" id="ext_ind_value_jsonpath" value="$.data[*].value" required>
<p style="color: #666; font-size: 11px; margin: 5px 0 15px 0;">
JSONPath expression to extract array of numeric values
</p>
<!-- JSONPath for Timestamps -->
<label for="ext_ind_timestamp_jsonpath"><b>Timestamp JSONPath:</b></label>
<input type="text" id="ext_ind_timestamp_jsonpath" value="$.data[*].timestamp" required>
<p style="color: #666; font-size: 11px; margin: 5px 0 15px 0;">
JSONPath expression to extract array of timestamps
</p>
<!-- Date Format in Response -->
<label for="ext_ind_date_format"><b>Timestamp Format in Response:</b></label>
<select id="ext_ind_date_format">
<option value="ISO">ISO (2024-01-15 or 2024-01-15T12:00:00Z)</option>
<option value="UNIX">Unix Timestamp (seconds or milliseconds)</option>
</select>
<!-- Date Format for URL Parameters -->
<label for="ext_ind_date_param_format"><b>Date Format for URL Parameters:</b></label>
<select id="ext_ind_date_param_format">
<option value="ISO">ISO (2024-01-15)</option>
<option value="UNIX">Unix Timestamp</option>
</select>
<hr style="margin: 20px 0;">
<!-- Test Result Area -->
<div id="ext_ind_test_result" style="display: none; padding: 10px; border-radius: 5px; margin-bottom: 15px;">
</div>
<!-- Loading indicator -->
<div id="ext_ind_loading" style="display: none; text-align: center; padding: 10px;">
<span>Testing API...</span>
</div>
<!-- Buttons -->
<div style="display: flex; gap: 10px; justify-content: flex-end;">
<button type="button" class="btn" onclick="UI.signals.testExternalIndicator()"
style="background: #17a2b8;">Test Connection</button>
<button type="button" class="btn" onclick="UI.signals.submitExternalIndicator()"
style="background: #28a745;">Save Indicator</button>
<button type="button" class="btn cancel" onclick="UI.signals.closeExternalIndicatorForm()">Cancel</button>
</div>
</form>
</div>
<style>
#external_indicator_form {
position: fixed;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
z-index: 1000;
background: #2a2a2a;
border-radius: 10px;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.5);
}
#external_indicator_form .form-container {
padding: 20px;
}
#external_indicator_form label {
color: #e0e0e0;
display: block;
margin-bottom: 5px;
}
#external_indicator_form input[type="text"],
#external_indicator_form input[type="password"] {
width: 100%;
padding: 8px;
margin-bottom: 10px;
border: 1px solid #444;
border-radius: 4px;
background: #1e1e1e;
color: #e0e0e0;
box-sizing: border-box;
}
#external_indicator_form select {
width: 100%;
height: 38px;
padding: 8px;
margin-bottom: 10px;
border: 1px solid #444;
border-radius: 4px;
background: #1e1e1e;
color: #e0e0e0;
box-sizing: border-box;
}
#external_indicator_form code {
background: #1a1a1a;
padding: 2px 5px;
border-radius: 3px;
font-family: monospace;
color: #17a2b8;
}
#external_indicator_form .btn {
padding: 8px 16px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
}
#external_indicator_form .btn.cancel {
background: #6c757d;
color: white;
}
#ext_ind_test_result.success {
background: rgba(40, 167, 69, 0.2);
border: 1px solid #28a745;
color: #28a745;
}
#ext_ind_test_result.error {
background: rgba(220, 53, 69, 0.2);
border: 1px solid #dc3545;
color: #dc3545;
}
</style>

View File

@ -43,6 +43,8 @@
{% include "new_strategy_popup.html" %}
{% include "ai_strategy_dialog.html" %}
{% include "new_signal_popup.html" %}
{% include "signal_type_popup.html" %}
{% include "external_indicator_popup.html" %}
{% include "new_indicator_popup.html" %}
{% include "trade_details_popup.html" %}
{% include "indicator_popup.html" %}

View File

@ -6,12 +6,18 @@
<div class="section" style="grid-column: 1;">
<button class="btn" onclick="UI.indicators.open_form()">New Indicator</button>
</div>
<!-- Button for displaying indicator options -->
<!-- Button for adding an external API indicator -->
<div class="section" style="grid-column: 2;">
<button class="btn">Indicator Options</button>
<button class="btn" onclick="UI.signals.openExternalIndicatorForm()" style="width: 200px;">+ External Indicator</button>
</div>
</div>
<!-- External Indicators Section -->
<div id="external_indicators_section" style="display: none; margin: 15px 0; padding: 10px; border-radius: 8px;">
<h3 style="color: #9f7aea; margin-bottom: 10px;">External Indicators <span style="font-size: 11px; color: #888;">(Historical API - works with backtesting)</span></h3>
<div class="external-indicators-container" id="external_indicators_list" style="display: flex; flex-wrap: wrap; gap: 10px;"></div>
</div>
<!-- Indicator list display section -->
<div class="section" style="margin-top: 15px; overflow-x: auto; display: grid; grid-auto-rows: minmax(80px, auto); grid-template-columns: 75px 200px 100px 150px 150px 75px 100px 100px auto; gap: 10px; padding-left: 10px;">
<div style="background-color: #F7E1C1; grid-column: 1 / span 9; padding: 5px 0; display: grid; grid-template-columns: 75px 200px 100px 150px 150px 75px 100px 100px auto; gap: 10px;">

View File

@ -53,8 +53,8 @@
<!-- Panel 2 of 3 -->
<div id="panel_2" class="form_panels" style="display: none; grid-template-columns: 1fr 1fr; gap: 10px;">
<!-- Signal Type -->
<div style="grid-column: 1 / span 2;">
<!-- Signal Type (hidden for pattern indicators) -->
<div id="signal_type_row" style="grid-column: 1 / span 2;">
<label for="signal_type"><b>Signal Type:</b></label>
<select name="signal_type" id="select_s_type" style="margin-left: 10px;" onchange="UI.signals.hideIfTrue(this.value,'Value','subpanel_1');UI.signals.hideIfTrue(this.value,'Comparison','subpanel_2');">
<option>Value</option>
@ -65,7 +65,7 @@
<!-- Display signal -->
<span id="sig_display" style="grid-column: 1 / span 2; padding: 10px; background: #f8f9fa; border-radius: 5px;">name:{property}(operator)name:{property}</span>
<!-- Operator field -->
<!-- Standard Operator field (for non-pattern indicators) -->
<div id="sig_operator" style="grid-column: 1 / span 2;">
<div style="display: flex; flex-wrap: wrap; gap: 10px; margin-bottom: 10px;">
<label><input type="radio" id="gt" name="Operator" value=">"> Greater than</label>
@ -79,6 +79,28 @@
</div>
</div>
<!-- Pattern Operator field (for candlestick pattern indicators) -->
<div id="pattern_operator" style="grid-column: 1 / span 2; display: none;">
<label style="font-weight: bold; display: block; margin-bottom: 10px;">Pattern Detection:</label>
<div style="display: flex; flex-direction: column; gap: 8px;">
<label style="display: flex; align-items: center; gap: 8px; padding: 8px; background: #f0f8ff; border-radius: 5px; cursor: pointer;">
<input type="radio" name="PatternOperator" value="is_detected" checked="checked">
<span style="flex: 1;"><strong>Any Pattern Detected</strong></span>
<span style="color: #666; font-size: 11px;">Bullish or Bearish</span>
</label>
<label style="display: flex; align-items: center; gap: 8px; padding: 8px; background: #f0fff0; border-radius: 5px; cursor: pointer;">
<input type="radio" name="PatternOperator" value="is_bullish">
<span style="flex: 1;"><strong>Bullish Pattern</strong></span>
<span style="color: #28a745; font-size: 11px;">▲ Buy signal</span>
</label>
<label style="display: flex; align-items: center; gap: 8px; padding: 8px; background: #fff0f0; border-radius: 5px; cursor: pointer;">
<input type="radio" name="PatternOperator" value="is_bearish">
<span style="flex: 1;"><strong>Bearish Pattern</strong></span>
<span style="color: #dc3545; font-size: 11px;">▼ Sell signal</span>
</label>
</div>
</div>
<!-- Value sub-panel -->
<div id="subpanel_2" style="grid-column: 1 / span 2;">
<label for="value"><b>Value:</b></label>
@ -89,19 +111,13 @@
<div id="subpanel_1" style="grid-column: 1 / span 2; display: none;">
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 10px;">
<div>
<label for="sig2_source"><b>Signal Source:</b></label>
<label for="sig2_source"><b style="display: inline-flex; width: 200px;">Signal Source:</b></label>
<select name="sig2_source" id="sig2_source" style="width: 100%; margin-top: 5px;" onchange="UI.signals.fill_prop('sig2_prop', this.value)">
{% set ns = namespace(optonVal=0) %}
{% for each in indicator_list %}
<option>{{each}}</option>
{% if loop.index0 == 0 %}
{% set ns.optonVal = each %}
{% endif %}
{% endfor %}
<!-- Options populated by JavaScript -->
</select>
</div>
<div>
<label for="sig2_prop"><b>Property:</b></label>
<label for="sig2_prop"><b style="display: inline-flex; width: 200px;">Property:</b></label>
<select id="sig2_prop" name="sig2_prop" style="width: 100%; margin-top: 5px;"></select>
</div>
</div>

View File

@ -196,6 +196,11 @@ and you set fee to 50%, you earn $0.50 per profitable trade.
<!-- Indicator blocks will be added here -->
</category>
<!-- Signals Category -->
<category name="Signals" colour="160" tooltip="Use your configured signals in strategy logic">
<!-- Signal blocks will be added here dynamically -->
</category>
<!-- Balances Subcategory -->
<category name="Balances" colour="#E69500" tooltip="Access strategy and account balance information">
<label text="Track your trading capital"></label>

View File

@ -0,0 +1,176 @@
<!-- Signal Type (External Source) Form Popup -->
<div class="form-popup" id="new_signal_type_form" style="display: none; overflow: hidden; position: absolute; width: 480px; height: 520px; border-radius: 10px;">
<!-- Draggable Header Section -->
<div class="dialog-header" id="signal_type_draggable_header">
<h1 id="signal-type-form-header">New Signal Type</h1>
<button class="dialog-close-btn" onclick="UI.signals.closeSignalTypeForm()">&times;</button>
</div>
<!-- Main Content -->
<form class="form-container" style="padding: 15px; overflow-y: auto; height: calc(100% - 50px);">
<!-- Hidden field for tbl_key (used when editing) -->
<input type="hidden" id="signal_type_tbl_key" name="signal_type_tbl_key" value="" />
<div style="display: grid; grid-template-columns: 1fr; gap: 12px;">
<!-- Name Input -->
<div>
<label for="signal_type_name"><b>Name:</b></label>
<input type="text" id="signal_type_name" name="signal_type_name"
placeholder="e.g., Fear & Greed Index"
style="width: 100%; margin-top: 5px; padding: 8px; border-radius: 4px; border: 1px solid #ccc;">
</div>
<!-- API Endpoint URL -->
<div>
<label for="signal_type_url"><b>API Endpoint URL:</b></label>
<input type="text" id="signal_type_url" name="signal_type_url"
placeholder="https://pro-api.coinmarketcap.com/v3/fear-and-greed/latest"
style="width: 100%; margin-top: 5px; padding: 8px; border-radius: 4px; border: 1px solid #ccc;">
</div>
<!-- Authentication Section -->
<div style="background: #f5f5f5; padding: 12px; border-radius: 6px;">
<label style="display: block; margin-bottom: 8px;"><b>Authentication (optional):</b></label>
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 10px;">
<div>
<label for="signal_type_auth_header" style="font-size: 12px;">Header Name:</label>
<input type="text" id="signal_type_auth_header" name="signal_type_auth_header"
placeholder="X-CMC_PRO_API_KEY"
style="width: 100%; margin-top: 3px; padding: 6px; border-radius: 4px; border: 1px solid #ccc; font-size: 12px;">
</div>
<div>
<label for="signal_type_auth_key" style="font-size: 12px;">API Key:</label>
<input type="password" id="signal_type_auth_key" name="signal_type_auth_key"
placeholder="Your API key"
style="width: 100%; margin-top: 3px; padding: 6px; border-radius: 4px; border: 1px solid #ccc; font-size: 12px;">
</div>
</div>
</div>
<!-- JSON Path -->
<div>
<label for="signal_type_json_path"><b>JSON Path to Value:</b></label>
<input type="text" id="signal_type_json_path" name="signal_type_json_path"
value="$.data[0].value"
placeholder="$.data[0].value"
style="width: 100%; margin-top: 5px; padding: 8px; border-radius: 4px; border: 1px solid #ccc; font-family: monospace;">
<small style="color: #666; display: block; margin-top: 4px;">
JSONPath expression to extract the value from the API response.
<a href="https://jsonpath.com/" target="_blank" style="color: #3E3AF2;">JSONPath reference</a>
</small>
</div>
<!-- Refresh Interval -->
<div>
<label for="signal_type_refresh"><b>Refresh Interval:</b></label>
<div style="display: flex; align-items: center; gap: 10px; margin-top: 5px;">
<input type="number" id="signal_type_refresh" name="signal_type_refresh"
value="300" min="60" max="86400"
style="width: 100px; padding: 8px; border-radius: 4px; border: 1px solid #ccc;">
<span>seconds</span>
<span style="color: #666; font-size: 12px;">(min: 60s, default: 300s = 5 min)</span>
</div>
</div>
<!-- Test Result Area -->
<div id="signal_type_test_result" style="display: none; padding: 10px; border-radius: 6px; margin-top: 5px;">
</div>
<!-- Loading indicator -->
<div id="signal_type_loading" style="display: none; text-align: center; padding: 10px;">
<div style="border: 3px solid #444; border-top: 3px solid #3E3AF2; border-radius: 50%; width: 24px; height: 24px; animation: spin 1s linear infinite; margin: 0 auto;"></div>
<span style="color: #666; font-size: 12px;">Testing API connection...</span>
</div>
<!-- Buttons -->
<div style="text-align: center; margin-top: 10px; padding-top: 10px; border-top: 1px solid #eee;">
<button type="button" class="btn cancel" onclick="UI.signals.closeSignalTypeForm()" style="margin-right: 10px;">
Cancel
</button>
<button type="button" class="btn" onclick="UI.signals.testSignalType()" style="margin-right: 10px; background: #17a2b8; color: white;">
Test Connection
</button>
<button type="button" class="btn submit" id="submit-signal-type" onclick="UI.signals.submitSignalType()">
Create
</button>
</div>
</div>
</form>
<!-- Resizer -->
<div id="resize-signal-type" class="resize-handle"></div>
</div>
<style>
#new_signal_type_form {
background: #1e1e1e;
border: 1px solid #444;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.5);
color: #e0e0e0;
}
#new_signal_type_form input[type="text"],
#new_signal_type_form input[type="password"],
#new_signal_type_form input[type="number"] {
background: #2a2a2a;
color: #e0e0e0;
border: 1px solid #444;
}
#new_signal_type_form input::placeholder {
color: #888;
}
#new_signal_type_form .btn.cancel {
background: #444;
color: white;
border: none;
padding: 8px 16px;
border-radius: 5px;
cursor: pointer;
}
#new_signal_type_form .btn.cancel:hover {
background: #555;
}
#new_signal_type_form .btn.submit {
background: linear-gradient(135deg, #3E3AF2 0%, #6366f1 100%);
color: white;
border: none;
padding: 8px 16px;
border-radius: 5px;
cursor: pointer;
}
#new_signal_type_form .btn.submit:hover {
opacity: 0.9;
}
#new_signal_type_form label b {
color: #e0e0e0;
}
#new_signal_type_form small {
color: #888 !important;
}
#new_signal_type_form div[style*="background: #f5f5f5"] {
background: #2a2a2a !important;
}
#signal_type_test_result.success {
background: #1e3a1e;
border: 1px solid #28a745;
color: #28a745;
}
#signal_type_test_result.error {
background: #3a1e1e;
border: 1px solid #dc3545;
color: #dc3545;
}
</style>

View File

@ -1,6 +1,15 @@
<div class="content" id="signal_content">
<button class="btn" id="new_signal" onclick="UI.signals.open_signal_Form()">New Signal</button>
<button class="btn" id="new_external_source" onclick="UI.signals.openSignalTypeForm()" style="background: #17a2b8;">+ External Source</button>
<hr>
<!-- External Sources Section (Real-time only) -->
<div id="external_sources_section" style="display: none; margin-bottom: 15px;">
<h3>External Sources <span style="font-size: 11px; color: #888;">(Real-time only - no backtesting)</span></h3>
<div class="external-sources-container" id="external_sources_list"></div>
<hr>
</div>
<h3>Signals</h3>
<div class="signals-container" id="signal_list"></div>
</div>
@ -202,4 +211,146 @@
font-size: 9px;
margin-top: 5px;
}
/* External Sources Styles */
.external-sources-container {
display: flex;
flex-wrap: wrap;
gap: 10px;
padding: 10px;
}
.external-source-item {
position: relative;
padding: 10px 15px;
background: linear-gradient(145deg, #2a2a2a, #1e1e1e);
border: 1px solid #444;
border-radius: 8px;
min-width: 150px;
cursor: pointer;
transition: all 0.2s ease;
}
.external-source-item:hover {
border-color: #17a2b8;
transform: translateY(-2px);
}
.external-source-item .source-name {
font-weight: bold;
color: #e0e0e0;
font-size: 13px;
margin-bottom: 5px;
}
.external-source-item .source-value {
font-size: 20px;
font-weight: bold;
color: #17a2b8;
}
.external-source-item .source-time {
font-size: 10px;
color: #888;
margin-top: 5px;
}
.external-source-item .delete-source-btn {
position: absolute;
top: 5px;
right: 5px;
width: 18px;
height: 18px;
border-radius: 50%;
background: #dc3545;
color: white;
border: none;
font-size: 10px;
cursor: pointer;
display: none;
align-items: center;
justify-content: center;
}
.external-source-item:hover .delete-source-btn {
display: flex;
}
.external-source-item .refresh-btn {
position: absolute;
top: 5px;
right: 28px;
width: 18px;
height: 18px;
border-radius: 50%;
background: #28a745;
color: white;
border: none;
font-size: 10px;
cursor: pointer;
display: none;
align-items: center;
justify-content: center;
}
.external-source-item:hover .refresh-btn {
display: flex;
}
/* External Indicators Styles */
.external-indicators-container {
display: flex;
flex-wrap: wrap;
gap: 10px;
padding: 10px;
}
.external-indicator-item {
position: relative;
padding: 10px 15px;
background: linear-gradient(145deg, #3d2a5a, #2a1e3e);
border: 1px solid #6f42c1;
border-radius: 8px;
min-width: 150px;
cursor: pointer;
transition: all 0.2s ease;
}
.external-indicator-item:hover {
border-color: #9f7aea;
transform: translateY(-2px);
}
.external-indicator-item .indicator-name {
font-weight: bold;
color: #e0e0e0;
font-size: 13px;
margin-bottom: 5px;
}
.external-indicator-item .indicator-type {
font-size: 11px;
color: #9f7aea;
}
.external-indicator-item .delete-indicator-btn {
position: absolute;
top: 5px;
right: 5px;
width: 18px;
height: 18px;
border-radius: 50%;
background: #dc3545;
color: white;
border: none;
font-size: 10px;
cursor: pointer;
display: none;
align-items: center;
justify-content: center;
}
.external-indicator-item:hover .delete-indicator-btn {
display: flex;
}
</style>

View File

@ -1,14 +1,16 @@
"""
Utility functions for BrighterTrading.
"""
import datetime
import math
import numpy as np
import pandas as pd
def sanitize_for_json(value):
"""
Convert a value to a JSON-serializable type.
Handles numpy types, inf, nan, and nested structures.
Handles numpy types, pandas types, datetime, inf, nan, and nested structures.
:param value: Any value that needs to be JSON serialized
:return: JSON-serializable value
@ -16,6 +18,12 @@ def sanitize_for_json(value):
# Handle None
if value is None:
return None
# Handle pandas NA/NaT (safely check for scalar values only)
try:
if pd.isna(value) and not isinstance(value, (dict, list, tuple, set)):
return None
except (TypeError, ValueError):
pass # pd.isna fails on some types, that's fine
# Handle numpy integer types
if isinstance(value, np.integer):
return int(value)
@ -31,20 +39,38 @@ def sanitize_for_json(value):
# Handle numpy arrays
elif isinstance(value, np.ndarray):
return [sanitize_for_json(v) for v in value.tolist()]
# Handle pandas Timestamp
elif isinstance(value, pd.Timestamp):
return value.isoformat()
# Handle datetime objects
elif isinstance(value, (datetime.datetime, datetime.date)):
return value.isoformat()
# Handle pandas Timedelta
elif isinstance(value, pd.Timedelta):
return str(value)
# Handle regular floats with inf/nan
elif isinstance(value, float):
if math.isinf(value) or math.isnan(value):
return None
return value
# Handle bytes
elif isinstance(value, bytes):
try:
return value.decode('utf-8')
except UnicodeDecodeError:
return value.hex()
# Handle nested dicts
elif isinstance(value, dict):
return {k: sanitize_for_json(v) for k, v in value.items()}
return {str(k): sanitize_for_json(v) for k, v in value.items()}
# Handle lists
elif isinstance(value, list):
return [sanitize_for_json(v) for v in value]
# Handle tuples
elif isinstance(value, tuple):
return tuple(sanitize_for_json(v) for v in value)
# Handle sets
elif isinstance(value, set):
return [sanitize_for_json(v) for v in value]
# Return other types as-is (str, int, bool, etc.)
return value