From 0ad0180f7eeffe75b913e3c7a5881d7aa89b1b84 Mon Sep 17 00:00:00 2001 From: grey-cat-1908 Date: Tue, 27 Jun 2023 16:07:40 +0300 Subject: [PATCH] BotiCord Websocket --- boticordpy/__init__.py | 1 + boticordpy/websocket.py | 124 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 boticordpy/websocket.py diff --git a/boticordpy/__init__.py b/boticordpy/__init__.py index 037ce87..422174b 100644 --- a/boticordpy/__init__.py +++ b/boticordpy/__init__.py @@ -14,3 +14,4 @@ __version__ = "3.0.0a" from .client import BoticordClient from .types import * +from .websocket import BotiCordWebsocket diff --git a/boticordpy/websocket.py b/boticordpy/websocket.py new file mode 100644 index 0000000..5692f41 --- /dev/null +++ b/boticordpy/websocket.py @@ -0,0 +1,124 @@ +# Copyright Marakarka (Viktor K) 2021 - Present +# Full MIT License can be found in `LICENSE.txt` at the project root. + +import logging +import json +import asyncio +import typing + +import aiohttp + +_logger = logging.getLogger("boticord.websocket") + + +class BotiCordWebsocket: + def __init__(self, token: str): + self.__session = None + self.loop = asyncio.get_event_loop() + self.ws = None + self._listeners = {} + self.not_closed = True + + self._token = token + + def listener(self): + """Decorator to set the listener.""" + + def inner(func): + if not asyncio.iscoroutinefunction(func): + raise TypeError(f"<{func.__qualname__}> must be a coroutine function") + self._listeners[func.__qualname__] = func + return func + + return inner + + def register_listener(self, notification_type: str, callback: typing.Any): + """Method to set the listener. + Args: + notify_type (:obj:`str`) + Type of notification (Check reference page) + callback (:obj:`function`) + Coroutine Callback Function + """ + if not asyncio.iscoroutinefunction(callback): + raise TypeError(f"<{callback.__qualname__}> must be a coroutine function") + + self._listeners[notification_type] = callback + return self + + async def connect(self) -> None: + """Connect to BotiCord.""" + try: + self.__session = aiohttp.ClientSession() + self.ws = await self.__session.ws_connect( + "wss://gateway.arbuz.pro/websocket/", + timeout=30.0, + ) + + _logger.info("Connected to BotiCord.") + + self.not_closed = True + + self.loop.create_task(self._receive()) + await self._send_identify() + except Exception as exc: + _logger.error("Connecting failed!") + + raise exc + + async def _send_identify(self) -> None: + await self.ws.send_json({"event": "auth", "data": {"token": self._token}}) + + async def _receive(self) -> None: + while self.not_closed: + async for msg in self.ws: + if msg.type == aiohttp.WSMsgType.TEXT: + await self._handle_data(msg.data) + else: + raise RuntimeError + + close_code = self.ws.close_code + + if close_code is not None: + await self._handle_close(close_code) + + async def _handle_data(self, data): + data = json.loads(data) + + if data["event"] == "hello": + _logger.info("Authorized successfully.") + self.loop.create_task(self._send_ping()) + elif data["event"] == "notify": + listener = self._listeners.get(data["data"]["type"]) + if listener: + self.loop.create_task(listener(data["data"])) + elif data["event"] == "pong": + _logger.info("Received pong-response.") + self.loop.create_task(self._send_ping()) + else: + _logger.error("An error has occurred.") + + async def _handle_close(self, code: int) -> None: + self.not_closed = False + await self.__session.close() + + if code == 4000: + _logger.info("Closed connection successfully.") + return + elif code == 1006: + _logger.error("Token is invalid.") + return + + _logger.info("Disconnected from BotiCord. Reconnecting...") + + await self.connect() + + async def _send_ping(self) -> None: + if not self.ws.closed: + await asyncio.sleep(45) + await self.ws.send_json({"event": "ping"}) + + async def close(self) -> None: + if self.ws: + self.not_closed = False + await self.ws.close(code=4000)