import datetime as dt import logging as log import pytz from shared_utilities import timeframe_to_minutes, ts_of_n_minutes_ago # log.basicConfig(level=log.ERROR) class Candles: def __init__(self, exchanges, users, datacache, config, edm_client=None): # A reference to the app configuration self.users = users self.config = config self.exchanges = exchanges # This object maintains all the cached data. self.data = datacache # EDM client for fetching candles from exchange-data-manager self.edm = edm_client # Cache the last received candle to detect duplicates self.cached_last_candle = None # size_limit is the max number of lists of candle(ohlc) data allowed. self.data.create_cache(name='candles', cache_type='row', default_expiration=dt.timedelta(days=5), size_limit=100, eviction_policy='evict') # The maximum amount of candles to load at one time. self.max_records = config.get_setting('max_data_loaded') # print('Setting the candle data.') # # Populate the data: # self.set_cache(symbol=self.users.get_chart_view(user_name='guest', specific_property='market'), # interval=self.users.get_chart_view(user_name='guest', specific_property='timeframe'), # exchange_name=self.users.get_chart_view(user_name='guest', specific_property='exchange_name')) # print('DONE Setting data') def get_last_n_candles(self, num_candles: int, asset: str, timeframe: str, exchange: str, user_name: str, session_id: str = None): """ Return the last num_candles candles of a specified timeframe, symbol, and exchange. Fetches data exclusively from EDM (Exchange Data Manager). :param user_name: The name of the user (used for session lookup if session_id not provided). :param num_candles: int - The number of records to return. :param asset: str - The symbol of the trading pair. :param timeframe: str - The timespan each candle represents. :param exchange: str - The name of the exchange. :param session_id: str - Optional EDM session ID for authenticated exchanges. :return: DataFrame with candle data. :raises: EdmConnectionError if EDM is unavailable. """ # EDM API has a maximum limit of 1000 candles EDM_MAX_CANDLES = 1000 if num_candles > EDM_MAX_CANDLES: log.warning(f'Requested {num_candles} candles, capping to EDM limit of {EDM_MAX_CANDLES}') num_candles = EDM_MAX_CANDLES log.info(f'[GET CANDLES] {asset} {exchange} {timeframe} limit={num_candles}') if self.edm is None: raise RuntimeError("EDM client not initialized. Cannot fetch candle data.") # Note: We don't pass session_id for candle requests since candle data is public # and doesn't require authentication. Using session_id can cause issues if the # session has expired or the exchange connector isn't properly initialized. candles = self.edm.get_candles_sync( exchange=exchange, symbol=asset, timeframe=timeframe, limit=num_candles ) if candles.empty: log.warning(f"No candles returned from EDM for {asset}/{timeframe}/{exchange}") return self.convert_candles(candles) log.info(f"Fetched {len(candles)} candles from EDM for {asset}/{timeframe}/{exchange}") return self.convert_candles(candles[-num_candles:]) def set_new_candle(self, cdata: dict) -> bool: """ Updates the cached last candle with new candle data. Called when live price data arrives via WebSocket. :param cdata: Dictionary containing candle data with keys like 'time', 'open', 'high', 'low', 'close', 'volume', 'symbol', etc. :return: True if the candle was updated. """ # Update the cached last candle self.cached_last_candle = cdata log.debug(f"Candle updated: {cdata.get('symbol', 'unknown')} @ {cdata.get('close', 0)}") return True def set_cache(self, symbol=None, interval=None, exchange_name=None, user_name=None): """ This method requests a chart from memory to ensure the data is initialized. TODO: This method is un-used. :param user_name: :param symbol: str - The symbol of the market. :param interval: str - timeframe of the candles. :param exchange_name: str - The name of the exchange_name to fetch from. :return: None """ # By default, initialise data with the last viewed chart. if not symbol: assert user_name is not None symbol = self.users.get_chart_view(user_name=user_name, prop='market') log.info(f'set_candle_history(): No symbol provided. Using{symbol}') if not interval: assert user_name is not None interval = self.users.get_chart_view(user_name=user_name, prop='timeframe') log.info(f'set_candle_history(): No timeframe provided. Using{interval}') if not exchange_name: assert user_name is not None exchange_name = self.users.get_chart_view(user_name=user_name, prop='exchange_name') # Log the completion to the console. log.info('set_candle_history(): Loading candle data...') # Load candles from database _cdata = self.get_last_n_candles(num_candles=self.max_records, asset=symbol, timeframe=interval, exchange=exchange_name, user_name=user_name) # Log the completion to the console. log.info('set_candle_history(): Candle data Loaded.') return @staticmethod def get_colour_coded_volume(candles): """ Extracts a list of volume values from the candlesticks received. :return: The values and a color in a start_datetime keyed dictionary. """ red = 'rgba(255,82,82, 0.8)' green = 'rgba(0, 150, 136, 0.8)' def get_color(r): row_index = int(r.name) if (row_index - 1) not in volumes.index: return green if candles.close.iloc[row_index - 1] < candles.close.iloc[row_index]: return green else: return red # Make sure the index is looking nice candles.reset_index(inplace=True) # Extract the time and volume columns. volumes = candles.loc[:, ['time', 'volume']] # Add the color field calling get_color() to supply the values. volumes["color"] = volumes.apply(get_color, axis=1) # Rename volume column to value volumes = volumes.rename({'volume': 'value'}, axis=1) return volumes def get_latest_values(self, value_name: str, symbol: str, timeframe: str, exchange: str, user_name: str, num_record: int = 1): """ Returns a timestamped dictionary of any value for the last num_record candles of any market specified. Dictionary format {time:, high:} :param user_name: str - The name of the user who owns this exchange. :param exchange: str - The name of the exchange_name the market is traded. :param timeframe: str - The timespan each candle represents. :param symbol: str - The symbol of the market. :param value_name: str - 'high'|'low'|'close' :param num_record: The number of records requested. :return: """ candles = self.get_last_n_candles(asset=symbol, exchange=exchange, timeframe=timeframe, num_candles=num_record, user_name=user_name) if value_name == 'volume': values = self.get_colour_coded_volume(candles) else: values = candles[['time', value_name]] return values @staticmethod def convert_candles(candles): """ Converts a dataframe of candlesticks into the format lightweight charts expects. :param candles: dt.dataframe :return: DataFrame with columns time, open, high, low, close, volume """ if candles.empty: return candles new_candles = candles.loc[:, ['time', 'open', 'high', 'low', 'close', 'volume']].copy() # EDM sends timestamps in seconds - no conversion needed for lightweight charts return new_candles def get_candle_history(self, num_records: int, symbol: str = None, interval: str = None, exchange_name: str = None, user_name: str = None) -> list: """ Returns a specified number of candle records from cached memory in the lightweight charts format. :param num_records: int - The number of candle records to retrieve. :param symbol: str - The symbol of the market. If None, it will be fetched from user configuration. :param interval: str - The timeframe of the candles. If None, it will be fetched from user configuration. :param exchange_name: str - The name of the exchange. If None, it will be fetched from user configuration. :param user_name: str - The name of the user who owns the exchange. :return: list - Candle records in the lightweight charts format. """ # By default, initialise data with the last viewed chart. if not symbol: assert user_name is not None symbol = self.users.get_chart_view(user_name=user_name, prop='market') log.info(f'get_candle_history(): No symbol provided. Using {symbol}') if not interval: assert user_name is not None interval = self.users.get_chart_view(user_name=user_name, prop='timeframe') log.info(f'get_candle_history(): No timeframe provided. Using {interval}') if not exchange_name: assert user_name is not None exchange_name = self.users.get_chart_view(user_name=user_name, prop='exchange_name') log.info(f'get_candle_history(): No exchange name provided. Using {exchange_name}') candlesticks = self.get_last_n_candles(num_candles=num_records, asset=symbol, timeframe=interval, exchange=exchange_name, user_name=user_name) # Reformat relevant candlestick data into a list of python dictionary objects. return candlesticks