From 143ad8ebac84a3663b6d11c8149f5fdb9320ccf8 Mon Sep 17 00:00:00 2001 From: harold Date: Sat, 8 Feb 2025 20:24:34 +0500 Subject: [PATCH] add fixes --- .../sql_scripts/create_table.sql | 23 +++++++------- telegram-application/src/__init__.py | 1 + .../src/core/database/__init__.py | 4 ++- .../src/core/database/chats.py | 2 +- .../src/core/database/successes.py | 9 ++---- .../src/core/database/tg_messages.py | 13 +++++++- .../src/core/database/users.py | 1 + .../src/core/settings/base.py | 2 ++ telegram-application/src/core/settings/bot.py | 5 +++ .../src/core/tg_service/crud.py | 4 +-- .../src/core/tg_service/notify_sender.py | 21 +++++++++++++ .../src/core/tg_service/notify_success.py | 22 +++++++++++++ .../src/core/tg_service/schemas.py | 2 +- .../src/core/tg_service/service.py | 31 +++---------------- telegram-application/src/core/workers/crud.py | 29 ++++++++++++----- .../src/core/workers/rmq_worker_handler.py | 16 ++++++---- telegram-application/src/main.py | 1 + 17 files changed, 121 insertions(+), 65 deletions(-) create mode 100644 telegram-application/src/core/settings/bot.py create mode 100644 telegram-application/src/core/tg_service/notify_sender.py create mode 100644 telegram-application/src/core/tg_service/notify_success.py diff --git a/telegram-application/sql_scripts/create_table.sql b/telegram-application/sql_scripts/create_table.sql index eb321da..dc705c8 100644 --- a/telegram-application/sql_scripts/create_table.sql +++ b/telegram-application/sql_scripts/create_table.sql @@ -1,32 +1,31 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; - CREATE TABLE tgchats ( - id BIGINT PRIMARY KEY UNIQUE, + id BIGINT PRIMARY KEY, chat_type TEXT NOT NULL, title TEXT NOT NULL ); CREATE TABLE users ( - id BIGINT PRIMARY KEY UNIQUE, + id BIGINT PRIMARY KEY, username TEXT, first_name TEXT, last_name TEXT ); +CREATE TABLE successed ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + reason TEXT NOT NULL, + created_at TIMESTAMP DEFAULT NOW() NOT NULL, + slice_id UUID NOT NULL UNIQUE +); + CREATE TABLE tg_messages ( - id BIGINT PRIMARY KEY UNIQUE, + id BIGINT PRIMARY KEY, text TEXT NOT NULL, message_time TIMESTAMP NOT NULL, user_id BIGINT NOT NULL REFERENCES users(id), chat_id BIGINT NOT NULL REFERENCES tgchats(id), slice_id UUID NOT NULL ); - -CREATE TABLE successed ( - id BIGSERIAL PRIMARY KEY UNIQUE, - user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE, - reason TEXT NOT NULL, - chat_id UUID NOT NULL REFERENCES tgchats(slice_id) ON DELETE CASCADE, - created_at TIMESTAMP DEFAULT NOW() NOT NULL -); diff --git a/telegram-application/src/__init__.py b/telegram-application/src/__init__.py index e69de29..fc22961 100644 --- a/telegram-application/src/__init__.py +++ b/telegram-application/src/__init__.py @@ -0,0 +1 @@ +import src.core.workers.rmq_worker_handler diff --git a/telegram-application/src/core/database/__init__.py b/telegram-application/src/core/database/__init__.py index 54146cc..bf0cc94 100644 --- a/telegram-application/src/core/database/__init__.py +++ b/telegram-application/src/core/database/__init__.py @@ -3,10 +3,12 @@ __all__ = [ "TgMessage", "TgChat", "User", + "Success" ] from .base import Base from .tg_messages import TgMessage from .chats import TgChat -from .users import User \ No newline at end of file +from .users import User +from .successes import Success \ No newline at end of file diff --git a/telegram-application/src/core/database/chats.py b/telegram-application/src/core/database/chats.py index 3eac3f9..1726038 100644 --- a/telegram-application/src/core/database/chats.py +++ b/telegram-application/src/core/database/chats.py @@ -13,5 +13,5 @@ class TgChat(Base): title: Mapped[str] message_relationship: Mapped[list["TgMessage"]] = relationship( - backref="tgchat_relationship", + back_populates="chat_relationship", ) \ No newline at end of file diff --git a/telegram-application/src/core/database/successes.py b/telegram-application/src/core/database/successes.py index 9de3e19..60defb2 100644 --- a/telegram-application/src/core/database/successes.py +++ b/telegram-application/src/core/database/successes.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship from src.core.database import Base if TYPE_CHECKING: - from src.core.database import User, TgChat + from src.core.database import User, TgMessage class Success(Base): @@ -21,9 +21,7 @@ class Success(Base): user_id: Mapped[str] = mapped_column( ForeignKey('users.id', ondelete='CASCADE'), ) - slice_id: Mapped[UUID] = mapped_column( - ForeignKey('tgchats.slice_id', ondelete='CASCADE'), - ) + slice_id: Mapped[UUID] reason: Mapped[str] created_at: Mapped[datetime] = mapped_column( default=datetime.utcnow, @@ -32,7 +30,4 @@ class Success(Base): user_relationship: Mapped["User"] = relationship( backref="success_relationship" - ) - chat_relationship: Mapped["TgChat"] = relationship( - backref="success_relationship" ) \ No newline at end of file diff --git a/telegram-application/src/core/database/tg_messages.py b/telegram-application/src/core/database/tg_messages.py index 4542d4d..1622b7a 100644 --- a/telegram-application/src/core/database/tg_messages.py +++ b/telegram-application/src/core/database/tg_messages.py @@ -1,11 +1,14 @@ from datetime import datetime +from typing import TYPE_CHECKING from uuid import UUID from sqlalchemy import ForeignKey -from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.orm import Mapped, mapped_column, relationship from src.core.database import Base +if TYPE_CHECKING: + from src.core.database import TgChat, User, Success class TgMessage(Base): @@ -19,4 +22,12 @@ class TgMessage(Base): chat_id: Mapped[int] = mapped_column( ForeignKey("tgchats.id"), ) + slice_id: Mapped[UUID] + + chat_relationship: Mapped["TgChat"] = relationship( + back_populates="message_relationship" + ) + user_relationship: Mapped["User"] = relationship( + backref="message_relationship" + ) diff --git a/telegram-application/src/core/database/users.py b/telegram-application/src/core/database/users.py index 2a715ee..8b9e324 100644 --- a/telegram-application/src/core/database/users.py +++ b/telegram-application/src/core/database/users.py @@ -9,3 +9,4 @@ class User(Base): first_name: Mapped[str | None] last_name: Mapped[str | None] + diff --git a/telegram-application/src/core/settings/base.py b/telegram-application/src/core/settings/base.py index ac044f0..8b27df9 100644 --- a/telegram-application/src/core/settings/base.py +++ b/telegram-application/src/core/settings/base.py @@ -1,6 +1,7 @@ from pydantic_settings import BaseSettings, SettingsConfigDict from src.core.common.constants import PydanticEnvPrefixEnum, EnvFileLocation, EnvironmentEnum +from src.core.settings.bot import BotSettings from src.core.settings.database import DatabaseSettings from src.core.settings.gemini import GeminiSettings @@ -25,6 +26,7 @@ class Settings(BaseSettings): POSTGRES: DatabaseSettings NOTIFY: NotifySettings = NotifySettings() + BOT: BotSettings GROQ: GroqSettings GEMINI: GeminiSettings diff --git a/telegram-application/src/core/settings/bot.py b/telegram-application/src/core/settings/bot.py new file mode 100644 index 0000000..5165059 --- /dev/null +++ b/telegram-application/src/core/settings/bot.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class BotSettings(BaseModel): + TOKEN: str \ No newline at end of file diff --git a/telegram-application/src/core/tg_service/crud.py b/telegram-application/src/core/tg_service/crud.py index 3dff301..752d012 100644 --- a/telegram-application/src/core/tg_service/crud.py +++ b/telegram-application/src/core/tg_service/crud.py @@ -31,13 +31,13 @@ async def create_new_chat( async def bulk_insert_messages( messages: list[MessageFromChatSchema], - chunk_id: UUID, + slice_id: UUID, session: AsyncSession, ) -> None: dicts_for_insert: list[dict] = [] for message in messages: dumped_schema = message.model_dump() - dumped_schema["chunk_id"] = chunk_id + dumped_schema["slice_id"] = slice_id dicts_for_insert.append(dumped_schema) stmt = ( diff --git a/telegram-application/src/core/tg_service/notify_sender.py b/telegram-application/src/core/tg_service/notify_sender.py new file mode 100644 index 0000000..9d602a4 --- /dev/null +++ b/telegram-application/src/core/tg_service/notify_sender.py @@ -0,0 +1,21 @@ +from httpx import AsyncClient + +from src.core.settings.base import settings + + + + +async def send_to_tg_from_bot(text: str): + token = settings.BOT.TOKEN + url = f"https://api.telegram.org/bot{token}/sendMessage" + + + payload = { + "chat_id": settings.NOTIFY.CHAT_ID, + "text": text, + "parse_mode": "Markdown", + } + async with AsyncClient() as client: + response = await client.post(url, json=payload) + print(response) + return response diff --git a/telegram-application/src/core/tg_service/notify_success.py b/telegram-application/src/core/tg_service/notify_success.py new file mode 100644 index 0000000..89036f1 --- /dev/null +++ b/telegram-application/src/core/tg_service/notify_success.py @@ -0,0 +1,22 @@ +from src.core.database import TgMessage, TgChat, User +from src.core.tg_service.notify_sender import send_to_tg_from_bot +from src.core.tg_service.utils import create_and_format_message + + + +async def notify_for_success( + messages: list[TgMessage], + chat: TgChat, + success_reason: str, + user_model: User, +) -> None: + message = create_and_format_message( + messages=messages, + reason=success_reason, + user_model=user_model, + chat=chat, + ) + + await send_to_tg_from_bot( + text=message + ) \ No newline at end of file diff --git a/telegram-application/src/core/tg_service/schemas.py b/telegram-application/src/core/tg_service/schemas.py index 53ba18e..7395db9 100644 --- a/telegram-application/src/core/tg_service/schemas.py +++ b/telegram-application/src/core/tg_service/schemas.py @@ -22,5 +22,5 @@ class UserFromMessageSchema(BaseModel): class MessagesForSendToWorkersSchema(BaseModel): - chunk_id: UUID4 + slice_id: UUID4 messages: list[MessageFromChatSchema] \ No newline at end of file diff --git a/telegram-application/src/core/tg_service/service.py b/telegram-application/src/core/tg_service/service.py index e7a8e4b..13e5c59 100644 --- a/telegram-application/src/core/tg_service/service.py +++ b/telegram-application/src/core/tg_service/service.py @@ -2,17 +2,13 @@ import uuid from pyrogram.types.user_and_chats import User as PyroUser, Chat as PyroChat -from src.core.database import TgChat, User, TgMessage -from src.core.database.successes import Success +from src.core.database import TgChat, User from src.core.rabbitmq.connect import message_handler_publisher from src.core.redis_helper.redis_connect import redis_client -from src.core.settings.base import settings from src.core.tg_service.constants import MESSAGE_CHANG_SIZE from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema, MessageFromChatSchema from src.core.database.connect import db_helper from src.core.tg_service import crud as tg_crud -from src.core.tg_service.utils import create_and_format_message -from src.main import app async def check_user_exists( @@ -73,17 +69,18 @@ async def check_chunk_state_and_publish( data[chat_id] = [message_schema] elif len(messages_chunk) == MESSAGE_CHANG_SIZE: - chunk_id = uuid.uuid4() + slice_id = uuid.uuid4() del data[chat_id] async with db_helper.get_async_session_not_closed() as session: await tg_crud.bulk_insert_messages( messages=messages_chunk, - session=session + session=session, + slice_id=slice_id ) await message_handler_publisher.publish( message=MessagesForSendToWorkersSchema( messages=messages_chunk, - chunk_id=chunk_id + slice_id=slice_id ) ) data[chat_id] = [message_schema] @@ -92,21 +89,3 @@ async def check_chunk_state_and_publish( data[chat_id].append(message_schema) -async def notify_for_success( - messages: list[TgMessage], - chat: TgChat, - success_reason: Success, - user_model: User, -) -> None: - message = create_and_format_message( - messages=messages, - reason=success_reason.reason, - user_model=user_model, - chat=chat, - ) - - await app.send_message( - chat_id=settings.NOTIFY.CHAT_ID, - message=message, - parse_mode="Markdown", - ) diff --git a/telegram-application/src/core/workers/crud.py b/telegram-application/src/core/workers/crud.py index 2d869a0..04383bb 100644 --- a/telegram-application/src/core/workers/crud.py +++ b/telegram-application/src/core/workers/crud.py @@ -1,10 +1,9 @@ from uuid import UUID from sqlalchemy import insert, select -from sqlalchemy.orm import selectinload, joinedload from sqlalchemy.ext.asyncio import AsyncSession -from src.core.database import TgMessage, TgChat +from src.core.database import TgChat, TgMessage from src.core.workers.schemas import ResponseFromGeminiSchema from src.core.database.successes import Success @@ -18,7 +17,7 @@ async def bulk_create_success_reasons( Success ) .values( - [chat.model_dump for chat in success_schema.success] + [chat.model_dump() for chat in success_schema.success] ) ) await session.execute(stmt) @@ -28,16 +27,30 @@ async def bulk_create_success_reasons( async def get_messages_by_slice_id( session: AsyncSession, slice_id: UUID +) -> list[TgMessage]: + stmt = ( + select(TgMessage) + .where( + TgMessage.slice_id == slice_id + ) + + ) + scalars_result = await session.scalars(stmt) + + return list(scalars_result.all()) + + +async def success_by_slice_id ( + session: AsyncSession, + slice_id: UUID ) -> Success: stmt = ( select(Success) - .options( - joinedload(Success.user_relationship), - joinedload(Success.chat_relationship) - .selectinload(TgChat.message_relationship) - ) .where(Success.slice_id == slice_id) ) + return await session.scalar(stmt) + + diff --git a/telegram-application/src/core/workers/rmq_worker_handler.py b/telegram-application/src/core/workers/rmq_worker_handler.py index 23db79e..4359d50 100644 --- a/telegram-application/src/core/workers/rmq_worker_handler.py +++ b/telegram-application/src/core/workers/rmq_worker_handler.py @@ -4,7 +4,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from faststream import Depends -from src.core.tg_service.service import notify_for_success +from src.core.database import User, TgChat +from src.core.tg_service.notify_success import notify_for_success from src.core.workers.schemas import ResponseFromGeminiSchema from src.core.database.connect import db_helper from src.core.rabbitmq.connect import success_gemini_subscriber @@ -22,16 +23,19 @@ async def create_success_record( ) for success_message in message.success: - success_mod_with_relation = await workers_crud.get_messages_by_slice_id( + messages = await workers_crud.get_messages_by_slice_id( session=session, slice_id=success_message.slice_id, ) + user_model = await session.get(User, success_message.user_id) + chat_model = await session.get(TgChat, messages[0].chat_id) + await notify_for_success( - messages=success_mod_with_relation.chat_relationship.message_relationship, - chat=success_mod_with_relation.chat_relationship, - user_model=success_mod_with_relation.user_relationship, - success_reason=success_mod_with_relation,, + messages=messages, + chat=chat_model, # type:ignore + user_model=user_model, # type:ignore + success_reason=success_message.reason, # type:ignore ) diff --git a/telegram-application/src/main.py b/telegram-application/src/main.py index 3ee80ad..79e1bd0 100644 --- a/telegram-application/src/main.py +++ b/telegram-application/src/main.py @@ -1,6 +1,7 @@ from pyrogram import Client, filters, idle from pyrogram.handlers import MessageHandler +from src.core.settings.base import settings from src.core.tg_service.messages_handler import message_listener from src.core.rabbitmq.connect import broker, init_queue_and_publisher