formated files with the black formatter

This commit is contained in:
grey-cat-1908 2022-03-25 20:18:42 +03:00
parent 3d66eb79c5
commit 2b2f5a2d85
12 changed files with 152 additions and 199 deletions

View file

@ -87,8 +87,10 @@ class Client:
""" """
inited_shard = Shard(self, 0, 1) inited_shard = Shard(self, 0, 1)
asyncio.ensure_future(inited_shard.launch(activity=self._activity, asyncio.ensure_future(
status=self._status), loop=self._loop) inited_shard.launch(activity=self._activity, status=self._status),
loop=self._loop,
)
self._loop.run_forever() self._loop.run_forever()
def run_shards(self, num_shards: int, *, shard_ids: List[int] = None): def run_shards(self, num_shards: int, *, shard_ids: List[int] = None):
@ -108,8 +110,10 @@ class Client:
for shard_id in shard_ids: for shard_id in shard_ids:
inited_shard = Shard(self, shard_id, num_shards) inited_shard = Shard(self, shard_id, num_shards)
asyncio.ensure_future(inited_shard.launch(activity=self._activity, asyncio.ensure_future(
status=self._status), loop=self._loop) inited_shard.launch(activity=self._activity, status=self._status),
loop=self._loop,
)
self._loop.run_forever() self._loop.run_forever()
def run_autosharded(self): def run_autosharded(self):
@ -122,8 +126,10 @@ class Client:
for shard_id in shard_ids: for shard_id in shard_ids:
inited_shard = Shard(self, shard_id, num_shards) inited_shard = Shard(self, shard_id, num_shards)
asyncio.ensure_future(inited_shard.launch(activity=self._activity, asyncio.ensure_future(
status=self._status), loop=self._loop) inited_shard.launch(activity=self._activity, status=self._status),
loop=self._loop,
)
self._loop.run_forever() self._loop.run_forever()
async def fetch_user(self, user_id: Union[Snowflake, str, int]): async def fetch_user(self, user_id: Union[Snowflake, str, int]):

View file

