diff --git a/telegram-application/Dockerfile b/telegram-application/Dockerfile index f461901..8bd22b5 100644 --- a/telegram-application/Dockerfile +++ b/telegram-application/Dockerfile @@ -15,6 +15,7 @@ RUN apt-get update \ # Устанавливаем рабочую директорию WORKDIR /app ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 # Копируем файлы Poetry (pyproject.toml и poetry.lock) diff --git a/telegram-application/src/core/database/tg_messages.py b/telegram-application/src/core/database/tg_messages.py index c2c0923..b5f96a1 100644 --- a/telegram-application/src/core/database/tg_messages.py +++ b/telegram-application/src/core/database/tg_messages.py @@ -8,6 +8,8 @@ from src.core.database import Base class TgMessage(Base): + __tablename__ = 'tg_messages' + text: Mapped[str] message_time: Mapped[datetime] user_id: Mapped[int] = mapped_column( diff --git a/telegram-application/src/core/database/users.py b/telegram-application/src/core/database/users.py index cf8f23b..2a715ee 100644 --- a/telegram-application/src/core/database/users.py +++ b/telegram-application/src/core/database/users.py @@ -5,7 +5,7 @@ from src.core.database.base import Base class User(Base): - username: Mapped[str] + username: Mapped[str | None] first_name: Mapped[str | None] last_name: Mapped[str | None] diff --git a/telegram-application/src/core/rabbitmq/connect.py b/telegram-application/src/core/rabbitmq/connect.py index 2fe2039..2943548 100644 --- a/telegram-application/src/core/rabbitmq/connect.py +++ b/telegram-application/src/core/rabbitmq/connect.py @@ -1,3 +1,5 @@ +from asyncio import FastChildWatcher + from faststream.rabbit import RabbitBroker, RabbitExchange, ExchangeType, QueueType, RabbitQueue from src.core.settings.base import settings @@ -14,5 +16,15 @@ base_exchange = RabbitExchange( base_queue = RabbitQueue( name="base_queue", - queue_type=QueueType.CLASSIC, ) + + +message_handler_publisher = broker.publisher( + queue=base_queue, + exchange=base_exchange, +) + +async def init_queue_and_publisher(): + await broker.declare_exchange(base_exchange) + await broker.declare_queue(base_queue) + diff --git a/telegram-application/src/core/tg_service/constants.py b/telegram-application/src/core/tg_service/constants.py index 1614011..c69c213 100644 --- a/telegram-application/src/core/tg_service/constants.py +++ b/telegram-application/src/core/tg_service/constants.py @@ -1 +1 @@ -MESSAGE_CHANG_SIZE: int = 20 \ No newline at end of file +MESSAGE_CHANG_SIZE: int = 2 \ No newline at end of file diff --git a/telegram-application/src/core/tg_service/messages_handler.py b/telegram-application/src/core/tg_service/messages_handler.py index 6c073cf..63beec4 100644 --- a/telegram-application/src/core/tg_service/messages_handler.py +++ b/telegram-application/src/core/tg_service/messages_handler.py @@ -1,10 +1,8 @@ -from pyrogram import filters, Client -from pyrogram.enums import ChatType +from pyrogram import Client from pyrogram.types import Message from src.core.ai_services.schemas import MessageFromChatSchema -from src.core.tg_service.constants import MESSAGE_CHANG_SIZE -from src.core.tg_service.service import check_user_exists, check_chat_exists +from src.core.tg_service.service import check_user_exists, check_chat_exists, check_chunk_state_and_publish from src.core.tg_service import utils as api_tg_utils @@ -24,7 +22,6 @@ async def message_listener(client: Client, message: Message): chat_pyrogram=message.chat, ) - messages_chunk = DATA.get(message.chat.id) message_schema = MessageFromChatSchema( id=message.id, user_id=message.from_user.id, @@ -33,12 +30,9 @@ async def message_listener(client: Client, message: Message): message_time=message.date, ) + await check_chunk_state_and_publish( + data=DATA, + message_schema=message_schema, + chat_id=message.chat.id, + ) - if messages_chunk and len(messages_chunk) == MESSAGE_CHANG_SIZE: # TODO в констант. - await some_publisher() - del DATA[message.chat.id] - - if messages_chunk: - DATA[message.chat.id].append(message_schema) - else: - DATA[message.chat.id] = [message_schema] diff --git a/telegram-application/src/core/tg_service/schemas.py b/telegram-application/src/core/tg_service/schemas.py index a6ca698..79d8d2c 100644 --- a/telegram-application/src/core/tg_service/schemas.py +++ b/telegram-application/src/core/tg_service/schemas.py @@ -1,9 +1,15 @@ -from pydantic import BaseModel +from pydantic import BaseModel, Field +from src.core.ai_services.schemas import MessageFromChatSchema class UserFromMessageSchema(BaseModel): id: int - username: str + username: str | None = None first_name: str | None = None last_name: str | None = None + + + +class MessagesForSendToWorkersSchema(BaseModel): + messages: list[MessageFromChatSchema] \ No newline at end of file diff --git a/telegram-application/src/core/tg_service/service.py b/telegram-application/src/core/tg_service/service.py index 04818d6..410b7f8 100644 --- a/telegram-application/src/core/tg_service/service.py +++ b/telegram-application/src/core/tg_service/service.py @@ -1,13 +1,17 @@ -from pyrogram.types.user_and_chats import User, Chat +from pyrogram.types.user_and_chats import User as PyroUser, Chat as PyroChat -from src.core.tg_service.schemas import UserFromMessageSchema +from src.core.ai_services.schemas import MessageFromChatSchema +from src.core.database import TgChat, User +from src.core.rabbitmq.connect import message_handler_publisher +from src.core.tg_service.constants import MESSAGE_CHANG_SIZE +from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema from src.core.database.connect import db_helper from src.core.redis_helper.redis_connect import redis_client from src.core.tg_service import crud as tg_crud async def check_user_exists( - user_pyrogram: User, + user_pyrogram: PyroUser, ) -> None: user_schema = UserFromMessageSchema( username=user_pyrogram.username, @@ -21,7 +25,7 @@ async def check_user_exists( return async with db_helper.get_async_session_not_closed() as session: - user_model = await session.get(id=user_model.id) + user_model = await session.get(User, user_schema.id) if not user_model: await tg_crud.create_new_user( @@ -29,19 +33,19 @@ async def check_user_exists( **user_schema.model_dump() ) - await redis_client.set(name=str(user_schema.id), value=True) + await redis_client.set(name=str(user_schema.id), value=str(True)) async def check_chat_exists( - chat_pyrogram: Chat + chat_pyrogram: PyroChat ) -> None: - chat = await redis_client.get(chat_id=str(chat_pyrogram.id)) + chat = await redis_client.get(name=str(chat_pyrogram.id)) if chat: return async with db_helper.get_async_session_not_closed() as session: - chat = await session.get(id=chat_pyrogram.id) + chat = await session.get(TgChat, chat_pyrogram.id) if not chat: await tg_crud.create_new_chat( @@ -50,6 +54,37 @@ async def check_chat_exists( chat_type=chat_pyrogram.type.value, title=chat_pyrogram.title, ) - await redis_client.set(name=str(chat_pyrogram.id), value=True) + await redis_client.set(name=str(chat_pyrogram.id), value=str(True)) +async def check_chunk_state_and_publish( + data: dict, + chat_id: int, + message_schema: MessageFromChatSchema, +) -> bool: + messages_chunk = data.get(chat_id) + if messages_chunk and len(messages_chunk) == MESSAGE_CHANG_SIZE: + await message_handler_publisher.publish( + message=MessagesForSendToWorkersSchema(messages=messages_chunk) + ) + + if messages_chunk is None: + data[chat_id] = [message_schema] + + elif len(messages_chunk) == MESSAGE_CHANG_SIZE: + del data[chat_id] + async with db_helper.get_async_session_not_closed() as session: + await tg_crud.bulk_insert_messages( + messages=messages_chunk, + session=session + ) + data[chat_id] = [message_schema] + + else: + data[chat_id].append(message_schema) + + if messages_chunk and len(messages_chunk) == MESSAGE_CHANG_SIZE: + await message_handler_publisher.publish( + message=MessagesForSendToWorkersSchema(messages=messages_chunk) + ) + diff --git a/telegram-application/src/main.py b/telegram-application/src/main.py index b2bd9b4..e5b2e96 100644 --- a/telegram-application/src/main.py +++ b/telegram-application/src/main.py @@ -1,10 +1,8 @@ -import asyncio - from pyrogram import Client, filters, idle from pyrogram.handlers import MessageHandler from src.core.tg_service.messages_handler import message_listener -from src.core.rabbitmq.connect import broker +from src.core.rabbitmq.connect import broker, init_queue_and_publisher api_id = 17718565 api_hash = "72f93973f4227415572f039d4f847082" @@ -23,8 +21,9 @@ app.add_handler(MessageHandler( async def main(): await app.start() await broker.start() + await init_queue_and_publisher() await idle() await app.stop() - +print("*" * 100) app.run(main()) \ No newline at end of file