diff --git a/telegram-application/poetry.lock b/telegram-application/poetry.lock index 28a7f40..ac454bd 100644 --- a/telegram-application/poetry.lock +++ b/telegram-application/poetry.lock @@ -379,14 +379,14 @@ standard = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.5)", "htt [[package]] name = "faststream" -version = "0.5.34" +version = "0.5.42" description = "FastStream: the simplest way to work with a messaging queues" optional = false python-versions = ">=3.8" groups = ["main"] files = [ - {file = "faststream-0.5.34-py3-none-any.whl", hash = "sha256:aa7f61d6968a68f13ebf755cce9e8bf11b00717c28b2ef66e896b5d652a6c6a2"}, - {file = "faststream-0.5.34.tar.gz", hash = "sha256:84615968c5768ebaa89b72ae66b53e5302c08e7d18b341ef5193e54cb6ba8623"}, + {file = "faststream-0.5.42-py3-none-any.whl", hash = "sha256:dbbbbca36bc22ac017c0cca2acffb81dc6569c1813055367bf7acd6c46c6cbb9"}, + {file = "faststream-0.5.42.tar.gz", hash = "sha256:7f115dc4c9eb53a76a5ee1194191179f8161ed678ad0cb744ace2743bfd5002e"}, ] [package.dependencies] @@ -395,21 +395,21 @@ fast-depends = ">=2.4.0b0,<3.0.0" typing-extensions = ">=4.8.0" [package.extras] -cli = ["typer (>=0.9,!=0.12,<=0.15.1)", "watchfiles (>=0.15.0,<1.1.0)"] -confluent = ["confluent-kafka (>=2,<3)", "confluent-kafka (>=2.6,<3)"] -dev = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "bandit (==1.8.0)", "cairosvg", "codespell (==2.3.0)", "confluent-kafka (>=2,<3)", "confluent-kafka (>=2.6,<3)", "confluent-kafka-stubs", "coverage[toml] (==7.6.1)", "coverage[toml] (==7.6.10)", "detect-secrets (==1.5.0)", "dirty-equals (==0.8.0)", "email-validator (==2.2.0)", "fastapi (==0.115.6)", "httpx (==0.28.1)", "mdx-include (==1.4.2)", "mike (==2.1.3)", "mkdocs-git-revision-date-localized-plugin (==1.3.0)", "mkdocs-glightbox (==0.4.0)", "mkdocs-literate-nav (==0.6.1)", "mkdocs-macros-plugin (==1.3.7)", "mkdocs-material (==9.5.49)", "mkdocs-minify-plugin (==0.8.0)", "mkdocs-static-i18n (==1.2.3)", "mkdocstrings[python] (==0.27.0)", "mypy (==1.14.1)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "pillow", "pre-commit (==3.5.0)", "pre-commit (==4.0.1)", "prometheus-client (>=0.20.0,<0.30.0)", "pydantic-settings (>=2.0.0,<3.0.0)", "pytest (==8.3.4)", "pytest-asyncio (==0.24.0)", "pytest-asyncio (==0.25.1)", "pyyaml (==6.0.2)", "redis (>=5.0.0,<6.0.0)", "requests", "ruff (==0.8.6)", "semgrep (==1.101.0)", "typer (>=0.9,!=0.12,<=0.15.1)", "types-aiofiles", "types-deprecated", "types-docutils", "types-pygments", "types-pyyaml", "types-redis", "types-setuptools", "types-ujson", "typing-extensions (>=4.8.0,<4.12.1)", "watchfiles (>=0.15.0,<1.1.0)"] -devdocs = ["cairosvg", "mdx-include (==1.4.2)", "mike (==2.1.3)", "mkdocs-git-revision-date-localized-plugin (==1.3.0)", "mkdocs-glightbox (==0.4.0)", "mkdocs-literate-nav (==0.6.1)", "mkdocs-macros-plugin (==1.3.7)", "mkdocs-material (==9.5.49)", "mkdocs-minify-plugin (==0.8.0)", "mkdocs-static-i18n (==1.2.3)", "mkdocstrings[python] (==0.27.0)", "pillow", "requests"] +cli = ["typer (>=0.9,!=0.12,<=0.15.4)", "watchfiles (>=0.15.0,<1.1.0)"] +confluent = ["confluent-kafka (>=2,!=2.8.1,<3)", "confluent-kafka (>=2.6,!=2.8.1,<3)"] +dev = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "bandit (==1.7.10)", "bandit (==1.8.3)", "cairosvg", "codespell (==2.4.1)", "confluent-kafka (>=2,!=2.8.1,<3)", "confluent-kafka (>=2.6,!=2.8.1,<3)", "confluent-kafka-stubs", "coverage[toml] (==7.6.1)", "coverage[toml] (==7.8.0)", "detect-secrets (==1.5.0)", "dirty-equals (==0.9.0)", "email-validator (==2.2.0)", "fastapi (==0.115.12)", "httpx (==0.28.1)", "mdx-include (==1.4.2)", "mike (==2.1.3)", "mkdocs-git-revision-date-localized-plugin (==1.4.5)", "mkdocs-glightbox (==0.4.0)", "mkdocs-literate-nav (==0.6.2)", "mkdocs-macros-plugin (==1.3.7)", "mkdocs-material (==9.6.14)", "mkdocs-minify-plugin (==0.8.0)", "mkdocs-static-i18n (==1.3.0)", "mkdocstrings[python] (==0.26.1)", "mkdocstrings[python] (==0.29.1)", "mypy (==1.15.0)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "pillow", "pre-commit (==3.5.0)", "pre-commit (==4.2.0)", "prometheus-client (>=0.20.0,<0.30.0)", "pydantic-settings (>=2.0.0,<3.0.0)", "pytest (==8.3.5)", "pytest-asyncio (==0.24.0)", "pytest-asyncio (==0.26.0)", "pyyaml (==6.0.2)", "redis (>=5.0.0,<7.0.0)", "requests", "ruff (==0.11.10)", "semgrep (==1.122.0)", "semgrep (==1.99.0)", "typer (>=0.9,!=0.12,<=0.15.4)", "types-aiofiles", "types-deprecated", "types-docutils", "types-pygments", "types-pyyaml", "types-redis", "types-setuptools", "types-ujson", "typing-extensions (>=4.8.0,<4.12.1)", "watchfiles (>=0.15.0,<1.1.0)"] +devdocs = ["cairosvg", "mdx-include (==1.4.2)", "mike (==2.1.3)", "mkdocs-git-revision-date-localized-plugin (==1.4.5)", "mkdocs-glightbox (==0.4.0)", "mkdocs-literate-nav (==0.6.2)", "mkdocs-macros-plugin (==1.3.7)", "mkdocs-material (==9.6.14)", "mkdocs-minify-plugin (==0.8.0)", "mkdocs-static-i18n (==1.3.0)", "mkdocstrings[python] (==0.26.1)", "mkdocstrings[python] (==0.29.1)", "pillow", "requests"] kafka = ["aiokafka (>=0.9,<0.13)"] -lint = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "bandit (==1.8.0)", "codespell (==2.3.0)", "confluent-kafka (>=2,<3)", "confluent-kafka (>=2.6,<3)", "confluent-kafka-stubs", "mypy (==1.14.1)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "prometheus-client (>=0.20.0,<0.30.0)", "redis (>=5.0.0,<6.0.0)", "ruff (==0.8.6)", "semgrep (==1.101.0)", "typer (>=0.9,!=0.12,<=0.15.1)", "types-aiofiles", "types-deprecated", "types-docutils", "types-pygments", "types-pyyaml", "types-redis", "types-setuptools", "types-ujson", "watchfiles (>=0.15.0,<1.1.0)"] +lint = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "bandit (==1.7.10)", "bandit (==1.8.3)", "codespell (==2.4.1)", "confluent-kafka (>=2,!=2.8.1,<3)", "confluent-kafka (>=2.6,!=2.8.1,<3)", "confluent-kafka-stubs", "mypy (==1.15.0)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "prometheus-client (>=0.20.0,<0.30.0)", "redis (>=5.0.0,<7.0.0)", "ruff (==0.11.10)", "semgrep (==1.122.0)", "semgrep (==1.99.0)", "typer (>=0.9,!=0.12,<=0.15.4)", "types-aiofiles", "types-deprecated", "types-docutils", "types-pygments", "types-pyyaml", "types-redis", "types-setuptools", "types-ujson", "watchfiles (>=0.15.0,<1.1.0)"] nats = ["nats-py (>=2.7.0,<=3.0.0)"] -optionals = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "confluent-kafka (>=2,<3)", "confluent-kafka (>=2.6,<3)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "prometheus-client (>=0.20.0,<0.30.0)", "redis (>=5.0.0,<6.0.0)", "typer (>=0.9,!=0.12,<=0.15.1)", "watchfiles (>=0.15.0,<1.1.0)"] +optionals = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "confluent-kafka (>=2,!=2.8.1,<3)", "confluent-kafka (>=2.6,!=2.8.1,<3)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "prometheus-client (>=0.20.0,<0.30.0)", "redis (>=5.0.0,<7.0.0)", "typer (>=0.9,!=0.12,<=0.15.4)", "watchfiles (>=0.15.0,<1.1.0)"] otel = ["opentelemetry-sdk (>=1.24.0,<2.0.0)"] prometheus = ["prometheus-client (>=0.20.0,<0.30.0)"] rabbit = ["aio-pika (>=9,<10)"] -redis = ["redis (>=5.0.0,<6.0.0)"] -test-core = ["coverage[toml] (==7.6.1)", "coverage[toml] (==7.6.10)", "dirty-equals (==0.8.0)", "pytest (==8.3.4)", "pytest-asyncio (==0.24.0)", "pytest-asyncio (==0.25.1)", "typing-extensions (>=4.8.0,<4.12.1)"] -testing = ["coverage[toml] (==7.6.1)", "coverage[toml] (==7.6.10)", "dirty-equals (==0.8.0)", "email-validator (==2.2.0)", "fastapi (==0.115.6)", "httpx (==0.28.1)", "pydantic-settings (>=2.0.0,<3.0.0)", "pytest (==8.3.4)", "pytest-asyncio (==0.24.0)", "pytest-asyncio (==0.25.1)", "pyyaml (==6.0.2)", "typing-extensions (>=4.8.0,<4.12.1)"] -types = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "confluent-kafka (>=2,<3)", "confluent-kafka (>=2.6,<3)", "confluent-kafka-stubs", "mypy (==1.14.1)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "prometheus-client (>=0.20.0,<0.30.0)", "redis (>=5.0.0,<6.0.0)", "typer (>=0.9,!=0.12,<=0.15.1)", "types-aiofiles", "types-deprecated", "types-docutils", "types-pygments", "types-pyyaml", "types-redis", "types-setuptools", "types-ujson", "watchfiles (>=0.15.0,<1.1.0)"] +redis = ["redis (>=5.0.0,<7.0.0)"] +test-core = ["coverage[toml] (==7.6.1)", "coverage[toml] (==7.8.0)", "dirty-equals (==0.9.0)", "pytest (==8.3.5)", "pytest-asyncio (==0.24.0)", "pytest-asyncio (==0.26.0)", "typing-extensions (>=4.8.0,<4.12.1)"] +testing = ["coverage[toml] (==7.6.1)", "coverage[toml] (==7.8.0)", "dirty-equals (==0.9.0)", "email-validator (==2.2.0)", "fastapi (==0.115.12)", "httpx (==0.28.1)", "pydantic-settings (>=2.0.0,<3.0.0)", "pytest (==8.3.5)", "pytest-asyncio (==0.24.0)", "pytest-asyncio (==0.26.0)", "pyyaml (==6.0.2)", "typing-extensions (>=4.8.0,<4.12.1)"] +types = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "confluent-kafka (>=2,!=2.8.1,<3)", "confluent-kafka (>=2.6,!=2.8.1,<3)", "confluent-kafka-stubs", "mypy (==1.15.0)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "prometheus-client (>=0.20.0,<0.30.0)", "redis (>=5.0.0,<7.0.0)", "typer (>=0.9,!=0.12,<=0.15.4)", "types-aiofiles", "types-deprecated", "types-docutils", "types-pygments", "types-pyyaml", "types-redis", "types-setuptools", "types-ujson", "watchfiles (>=0.15.0,<1.1.0)"] [[package]] name = "google-ai-generativelanguage" @@ -1668,6 +1668,25 @@ anyio = ">=3.6.2,<5" [package.extras] full = ["httpx (>=0.27.0,<0.29.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.18)", "pyyaml"] +[[package]] +name = "telethon" +version = "1.40.0" +description = "Full-featured Telegram client library for Python 3" +optional = false +python-versions = ">=3.5" +groups = ["main"] +files = [ + {file = "Telethon-1.40.0-py3-none-any.whl", hash = "sha256:146fd4cb2a7afa66bc67f9c2167756096a37b930f65711a3e7399ec9874dcfa7"}, + {file = "telethon-1.40.0.tar.gz", hash = "sha256:40e83326877a2e68b754d4b6d0d1ca5ac924110045b039e02660f2d67add97db"}, +] + +[package.dependencies] +pyaes = "*" +rsa = "*" + +[package.extras] +cryptg = ["cryptg"] + [[package]] name = "tgcrypto" version = "1.2.5" @@ -1935,4 +1954,4 @@ propcache = ">=0.2.0" [metadata] lock-version = "2.1" python-versions = ">=3.11, <4.0" -content-hash = "8d9d9fd14dae4dfc99b18096b87a59e6bca68ecf03bdc411f4b2c428e556c9a6" +content-hash = "99708cc4fde94883d4501ad03427d12d5b88d4b94426cb5a21940b4e5d6249f4" diff --git a/telegram-application/pyproject.toml b/telegram-application/pyproject.toml index a4298ad..458155e 100644 --- a/telegram-application/pyproject.toml +++ b/telegram-application/pyproject.toml @@ -23,7 +23,8 @@ dependencies = [ "sqladmin (>=0.20.1,<0.21.0)", "fastapi (>=0.115.12,<0.116.0)", "uvicorn (>=0.34.2,<0.35.0)", - "emoji (>=2.14.1,<3.0.0)" + "emoji (>=2.14.1,<3.0.0)", + "telethon (>=1.40.0,<2.0.0)" ] diff --git a/telegram-application/src/core/database/chats.py b/telegram-application/src/core/database/chats.py index 1726038..b5d2acc 100644 --- a/telegram-application/src/core/database/chats.py +++ b/telegram-application/src/core/database/chats.py @@ -11,6 +11,7 @@ if TYPE_CHECKING: class TgChat(Base): chat_type: Mapped[str] title: Mapped[str] + username: Mapped[str | None] message_relationship: Mapped[list["TgMessage"]] = relationship( back_populates="chat_relationship", diff --git a/telegram-application/src/core/tg_service/constants.py b/telegram-application/src/core/tg_service/constants.py index 0dfc37c..ba6b0d6 100644 --- a/telegram-application/src/core/tg_service/constants.py +++ b/telegram-application/src/core/tg_service/constants.py @@ -1 +1,16 @@ -MESSAGE_CHANG_SIZE: int = 5 \ No newline at end of file +from enum import Enum + +from telethon.types import Message, User, Chat, Channel + + +MESSAGE_CHANG_SIZE: int = 5 + +TELETHON_CHAT_TYPES = User | Chat | Channel + + +class CustomChatTypes(Enum): + PRIVATE_GROUP = "private_group" + STANDARD_GROUP = "standard_group" + SUPERGROUP = "supergroup" + BROADCAST_CHANNEL = "broadcast_channel" + BOT = "bot" \ No newline at end of file diff --git a/telegram-application/src/core/tg_service/messages_handler.py b/telegram-application/src/core/tg_service/messages_handler.py index ccff0b5..055c273 100644 --- a/telegram-application/src/core/tg_service/messages_handler.py +++ b/telegram-application/src/core/tg_service/messages_handler.py @@ -1,39 +1,56 @@ import asyncio -from pyrogram import Client -from pyrogram.types import Message - +from src.core.settings.base import settings +from src.core.tg_service.constants import TELETHON_CHAT_TYPES from src.core.tg_service.schemas import MessageFromChatSchema from src.core.tg_service.service import check_user_exists, check_chat_exists, check_chunk_state_and_publish from src.core.tg_service import utils as api_tg_utils +from src.telethon_client import telethon_client +from telethon.events import NewMessage +from telethon.types import Message, User, Chat, Channel DATA: dict[int, list[MessageFromChatSchema]] = dict() - lock = asyncio.Lock() -async def message_listener(client: Client, message: Message): - print("received message") - if api_tg_utils.check_message_condition(message): + +@telethon_client.on(event=NewMessage) +async def message_listener(event: NewMessage.Event) -> None: + message: Message = event.message + sender: User = await event.get_sender() + chat: TELETHON_CHAT_TYPES = await event.get_chat() + + chat_type, chat_username = api_tg_utils.handle_chat_type(chat) + if api_tg_utils.check_message_condition( + message=message, + sender=sender, + chat_type=chat_type, + ): async with lock: await check_user_exists( - user_pyrogram=message.from_user, + username=sender.username, + first_name=sender.first_name, + last_name=sender.last_name, + user_id=sender.id, ) + await check_chat_exists( - chat_pyrogram=message.chat, + chat_id=chat.id, + chat_type=chat_type.value, + chat_title=chat.title, + chat_username=chat_username, ) message_schema = MessageFromChatSchema( id=message.id, - user_id=message.from_user.id, - chat_id=message.chat.id, + user_id=sender.id, + chat_id=chat.id, text=message.text, message_time=message.date, ) await check_chunk_state_and_publish( data=DATA, message_schema=message_schema, - chat_id=message.chat.id, + chat_id=chat.id, ) - diff --git a/telegram-application/src/core/tg_service/notify_sender.py b/telegram-application/src/core/tg_service/notify_sender.py index f181b8e..7d30960 100644 --- a/telegram-application/src/core/tg_service/notify_sender.py +++ b/telegram-application/src/core/tg_service/notify_sender.py @@ -16,7 +16,7 @@ async def send_to_tg_from_bot(text: str): "parse_mode": "Markdown", } async with AsyncClient() as client: - response = await client.post(url, json=payload) + response = await client.post(url, json=payload, timeout=30 ) print(response) if response.status_code != 200: print(response.text) diff --git a/telegram-application/src/core/tg_service/notify_success.py b/telegram-application/src/core/tg_service/notify_success.py index 89036f1..7f83dbe 100644 --- a/telegram-application/src/core/tg_service/notify_success.py +++ b/telegram-application/src/core/tg_service/notify_success.py @@ -1,7 +1,9 @@ from src.core.database import TgMessage, TgChat, User +from src.core.settings.base import settings from src.core.tg_service.notify_sender import send_to_tg_from_bot from src.core.tg_service.utils import create_and_format_message +from src.telethon_client import telethon_client async def notify_for_success( @@ -11,7 +13,6 @@ async def notify_for_success( user_model: User, ) -> None: message = create_and_format_message( - messages=messages, reason=success_reason, user_model=user_model, chat=chat, @@ -19,4 +20,10 @@ async def notify_for_success( await send_to_tg_from_bot( text=message - ) \ No newline at end of file + ) + + await telethon_client.forward_messages( + entity=settings.NOTIFY.CHAT_ID, + messages=[message.id for message in messages], + from_peer=chat.id, + ) diff --git a/telegram-application/src/core/tg_service/service.py b/telegram-application/src/core/tg_service/service.py index f72e4ba..a143841 100644 --- a/telegram-application/src/core/tg_service/service.py +++ b/telegram-application/src/core/tg_service/service.py @@ -16,13 +16,16 @@ lock = asyncio.Lock() async def check_user_exists( - user_pyrogram: PyroUser, + username: str, + first_name: str, + last_name: str, + user_id: int, ) -> None: user_schema = UserFromMessageSchema( - username=user_pyrogram.username, - first_name=user_pyrogram.first_name, - last_name=user_pyrogram.last_name, - id=user_pyrogram.id, + username=username, + first_name=first_name, + last_name=last_name, + id=user_id, ) user_model = await redis_client.get(name=str(user_schema.id)) @@ -42,24 +45,35 @@ async def check_user_exists( async def check_chat_exists( - chat_pyrogram: PyroChat + chat_id: int, + chat_type: str, + chat_title: str, + chat_username: str | None = None, ) -> None: - chat = await redis_client.get(name=str(chat_pyrogram.id)) + chat = await redis_client.get(name=str(chat_id)) - if chat: - return + # if chat: + # return async with db_helper.get_async_session_not_closed() as session: - chat = await session.get(TgChat, chat_pyrogram.id) + chat = await session.get(TgChat, chat_id) if not chat: await tg_crud.create_new_chat( session=session, - id=chat_pyrogram.id, - chat_type=chat_pyrogram.type.value, - title=chat_pyrogram.title, + id=chat_id, + chat_type=chat_type, + title=chat_title, + username=chat_username, ) - await redis_client.set(name=str(chat_pyrogram.id), value=str(True)) + + if not chat.username and chat_username: + chat.username = chat_username + await session.commit() + + + + await redis_client.set(name=str(chat_id), value=str(True)) async def check_chunk_state_and_publish( diff --git a/telegram-application/src/core/tg_service/utils.py b/telegram-application/src/core/tg_service/utils.py index 3ccc14d..2c42376 100644 --- a/telegram-application/src/core/tg_service/utils.py +++ b/telegram-application/src/core/tg_service/utils.py @@ -1,25 +1,28 @@ import emoji -from pyrogram.types import Message -from pyrogram.enums import ChatType +# from telethon.events import NewMessage +from telethon.tl.types import Message, User as UserTelethon, Chat, Channel + + from src.core.database import TgMessage, User, TgChat +from src.core.tg_service.constants import TELETHON_CHAT_TYPES, CustomChatTypes + def check_message_condition( message: Message, + sender: User, + chat_type: CustomChatTypes, ) -> bool: - if message.chat.type in [ChatType.PRIVATE, ChatType.BOT]: - return False - - if not message.from_user: + if chat_type == CustomChatTypes.PRIVATE_GROUP: return False if not message.text: return False - if message.from_user.username: - if 'bot' in message.from_user.username.lower(): + if sender.username: + if 'bot' in sender.username.lower(): return False emoji_count = emoji.emoji_count(str(message.text)) @@ -31,7 +34,6 @@ def check_message_condition( def create_and_format_message( - messages: list[TgMessage], reason: str, chat: TgChat, user_model: User, @@ -57,18 +59,21 @@ def create_and_format_message( # Chat info chat_title = escape_markdown_v2(chat.title) - chat_link = ( - f"https://t.me/c/{str(chat.id)[4:]}" - if str(chat.id).startswith("-100") - else f"https://t.me/{chat.username}" if chat.username - else f"Chat ID: {chat.id}" - ) + + if chat.username: + chat_link = f"https://t.me/{chat.username}" + else: + chat_link = ( + f"https://t.me/c/{str(chat.id)[4:]}" + if str(chat.id).startswith("-100") + else f"Chat ID: {chat.id}" + ) # Escape reason reason_escaped = escape_markdown_v2(reason) # Header - header = ( + full_message = ( f"🔥 *Найдена успешка!*\n" f"👤 *Пользователь:* {user_link}\n" f"🐩 *Чат:* [{chat_title}]({chat_link})\n" @@ -77,57 +82,31 @@ def create_and_format_message( ) MAX_LENGTH = 4096 - header_length = len(header) - available_length = MAX_LENGTH - header_length - 50 # Extra buffer for safety - message_blocks = [] - current_length = 0 - - for msg in messages: - if not msg.text: - continue - - # Clean and escape text - clean_text = msg.text.replace('```', 'ʻʻʻ') # Replace triple backticks - escaped_text = escape_markdown_v2(clean_text) - escaped_text = validate_markdown(escaped_text) - - sender_username = escape_markdown_v2( - msg.user_relationship.username or f"ID:{msg.user_id}" - ) - - # Create message block with proper code block formatting - block = ( - f"**{sender_username}:**\n" - f"```\n" - f"{escaped_text}\n" - f"```\n\n" - ) - block_length = len(block) - - if current_length + block_length > available_length: - remaining_space = available_length - current_length - if remaining_space > 30: # Enough space for a truncated message - truncated_text = escaped_text[:remaining_space - 30].rsplit(' ', 1)[0] + "..." - block = ( - f"**{sender_username}:**\n" - f"```\n" - f"{truncated_text}\n" - f"```\n\n" - f"_... сообщение обрезано ..._\n" - ) - message_blocks.append(block) - break - - message_blocks.append(block) - current_length += block_length - - content = ''.join(message_blocks).strip() - full_message = header + content # Final validation and truncation if needed full_message = validate_markdown(full_message) if len(full_message) > MAX_LENGTH: full_message = full_message[:MAX_LENGTH - 30].rsplit('\n', 2)[0] + "\n```\n...\n```\n_... сообщение обрезано ..._" - return full_message \ No newline at end of file + return full_message + + +def handle_chat_type( + chat: TELETHON_CHAT_TYPES +) -> tuple[CustomChatTypes, str | None]: + chat_username = None + if isinstance(chat, UserTelethon): + chat_type = CustomChatTypes.PRIVATE_GROUP + elif isinstance(chat, Chat): + chat_type = CustomChatTypes.STANDARD_GROUP + elif isinstance(chat, Channel): + chat_username = chat.username + if chat.megagroup: + chat_type = CustomChatTypes.SUPERGROUP + else: + chat_type = CustomChatTypes.BROADCAST_CHANNEL + else: + raise ValueError(f"Invalid chat type check it!{chat}") + + return chat_type, chat_username \ No newline at end of file diff --git a/telegram-application/src/core/workers/crud.py b/telegram-application/src/core/workers/crud.py index 8f5be8b..f02ab64 100644 --- a/telegram-application/src/core/workers/crud.py +++ b/telegram-application/src/core/workers/crud.py @@ -25,15 +25,20 @@ async def bulk_create_success_reasons( await session.commit() -async def get_messages_by_slice_id( +async def get_messages_by_slice_id_and_user_id( session: AsyncSession, - slice_id: UUID + slice_id: UUID, + user_id: int, ) -> list[TgMessage]: stmt = ( select(TgMessage) .options(joinedload(TgMessage.user_relationship)) .where( - TgMessage.slice_id == slice_id + TgMessage.slice_id == slice_id, + TgMessage.user_id == user_id, + ) + .order_by( + TgMessage.message_time.asc() ) ) diff --git a/telegram-application/src/core/workers/rmq_worker_handler.py b/telegram-application/src/core/workers/rmq_worker_handler.py index ca89011..840e6e3 100644 --- a/telegram-application/src/core/workers/rmq_worker_handler.py +++ b/telegram-application/src/core/workers/rmq_worker_handler.py @@ -6,7 +6,7 @@ from faststream import Depends from src.core.database import User, TgChat from src.core.tg_service.notify_success import notify_for_success -from src.core.workers.schemas import ResponseFromGeminiSchema +from src.core.workers.schemas import ResponseFromGeminiSchema, SuccessChatFromAiSchema from src.core.database.connect import db_helper from src.core.rabbitmq.connect import success_gemini_subscriber from src.core.workers import crud as workers_crud @@ -17,27 +17,25 @@ async def create_success_record( message: ResponseFromGeminiSchema, session: Annotated[AsyncSession, Depends(db_helper.get_async_session)], ): - print(message.success) - await workers_crud.bulk_create_success_reasons( - success_schema=message, - session=session, - ) - - for success_message in message.success: - messages = await workers_crud.get_messages_by_slice_id( + await workers_crud.bulk_create_success_reasons( + success_schema=message, session=session, - slice_id=success_message.slice_id, ) - user_model = await session.get(User, success_message.user_id) - chat_model = await session.get(TgChat, messages[0].chat_id) + for success_message in message.success: + messages = await workers_crud.get_messages_by_slice_id_and_user_id( + session=session, + slice_id=success_message.slice_id, + user_id=success_message.user_id + ) + user_model = await session.get(User, success_message.user_id) + chat_model = await session.get(TgChat, messages[0].chat_id) - - await notify_for_success( - messages=messages, - chat=chat_model, # type:ignore - user_model=user_model, # type:ignore - success_reason=success_message.reason, # type:ignore - ) + await notify_for_success( + messages=messages, + chat=chat_model, # type:ignore + user_model=user_model, # type:ignore + success_reason=success_message.reason, # type:ignore + ) diff --git a/telegram-application/src/main.py b/telegram-application/src/main.py index c01e0b6..cc12ca2 100644 --- a/telegram-application/src/main.py +++ b/telegram-application/src/main.py @@ -1,27 +1,44 @@ -from pyrogram import Client, filters, idle +import asyncio + + + +from pyrogram import filters, idle from pyrogram.handlers import MessageHandler -from src.core.settings.base import settings from src.core.tg_service.messages_handler import message_listener from src.core.rabbitmq.connect import broker, init_queue_and_publisher +from src.telethon_client import telethon_client + +# # app.add_handler(MessageHandler( +# # callback=message_listener, +# # filters=filters.all +# # )) + +from telethon import TelegramClient, events, types + +from src.core.settings.base import settings + +# app = TelegramClient( +# settings.ACCOUNT.NAME, +# api_id=settings.ACCOUNT.API_ID, +# api_hash=settings.ACCOUNT.API_HASH, +# ) +# +# +# @app.on(events.NewMessage) +# async def my_event_handler(event): +# print(event.message) -app = Client( - name=settings.ACCOUNT.NAME, - api_id=settings.ACCOUNT.API_ID, - api_hash=settings.ACCOUNT.API_HASH, -) - -app.add_handler(MessageHandler( - callback=message_listener, - filters=filters.all -)) +# telethon_client.start() +# telethon_client.run_until_disconnected() async def main(): - await app.start() await broker.start() await init_queue_and_publisher() - await idle() - await app.stop() -app.run(main()) + await telethon_client.start() + await telethon_client.run_until_disconnected() + + +asyncio.run(main()) \ No newline at end of file diff --git a/telegram-application/src/telethon_client.py b/telegram-application/src/telethon_client.py new file mode 100644 index 0000000..e8a8b62 --- /dev/null +++ b/telegram-application/src/telethon_client.py @@ -0,0 +1,14 @@ +import asyncio + +from telethon import TelegramClient + +from src.core.settings.base import settings + +loop = asyncio.get_event_loop() + +telethon_client = TelegramClient( + settings.ACCOUNT.NAME, + api_id=settings.ACCOUNT.API_ID, + api_hash=settings.ACCOUNT.API_HASH, + loop=loop, +)