@ -20,6 +20,7 @@ from ..utils import APIModelBase, json
@dataclass @dataclass
class GatewayBotInfo(APIModelBase): class GatewayBotInfo(APIModelBase):
"""Gateway info from the `gateway/bot` endpoint""" """Gateway info from the `gateway/bot` endpoint"""
url: str url: str
shards: int shards: int
session_start_limit: dict session_start_limit: dict
@ -39,11 +40,7 @@ class Gateway:
HELLO = 10 HELLO = 10
HEARTBEAT_ACK = 11 HEARTBEAT_ACK = 11
def __init__(self, def __init__(self, client, shard_id: int = 0, num_shards: int = 1, **kwargs):
client,
shard_id: int = 0,
num_shards: int = 1,
**kwargs):
self.GATEWAY_VERSION = "9" self.GATEWAY_VERSION = "9"
self.interval = None self.interval = None
@ -52,7 +49,7 @@ class Gateway:
self.__session = aiohttp.ClientSession() self.__session = aiohttp.ClientSession()
self.session_id = None self.session_id = None
self.client = client self.client = client
self.latency = float('inf') self.latency = float("inf")
self.ws = None self.ws = None
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.shard_id = shard_id self.shard_id = shard_id
@ -63,7 +60,7 @@ class Gateway:
4011: GatewayError("Sharding required"), 4011: GatewayError("Sharding required"),
4012: GatewayError("Invalid API version"), 4012: GatewayError("Invalid API version"),
4013: GatewayError("Invalid intents"), 4013: GatewayError("Invalid intents"),
4014: PrivilegedIntentsRequired("Disallowed intents") 4014: PrivilegedIntentsRequired("Disallowed intents"),
} }
self.listeners = listeners self.listeners = listeners
@ -76,19 +73,21 @@ class Gateway:
"properties": { "properties": {
"$os": sys.platform, "$os": sys.platform,
"$browser": "Melisa Python Library", "$browser": "Melisa Python Library",
"$device": "Melisa Python Library" "$device": "Melisa Python Library",
}, },
"compress": True, "compress": True,
"shard": [shard_id, num_shards], "shard": [shard_id, num_shards],
"presence": self.generate_presence(kwargs.get("start_activity"), "presence": self.generate_presence(
kwargs.get("start_status"))} kwargs.get("start_activity"), kwargs.get("start_status")
),
}
self._zlib: zlib._Decompress = zlib.decompressobj() self._zlib: zlib._Decompress = zlib.decompressobj()
self._buffer: bytearray = bytearray() self._buffer: bytearray = bytearray()
async def connect(self) -> None: async def connect(self) -> None:
self.ws = await self.__session.ws_connect( self.ws = await self.__session.ws_connect(
f'wss://gateway.discord.gg/?v={self.GATEWAY_VERSION}&encoding=json&compress=zlib-stream' f"wss://gateway.discord.gg/?v={self.GATEWAY_VERSION}&encoding=json&compress=zlib-stream"
) )
if self.session_id is None: if self.session_id is None:
@ -114,10 +113,10 @@ class Gateway:
if type(msg) is bytes: if type(msg) is bytes:
self._buffer.extend(msg) self._buffer.extend(msg)
if len(msg) < 4 or msg[-4:] != b'\x00\x00\xff\xff': if len(msg) < 4 or msg[-4:] != b"\x00\x00\xff\xff":
return None return None
msg = self._zlib.decompress(self._buffer) msg = self._zlib.decompress(self._buffer)
msg = msg.decode('utf-8') msg = msg.decode("utf-8")
self._buffer = bytearray() self._buffer = bytearray()
return json.loads(msg) return json.loads(msg)
@ -125,7 +124,7 @@ class Gateway:
return None return None
async def handle_data(self, data): async def handle_data(self, data):
if data['op'] == self.DISPATCH: if data["op"] == self.DISPATCH:
self.sequence = int(data["s"]) self.sequence = int(data["s"])
event_type = data["t"].lower() event_type = data["t"].lower()
@ -134,12 +133,12 @@ class Gateway:
if event_to_call is not None: if event_to_call is not None:
ensure_future(event_to_call(self.client, self, data["d"])) ensure_future(event_to_call(self.client, self, data["d"]))
elif data['op'] == self.INVALID_SESSION: elif data["op"] == self.INVALID_SESSION:
await self.ws.close(code=4000) await self.ws.close(code=4000)
await self.handle_close(4000) await self.handle_close(4000)
elif data['op'] == self.HELLO: elif data["op"] == self.HELLO:
await self.send_hello(data) await self.send_hello(data)
elif data['op'] == self.HEARTBEAT_ACK: elif data["op"] == self.HEARTBEAT_ACK:
self.latency = time.perf_counter() - self._last_send self.latency = time.perf_counter() - self._last_send
async def receive(self) -> None: async def receive(self) -> None:
@ -184,34 +183,28 @@ class Gateway:
self._buffer.clear() self._buffer.clear()
async def send_hello(self, data: Dict) -> None: async def send_hello(self, data: Dict) -> None:
interval = data['d']['heartbeat_interval'] / 1000 interval = data["d"]["heartbeat_interval"] / 1000
await asyncio.sleep((interval - 2000) / 1000) await asyncio.sleep((interval - 2000) / 1000)
self.loop.create_task(self.send_heartbeat(interval)) self.loop.create_task(self.send_heartbeat(interval))
async def send_identify(self) -> None: async def send_identify(self) -> None:
await self.send(self.opcode( await self.send(self.opcode(self.IDENTIFY, self.auth))
self.IDENTIFY,
self.auth
))
async def resume(self) -> None: async def resume(self) -> None:
await self.send( await self.send(
self.opcode( self.opcode(
self.RESUME, self.RESUME,
{ {
'token': self.client._token, "token": self.client._token,
'session_id': self.session_id, "session_id": self.session_id,
'seq': self.sequence, "seq": self.sequence,
} },
) )
) )
@staticmethod @staticmethod
def generate_presence(activity: BotActivity = None, status: str = None): def generate_presence(activity: BotActivity = None, status: str = None):
data = { data = {"since": time.time() * 1000, "afk": False}
"since": time.time() * 1000,
"afk": False
}
if activity is not None: if activity is not None:
data["activities"] = activity.to_dict() data["activities"] = activity.to_dict()
@ -226,8 +219,5 @@ class Gateway:
@staticmethod @staticmethod
def opcode(opcode: int, payload) -> str: def opcode(opcode: int, payload) -> str:
data = { data = {"op": opcode, "d": payload}
"op": opcode,
"d": payload
}
return json.dumps(data) return json.dumps(data)

