initial commit

This commit is contained in:
grey-cat-1908 2022-03-11 20:36:01 +03:00
commit d3472d6c82
20 changed files with 780 additions and 0 deletions

32
.gitignore vendored Normal file
View file

@ -0,0 +1,32 @@
# IDE Configuration files
.idea
.vscode
.vs
# Python Environment
*.py[dcw]
__pycache__
venv
virtualenv
pyvenv
Pipfile.lock
# Docs
docs/_build
# Dev Tools
tools/*
# Dev testing
dev
_testing.py
# Packaging
*.egg-info
bin
build
dist
MANIFEST
# IDE Ext's
.history/

14
README.md Normal file
View file

@ -0,0 +1,14 @@
<p align="center">
<b>
The easiest way to create your own Discord Bot.
</b>
</p>
<hr>
## WARNING
**THIS LIBRARY IS CURRENTLY UNDER DEVELOPMENT!**
## About
<strong>MelisaPy</strong> is a Python library for the [Discord API](https://discord.com/developers/docs/intro).
We are trying to make our library optimized. We are going to create really big Cache configuration, so don't worry about the RAM :)

0
melisa/__init__.py Normal file
View file

42
melisa/client.py Normal file
View file

@ -0,0 +1,42 @@
from .models.app import Shard
from .utils.types import Coro
import asyncio
from typing import Dict
class Client:
def __init__(self, token, intents, **kwargs):
self.shards: Dict[int, Shard] = {}
self._events = {}
self.guilds = []
self.loop = asyncio.get_event_loop()
self.intents = intents
self._token = token
self._activity = kwargs.get("activity")
self._status = kwargs.get("status")
def listen(self, callback: Coro):
"""Method to set the listener.
Args:
callback (:obj:`function`)
Coroutine Callback Function
"""
if not asyncio.iscoroutinefunction(callback):
raise TypeError(f"<{callback.__qualname__}> must be a coroutine function")
self._events[callback.__qualname__] = callback
return self
def run(self) -> None:
"""
Run Bot without shards (only 0 shard)
"""
inited_shard = Shard(self, 0, 1)
asyncio.ensure_future(inited_shard.launch(activity=self._activity, status=self._status), loop=self.loop)
self.loop.run_forever()

1
melisa/core/__init__.py Normal file
View file

@ -0,0 +1 @@
from .gateway import Gateway

177
melisa/core/gateway.py Normal file
View file

@ -0,0 +1,177 @@
import json
import asyncio
import zlib
import time
import websockets
from ..listeners import listeners
from ..models.user import BotActivity
class Gateway:
DISPATCH = 0
HEARTBEAT = 1
IDENTIFY = 2
PRESENCE_UPDATE = 3
VOICE_UPDATE = 4
RESUME = 6
RECONNECT = 7
REQUEST_MEMBERS = 8
INVALID_SESSION = 9
HELLO = 10
HEARTBEAT_ACK = 11
def __init__(self,
client,
shard_id: int = 0,
num_shards: int = 1,
**kwargs):
self.GATEWAY_VERSION = "9"
self.interval = None
self.intents = client.intents
self.sequence = None
self.session_id = None
self.client = client
self.shard_id = shard_id
self.latency = float('inf')
self.listeners = listeners
self._last_send = 0
self.auth = {
"token": self.client._token,
"intents": self.intents,
"properties": {
"$os": "windows",
"$browser": "melisa",
"$device": "melisa"
},
"compress": True,
"shard": [shard_id, num_shards],
"presence": self.generate_presence(kwargs.get("start_activity"), kwargs.get("start_status"))
}
self._zlib: zlib._Decompress = zlib.decompressobj()
self._buffer: bytearray = bytearray()
async def websocket_message(self, msg):
if type(msg) is bytes:
self._buffer.extend(msg)
if len(msg) < 4 or msg[-4:] != b'\x00\x00\xff\xff':
return None
msg = self._zlib.decompress(self._buffer)
msg = msg.decode('utf-8')
self._buffer = bytearray()
return json.loads(msg)
async def start_loop(self):
async with websockets.connect(
f'wss://gateway.discord.gg/?v={self.GATEWAY_VERSION}&encoding=json&compress=zlib-stream') \
as self.websocket:
await self.hello()
if self.interval is None:
return
await asyncio.gather(self.heartbeat(), self.receive())
async def close(self, code: int = 4000):
await self.websocket.close(code=code)
async def resume(self):
resume_data = {
"seq": self.sequence,
"session_id": self.session_id,
"token": self.client._token
}
await self.send(self.RESUME, resume_data)
async def receive(self):
async for msg in self.websocket:
msg = await self.websocket_message(msg)
if msg is None:
return None
if msg["op"] == self.HEARTBEAT_ACK:
self.latency = time.time() - self._last_send
if msg["op"] == self.DISPATCH:
self.sequence = int(msg["s"])
event_type = msg["t"].lower()
event_to_call = self.listeners.get(event_type)
if event_to_call is not None:
await event_to_call(self.client, self, msg["d"])
if msg["op"] != self.DISPATCH:
if msg["op"] == self.RECONNECT:
await self.websocket.close()
await self.resume()
async def send(self, opcode, payload):
data = self.opcode(opcode, payload)
if opcode == self.HEARTBEAT:
self._last_send = time.time()
await self.websocket.send(data)
async def heartbeat(self):
while self.interval is not None:
await self.send(self.HEARTBEAT, self.sequence)
await asyncio.sleep(self.interval)
async def hello(self):
await self.send(self.IDENTIFY, self.auth)
ret = await self.websocket.recv()
data = await self.websocket_message(ret)
opcode = data["op"]
if opcode != 10:
return
self.interval = (data["d"]["heartbeat_interval"] - 2000) / 1000
@staticmethod
def generate_presence(activity: BotActivity = None, status: str = None):
data = {
"since": time.time() * 1000,
"afk": False
}
if activity is not None:
activity_to_set = {
"name": activity.name,
"type": int(activity.type)
}
if int(activity.type) == 1 and activity.url:
activity_to_set["url"] = activity.url
data["activities"] = [activity_to_set]
if status is not None:
data["status"] = str(status)
return data
async def update_presence(self, data: dict):
await self.send(self.PRESENCE_UPDATE, data)
@staticmethod
def opcode(opcode: int, payload) -> str:
data = {
"op": opcode,
"d": payload
}
return json.dumps(data)

View file

@ -0,0 +1,30 @@
from __future__ import annotations
from glob import glob
from importlib import import_module
from os import chdir
from pathlib import Path
def get_listeners():
listeners_list = {}
chdir(Path(__file__).parent.resolve())
for listener_path in glob("*.py"):
if listener_path.startswith("__"):
continue
event = listener_path[:-3]
try:
listeners_list[event] = getattr(
import_module(f".{event}", package=__name__), "export"
)()
except AttributeError:
continue
return listeners_list
listeners = get_listeners()

View file

@ -0,0 +1,20 @@
from __future__ import annotations
from ..utils.types import Coro
async def guild_create_listener(self, gateway, payload: dict):
gateway.session_id = payload.get("session_id")
self.guilds[payload["id"]] = payload
custom_listener = self._events.get("on_guild_create")
if custom_listener is not None:
await custom_listener(payload) # ToDo: Guild Model
return
def export() -> Coro:
return guild_create_listener

22
melisa/listeners/ready.py Normal file
View file

@ -0,0 +1,22 @@
from __future__ import annotations
from ..utils.types import Coro
async def on_ready_listener(self, gateway, payload: dict):
gateway.session_id = payload.get("session_id")
guilds = payload.get("guilds")
self.guilds = dict(map(lambda i: (i["id"], None), guilds))
custom_listener = self._events.get("on_ready")
if custom_listener is not None:
await custom_listener()
return
def export() -> Coro:
return on_ready_listener

View file

@ -0,0 +1 @@
from .app import Intents

View file

@ -0,0 +1,2 @@
from .intents import *
from .shard import *

View file

@ -0,0 +1,26 @@
from __future__ import annotations
from enum import IntFlag
class Intents(IntFlag):
NONE = 0
GUILDS = 1 << 0
GUILD_MEMBERS = 1 << 1
GUILD_BANS = 1 << 2
GUILD_EMOJIS_AND_STICKERS = 1 << 3
GUILD_INTEGRATIONS = 1 << 4
GUILD_WEBHOOKS = 1 << 5
GUILD_INVITES = 1 << 6
GUILD_VOICE_STATES = 1 << 7
GUILD_PRESENCES = 1 << 8
GUILD_MESSAGES = 1 << 9
GUILD_MESSAGE_REACTIONS = 1 << 10
GUILD_MESSAGE_TYPING = 1 << 11
DIRECT_MESSAGES = 1 << 12
DIRECT_MESSAGE_REACTIONS = 1 << 13
DIRECT_MESSAGE_TYPING = 1 << 14
@classmethod
def all(cls) -> Intents:
return cls(sum(cls))

View file

@ -0,0 +1,64 @@
from __future__ import annotations
from asyncio import create_task
from ...core.gateway import Gateway
from ..user import BotActivity
class Shard:
def __init__(self,
client,
shard_id: int,
num_shards: int):
self._client = client
self._shard_id: int = shard_id
self._num_shards: num_shards = num_shards
self._gateway: Gateway
self.disconnected = None
@property
def id(self) -> int:
"""Id of Shard"""
return self._gateway.shard_id
async def launch(self, **kwargs) -> Shard:
"""Launch new shard"""
self._gateway = Gateway(self._client,
self._shard_id,
self._num_shards,
start_activity=kwargs.get("activity"),
start_status=kwargs.get("status"))
self._client.shards[self._shard_id] = self
self.disconnected = False
create_task(self._gateway.start_loop())
return self
async def update_presence(self, activity: BotActivity = None, status: str = None) -> Shard:
"""
Update Presence for the shard
Parameters
----------
activity: :class:`Activity`
Activity to set
status: :class:`str`
Status to set (You can use :class:`~melisa.models.users.StatusType` to generate it)
"""
data = self._gateway.generate_presence(activity, status)
await self._gateway.update_presence(data)
return self
async def disconnect(self) -> Shard:
await self._gateway.close()
self.disconnected = True
return self

View file

@ -0,0 +1 @@
from .guild import *

View file

@ -0,0 +1,6 @@
from ...utils import Snowflake
class Guild:
pass

View file

@ -0,0 +1 @@
from .presence import *

View file

@ -0,0 +1,250 @@
from dataclasses import dataclass
from enum import IntEnum, Enum, Flag
from typing import Optional, Tuple, List, Literal
from ...utils import Snowflake
class BasePresenceData:
"""
All the information about activities here is from the Discord API docs.
Read more here: https://discord.com/developers/docs/topics/gateway#activity-object
Unknown data will be returned as None.
"""
class ActivityType(IntEnum):
"""Represents the enum of the type of activity.
Attributes
----------
GAME:
Playing {name} (Playing Rocket League)
STREAMING:
Streaming {details} (Streaming Rocket League) It supports only YouTube and Twitch
LISTENING:
Listening to {name} (Listening to Spotify)
WATCHING:
Watching {name} (Watching YouTube Together)
CUSTOM:
{emoji} {name} (":smiley: I am cool") (THIS ACTIVITY IS NOT SUPPORTED FOR BOTS)
COMPETING:
Competing in {name} (Competing in Arena World Champions)
"""
GAME = 0
STREAMING = 1
LISTENING = 2
WATCHING = 3
CUSTOM = 4
COMPETING = 5
def __int__(self):
return self.value
@dataclass(repr=False)
class ActivityTimestamp(BasePresenceData):
"""Represents the timestamp of an activity.
Attributes
----------
start: Optional[:class:`int`]
Unix time (in milliseconds) of when the activity started
end: Optional[:class:`int`]
Unix time (in milliseconds) of when the activity ends
"""
start: Optional[int] = None
end: Optional[int] = None
@dataclass(repr=False)
class ActivityEmoji(BasePresenceData):
"""Represents an emoji in an activity.
Attributes
----------
name: :class:`str`
The name of the emoji
id: Optional[:class:`Snowflake`]
The id of the emoji
animated: Optional[:class:`bool`]
Whether this emoji is animated
"""
name: str
id: Optional[Snowflake] = None
animated: Optional[bool] = None
@dataclass(repr=False)
class ActivityParty(BasePresenceData):
"""Represents a party in an activity.
Attributes
----------
id: Optional[:class:`str`]
The id of the party
size: Optional[Tuple[:class:`int`, :class:`int`]]
Array of two integers (current_size, max_size)
"""
id: Optional[str] = None
size: Optional[Tuple[int, int]] = None
@dataclass(repr=False)
class ActivityAssets(BasePresenceData):
"""Represents an asset of an activity.
Attributes
----------
large_image: Optional[:class:`str`]
(Large Image) Activity asset images are arbitrary strings which usually contain snowflake IDs
large_text: Optional[:class:`str`]
text displayed when hovering over the large image of the activity
small_image: Optional[:class:`str`]
(Small Image) Activity asset images are arbitrary strings which usually contain snowflake IDs
small_text: Optional[:class:`str`]
text displayed when hovering over the small image of the activity
"""
large_image: Optional[str] = None
large_text: Optional[str] = None
small_image: Optional[str] = None
small_text: Optional[str] = None
@dataclass(repr=False)
class ActivitySecrets(BasePresenceData):
"""Represents a secret of an activity.
Attributes
----------
join: Optional[:class:`str`]
The secret for joining a party
spectate: Optional[:class:`str`]
The secret for spectating a game
match: Optional[:class:`str`]
The secret for a specific instanced match
"""
join: Optional[str] = None
spectate: Optional[str] = None
match_: Optional[str] = None
class ActivityFlags(BasePresenceData):
"""
Just Activity Flags (From Discord API).
Everything returns :class:`bool` value.
"""
def __init__(self, flags) -> None:
self.INSTANCE = bool(flags >> 0 & 1)
self.JOIN = bool(flags >> 1 & 1)
self.SPECTATE = bool(flags >> 2 & 1)
self.JOIN_REQUEST = bool(flags >> 3 & 1)
self.SYNC = bool(flags >> 4 & 1)
self.PLAY = bool(flags >> 5 & 1)
self.PARTY_PRIVACY_FRIENDS = bool(flags >> 6 & 1)
self.PARTY_PRIVACY_VOICE_CHANNEL = bool(flags >> 7 & 1)
self.EMBEDDED = bool(flags >> 8 & 1)
@dataclass(repr=False)
class ActivityButton(BasePresenceData):
"""When received over the gateway, the buttons field is an array of strings, which are the button labels. Bots
cannot access a user's activity button URLs. When sending, the buttons field must be an array of the below
object:
Attributes
----------
label: :class:`str`
The text shown on the button (1-32 characters)
url: :class:`str`
The url opened when clicking the button (1-512 characters)
"""
label: str
url: str
@dataclass(repr=False)
class Activity(BasePresenceData):
"""Bots are only able to send ``name``, ``type``, and optionally ``url``.
Attributes
----------
name: :class:`str`
The activity's name
type: :class:`~melisa.models.user.activity.ActivityType`
Activity type
url: Optional[:class:`str`]
Stream url, is validated when type is 1
created_at: :class:`int`
Unix timestamp (in milliseconds) of when the activity was added to the user's session
timestamps: Optional[:class:`~melisa.models.user.activity.ActivityTimestamp`]
Unix timestamps for start and/or end of the game
application_id: Optional[:class:`~melisa.utils.snowflake.Snowflake`]
Application id for the game
details: Optional[:class:`str`]
What the player is currently doing
state: Optional[:class:`str`]
The user's current party status
emoji: Optional[:class:`~melisa.models.user.activity.ActivityEmoji`]
The emoji used for a custom status
party: Optional[:class:`~melisa.models.user.activity.ActivityParty`]
Information for the current party of the player
assets: Optional[:class:`~melisa.models.user.activity.ActivityAssets`]
Images for the presence and their hover texts
secrets: Optional[:class:`~melisa.models.user.activity.ActivitySecrets`]
Secrets for Rich Presence joining and spectating
instance: Optional[:class:`bool`]
Whether or not the activity is an instanced game session
flags: Optional[:class:`~melisa.models.user.activity.ActivityFlags`]
Activity flags ORd together, describes what the payload includes
buttons: Optional[List[:class:`~melisa.models.user.activity.ActivityButton`]]
The url button on an activity.
"""
name: str
type: ActivityType
created_at: int
url: Optional[str] = None
timestamps: Optional[ActivityTimestamp] = None
application_id: Optional[Snowflake] = None
details: Optional[str] = None
state: Optional[str] = None
emoji: Optional[ActivityEmoji] = None
party: Optional[ActivityParty] = None
assets: Optional[ActivityAssets] = None
secrets: Optional[ActivitySecrets] = None
instance: Optional[bool] = None
flags: Optional[ActivityFlags] = None
buttons: Optional[List[ActivityButton]] = None
@dataclass(repr=False)
class BotActivity(BasePresenceData):
"""
Attributes
----------
name: :class:`str`
The activity's name
type: :class:`~melisa.models.user.activity.ActivityType`
Activity type
url: Optional[:class:`str`]
Stream url, is validated when type is Streaming"""
name: str
type: ActivityType
url: Optional[str] = None
class StatusType(Enum):
ONLINE = 'online'
OFFLINE = 'offline'
IDLE = 'idle'
DND = 'dnd'
INVISIBLE = 'invisible'
def __str__(self):
return self.value

10
melisa/utils/__init__.py Normal file
View file

@ -0,0 +1,10 @@
from .types import (
Coro
)
from .snowflake import Snowflake
__all__ = (
"Coro",
"Snowflake"
)

69
melisa/utils/snowflake.py Normal file
View file

@ -0,0 +1,69 @@
from __future__ import annotations
class Snowflake(int):
"""
Discord utilizes Twitter's snowflake format for uniquely identifiable descriptors (IDs). These IDs are guaranteed
to be unique across all of Discord, except in some unique scenarios in which child objects share their parent's
ID. Because Snowflake IDs are up to 64 bits in size (e.g. a uint64), they are always returned as strings in the
HTTP API to prevent integer overflows in some languages. See Gateway ETF/JSON for more information regarding
Gateway encoding.
Read more here: https://discord.com/developers/docs/reference#snowflakes
"""
_MAX_VALUE: int = 9223372036854775807
_MIN_VALUE: int = 0
def __init__(self, _):
super().__init__()
if self < self._MIN_VALUE:
raise ValueError(
"snowflake value should be greater than or equal to 0."
)
if self > self._MAX_VALUE:
raise ValueError(
"snowflake value should be less than or equal to 9223372036854775807."
)
@classmethod
def __factory__(cls, string: str) -> Snowflake:
return cls.from_string(string)
@classmethod
def from_string(cls, string: str):
"""Initialize a new Snowflake from a string.
Parameters
----------
string: :class:`str`
The snowflake as a string.
"""
return Snowflake(int(string))
@property
def timestamp(self) -> int:
"""
Milliseconds since Discord Epoch, the first second of 2015 or 1420070400000.
"""
return self >> 22
@property
def worker_id(self) -> int:
"""Internal worker ID"""
return (self >> 17) % 16
@property
def process_id(self) -> int:
"""Internal process ID"""
return (self >> 12) % 16
@property
def increment(self) -> int:
""" For every ID that is generated on that process, this number is incremented"""
return self % 2048
@property
def unix(self) -> int:
return self.timestamp + 1420070400000

12
melisa/utils/types.py Normal file
View file

@ -0,0 +1,12 @@
from __future__ import annotations
from typing import TypeVar, Callable, Coroutine, Any, Union
T = TypeVar("T")
Coro = TypeVar("Coro", bound=Callable[..., Coroutine[Any, Any, Any]])
APINullable = Union[T, None]