BotiCord Websocket

This commit is contained in:
grey-cat-1908 2023-06-27 16:07:40 +03:00
parent 429d2b7b40
commit 0ad0180f7e
2 changed files with 125 additions and 0 deletions

View file

@ -14,3 +14,4 @@ __version__ = "3.0.0a"
from .client import BoticordClient from .client import BoticordClient
from .types import * from .types import *
from .websocket import BotiCordWebsocket

124
boticordpy/websocket.py Normal file
View file

@ -0,0 +1,124 @@
# Copyright Marakarka (Viktor K) 2021 - Present
# Full MIT License can be found in `LICENSE.txt` at the project root.
import logging
import json
import asyncio
import typing
import aiohttp
_logger = logging.getLogger("boticord.websocket")
class BotiCordWebsocket:
def __init__(self, token: str):
self.__session = None
self.loop = asyncio.get_event_loop()
self.ws = None
self._listeners = {}
self.not_closed = True
self._token = token
def listener(self):
"""Decorator to set the listener."""
def inner(func):
if not asyncio.iscoroutinefunction(func):
raise TypeError(f"<{func.__qualname__}> must be a coroutine function")
self._listeners[func.__qualname__] = func
return func
return inner
def register_listener(self, notification_type: str, callback: typing.Any):
"""Method to set the listener.
Args:
notify_type (:obj:`str`)
Type of notification (Check reference page)
callback (:obj:`function`)
Coroutine Callback Function
"""
if not asyncio.iscoroutinefunction(callback):
raise TypeError(f"<{callback.__qualname__}> must be a coroutine function")
self._listeners[notification_type] = callback
return self
async def connect(self) -> None:
"""Connect to BotiCord."""
try:
self.__session = aiohttp.ClientSession()
self.ws = await self.__session.ws_connect(
"wss://gateway.arbuz.pro/websocket/",
timeout=30.0,
)
_logger.info("Connected to BotiCord.")
self.not_closed = True
self.loop.create_task(self._receive())
await self._send_identify()
except Exception as exc:
_logger.error("Connecting failed!")
raise exc
async def _send_identify(self) -> None:
await self.ws.send_json({"event": "auth", "data": {"token": self._token}})
async def _receive(self) -> None:
while self.not_closed:
async for msg in self.ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_data(msg.data)
else:
raise RuntimeError
close_code = self.ws.close_code
if close_code is not None:
await self._handle_close(close_code)
async def _handle_data(self, data):
data = json.loads(data)
if data["event"] == "hello":
_logger.info("Authorized successfully.")
self.loop.create_task(self._send_ping())
elif data["event"] == "notify":
listener = self._listeners.get(data["data"]["type"])
if listener:
self.loop.create_task(listener(data["data"]))
elif data["event"] == "pong":
_logger.info("Received pong-response.")
self.loop.create_task(self._send_ping())
else:
_logger.error("An error has occurred.")
async def _handle_close(self, code: int) -> None:
self.not_closed = False
await self.__session.close()
if code == 4000:
_logger.info("Closed connection successfully.")
return
elif code == 1006:
_logger.error("Token is invalid.")
return
_logger.info("Disconnected from BotiCord. Reconnecting...")
await self.connect()
async def _send_ping(self) -> None:
if not self.ws.closed:
await asyncio.sleep(45)
await self.ws.send_json({"event": "ping"})
async def close(self) -> None:
if self.ws:
self.not_closed = False
await self.ws.close(code=4000)