View file

@ -8,7 +8,8 @@ from typing import Dict, Optional
from aiohttp import ClientSession, ClientResponse from aiohttp import ClientSession, ClientResponse
from melisa.exceptions import (NotModifiedError, from melisa.exceptions import (
NotModifiedError,
BadRequestError, BadRequestError,
ForbiddenError, ForbiddenError,
UnauthorizedError, UnauthorizedError,
@ -16,7 +17,8 @@ from melisa.exceptions import (NotModifiedError,
NotFoundError, NotFoundError,
MethodNotAllowedError, MethodNotAllowedError,
ServerError, ServerError,
RateLimitError) RateLimitError,
)
from .ratelimiter import RateLimiter from .ratelimiter import RateLimiter
@ -31,7 +33,7 @@ class HTTPClient:
headers: Dict[str, str] = { headers: Dict[str, str] = {
"Content-Type": "application/json", "Content-Type": "application/json",
"Authorization": f"Bot {token}", "Authorization": f"Bot {token}",
"User-Agent": "Melisa Python Library" "User-Agent": "Melisa Python Library",
} }
self.__http_exceptions: Dict[int, HTTPException] = { self.__http_exceptions: Dict[int, HTTPException] = {
@ -41,7 +43,7 @@ class HTTPClient:
403: ForbiddenError(), 403: ForbiddenError(),
404: NotFoundError(), 404: NotFoundError(),
405: MethodNotAllowedError(), 405: MethodNotAllowedError(),
429: RateLimitError() 429: RateLimitError(),
} }
self.__aiohttp_session: ClientSession = ClientSession(headers=headers) self.__aiohttp_session: ClientSession = ClientSession(headers=headers)
@ -57,12 +59,7 @@ class HTTPClient:
await self.__aiohttp_session.close() await self.__aiohttp_session.close()
async def __send( async def __send(
self, self, method: str, endpoint: str, *, _ttl: int = None, **kwargs
method: str,
endpoint: str,
*,
_ttl: int = None,
**kwargs
) -> Optional[Dict]: ) -> Optional[Dict]:
"""Send an API request to the Discord API.""" """Send an API request to the Discord API."""
@ -71,15 +68,14 @@ class HTTPClient:
if ttl == 0: if ttl == 0:
raise ServerError(f"Maximum amount of retries for `{endpoint}`.") raise ServerError(f"Maximum amount of retries for `{endpoint}`.")
await self.__rate_limiter.wait_until_not_ratelimited( await self.__rate_limiter.wait_until_not_ratelimited(endpoint, method)
endpoint,
method
)
url = f"{self.url}/{endpoint}" url = f"{self.url}/{endpoint}"
async with self.__aiohttp_session.request(method, url, **kwargs) as response: async with self.__aiohttp_session.request(method, url, **kwargs) as response:
return await self.__handle_response(response, method, endpoint, _ttl=ttl, **kwargs) return await self.__handle_response(
response, method, endpoint, _ttl=ttl, **kwargs
)
async def __handle_response( async def __handle_response(
self, self,
@ -88,13 +84,11 @@ class HTTPClient:
endpoint: str, endpoint: str,
*, *,
_ttl: int = None, _ttl: int = None,
**kwargs **kwargs,
) -> Optional[Dict]: ) -> Optional[Dict]:
"""Handle responses from the Discord API.""" """Handle responses from the Discord API."""
self.__rate_limiter.save_response_bucket( self.__rate_limiter.save_response_bucket(endpoint, method, res.headers)
endpoint, method, res.headers
)
if res.ok: if res.ok:
return await res.json() return await res.json()
@ -106,11 +100,7 @@ class HTTPClient:
timeout = (await res.json()).get("retry_after", 40) timeout = (await res.json()).get("retry_after", 40)
await asyncio.sleep(timeout) await asyncio.sleep(timeout)
return await self.__send( return await self.__send(method, endpoint, **kwargs)
method,
endpoint,
**kwargs
)
exception.__init__(res.reason) exception.__init__(res.reason)
raise exception raise exception
@ -119,18 +109,9 @@ class HTTPClient:
await asyncio.sleep(retry_in) await asyncio.sleep(retry_in)
return await self.__send( return await self.__send(method, endpoint, _ttl=_ttl - 1, **kwargs)
method,
endpoint,
_ttl=_ttl - 1,
**kwargs
)
async def get( async def get(self, route: str, params: Optional[Dict] = None) -> Optional[Dict]:
self,
route: str,
params: Optional[Dict] = None
) -> Optional[Dict]:
"""|coro| """|coro|
Sends a GET request to a Discord REST API endpoint. Sends a GET request to a Discord REST API endpoint.
@ -146,17 +127,9 @@ class HTTPClient:
Optional[:class:`Dict`] Optional[:class:`Dict`]
The response from Discord. The response from Discord.
""" """
return await self.__send( return await self.__send("GET", route, params=params)
"GET",
route,
params=params
)
async def post( async def post(self, route: str, data: Optional[Dict] = None) -> Optional[Dict]:
self,
route: str,
data: Optional[Dict] = None
) -> Optional[Dict]:
"""|coro| """|coro|
Sends a POST request to a Discord REST API endpoint. Sends a POST request to a Discord REST API endpoint.
@ -178,11 +151,7 @@ class HTTPClient:
json=data, json=data,
) )
async def delete( async def delete(self, route: str, headers: dict = None) -> Optional[Dict]:
self,
route: str,
headers: dict = None
) -> Optional[Dict]:
"""|coro| """|coro|
Sends a DELETE request to a Discord REST API endpoint. Sends a DELETE request to a Discord REST API endpoint.
@ -198,8 +167,4 @@ class HTTPClient:
Optional[:class:`Dict`] Optional[:class:`Dict`]
JSON response from the Discord API. JSON response from the Discord API.
""" """
return await self.__send( return await self.__send("DELETE", route, headers=headers)
"DELETE",
route,
headers=headers
)

