mirror of
https://github.com/MelisaDev/melisa.git
synced 2024-11-11 19:07:28 +03:00
some models parsing rework, timestamps, archive threads method
This commit is contained in:
parent
0fd1be8aab
commit
8194408998
9 changed files with 355 additions and 52 deletions
|
@ -44,8 +44,6 @@ class Gateway:
|
||||||
HEARTBEAT_ACK = 11
|
HEARTBEAT_ACK = 11
|
||||||
|
|
||||||
def __init__(self, client, shard_id: int = 0, num_shards: int = 1, **kwargs):
|
def __init__(self, client, shard_id: int = 0, num_shards: int = 1, **kwargs):
|
||||||
|
|
||||||
self.GATEWAY_VERSION = "9"
|
|
||||||
self.interval = None
|
self.interval = None
|
||||||
self.intents = client.intents
|
self.intents = client.intents
|
||||||
self.sequence = None
|
self.sequence = None
|
||||||
|
@ -93,7 +91,7 @@ class Gateway:
|
||||||
|
|
||||||
async def connect(self) -> None:
|
async def connect(self) -> None:
|
||||||
self.ws = await self.__session.ws_connect(
|
self.ws = await self.__session.ws_connect(
|
||||||
f"wss://gateway.discord.gg/?v={self.GATEWAY_VERSION}&encoding=json&compress=zlib-stream"
|
f"wss://gateway.discord.gg/?v=10&encoding=json&compress=zlib-stream"
|
||||||
)
|
)
|
||||||
_logger.debug("(Shard %s) Starting...", self.shard_id)
|
_logger.debug("(Shard %s) Starting...", self.shard_id)
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ _logger = logging.getLogger("melisa.http")
|
||||||
|
|
||||||
|
|
||||||
class HTTPClient:
|
class HTTPClient:
|
||||||
API_VERSION = 9
|
API_VERSION = 10
|
||||||
|
|
||||||
def __init__(self, token: str, *, ttl: int = 5):
|
def __init__(self, token: str, *, ttl: int = 5):
|
||||||
self.url: str = f"https://discord.com/api/v{self.API_VERSION}"
|
self.url: str = f"https://discord.com/api/v{self.API_VERSION}"
|
||||||
|
|
|
@ -19,7 +19,7 @@ async def guild_create_listener(self, gateway, payload: dict):
|
||||||
if self.guilds.get(guild.id, "empty") != "empty":
|
if self.guilds.get(guild.id, "empty") != "empty":
|
||||||
guild_was_cached_as_none = True
|
guild_was_cached_as_none = True
|
||||||
|
|
||||||
self.guilds[guild.id] = guild
|
self.guilds[str(guild.id)] = guild
|
||||||
|
|
||||||
custom_listener = self._events.get("on_guild_create")
|
custom_listener = self._events.get("on_guild_create")
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ from dataclasses import dataclass
|
||||||
from enum import IntEnum
|
from enum import IntEnum
|
||||||
from typing import List, Any, Optional, AsyncIterator, Union, Dict, overload
|
from typing import List, Any, Optional, AsyncIterator, Union, Dict, overload
|
||||||
|
|
||||||
from ...utils import Snowflake
|
from ...utils import Snowflake, Timestamp
|
||||||
from ...utils import APIModelBase
|
from ...utils import APIModelBase
|
||||||
from ...utils.types import APINullable
|
from ...utils.types import APINullable
|
||||||
|
|
||||||
|
@ -151,7 +151,7 @@ class Channel(APIModelBase):
|
||||||
type: APINullable[int] = None
|
type: APINullable[int] = None
|
||||||
guild_id: APINullable[Snowflake] = None
|
guild_id: APINullable[Snowflake] = None
|
||||||
position: APINullable[int] = None
|
position: APINullable[int] = None
|
||||||
permission_overwrites: APINullable[List[Any]] = None
|
permission_overwrites: APINullable[List] = None
|
||||||
name: APINullable[str] = None
|
name: APINullable[str] = None
|
||||||
topic: APINullable[str] = None
|
topic: APINullable[str] = None
|
||||||
nsfw: APINullable[bool] = None
|
nsfw: APINullable[bool] = None
|
||||||
|
@ -159,7 +159,7 @@ class Channel(APIModelBase):
|
||||||
bitrate: APINullable[int] = None
|
bitrate: APINullable[int] = None
|
||||||
user_limit: APINullable[int] = None
|
user_limit: APINullable[int] = None
|
||||||
rate_limit_per_user: APINullable[int] = None
|
rate_limit_per_user: APINullable[int] = None
|
||||||
recipients: APINullable[List[Any]] = None
|
recipients: APINullable[List] = None
|
||||||
icon: APINullable[str] = None
|
icon: APINullable[str] = None
|
||||||
owner_id: APINullable[Snowflake] = None
|
owner_id: APINullable[Snowflake] = None
|
||||||
application_id: APINullable[Snowflake] = None
|
application_id: APINullable[Snowflake] = None
|
||||||
|
@ -169,8 +169,8 @@ class Channel(APIModelBase):
|
||||||
video_quality_mode: APINullable[int] = None
|
video_quality_mode: APINullable[int] = None
|
||||||
message_count: APINullable[int] = None
|
message_count: APINullable[int] = None
|
||||||
member_count: APINullable[int] = None
|
member_count: APINullable[int] = None
|
||||||
thread_metadata: APINullable[List[Any]] = None
|
thread_metadata: APINullable[List] = None
|
||||||
member: APINullable[List[Any]] = None
|
member: APINullable[List] = None
|
||||||
default_auto_archive_duration: APINullable[int] = None
|
default_auto_archive_duration: APINullable[int] = None
|
||||||
permissions: APINullable[str] = None
|
permissions: APINullable[str] = None
|
||||||
|
|
||||||
|
@ -242,9 +242,9 @@ class MessageableChannel(Channel):
|
||||||
async def start_thread_without_message(
|
async def start_thread_without_message(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
name: Optional[str] = None,
|
name: str,
|
||||||
|
type: ChannelType,
|
||||||
auto_archive_duration: Optional[int] = None,
|
auto_archive_duration: Optional[int] = None,
|
||||||
type: Optional[ChannelType] = None,
|
|
||||||
invitable: Optional[bool] = None,
|
invitable: Optional[bool] = None,
|
||||||
rate_limit_per_user: Optional[int] = None,
|
rate_limit_per_user: Optional[int] = None,
|
||||||
reason: Optional[str] = None,
|
reason: Optional[str] = None,
|
||||||
|
@ -279,6 +279,13 @@ class MessageableChannel(Channel):
|
||||||
reason: Optional[:class:`str`]
|
reason: Optional[:class:`str`]
|
||||||
The reason of the thread creation.
|
The reason of the thread creation.
|
||||||
|
|
||||||
|
Raises
|
||||||
|
-------
|
||||||
|
ForbiddenError
|
||||||
|
You do not have proper permissions to do the actions required.
|
||||||
|
HTTPException
|
||||||
|
The request to perform the action failed with other http exception.
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
-------
|
-------
|
||||||
Union[:class:`~melisa.models.guild.channel.PublicThread`,
|
Union[:class:`~melisa.models.guild.channel.PublicThread`,
|
||||||
|
@ -298,10 +305,7 @@ class MessageableChannel(Channel):
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
data.update({"type": ChannelType(data.pop("type"))})
|
return Thread.from_dict(data)
|
||||||
|
|
||||||
channel_cls = channel_types_for_converting.get(data["type"], Channel)
|
|
||||||
return channel_cls.from_dict(data)
|
|
||||||
|
|
||||||
async def history(
|
async def history(
|
||||||
self,
|
self,
|
||||||
|
@ -335,8 +339,8 @@ class MessageableChannel(Channel):
|
||||||
|
|
||||||
Raises
|
Raises
|
||||||
-------
|
-------
|
||||||
NotFoundError
|
HTTPException
|
||||||
If channel is not found.
|
The request to perform the action failed with other http exception.
|
||||||
ForbiddenError
|
ForbiddenError
|
||||||
You do not have proper permissions to do the actions required.
|
You do not have proper permissions to do the actions required.
|
||||||
|
|
||||||
|
@ -388,8 +392,8 @@ class MessageableChannel(Channel):
|
||||||
|
|
||||||
Raises
|
Raises
|
||||||
-------
|
-------
|
||||||
NotFoundError
|
HTTPException
|
||||||
If message is not found.
|
The request to perform the action failed with other http exception.
|
||||||
ForbiddenError
|
ForbiddenError
|
||||||
You do not have proper permissions to do the actions required.
|
You do not have proper permissions to do the actions required.
|
||||||
|
|
||||||
|
@ -422,8 +426,8 @@ class MessageableChannel(Channel):
|
||||||
|
|
||||||
Raises
|
Raises
|
||||||
-------
|
-------
|
||||||
BadRequestError
|
HTTPException
|
||||||
if any message provided is older than that or if any duplicate message IDs are provided.
|
The request to perform the action failed with other http exception.
|
||||||
ForbiddenError
|
ForbiddenError
|
||||||
You do not have proper permissions to do the actions required.
|
You do not have proper permissions to do the actions required.
|
||||||
(You must have **MANAGE_MESSAGES** permission)
|
(You must have **MANAGE_MESSAGES** permission)
|
||||||
|
@ -450,10 +454,8 @@ class MessageableChannel(Channel):
|
||||||
|
|
||||||
Raises
|
Raises
|
||||||
-------
|
-------
|
||||||
BadRequestError
|
HTTPException
|
||||||
Something is wrong with request, maybe specified parameters.
|
The request to perform the action failed with other http exception.
|
||||||
NotFoundError
|
|
||||||
If message is not found.
|
|
||||||
ForbiddenError
|
ForbiddenError
|
||||||
You do not have proper permissions to do the actions required.
|
You do not have proper permissions to do the actions required.
|
||||||
(You must have **MANAGE_MESSAGES** permission)
|
(You must have **MANAGE_MESSAGES** permission)
|
||||||
|
@ -492,8 +494,8 @@ class MessageableChannel(Channel):
|
||||||
|
|
||||||
Raises
|
Raises
|
||||||
-------
|
-------
|
||||||
BadRequestError
|
HTTPException
|
||||||
if any message provided is older than that or if any duplicate message IDs are provided.
|
The request to perform the action failed with other http exception.
|
||||||
ForbiddenError
|
ForbiddenError
|
||||||
You do not have proper permissions to do the actions required.
|
You do not have proper permissions to do the actions required.
|
||||||
(You must have **MANAGE_MESSAGES** permission)
|
(You must have **MANAGE_MESSAGES** permission)
|
||||||
|
@ -528,6 +530,61 @@ class MessageableChannel(Channel):
|
||||||
await self.delete_message(message_ids[0], reason=reason)
|
await self.delete_message(message_ids[0], reason=reason)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
async def archived_threads(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
private: bool = False,
|
||||||
|
joined: bool = False,
|
||||||
|
before: Optional[Union[Snowflake, Timestamp]] = None,
|
||||||
|
limit: Optional[int] = 50
|
||||||
|
) -> ThreadsList:
|
||||||
|
"""|coro|
|
||||||
|
|
||||||
|
Returns archived threads in the channel.
|
||||||
|
|
||||||
|
Requires the ``READ_MESSAGE_HISTORY`` permission.
|
||||||
|
If iterating over private threads then ``MANAGE_THREADS`` permission is also required.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
before: Optional[Union[:class:`~melisa.utils.Snowflake`, :class:`~melisa.utils.Timestamp`]]
|
||||||
|
Retrieve archived channels before the given date or ID.
|
||||||
|
limit: Optional[:class:`int`]
|
||||||
|
The number of threads to retrieve. If None, retrieves every archived thread in the channel.
|
||||||
|
Note, however, that this would make it a slow operation
|
||||||
|
private: :class:`bool`
|
||||||
|
Whether to retrieve private archived threads.
|
||||||
|
joined: :class:`bool`
|
||||||
|
Whether to retrieve private archived threads that you’ve joined.
|
||||||
|
You cannot set ``joined`` to ``True`` and ``private`` to ``False``.
|
||||||
|
|
||||||
|
Raises
|
||||||
|
-------
|
||||||
|
HTTPException
|
||||||
|
The request to perform the action failed with other http exception.
|
||||||
|
ForbiddenError
|
||||||
|
You do not have permissions to get archived threads.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
:class:`~melisa.models.channel.ThreadsList`
|
||||||
|
The threads list object.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if joined:
|
||||||
|
url = f"/channels/{self.id}/users/@me/threads/archived/private"
|
||||||
|
elif private:
|
||||||
|
url = f"/channels/{self.id}/threads/archived/private"
|
||||||
|
else:
|
||||||
|
url = f"/channels/{self.id}/threads/archived/public"
|
||||||
|
|
||||||
|
return ThreadsList.from_dict(
|
||||||
|
await self._http.get(
|
||||||
|
url,
|
||||||
|
params={"before": before, "limit": limit},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TextChannel(MessageableChannel):
|
class TextChannel(MessageableChannel):
|
||||||
"""A subclass of ``Channel`` representing text channels with all the same attributes."""
|
"""A subclass of ``Channel`` representing text channels with all the same attributes."""
|
||||||
|
@ -573,12 +630,23 @@ class Thread(MessageableChannel):
|
||||||
"""A subclass of ``Channel`` for threads with all the same attributes."""
|
"""A subclass of ``Channel`` for threads with all the same attributes."""
|
||||||
|
|
||||||
|
|
||||||
class PublicThread(Thread):
|
@dataclass(repr=False)
|
||||||
"""A subclass of ``Thread`` for public threads with all the same attributes."""
|
class ThreadsList(APIModelBase):
|
||||||
|
"""A class representing a list of channel threads from the Discord API.
|
||||||
|
|
||||||
|
Attributes
|
||||||
|
----------
|
||||||
|
threads: List[:class:`~melisa.models.guild.channel.Thread`]
|
||||||
|
Async iterator of threads. To get their type use them `.type` attribute.
|
||||||
|
members: List[:class:`Any`]
|
||||||
|
Async iterator of thread members.
|
||||||
|
has_more: Optional[:class:`bool`]
|
||||||
|
Whether there are potentially additional threads that could be returned on a subsequent cal
|
||||||
|
"""
|
||||||
|
|
||||||
class PrivateThread(Thread):
|
threads: List[Thread]
|
||||||
"""A subclass of ``Thread`` for private threads with all the same attributes."""
|
members: List
|
||||||
|
has_more: Optional[bool]
|
||||||
|
|
||||||
|
|
||||||
# noinspection PyTypeChecker
|
# noinspection PyTypeChecker
|
||||||
|
|
|
@ -8,7 +8,7 @@ from enum import IntEnum, Enum
|
||||||
from typing import List, Any, Optional, overload
|
from typing import List, Any, Optional, overload
|
||||||
|
|
||||||
from .channel import Channel, ChannelType, channel_types_for_converting
|
from .channel import Channel, ChannelType, channel_types_for_converting
|
||||||
from ...utils import Snowflake
|
from ...utils import Snowflake, Timestamp
|
||||||
from ...utils import APIModelBase
|
from ...utils import APIModelBase
|
||||||
from ...utils.types import APINullable
|
from ...utils.types import APINullable
|
||||||
|
|
||||||
|
@ -243,6 +243,7 @@ class GuildFeatures(Enum):
|
||||||
VERIFIED = "VERIFIED"
|
VERIFIED = "VERIFIED"
|
||||||
VIP_REGIONS = "VIP_REGIONS"
|
VIP_REGIONS = "VIP_REGIONS"
|
||||||
WELCOME_SCREEN_ENABLED = "WELCOME_SCREEN_ENABLED"
|
WELCOME_SCREEN_ENABLED = "WELCOME_SCREEN_ENABLED"
|
||||||
|
EXPOSED_TO_ACTIVITIES_WTP_EXPERIMENT = "EXPOSED_TO_ACTIVITIES_WTP_EXPERIMENT"
|
||||||
|
|
||||||
|
|
||||||
@dataclass(repr=False)
|
@dataclass(repr=False)
|
||||||
|
@ -303,7 +304,7 @@ class Guild(APIModelBase):
|
||||||
System channel flags
|
System channel flags
|
||||||
rules_channel_id: APINullable[:class:`~melisa.utils.types.Snowflake`]
|
rules_channel_id: APINullable[:class:`~melisa.utils.types.Snowflake`]
|
||||||
The id of the channel where Community guilds can display rules and/or guidelines
|
The id of the channel where Community guilds can display rules and/or guidelines
|
||||||
joined_at: APINullable[:class:`int`]
|
joined_at: APINullable[:class:`~melisa.utils.Timestamp`]
|
||||||
When this guild was joined at
|
When this guild was joined at
|
||||||
large: APINullable[:class:`bool`]
|
large: APINullable[:class:`bool`]
|
||||||
True if this is considered a large guild
|
True if this is considered a large guild
|
||||||
|
@ -386,8 +387,8 @@ class Guild(APIModelBase):
|
||||||
default_message_notifications: APINullable[int] = None
|
default_message_notifications: APINullable[int] = None
|
||||||
explicit_content_filter: APINullable[int] = None
|
explicit_content_filter: APINullable[int] = None
|
||||||
features: APINullable[List[GuildFeatures]] = None
|
features: APINullable[List[GuildFeatures]] = None
|
||||||
roles: APINullable[List[Any]] = None
|
roles: APINullable[List] = None
|
||||||
emojis: APINullable[List[Any]] = None
|
emojis: APINullable[List] = None
|
||||||
# TODO: Make a structures of emoji and role
|
# TODO: Make a structures of emoji and role
|
||||||
|
|
||||||
mfa_level: APINullable[int] = None
|
mfa_level: APINullable[int] = None
|
||||||
|
@ -395,16 +396,16 @@ class Guild(APIModelBase):
|
||||||
system_channel_id: APINullable[Snowflake] = None
|
system_channel_id: APINullable[Snowflake] = None
|
||||||
system_channel_flags: APINullable[int] = None
|
system_channel_flags: APINullable[int] = None
|
||||||
rules_channel_id: APINullable[Snowflake] = None
|
rules_channel_id: APINullable[Snowflake] = None
|
||||||
joined_at: APINullable[int] = None
|
joined_at: APINullable[Timestamp] = None
|
||||||
# TODO: Deal with joined_at
|
# TODO: Deal with joined_at
|
||||||
|
|
||||||
large: APINullable[bool] = None
|
large: APINullable[bool] = None
|
||||||
unavailable: APINullable[bool] = None
|
unavailable: APINullable[bool] = None
|
||||||
member_count: APINullable[int] = None
|
member_count: APINullable[int] = None
|
||||||
voice_states: APINullable[List[Any]] = None
|
voice_states: APINullable[List] = None
|
||||||
members: APINullable[List[Any]] = None
|
members: APINullable[List] = None
|
||||||
threads: APINullable[List[Any]] = None
|
threads: APINullable[List] = None
|
||||||
presences: APINullable[List[Any]] = None
|
presences: APINullable[List] = None
|
||||||
# TODO: Make a structure for voice_states, members, channels, threads, presences(?)
|
# TODO: Make a structure for voice_states, members, channels, threads, presences(?)
|
||||||
|
|
||||||
max_presences: APINullable[int] = None
|
max_presences: APINullable[int] = None
|
||||||
|
@ -421,10 +422,10 @@ class Guild(APIModelBase):
|
||||||
approximate_presence_count: APINullable[int] = None
|
approximate_presence_count: APINullable[int] = None
|
||||||
nsfw_level: APINullable[int] = None
|
nsfw_level: APINullable[int] = None
|
||||||
premium_progress_bar_enabled: APINullable[bool] = None
|
premium_progress_bar_enabled: APINullable[bool] = None
|
||||||
stage_instances: APINullable[List[Any]] = None
|
stage_instances: APINullable[List] = None
|
||||||
stickers: APINullable[List[Any]] = None
|
stickers: APINullable[List] = None
|
||||||
welcome_screen: APINullable[Any] = None
|
welcome_screen: APINullable = None
|
||||||
guild_scheduled_events: APINullable[List[Any]] = None
|
guild_scheduled_events: APINullable[List] = None
|
||||||
|
|
||||||
# TODO: Make a structure for welcome_screen, stage_instances,
|
# TODO: Make a structure for welcome_screen, stage_instances,
|
||||||
# stickers and guild_scheduled_events
|
# stickers and guild_scheduled_events
|
||||||
|
|
|
@ -2,11 +2,9 @@
|
||||||
# Full MIT License can be found in `LICENSE.txt` at the project root.
|
# Full MIT License can be found in `LICENSE.txt` at the project root.
|
||||||
|
|
||||||
from .types import Coro
|
from .types import Coro
|
||||||
|
from .timestamp import Timestamp
|
||||||
from .snowflake import Snowflake
|
from .snowflake import Snowflake
|
||||||
|
|
||||||
|
|
||||||
from .api_model import APIModelBase
|
from .api_model import APIModelBase
|
||||||
from .conversion import remove_none
|
from .conversion import remove_none
|
||||||
|
|
||||||
__all__ = ("Coro", "Snowflake", "APIModelBase", "remove_none")
|
__all__ = ("Coro", "Snowflake", "APIModelBase", "remove_none", "Timestamp")
|
||||||
|
|
|
@ -1,20 +1,28 @@
|
||||||
# Copyright MelisaDev 2022 - Present
|
# Copyright MelisaDev 2022 - Present
|
||||||
# Full MIT License can be found in `LICENSE.txt` at the project root.
|
# Full MIT License can be found in `LICENSE.txt` at the project root.
|
||||||
|
|
||||||
|
# We found this file in the Pincer Python Module and modified it. Thank you Pincer Devs!
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
|
import datetime
|
||||||
from dataclasses import _is_dataclass_instance, fields
|
from dataclasses import _is_dataclass_instance, fields
|
||||||
from enum import Enum
|
from enum import Enum, EnumMeta
|
||||||
from inspect import getfullargspec
|
from inspect import getfullargspec
|
||||||
|
from itertools import chain
|
||||||
from typing import (
|
from typing import (
|
||||||
Dict,
|
Dict,
|
||||||
Union,
|
Union,
|
||||||
Generic,
|
Generic,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
Any,
|
Any, get_origin, Tuple, get_args,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from typing_extensions import get_type_hints
|
||||||
|
|
||||||
|
from melisa.utils.types import APINullable, TypeCache
|
||||||
|
|
||||||
T = TypeVar("T")
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
@ -67,6 +75,108 @@ class APIModelBase:
|
||||||
def set_client(cls, client):
|
def set_client(cls, client):
|
||||||
cls._client = client
|
cls._client = client
|
||||||
|
|
||||||
|
def __get_types(self, arg_type: type) -> Tuple[type]:
|
||||||
|
origin = get_origin(arg_type)
|
||||||
|
|
||||||
|
if origin is Union:
|
||||||
|
# noinspection PyTypeChecker
|
||||||
|
args: Tuple[type] = get_args(arg_type)
|
||||||
|
|
||||||
|
if 2 <= len(args) < 4:
|
||||||
|
return args
|
||||||
|
|
||||||
|
raise TypeError
|
||||||
|
|
||||||
|
return (arg_type, )
|
||||||
|
|
||||||
|
def __attr_convert(self, attr_value: Dict, attr_type: T) -> T:
|
||||||
|
factory = attr_type
|
||||||
|
|
||||||
|
# Always use `__factory__` over __init__
|
||||||
|
if getattr(attr_type, "__factory__", None):
|
||||||
|
factory = attr_type.__factory__
|
||||||
|
|
||||||
|
if attr_value is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if attr_type is not None and isinstance(attr_value, attr_type):
|
||||||
|
return attr_value
|
||||||
|
|
||||||
|
if isinstance(attr_value, dict):
|
||||||
|
return factory(attr_value)
|
||||||
|
|
||||||
|
if isinstance(attr_value, datetime.datetime):
|
||||||
|
return attr_value
|
||||||
|
|
||||||
|
return factory(attr_value)
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
TypeCache()
|
||||||
|
|
||||||
|
attributes = chain.from_iterable(
|
||||||
|
get_type_hints(cls, globalns=TypeCache.cache).items()
|
||||||
|
for cls in chain(self.__class__.__bases__, (self,))
|
||||||
|
)
|
||||||
|
|
||||||
|
for attr, attr_type in attributes:
|
||||||
|
# Ignore private attributes.
|
||||||
|
if attr.startswith("_"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
types = self.__get_types(attr_type)
|
||||||
|
|
||||||
|
types = tuple(
|
||||||
|
filter(
|
||||||
|
lambda tpe: tpe is not None and tpe is not None, types
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if not types:
|
||||||
|
raise TypeError
|
||||||
|
|
||||||
|
specific_tp = types[0]
|
||||||
|
|
||||||
|
attr_gotten = getattr(self, attr)
|
||||||
|
|
||||||
|
if tp := get_origin(specific_tp):
|
||||||
|
specific_tp = tp
|
||||||
|
|
||||||
|
if isinstance(specific_tp, EnumMeta) and not attr_gotten:
|
||||||
|
attr_value = None
|
||||||
|
elif tp == list and attr_gotten and (classes := get_args(types[0])):
|
||||||
|
attr_value = [
|
||||||
|
self.__attr_convert(attr_item, classes[0])
|
||||||
|
for attr_item in attr_gotten
|
||||||
|
]
|
||||||
|
elif tp == dict and attr_gotten and (classes := get_args(types[0])):
|
||||||
|
attr_value = {
|
||||||
|
key: self.__attr_convert(value, classes[1])
|
||||||
|
for key, value in attr_gotten.items()
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
attr_value = self.__attr_convert(attr_gotten, specific_tp)
|
||||||
|
|
||||||
|
setattr(self, attr, attr_value)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def __factory__(cls: Generic[T], *args, **kwargs) -> T:
|
||||||
|
return cls.from_dict(*args, **kwargs)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
attrs = ", ".join(
|
||||||
|
f"{k}={v!r}"
|
||||||
|
for k, v in self.__dict__.items()
|
||||||
|
if v and not k.startswith("_")
|
||||||
|
)
|
||||||
|
|
||||||
|
return f"{type(self).__name__}({attrs})"
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
if _name := getattr(self, "__name__", None):
|
||||||
|
return f"{_name} {self.__class__.__name__.lower()}"
|
||||||
|
|
||||||
|
return super().__str__()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dict(cls: Generic[T], data: Dict[str, Union[str, bool, int, Any]]) -> T:
|
def from_dict(cls: Generic[T], data: Dict[str, Union[str, bool, int, Any]]) -> T:
|
||||||
"""
|
"""
|
||||||
|
@ -81,7 +191,9 @@ class APIModelBase:
|
||||||
map(
|
map(
|
||||||
lambda key: (
|
lambda key: (
|
||||||
key,
|
key,
|
||||||
data[key].value if isinstance(data[key], Enum) else data[key],
|
data[key].value
|
||||||
|
if isinstance(data[key], Enum)
|
||||||
|
else data[key],
|
||||||
),
|
),
|
||||||
filter(
|
filter(
|
||||||
lambda object_argument: data.get(object_argument) is not None,
|
lambda object_argument: data.get(object_argument) is not None,
|
||||||
|
|
98
melisa/utils/timestamp.py
Normal file
98
melisa/utils/timestamp.py
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
# Copyright MelisaDev 2022 - Present
|
||||||
|
# Full MIT License can be found in `LICENSE.txt` at the project root.
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional, TypeVar, Union
|
||||||
|
|
||||||
|
DISCORD_EPOCH = 1420070400
|
||||||
|
TS = TypeVar("TS", str, datetime, float, int)
|
||||||
|
|
||||||
|
|
||||||
|
class Timestamp:
|
||||||
|
"""Contains a lot of useful methods for working with unix timestamps.
|
||||||
|
|
||||||
|
Attributes
|
||||||
|
----------
|
||||||
|
date: :class:`str`
|
||||||
|
The time of the timestamp.
|
||||||
|
time: :class:`str`
|
||||||
|
Alias for date.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
time: Union[:class:`str`, :class:`int`, :class:`float`, :class:`datetime.datetime`]
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, time: Optional[TS] = None):
|
||||||
|
self.__time = Timestamp.parse(time)
|
||||||
|
self.__epoch = int(time.timestamp() * 1000)
|
||||||
|
self.date, self.time = str(self).split()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def __factory__(cls, time: Optional[TS] = None) -> datetime:
|
||||||
|
return cls.parse(time)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse(time: Optional[TS] = None) -> datetime:
|
||||||
|
"""Convert a time to datetime object.
|
||||||
|
|
||||||
|
time: Optional[Union[:class:`str`, :class:`int`, :class:`float`, :class:`datetime.datetime`]]
|
||||||
|
The time to be converted to a datetime object.
|
||||||
|
This can be one of these types: datetime, float, int, str
|
||||||
|
If no parameter is passed it will return the current
|
||||||
|
datetime.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
:class:`datetime.datetime`:
|
||||||
|
The converted datetime object.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if isinstance(time, datetime):
|
||||||
|
return time
|
||||||
|
|
||||||
|
elif isinstance(time, str):
|
||||||
|
return datetime.fromisoformat(time)
|
||||||
|
|
||||||
|
elif isinstance(time, (int, float)):
|
||||||
|
dt = datetime.utcfromtimestamp(t := int(time))
|
||||||
|
|
||||||
|
if dt.year < 2015:
|
||||||
|
t += DISCORD_EPOCH
|
||||||
|
return datetime.utcfromtimestamp(t)
|
||||||
|
|
||||||
|
return datetime.now()
|
||||||
|
|
||||||
|
def __getattr__(self, key: str) -> int:
|
||||||
|
return getattr(self.__time, key)
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
if len(string := str(self.__time)) == 19:
|
||||||
|
return string + ".000"
|
||||||
|
return string[:-3]
|
||||||
|
|
||||||
|
def __int__(self) -> int:
|
||||||
|
return self.__epoch
|
||||||
|
|
||||||
|
def __float__(self) -> float:
|
||||||
|
return self.__epoch / 1000
|
||||||
|
|
||||||
|
def __ge__(self, other: Timestamp) -> bool:
|
||||||
|
return self.__epoch >= other.__epoch
|
||||||
|
|
||||||
|
def __gt__(self, other: Timestamp) -> bool:
|
||||||
|
return self.__epoch > other.__epoch
|
||||||
|
|
||||||
|
def __le__(self, other: Timestamp) -> bool:
|
||||||
|
return self.__epoch <= other.__epoch
|
||||||
|
|
||||||
|
def __lt__(self, other: Timestamp) -> bool:
|
||||||
|
return self.__epoch < other.__epoch
|
||||||
|
|
||||||
|
def __eq__(self, other: Timestamp) -> bool:
|
||||||
|
return self.__epoch == other.__epoch
|
||||||
|
|
||||||
|
def __ne__(self, other: Timestamp) -> bool:
|
||||||
|
return self.__epoch != other.__epoch
|
|
@ -3,6 +3,7 @@
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from sys import modules
|
||||||
from typing import TypeVar, Callable, Coroutine, Any, Union
|
from typing import TypeVar, Callable, Coroutine, Any, Union
|
||||||
|
|
||||||
|
|
||||||
|
@ -11,3 +12,30 @@ T = TypeVar("T")
|
||||||
Coro = TypeVar("Coro", bound=Callable[..., Coroutine[Any, Any, Any]])
|
Coro = TypeVar("Coro", bound=Callable[..., Coroutine[Any, Any, Any]])
|
||||||
|
|
||||||
APINullable = Union[T, None]
|
APINullable = Union[T, None]
|
||||||
|
|
||||||
|
|
||||||
|
class Singleton(type):
|
||||||
|
# Thanks to this stackoverflow answer (method 3):
|
||||||
|
# https://stackoverflow.com/q/6760685/12668716
|
||||||
|
_instances = {}
|
||||||
|
|
||||||
|
def __call__(cls, *args, **kwargs):
|
||||||
|
if cls not in cls._instances:
|
||||||
|
cls._instances[cls] = super(
|
||||||
|
Singleton,
|
||||||
|
cls
|
||||||
|
).__call__(*args, **kwargs)
|
||||||
|
return cls._instances[cls]
|
||||||
|
|
||||||
|
|
||||||
|
class TypeCache(metaclass=Singleton):
|
||||||
|
# Thanks Pincer Devs. This class is from the Pincer Library.
|
||||||
|
cache = {}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
lcp = modules.copy()
|
||||||
|
for module in lcp:
|
||||||
|
if not module.startswith("melisa"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
TypeCache.cache.update(lcp[module].__dict__)
|
||||||
|
|
Loading…
Reference in a new issue