CryptoFly.us | CRYPTOCURRENCY BITCOIN SHIRTS CLOTHING APPAREL HATS

CRYPTO NEWS

Singapore Greenlights Aussie Exchange and DBS Vickers to Provide Crypto Services

The digital asset trading venue Independent Reserve received approval from the Monetary Authority of Singapore (MAS) to provide crypto services in the Asian city-state. Additionally, the regulator also greenlighted the brokerage arm of DBS Bank.

Independent Reserve Invades Singapore

According to a recent report, the MAS enabled the Australian-based cryptocurrency platform – Independent Reserve – to offer digital payment token services to retail and institutional investors in Singapore.

Before giving its official approval, the Monetary Authority of Singapore considered several factors such as customer protection mechanisms, information technology services transactions, screening, and compliance structures.

Independent Reserve’s CEO – Adrian Przelozny – raised hopes that after the licensing, Singaporeans would be able to deal with digital assets “quickly and securely.” He added that Singapore’s crypto ecosystem provides local investors with enhanced protection:

“A well-regulated environment will benefit both investors and crypto industry stakeholders. With tailormade rules for the crypto industry, Singapore has the clearest and most detailed licensing requirements of any jurisdiction in Asia. And now, Independent Reserve is one of the first fully licensed crypto exchanges available to Singaporeans, enabling them to quickly and securely use their SGD to get in and out of crypto.”

Przelozny urged Aussies to even take an example from Singapore’s “thorough approach to crypto industry licensing,” reminding that there are no custodian requirements for exchanges in Australia:

“We’d like to see, and have been pushing for, appropriate legislation to be introduced locally that will support the industry and provide protection for investors,” he concluded.

Adrian Przelozny
Adrian Przelozny, Source: Business Insider

DBS Vickers Also Got Approved

Like Independent Reserve, the MAS also allowed DBS Vickers (DBSV) – the brokerage arm of DBS Bank – to provide digital asset services inside Singapore’s borders.

Per an announcement, DBSV can now directly support asset managers and companies to trade in digital payment tokens through the DBS Digital Exchange (DDEx). Eng-Kwok Seat Moey – Head of Capital Markets at DBS and Chair of the DBS Digital Exchange – commented:

“Having received formal regulatory approval from the MAS, DBSV is now in a better position to support institutional and corporate investors in tapping into the growing potential of digital assets as an investment class. This marks another significant milestone in our ability to provide integrated solutions across the digital asset value chain, from deal origination to tokenization, listing, trading, and custody.”

DBS’s approval does not come as a surprise since the MAS greenlighted the move in August. Back then, Moey raised hopes that the initiative would be beneficial for the city-state:

“This will contribute to Singapore’s ambitions to be a digital asset hub in Asia.”

"CANCEL read_loop, Task exception was never retrieved"{‘e’: ‘error’, ‘m’: ‘Queue overflow. Message not filled’}

I’ve finally got my code working which does the following: first it gets the historical data from currently 297 symbols (only the last 20 data intervals). After it got the data it starts to fetch real time data for all these symbols. The code works good for a few seconds but then I get the::Listen

I’ve finally got my code working which does the following: first it gets the historical data from currently 297 symbols (only the last 20 data intervals). After it got the data it starts to fetch real time data for all these symbols. The code works good for a few seconds but then I get the error: "CANCEL read_loop, Task exception was never retrieved"{‘e’: ‘error’, ‘m’: ‘Queue overflow. Message not filled’}. I’ve done some searching on the web but there’s not much information about it. Does anyone know how to fix this error?

Code to get the live data:

class GetNewData:

    @staticmethod
    def refactor_old_result():
        historical_list_refactored = []
        for i in range(len(results)):
            single_key_data = results[i]
            single_key_data['high'] = list(single_key_data['high'].values())
            single_key_data['low'] = list(single_key_data['low'].values())
            single_key_data['close'] = list(single_key_data['close'].values())
            single_key_data['interval'] = list(single_key_data['interval'].values())
            single_key_data['symbol'] = list(single_key_data['symbol'].values())
            single_key_data['time'] = list(single_key_data['time'].values())
            single_key_data['volume'] = list(single_key_data['volume'].values())
            historical_list_refactored.append(single_key_data)
        return historical_list_refactored

    @staticmethod
    def start_websocket():
        twm = ThreadedWebsocketManager(api_key=api_key, api_secret=api_secret)
        twm.start()
        streams = stream_list
        twm.start_multiplex_socket(callback=GetNewData.add_new_data_values, streams=streams)
        twm.join()

    @staticmethod
    def add_new_data_values(msg):
        candle = msg['data']['k']
        symbol = candle['s']
        interval = candle['i']
        close_price = candle['c']
        highest_price = candle['h']
        lowest_price = candle['l']
        status = candle['x']
        time = candle['t']
        volume = candle['v']
        daytime = datetime.fromtimestamp(time / 1e3)
        GetNewData.refactor_msg(old_data_refactored, symbol, interval, close_price, highest_price, lowest_price, status, daytime, volume)


    @staticmethod
    def refactor_msg(historical_data_list, symbol, interval, close_price, highest_price, lowest_price, status, daytime, volume):
        if status:
            for dic in historical_data_list:
                if interval == dic['interval'][0] and symbol == dic['symbol'][0]:
                    # print(interval, symbol, close_price)
                    dic['high'].append(float(highest_price))
                    dic['low'].append(float(lowest_price))
                    dic['close'].append(float(close_price))
                    dic['interval'].append(interval)
                    dic['symbol'].append(symbol)
                    dic['time'].append(daytime)
                    dic['volume'].append(volume)
                    print(dic)