View file

@ -12,6 +12,7 @@ from typing import Dict, Tuple, Any
@dataclass @dataclass
class RateLimitBucket: class RateLimitBucket:
"""Represents a rate limit bucket""" """Represents a rate limit bucket"""
limit: int limit: int
remaining: int remaining: int
reset: float reset: float
@ -23,15 +24,12 @@ class RateLimiter:
"""Prevents ``user`` rate limits""" """Prevents ``user`` rate limits"""
def __init__(self) -> None: def __init__(self) -> None:
self.bucket_map: Dict[Tuple[str, str], str] = {} # Dict[Tuple[endpoint, method], bucket_id] self.bucket_map: Dict[
Tuple[str, str], str
] = {} # Dict[Tuple[endpoint, method], bucket_id]
self.buckets: Dict[str, RateLimitBucket] = {} self.buckets: Dict[str, RateLimitBucket] = {}
def save_response_bucket( def save_response_bucket(self, endpoint: str, method: str, header: Any):
self,
endpoint: str,
method: str,
header: Any
):
ratelimit_bucket_id = header.get("X-RateLimit-Bucket") ratelimit_bucket_id = header.get("X-RateLimit-Bucket")
if not ratelimit_bucket_id: if not ratelimit_bucket_id:
@ -44,14 +42,10 @@ class RateLimiter:
remaining=int(header["X-RateLimit-Remaining"]), remaining=int(header["X-RateLimit-Remaining"]),
reset=float(header["X-RateLimit-Reset"]), reset=float(header["X-RateLimit-Reset"]),
reset_after_timestamp=float(header["X-RateLimit-Reset-After"]), reset_after_timestamp=float(header["X-RateLimit-Reset-After"]),
since_timestamp=time() since_timestamp=time(),
) )
async def wait_until_not_ratelimited( async def wait_until_not_ratelimited(self, endpoint: str, method: str):
self,
endpoint: str,
method: str
):
bucket_id = self.bucket_map.get((endpoint, method)) bucket_id = self.bucket_map.get((endpoint, method))
if not bucket_id: if not bucket_id:

