add prototype working publisher

This commit is contained in:
harold 2025-02-06 16:43:56 +05:00
parent 5ea4487232
commit 05b6060ccf
9 changed files with 80 additions and 31 deletions

View File

@ -15,6 +15,7 @@ RUN apt-get update \
# Устанавливаем рабочую директорию # Устанавливаем рабочую директорию
WORKDIR /app WORKDIR /app
ENV PYTHONPATH=/app ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
# Копируем файлы Poetry (pyproject.toml и poetry.lock) # Копируем файлы Poetry (pyproject.toml и poetry.lock)

View File

@ -8,6 +8,8 @@ from src.core.database import Base
class TgMessage(Base): class TgMessage(Base):
__tablename__ = 'tg_messages'
text: Mapped[str] text: Mapped[str]
message_time: Mapped[datetime] message_time: Mapped[datetime]
user_id: Mapped[int] = mapped_column( user_id: Mapped[int] = mapped_column(

View File

@ -5,7 +5,7 @@ from src.core.database.base import Base
class User(Base): class User(Base):
username: Mapped[str] username: Mapped[str | None]
first_name: Mapped[str | None] first_name: Mapped[str | None]
last_name: Mapped[str | None] last_name: Mapped[str | None]

View File

@ -1,3 +1,5 @@
from asyncio import FastChildWatcher
from faststream.rabbit import RabbitBroker, RabbitExchange, ExchangeType, QueueType, RabbitQueue from faststream.rabbit import RabbitBroker, RabbitExchange, ExchangeType, QueueType, RabbitQueue
from src.core.settings.base import settings from src.core.settings.base import settings
@ -14,5 +16,15 @@ base_exchange = RabbitExchange(
base_queue = RabbitQueue( base_queue = RabbitQueue(
name="base_queue", name="base_queue",
queue_type=QueueType.CLASSIC,
) )
message_handler_publisher = broker.publisher(
queue=base_queue,
exchange=base_exchange,
)
async def init_queue_and_publisher():
await broker.declare_exchange(base_exchange)
await broker.declare_queue(base_queue)

View File

@ -1 +1 @@
MESSAGE_CHANG_SIZE: int = 20 MESSAGE_CHANG_SIZE: int = 2

View File

@ -1,10 +1,8 @@
from pyrogram import filters, Client from pyrogram import Client
from pyrogram.enums import ChatType
from pyrogram.types import Message from pyrogram.types import Message
from src.core.ai_services.schemas import MessageFromChatSchema from src.core.ai_services.schemas import MessageFromChatSchema
from src.core.tg_service.constants import MESSAGE_CHANG_SIZE from src.core.tg_service.service import check_user_exists, check_chat_exists, check_chunk_state_and_publish
from src.core.tg_service.service import check_user_exists, check_chat_exists
from src.core.tg_service import utils as api_tg_utils from src.core.tg_service import utils as api_tg_utils
@ -24,7 +22,6 @@ async def message_listener(client: Client, message: Message):
chat_pyrogram=message.chat, chat_pyrogram=message.chat,
) )
messages_chunk = DATA.get(message.chat.id)
message_schema = MessageFromChatSchema( message_schema = MessageFromChatSchema(
id=message.id, id=message.id,
user_id=message.from_user.id, user_id=message.from_user.id,
@ -33,12 +30,9 @@ async def message_listener(client: Client, message: Message):
message_time=message.date, message_time=message.date,
) )
await check_chunk_state_and_publish(
data=DATA,
message_schema=message_schema,
chat_id=message.chat.id,
)
if messages_chunk and len(messages_chunk) == MESSAGE_CHANG_SIZE: # TODO в констант.
await some_publisher()
del DATA[message.chat.id]
if messages_chunk:
DATA[message.chat.id].append(message_schema)
else:
DATA[message.chat.id] = [message_schema]

View File

@ -1,9 +1,15 @@
from pydantic import BaseModel from pydantic import BaseModel, Field
from src.core.ai_services.schemas import MessageFromChatSchema
class UserFromMessageSchema(BaseModel): class UserFromMessageSchema(BaseModel):
id: int id: int
username: str username: str | None = None
first_name: str | None = None first_name: str | None = None
last_name: str | None = None last_name: str | None = None
class MessagesForSendToWorkersSchema(BaseModel):
messages: list[MessageFromChatSchema]

View File

@ -1,13 +1,17 @@
from pyrogram.types.user_and_chats import User, Chat from pyrogram.types.user_and_chats import User as PyroUser, Chat as PyroChat
from src.core.tg_service.schemas import UserFromMessageSchema from src.core.ai_services.schemas import MessageFromChatSchema
from src.core.database import TgChat, User
from src.core.rabbitmq.connect import message_handler_publisher
from src.core.tg_service.constants import MESSAGE_CHANG_SIZE
from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema
from src.core.database.connect import db_helper from src.core.database.connect import db_helper
from src.core.redis_helper.redis_connect import redis_client from src.core.redis_helper.redis_connect import redis_client
from src.core.tg_service import crud as tg_crud from src.core.tg_service import crud as tg_crud
async def check_user_exists( async def check_user_exists(
user_pyrogram: User, user_pyrogram: PyroUser,
) -> None: ) -> None:
user_schema = UserFromMessageSchema( user_schema = UserFromMessageSchema(
username=user_pyrogram.username, username=user_pyrogram.username,
@ -21,7 +25,7 @@ async def check_user_exists(
return return
async with db_helper.get_async_session_not_closed() as session: async with db_helper.get_async_session_not_closed() as session:
user_model = await session.get(id=user_model.id) user_model = await session.get(User, user_schema.id)
if not user_model: if not user_model:
await tg_crud.create_new_user( await tg_crud.create_new_user(
@ -29,19 +33,19 @@ async def check_user_exists(
**user_schema.model_dump() **user_schema.model_dump()
) )
await redis_client.set(name=str(user_schema.id), value=True) await redis_client.set(name=str(user_schema.id), value=str(True))
async def check_chat_exists( async def check_chat_exists(
chat_pyrogram: Chat chat_pyrogram: PyroChat
) -> None: ) -> None:
chat = await redis_client.get(chat_id=str(chat_pyrogram.id)) chat = await redis_client.get(name=str(chat_pyrogram.id))
if chat: if chat:
return return
async with db_helper.get_async_session_not_closed() as session: async with db_helper.get_async_session_not_closed() as session:
chat = await session.get(id=chat_pyrogram.id) chat = await session.get(TgChat, chat_pyrogram.id)
if not chat: if not chat:
await tg_crud.create_new_chat( await tg_crud.create_new_chat(
@ -50,6 +54,37 @@ async def check_chat_exists(
chat_type=chat_pyrogram.type.value, chat_type=chat_pyrogram.type.value,
title=chat_pyrogram.title, title=chat_pyrogram.title,
) )
await redis_client.set(name=str(chat_pyrogram.id), value=True) await redis_client.set(name=str(chat_pyrogram.id), value=str(True))
async def check_chunk_state_and_publish(
data: dict,
chat_id: int,
message_schema: MessageFromChatSchema,
) -> bool:
messages_chunk = data.get(chat_id)
if messages_chunk and len(messages_chunk) == MESSAGE_CHANG_SIZE:
await message_handler_publisher.publish(
message=MessagesForSendToWorkersSchema(messages=messages_chunk)
)
if messages_chunk is None:
data[chat_id] = [message_schema]
elif len(messages_chunk) == MESSAGE_CHANG_SIZE:
del data[chat_id]
async with db_helper.get_async_session_not_closed() as session:
await tg_crud.bulk_insert_messages(
messages=messages_chunk,
session=session
)
data[chat_id] = [message_schema]
else:
data[chat_id].append(message_schema)
if messages_chunk and len(messages_chunk) == MESSAGE_CHANG_SIZE:
await message_handler_publisher.publish(
message=MessagesForSendToWorkersSchema(messages=messages_chunk)
)

View File

@ -1,10 +1,8 @@
import asyncio
from pyrogram import Client, filters, idle from pyrogram import Client, filters, idle
from pyrogram.handlers import MessageHandler from pyrogram.handlers import MessageHandler
from src.core.tg_service.messages_handler import message_listener from src.core.tg_service.messages_handler import message_listener
from src.core.rabbitmq.connect import broker from src.core.rabbitmq.connect import broker, init_queue_and_publisher
api_id = 17718565 api_id = 17718565
api_hash = "72f93973f4227415572f039d4f847082" api_hash = "72f93973f4227415572f039d4f847082"
@ -23,8 +21,9 @@ app.add_handler(MessageHandler(
async def main(): async def main():
await app.start() await app.start()
await broker.start() await broker.start()
await init_queue_and_publisher()
await idle() await idle()
await app.stop() await app.stop()
print("*" * 100)
app.run(main()) app.run(main())