add on_channel_create, on_channel_delete and on_channel_update listeners

This commit is contained in:
grey-cat-1908 2022-03-28 20:38:20 +03:00
parent ff0d30664e
commit 8e6cbd711c
3 changed files with 91 additions and 0 deletions

View file

@ -0,0 +1,30 @@
# Copyright MelisaDev 2022 - Present
# Full MIT License can be found in `LICENSE.txt` at the project root.
from __future__ import annotations
import asyncio
from ..utils.types import Coro
from ..models.guild import Channel, ChannelType, channel_types_for_converting
async def channel_create_listener(self, gateway, payload: dict):
gateway.session_id = payload.get("session_id")
payload.update({"type": ChannelType(payload.pop("type"))})
channel_cls = channel_types_for_converting.get(payload["type"], Channel)
channel = channel_cls.from_dict(payload)
custom_listener = self._events.get("on_channel_create")
if custom_listener is not None:
asyncio.ensure_future(custom_listener(channel))
return
def export() -> Coro:
return channel_create_listener

View file

@ -0,0 +1,30 @@
# Copyright MelisaDev 2022 - Present
# Full MIT License can be found in `LICENSE.txt` at the project root.
from __future__ import annotations
import asyncio
from ..utils.types import Coro
from ..models.guild import Channel, ChannelType, channel_types_for_converting
async def channel_delete_listener(self, gateway, payload: dict):
gateway.session_id = payload.get("session_id")
payload.update({"type": ChannelType(payload.pop("type"))})
channel_cls = channel_types_for_converting.get(payload["type"], Channel)
channel = channel_cls.from_dict(payload)
custom_listener = self._events.get("on_channel_delete")
if custom_listener is not None:
asyncio.ensure_future(custom_listener(channel))
return
def export() -> Coro:
return channel_delete_listener

View file

@ -0,0 +1,31 @@
# Copyright MelisaDev 2022 - Present
# Full MIT License can be found in `LICENSE.txt` at the project root.
from __future__ import annotations
import asyncio
from ..utils.types import Coro
from ..models.guild import Channel, ChannelType, channel_types_for_converting
async def channel_update_listener(self, gateway, payload: dict):
# ToDo: Replace None to the old channel object (so it requires cache manager)
gateway.session_id = payload.get("session_id")
payload.update({"type": ChannelType(payload.pop("type"))})
channel_cls = channel_types_for_converting.get(payload["type"], Channel)
channel = channel_cls.from_dict(payload)
custom_listener = self._events.get("on_channel_update")
if custom_listener is not None:
asyncio.ensure_future(custom_listener(None, channel))
return
def export() -> Coro:
return channel_update_listener