View file

@ -1,25 +1,29 @@
# Copyright MelisaDev 2022 - Present # Copyright MelisaDev 2022 - Present
# Full MIT License can be found in `LICENSE.txt` at the project root. # Full MIT License can be found in `LICENSE.txt` at the project root.
class MelisaException(Exception): class MelisaException(Exception):
"""Base exception""" """Base exception"""
pass pass
class ClientException(MelisaException): class ClientException(MelisaException):
"""Handling user errors""" """Handling user errors"""
pass pass
class LoginFailure(ClientException): class LoginFailure(ClientException):
"""Fails to log you in from improper credentials or some other misc.""" """Fails to log you in from improper credentials or some other misc."""
pass pass
class ConnectionClosed(ClientException): class ConnectionClosed(ClientException):
"""Exception that's thrown when the gateway connection is closed """Exception that's thrown when the gateway connection is closed
for reasons that could not be handled for reasons that could not be handled
internally. """ internally."""
def __init__(self, socket, *, shard_id, code=None): def __init__(self, socket, *, shard_id, code=None):
message = "Websocket with shard ID {} closed with code {}" message = "Websocket with shard ID {} closed with code {}"
@ -38,9 +42,11 @@ class PrivilegedIntentsRequired(ClientException):
def __init__(self, shard_id): def __init__(self, shard_id):
self.shard_id = shard_id self.shard_id = shard_id
message = "Shard ID {} is requesting privileged intents " \ message = (
"that have not been explicitly enabled in the " \ "Shard ID {} is requesting privileged intents "
"that have not been explicitly enabled in the "
"developer portal. Please visit to https://discord.com/developers/applications/ " "developer portal. Please visit to https://discord.com/developers/applications/ "
)
super().__init__(message.format(self.shard_id)) super().__init__(message.format(self.shard_id))

View file

@ -10,10 +10,7 @@ from ..user import BotActivity
class Shard: class Shard:
def __init__(self, def __init__(self, client, shard_id: int, num_shards: int):
client,
shard_id: int,
num_shards: int):
self._client = client self._client = client
self._shard_id: int = shard_id self._shard_id: int = shard_id
@ -38,11 +35,13 @@ class Shard:
"""|coro| """|coro|
Launches new shard""" Launches new shard"""
self._gateway = Gateway(self._client, self._gateway = Gateway(
self._client,
self._shard_id, self._shard_id,
self._num_shards, self._num_shards,
start_activity=kwargs.get("activity"), start_activity=kwargs.get("activity"),
start_status=kwargs.get("status")) start_status=kwargs.get("status"),
)
self._client.shards[self._shard_id] = self self._client.shards[self._shard_id] = self
@ -58,7 +57,9 @@ class Shard:
""" """
create_task(self._gateway.close()) create_task(self._gateway.close())
async def update_presence(self, activity: BotActivity = None, status: str = None) -> Shard: async def update_presence(
self, activity: BotActivity = None, status: str = None
) -> Shard:
""" """
|coro| |coro|

