This commit is contained in:
grey-cat-1908 2022-03-27 15:57:14 +03:00
commit b7033e4976
8 changed files with 324 additions and 31 deletions

View file

@ -2,7 +2,7 @@
# Full MIT License can be found in `LICENSE.txt` at the project root.
from .client import *
from .models import Intents
from .models import Intents, Snowflake
from .exceptions import *
__package__ = "melisa"

View file

@ -8,9 +8,10 @@ from .utils.types import Coro
from .core.http import HTTPClient
from .core.gateway import GatewayBotInfo
from .models.guild.channel import Channel, ChannelType, channel_types_for_converting
import asyncio
from typing import Dict, List, Union
from typing import Dict, List, Union, Any
class Client:
@ -163,3 +164,26 @@ class Client:
data = await self.http.get(f"guilds/{guild_id}")
return Guild.from_dict(data)
async def fetch_channel(
self, channel_id: Union[Snowflake, str, int]
) -> Union[Channel, Any]:
"""
Fetch Channel from the Discord API (by id).
If type of channel is unknown:
it will return just :class:`melisa.models.guild.channel.Channel` object.
Parameters
----------
channel_id : :class:`Union[Snowflake, str, int]`
Id of channel to fetch
"""
# ToDo: Update cache if CHANNEL_CACHE enabled.
data = (await self.http.get(f"channels/{channel_id}")) or {}
data.update({"type": ChannelType(data.pop("type"))})
channel_cls = channel_types_for_converting.get(data["type"], Channel)
return channel_cls.from_dict(data)

View file