Entire code:

import asyncio
import pandas as pd
from binance import AsyncClient, Client, ThreadedWebsocketManager
from datetime import datetime
import time
pd.options.mode.chained_assignment = None  # default='warn'

pd.set_option('display.max_columns', None, 'display.max_rows', None)
api_key = '5Z5VtpcArDgm525AC6sUoy8TJer4tlel4Twt1ENf7OIzeLB2qx6oaLICHL9jVKoa'
api_secret = 'e641miivoiguEU3gOKjuYEvVKdFNazehtovZtJKdQXb2NVxwTeo1AQ2TBDArocWU'
client = Client(api_key, api_secret)

results = []
symbols = []
with open('DayTradeSymbols.txt') as f:
    for line in f:
        symbol = line.strip('n') + 'BTC'
        symbols.append(symbol)

stream_list = []
intervals = ['1m','3m']
for symbol in symbols:
    for interval in intervals:
        x = lambda symbol: symbol + '@kline_' + interval
        stream_list.append(x(symbol.lower()))
print(stream_list)

class GetHistoricalData:
    def __init__(self, num_workers: int = 10):
        self.num_workers: int = num_workers
        self.task_q: asyncio.Queue = asyncio.Queue(maxsize=10)

    async def get_symbols(self):
        # symbols = ['BTCUSDT', 'ETHUSDT', 'ADAUSDT','BNBUSDT', 'SOLUSDT', 'DOTUSDT','SHIBUSDT', 'DOGEUSDT', 'LTCUSDT', 'XLMUSDT', 'XMRUSDT', 'AVAXUSDT', 'LINKUSDT']
        for i in symbols:
            await self.task_q.put(i)

        for i in range(self.num_workers):
            await self.task_q.put(None)

    async def get_historical_klines(self, client: AsyncClient):
        while True:
            symbol = await self.task_q.get()
            if symbol is None:
                break
            data = await (client.get_historical_klines(symbol, '1m', '11/23/2021'))
            data_df = pd.DataFrame(data)
            data_df.columns = ['time', '1', 'high', 'low', 'close', 'volume', '6', '7', '8', '9', '10', '11']
            data_df.drop(columns=['1', '6', '7', '8', '9', '10', '11'], axis=1, inplace=True)
            data2_df = data_df.head(-1)
            new_df = data2_df.tail(22)
            new_df['interval'] = '1m'
            new_df['symbol'] = symbol
            new_df['time'] = pd.to_datetime(new_df['time'] / 1000, unit='s')
            results.append(new_df.to_dict())

    async def amain(self) -> None:
        client = await AsyncClient.create()
        await asyncio.gather(
            self.get_symbols(),
            *[self.get_historical_klines(client) for _ in range(self.num_workers)])
        await client.close_connection()


class GetNewData:

    @staticmethod
    def refactor_old_result():
        historical_list_refactored = []
        for i in range(len(results)):
            single_key_data = results[i]
            single_key_data['high'] = list(single_key_data['high'].values())
            single_key_data['low'] = list(single_key_data['low'].values())
            single_key_data['close'] = list(single_key_data['close'].values())
            single_key_data['interval'] = list(single_key_data['interval'].values())
            single_key_data['symbol'] = list(single_key_data['symbol'].values())
            single_key_data['time'] = list(single_key_data['time'].values())
            single_key_data['volume'] = list(single_key_data['volume'].values())
            historical_list_refactored.append(single_key_data)
        return historical_list_refactored

    @staticmethod
    def start_websocket():
        twm = ThreadedWebsocketManager(api_key=api_key, api_secret=api_secret)
        twm.start()
        streams = stream_list
        twm.start_multiplex_socket(callback=GetNewData.add_new_data_values, streams=streams)
        twm.join()

    @staticmethod
    def add_new_data_values(msg):
        candle = msg['data']['k']
        symbol = candle['s']
        interval = candle['i']
        close_price = candle['c']
        highest_price = candle['h']
        lowest_price = candle['l']
        status = candle['x']
        time = candle['t']
        volume = candle['v']
        daytime = datetime.fromtimestamp(time / 1e3)
        GetNewData.refactor_msg(old_data_refactored, symbol, interval, close_price, highest_price, lowest_price, status, daytime, volume)


    @staticmethod
    def refactor_msg(historical_data_list, symbol, interval, close_price, highest_price, lowest_price, status, daytime, volume):
        if status:
            for dic in historical_data_list:
                if interval == dic['interval'][0] and symbol == dic['symbol'][0]:
                    # print(interval, symbol, close_price)
                    dic['high'].append(float(highest_price))
                    dic['low'].append(float(lowest_price))
                    dic['close'].append(float(close_price))
                    dic['interval'].append(interval)
                    dic['symbol'].append(symbol)
                    dic['time'].append(daytime)
                    dic['volume'].append(volume)
                    print(dic)


if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(GetHistoricalData().amain())
    print(f'time = {time.time() - start_time}')
    old_data_refactored = GetNewData.refactor_old_result()
    GetNewData.start_websocket()

Singapore Greenlights Aussie Exchange and DBS Vickers to Provide Crypto Services

Shopping cart
There are no products in the cart!
Continue shopping
0