View file

@ -39,6 +39,7 @@ class ActivityType(IntEnum):
COMPETING: COMPETING:
Competing in {name} (Competing in Arena World Champions) Competing in {name} (Competing in Arena World Champions)
""" """
GAME = 0 GAME = 0
STREAMING = 1 STREAMING = 1
LISTENING = 2 LISTENING = 2
@ -61,6 +62,7 @@ class ActivityTimestamp(BasePresence, APIModelBase):
end: Optional[:class:`int`] end: Optional[:class:`int`]
Unix time (in milliseconds) of when the activity ends Unix time (in milliseconds) of when the activity ends
""" """
start: APINullable[int] = None start: APINullable[int] = None
end: APINullable[int] = None end: APINullable[int] = None
@ -78,6 +80,7 @@ class ActivityEmoji(BasePresence, APIModelBase):
animated: Optional[:class:`bool`] animated: Optional[:class:`bool`]
Whether this emoji is animated Whether this emoji is animated
""" """
name: str name: str
id: APINullable[Snowflake] = None id: APINullable[Snowflake] = None
animated: APINullable[bool] = None animated: APINullable[bool] = None
@ -94,6 +97,7 @@ class ActivityParty(BasePresence, APIModelBase):
size: Optional[Tuple[:class:`int`, :class:`int`]] size: Optional[Tuple[:class:`int`, :class:`int`]]
Array of two integers (current_size, max_size) Array of two integers (current_size, max_size)
""" """
id: APINullable[str] = None id: APINullable[str] = None
size: APINullable[Tuple[int, int]] = None size: APINullable[Tuple[int, int]] = None
@ -115,6 +119,7 @@ class ActivityAssets(BasePresence, APIModelBase):
small_text: Optional[:class:`str`] small_text: Optional[:class:`str`]
text displayed when hovering over the small image of the activity text displayed when hovering over the small image of the activity
""" """
large_image: APINullable[str] = None large_image: APINullable[str] = None
large_text: APINullable[str] = None large_text: APINullable[str] = None
small_image: APINullable[str] = None small_image: APINullable[str] = None
@ -134,6 +139,7 @@ class ActivitySecrets(BasePresence, APIModelBase):
match: Optional[:class:`str`] match: Optional[:class:`str`]
The secret for a specific instanced match The secret for a specific instanced match
""" """
join: APINullable[str] = None join: APINullable[str] = None
spectate: APINullable[str] = None spectate: APINullable[str] = None
match_: APINullable[str] = None match_: APINullable[str] = None
@ -172,6 +178,7 @@ class ActivityButton(BasePresence, APIModelBase):
url: :class:`str` url: :class:`str`
The url opened when clicking the button (1-512 characters) The url opened when clicking the button (1-512 characters)
""" """
label: str label: str
url: str url: str
@ -251,11 +258,11 @@ class BotActivity(BasePresence, APIModelBase):
class StatusType(Enum): class StatusType(Enum):
ONLINE = 'online' ONLINE = "online"
OFFLINE = 'offline' OFFLINE = "offline"
IDLE = 'idle' IDLE = "idle"
DND = 'dnd' DND = "dnd"
INVISIBLE = 'invisible' INVISIBLE = "invisible"
def __str__(self): def __str__(self):
return self.value return self.value

View file