@ -4,7 +4,7 @@
from __future__ import annotations
import asyncio
from typing import Dict, Optional
from typing import Dict, Optional, Any
from aiohttp import ClientSession, ClientResponse
@ -20,6 +20,7 @@ from melisa.exceptions import (
RateLimitError,
)
from .ratelimiter import RateLimiter
from ..utils import remove_none
class HTTPClient:
@ -59,7 +60,14 @@ class HTTPClient:
await self.__aiohttp_session.close()
async def __send(
self, method: str, endpoint: str, *, _ttl: int = None, **kwargs
self,
method: str,
endpoint: str,
*,
_ttl: int = None,
headers: Optional[Dict[str, Any]] = None,
params: Optional[Dict] = None,
**kwargs,
) -> Optional[Dict]:
"""Send an API request to the Discord API."""
@ -72,7 +80,16 @@ class HTTPClient:
url = f"{self.url}/{endpoint}"
async with self.__aiohttp_session.request(method, url, **kwargs) as response:
async with self.__aiohttp_session.request(
method,
url,
params=remove_none(params),
headers={
"Content-Type": "application/json",
**(remove_none(headers) or {}),
},
**kwargs,
) as response:
return await self.__handle_response(
response, method, endpoint, _ttl=ttl, **kwargs
)
@ -91,6 +108,9 @@ class HTTPClient:
self.__rate_limiter.save_response_bucket(endpoint, method, res.headers)
if res.ok:
if res.status == 204:
return
return await res.json()
exception = self.__http_exceptions.get(res.status)
@ -111,7 +131,7 @@ class HTTPClient:
return await self.__send(method, endpoint, _ttl=_ttl - 1, **kwargs)
async def get(self, route: str, params: Optional[Dict] = None) -> Optional[Dict]:
async def get(self, route: str, *, params: Optional[Dict] = None) -> Optional[Dict]:
"""|coro|
Sends a GET request to a Discord REST API endpoint.
@ -129,7 +149,9 @@ class HTTPClient:
"""
return await self.__send("GET", route, params=params)
async def post(self, route: str, data: Optional[Dict] = None) -> Optional[Dict]:
async def post(
self, route: str, *, headers: dict = None, data: Optional[Dict] = None
) -> Optional[Dict]:
"""|coro|
Sends a POST request to a Discord REST API endpoint.
@ -145,13 +167,9 @@ class HTTPClient:
Optional[:class:`Dict`]
JSON response from the Discord API.
"""
return await self.__send(
"POST",
route,
json=data,
)
return await self.__send("POST", route, json=data, headers=headers)
async def delete(self, route: str, headers: dict = None) -> Optional[Dict]:
async def delete(self, route: str, *, headers: dict = None) -> Optional[Dict]:
"""|coro|
Sends a DELETE request to a Discord REST API endpoint.

View file

@ -57,9 +57,9 @@ class Shard:
"""
create_task(self._gateway.close())
async def update_presence(self,
activity: BotActivity = None,
status: str = None) -> Shard:
async def update_presence(
self, activity: BotActivity = None, status: str = None
) -> Shard:
"""
|coro|

View file

@ -3,9 +3,10 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from enum import IntEnum, Enum
from typing import List, Any
from enum import IntEnum
from typing import List, Any, Optional, AsyncIterator, Union, Dict
from ...utils import Snowflake
from ...utils import APIModelBase
@ -109,3 +110,238 @@ class Channel(APIModelBase):
member: APINullable[List[Any]] = None
default_auto_archive_duration: APINullable[int] = None
permissions: APINullable[str] = None
@property
def mention(self):
return f"<#{self.id}>"
class TextChannel(Channel):
"""A subclass of ``Channel`` representing text channels with all the same attributes."""
async def history(
self,
limit: int = 50,
*,
before: Optional[Union[int, str, Snowflake]] = None,
after: Optional[Union[int, str, Snowflake]] = None,
around: Optional[Union[int, str, Snowflake]] = None,
) -> AsyncIterator[Dict[str, Any]]:
"""|coro|
Returns a list of messages in this channel.
Examples
---------
Flattening messages into a list: ::
messages = [message async for message in channel.history(limit=111)]
All parameters are optional.
Parameters
----------
limit : Optional[Union[:class:`int`, :class:`str`, :class:`~.melisa.Snowflake`]]
Max number of messages to return (1-100).
around : Optional[Union[:class:`int`, :class:`str`, :class:`~.melisa.Snowflake`]]
Get messages around this message ID.
before : Optional[Union[:class:`int`, :class:`str`, :class:`~.melisa.Snowflake`]]
Get messages before this message ID.
after : Optional[Union[:class:`int`, :class:`str`, :class:`~.melisa.Snowflake`]]
Get messages after this message ID.
Raises
-------
NotFoundError
If channel is not found.
ForbiddenError
You do not have proper permissions to do the actions required.
Returns
-------
AsyncIterator[Dict[:class:`str`, Any]]
An iterator of messages.
"""
# ToDo: Add check parameter
if limit is None:
limit = 100
while limit > 0:
search_limit = min(limit, 100)
raw_messages = await self._http.get(
f"/channels/{self.id}/messages",
params={
"limit": search_limit,
"before": before,
"after": after,
"around": around,
},
)
if not raw_messages:
break
for message_data in raw_messages:
yield message_data
before = raw_messages[-1]["id"]
limit -= search_limit
async def fetch_message(
self,
message_id: Optional[Snowflake, int, str],
) -> Dict[str, Any]:
"""|coro|
Returns a specific message in the channel.
Parameters
----------
message_id : Optional[Union[:class:`int`, :class:`str`, :class:`~.melisa.Snowflake`]]
Id of message to fetch.
Raises
-------
NotFoundError
If message is not found.
ForbiddenError
You do not have proper permissions to do the actions required.
Returns
-------
Dict[:class:`str`, Any]
Message object.
"""
message = await self._http.get(
f"/channels/{self.id}/messages/{message_id}",
)
return message
async def bulk_delete_messages(
self, messages: List[Snowflake], *, reason: Optional[str] = None
):
"""|coro|
Delete multiple messages in a single request.
This method will not delete messages older than 2 weeks.
Parameters
----------
messages: List[:class:`~.melisa.Snowflake`]
The list of message IDs to delete (2-100).
reason: Optional[:class:`str`]
The reason of the bulk delete messages operation.
Raises
-------
BadRequestError
if any message provided is older than that or if any duplicate message IDs are provided.
ForbiddenError
You do not have proper permissions to do the actions required.
(You must have `MANAGE_MESSAGES` permission)
"""
await self._http.post(
f"channels/{self.id}/messages/bulk-delete",
headers={"X-Audit-Log-Reason": reason},
data={"messages": messages},
)
async def delete_message(
self, message_id: Optional[Snowflake, str, int], *, reason: Optional[str] = None
):
"""|coro|
Deletes only one specified message.
Parameters
----------
message_id: Union[:class:`int`, :class:`str`, :class:`~.melisa.Snowflake`]
Id of message to delete.
reason: Optional[:class:`str`]
The reason of the message delete operation.
Raises
-------
BadRequestError
Something is wrong with request, maybe specified parameters.
NotFoundError
If message is not found.
ForbiddenError
You do not have proper permissions to do the actions required.
(You must have `MANAGE_MESSAGES` permission)
"""
await self._http.delete(
f"channels/{self.id}/messages/{message_id}",
headers={"X-Audit-Log-Reason": reason},
)
async def purge(
self,
limit: int = 50,
*,
before: Optional[Union[int, str, Snowflake]] = None,
after: Optional[Union[int, str, Snowflake]] = None,
around: Optional[Union[int, str, Snowflake]] = None,
reason: Optional[str] = None,
):
"""|coro|
Purges a list of messages that meet the criteria specified in parameters.
This method will not delete messages older than 2 weeks.
Parameters
----------
limit : Optional[Union[:class:`int`, :class:`str`, :class:`~.melisa.Snowflake`]]
Max number of messages to purge.
around : Optional[Union[:class:`int`, :class:`str`, :class:`~.melisa.Snowflake`]]
Get messages around this message ID.
before : Optional[Union[:class:`int`, :class:`str`, :class:`~.melisa.Snowflake`]]
Get messages before this message ID.
after : Optional[Union[:class:`int`, :class:`str`, :class:`~.melisa.Snowflake`]]
Get messages after this message ID.
reason: Optional[:class:`str`]
The reason of the channel purge operation.
Raises
-------
BadRequestError
if any message provided is older than that or if any duplicate message IDs are provided.
ForbiddenError
You do not have proper permissions to do the actions required.
(You must have `MANAGE_MESSAGES` permission)
"""
iterator = self.history(
limit,
around=around,
before=before,
after=after,
)
message_ids = []
count = 0
async for message in iterator:
message_ids.append(message["id"])
count += 1
if count == 100:
print("abobatelecom")
await self.bulk_delete_messages(message_ids, reason=reason)
message_ids = []
count = 0
await asyncio.sleep(1)
await asyncio.sleep(1)
if count > 1:
await self.bulk_delete_messages(message_ids, reason=reason)
return
if count == 0:
await self.delete_message(message_ids[0], reason=reason)
return
# noinspection PyTypeChecker
channel_types_for_converting: Dict[ChannelType, Channel] = {
ChannelType.GUILD_TEXT: TextChannel
}

View file

@ -7,5 +7,6 @@ from .snowflake import Snowflake
from .api_model import APIModelBase
from .conversion import remove_none
__all__ = ("Coro", "Snowflake", "APIModelBase")
__all__ = ("Coro", "Snowflake", "APIModelBase", "remove_none")

View file

@ -18,12 +18,12 @@ from typing import (
T = TypeVar("T")
def _to_dict_without_none(model):
def to_dict_without_none(model):
if _is_dataclass_instance(model):
result = []
for field in fields(model):
value = _to_dict_without_none(getattr(model, field.name))
value = to_dict_without_none(getattr(model, field.name))
if isinstance(value, Enum):
result.append((field.name, value.value))
@ -32,19 +32,18 @@ def _to_dict_without_none(model):
return dict(result)
elif isinstance(model, tuple) and hasattr(model, "_fields"):
return type(model)(*[_to_dict_without_none(v) for v in model])
if isinstance(model, tuple) and hasattr(model, "_fields"):
return type(model)(*[to_dict_without_none(v) for v in model])
elif isinstance(model, (list, tuple)):
return type(model)(_to_dict_without_none(v) for v in model)
if isinstance(model, (list, tuple)):
return type(model)(to_dict_without_none(v) for v in model)
elif isinstance(model, dict):
if isinstance(model, dict):
return type(model)(
(_to_dict_without_none(k), _to_dict_without_none(v))
for k, v in model.items()
(to_dict_without_none(k), to_dict_without_none(v)) for k, v in model.items()
)
else:
return copy.deepcopy(model)
return copy.deepcopy(model)
class APIModelBase:
@ -90,4 +89,4 @@ class APIModelBase:
)
def to_dict(self) -> Dict:
return _to_dict_without_none(self)
return to_dict_without_none(self)

View file

@ -0,0 +1,15 @@
# Copyright MelisaDev 2022 - Present
# Full MIT License can be found in `LICENSE.txt` at the project root.
from __future__ import annotations
def remove_none(obj):
if isinstance(obj, list):
return [i for i in obj if i is not None]
elif isinstance(obj, tuple):
return tuple(i for i in obj if i is not None)
elif isinstance(obj, set):
return obj - {None}
elif isinstance(obj, dict):
return {k: v for k, v in obj.items() if None not in (k, v)}