@ -169,20 +169,12 @@ class User(APIModelBase):
"""APINullable[:class:`~melisa.models.user.user.PremiumTypes`]: The """APINullable[:class:`~melisa.models.user.user.PremiumTypes`]: The
user their premium type in a usable enum. user their premium type in a usable enum.
""" """
return ( return None if self.premium_type is None else PremiumTypes(self.premium_type)
None
if self.premium_type is None
else PremiumTypes(self.premium_type)
)
@property @property
def flags(self) -> Optional[UserFlags]: def flags(self) -> Optional[UserFlags]:
"""Flags of user""" """Flags of user"""
return( return None if self.flags is None else UserFlags(self.flags)
None
if self.flags is None
else UserFlags(self.flags)
)
def __str__(self): def __str__(self):
"""String representation of the User object""" """String representation of the User object"""
@ -195,10 +187,13 @@ class User(APIModelBase):
def avatar_url(self) -> str: def avatar_url(self) -> str:
"""Avatar url (from the Discord CDN server)""" """Avatar url (from the Discord CDN server)"""
return "https://cdn.discordapp.com/avatars/{}/{}.png?size=1024".format(self.id, self.avatar) return "https://cdn.discordapp.com/avatars/{}/{}.png?size=1024".format(
self.id, self.avatar
)
async def create_dm_channel(self): async def create_dm_channel(self):
# ToDo: Add docstrings # ToDo: Add docstrings
# ToDo: Add checking this channel in cache # ToDo: Add checking this channel in cache
return await self._http.post( return await self._http.post(
"/users/@me/channels", data={"recipient_id": self.id}) "/users/@me/channels", data={"recipient_id": self.id}
)

View file

@ -1,17 +1,11 @@
# Copyright MelisaDev 2022 - Present # Copyright MelisaDev 2022 - Present
# Full MIT License can be found in `LICENSE.txt` at the project root. # Full MIT License can be found in `LICENSE.txt` at the project root.
from .types import ( from .types import Coro
Coro
)
from .snowflake import Snowflake from .snowflake import Snowflake
from .api_model import APIModelBase from .api_model import APIModelBase
__all__ = ( __all__ = ("Coro", "Snowflake", "APIModelBase")
"Coro",
"Snowflake",
"APIModelBase"
)

View file

@ -66,9 +66,7 @@ class APIModelBase:
cls._client = client cls._client = client
@classmethod @classmethod
def from_dict( def from_dict(cls: Generic[T], data: Dict[str, Union[str, bool, int, Any]]) -> T:
cls: Generic[T], data: Dict[str, Union[str, bool, int, Any]]
) -> T:
""" """
Parse an API object from a dictionary. Parse an API object from a dictionary.
""" """
@ -81,13 +79,10 @@ class APIModelBase:
map( map(
lambda key: ( lambda key: (
key, key,
data[key].value data[key].value if isinstance(data[key], Enum) else data[key],
if isinstance(data[key], Enum)
else data[key],
), ),
filter( filter(
lambda object_argument: data.get(object_argument) lambda object_argument: data.get(object_argument) is not None,
is not None,
getfullargspec(cls.__init__).args, getfullargspec(cls.__init__).args,
), ),
) )

View file

@ -15,12 +15,14 @@ else:
HAS_ORJSON = True HAS_ORJSON = True
if HAS_ORJSON: if HAS_ORJSON:
def dumps(obj: Any) -> str: def dumps(obj: Any) -> str:
return orjson.dumps(obj).decode('utf-8') return orjson.dumps(obj).decode("utf-8")
loads = orjson.loads loads = orjson.loads
else: else:
def dumps(obj: Any) -> str: def dumps(obj: Any) -> str:
return json.dumps(obj, separators=(',', ':'), ensure_ascii=True) return json.dumps(obj, separators=(",", ":"), ensure_ascii=True)
loads = json.loads loads = json.loads

View file

@ -24,9 +24,7 @@ class Snowflake(int):
super().__init__() super().__init__()
if self < self._MIN_VALUE: if self < self._MIN_VALUE:
raise ValueError( raise ValueError("snowflake value should be greater than or equal to 0.")
"snowflake value should be greater than or equal to 0."
)
if self > self._MAX_VALUE: if self > self._MAX_VALUE:
raise ValueError( raise ValueError(
@ -66,7 +64,7 @@ class Snowflake(int):
@property @property
def increment(self) -> int: def increment(self) -> int:
""" For every ID that is generated on that process, this number is incremented""" """For every ID that is generated on that process, this number is incremented"""
return self % 2048 return self % 2048
@property @property