add fixes

This commit is contained in:
harold 2025-02-08 20:24:34 +05:00
parent 9e2dcaee53
commit 143ad8ebac
17 changed files with 121 additions and 65 deletions

View File

@ -1,32 +1,31 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE tgchats ( CREATE TABLE tgchats (
id BIGINT PRIMARY KEY UNIQUE, id BIGINT PRIMARY KEY,
chat_type TEXT NOT NULL, chat_type TEXT NOT NULL,
title TEXT NOT NULL title TEXT NOT NULL
); );
CREATE TABLE users ( CREATE TABLE users (
id BIGINT PRIMARY KEY UNIQUE, id BIGINT PRIMARY KEY,
username TEXT, username TEXT,
first_name TEXT, first_name TEXT,
last_name TEXT last_name TEXT
); );
CREATE TABLE successed (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
reason TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW() NOT NULL,
slice_id UUID NOT NULL UNIQUE
);
CREATE TABLE tg_messages ( CREATE TABLE tg_messages (
id BIGINT PRIMARY KEY UNIQUE, id BIGINT PRIMARY KEY,
text TEXT NOT NULL, text TEXT NOT NULL,
message_time TIMESTAMP NOT NULL, message_time TIMESTAMP NOT NULL,
user_id BIGINT NOT NULL REFERENCES users(id), user_id BIGINT NOT NULL REFERENCES users(id),
chat_id BIGINT NOT NULL REFERENCES tgchats(id), chat_id BIGINT NOT NULL REFERENCES tgchats(id),
slice_id UUID NOT NULL slice_id UUID NOT NULL
); );
CREATE TABLE successed (
id BIGSERIAL PRIMARY KEY UNIQUE,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
reason TEXT NOT NULL,
chat_id UUID NOT NULL REFERENCES tgchats(slice_id) ON DELETE CASCADE,
created_at TIMESTAMP DEFAULT NOW() NOT NULL
);

View File

@ -0,0 +1 @@
import src.core.workers.rmq_worker_handler

View File

@ -3,10 +3,12 @@ __all__ = [
"TgMessage", "TgMessage",
"TgChat", "TgChat",
"User", "User",
"Success"
] ]
from .base import Base from .base import Base
from .tg_messages import TgMessage from .tg_messages import TgMessage
from .chats import TgChat from .chats import TgChat
from .users import User from .users import User
from .successes import Success

View File

@ -13,5 +13,5 @@ class TgChat(Base):
title: Mapped[str] title: Mapped[str]
message_relationship: Mapped[list["TgMessage"]] = relationship( message_relationship: Mapped[list["TgMessage"]] = relationship(
backref="tgchat_relationship", back_populates="chat_relationship",
) )

View File

@ -8,7 +8,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.core.database import Base from src.core.database import Base
if TYPE_CHECKING: if TYPE_CHECKING:
from src.core.database import User, TgChat from src.core.database import User, TgMessage
class Success(Base): class Success(Base):
@ -21,9 +21,7 @@ class Success(Base):
user_id: Mapped[str] = mapped_column( user_id: Mapped[str] = mapped_column(
ForeignKey('users.id', ondelete='CASCADE'), ForeignKey('users.id', ondelete='CASCADE'),
) )
slice_id: Mapped[UUID] = mapped_column( slice_id: Mapped[UUID]
ForeignKey('tgchats.slice_id', ondelete='CASCADE'),
)
reason: Mapped[str] reason: Mapped[str]
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
default=datetime.utcnow, default=datetime.utcnow,
@ -32,7 +30,4 @@ class Success(Base):
user_relationship: Mapped["User"] = relationship( user_relationship: Mapped["User"] = relationship(
backref="success_relationship" backref="success_relationship"
)
chat_relationship: Mapped["TgChat"] = relationship(
backref="success_relationship"
) )

View File

@ -1,11 +1,14 @@
from datetime import datetime from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID from uuid import UUID
from sqlalchemy import ForeignKey from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.core.database import Base from src.core.database import Base
if TYPE_CHECKING:
from src.core.database import TgChat, User, Success
class TgMessage(Base): class TgMessage(Base):
@ -19,4 +22,12 @@ class TgMessage(Base):
chat_id: Mapped[int] = mapped_column( chat_id: Mapped[int] = mapped_column(
ForeignKey("tgchats.id"), ForeignKey("tgchats.id"),
) )
slice_id: Mapped[UUID] slice_id: Mapped[UUID]
chat_relationship: Mapped["TgChat"] = relationship(
back_populates="message_relationship"
)
user_relationship: Mapped["User"] = relationship(
backref="message_relationship"
)

View File

@ -9,3 +9,4 @@ class User(Base):
first_name: Mapped[str | None] first_name: Mapped[str | None]
last_name: Mapped[str | None] last_name: Mapped[str | None]

View File

@ -1,6 +1,7 @@
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
from src.core.common.constants import PydanticEnvPrefixEnum, EnvFileLocation, EnvironmentEnum from src.core.common.constants import PydanticEnvPrefixEnum, EnvFileLocation, EnvironmentEnum
from src.core.settings.bot import BotSettings
from src.core.settings.database import DatabaseSettings from src.core.settings.database import DatabaseSettings
from src.core.settings.gemini import GeminiSettings from src.core.settings.gemini import GeminiSettings
@ -25,6 +26,7 @@ class Settings(BaseSettings):
POSTGRES: DatabaseSettings POSTGRES: DatabaseSettings
NOTIFY: NotifySettings = NotifySettings() NOTIFY: NotifySettings = NotifySettings()
BOT: BotSettings
GROQ: GroqSettings GROQ: GroqSettings
GEMINI: GeminiSettings GEMINI: GeminiSettings

View File

@ -0,0 +1,5 @@
from pydantic import BaseModel
class BotSettings(BaseModel):
TOKEN: str

View File

@ -31,13 +31,13 @@ async def create_new_chat(
async def bulk_insert_messages( async def bulk_insert_messages(
messages: list[MessageFromChatSchema], messages: list[MessageFromChatSchema],
chunk_id: UUID, slice_id: UUID,
session: AsyncSession, session: AsyncSession,
) -> None: ) -> None:
dicts_for_insert: list[dict] = [] dicts_for_insert: list[dict] = []
for message in messages: for message in messages:
dumped_schema = message.model_dump() dumped_schema = message.model_dump()
dumped_schema["chunk_id"] = chunk_id dumped_schema["slice_id"] = slice_id
dicts_for_insert.append(dumped_schema) dicts_for_insert.append(dumped_schema)
stmt = ( stmt = (

View File

@ -0,0 +1,21 @@
from httpx import AsyncClient
from src.core.settings.base import settings
async def send_to_tg_from_bot(text: str):
token = settings.BOT.TOKEN
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {
"chat_id": settings.NOTIFY.CHAT_ID,
"text": text,
"parse_mode": "Markdown",
}
async with AsyncClient() as client:
response = await client.post(url, json=payload)
print(response)
return response

View File

@ -0,0 +1,22 @@
from src.core.database import TgMessage, TgChat, User
from src.core.tg_service.notify_sender import send_to_tg_from_bot
from src.core.tg_service.utils import create_and_format_message
async def notify_for_success(
messages: list[TgMessage],
chat: TgChat,
success_reason: str,
user_model: User,
) -> None:
message = create_and_format_message(
messages=messages,
reason=success_reason,
user_model=user_model,
chat=chat,
)
await send_to_tg_from_bot(
text=message
)

View File

@ -22,5 +22,5 @@ class UserFromMessageSchema(BaseModel):
class MessagesForSendToWorkersSchema(BaseModel): class MessagesForSendToWorkersSchema(BaseModel):
chunk_id: UUID4 slice_id: UUID4
messages: list[MessageFromChatSchema] messages: list[MessageFromChatSchema]

View File

@ -2,17 +2,13 @@ import uuid
from pyrogram.types.user_and_chats import User as PyroUser, Chat as PyroChat from pyrogram.types.user_and_chats import User as PyroUser, Chat as PyroChat
from src.core.database import TgChat, User, TgMessage from src.core.database import TgChat, User
from src.core.database.successes import Success
from src.core.rabbitmq.connect import message_handler_publisher from src.core.rabbitmq.connect import message_handler_publisher
from src.core.redis_helper.redis_connect import redis_client from src.core.redis_helper.redis_connect import redis_client
from src.core.settings.base import settings
from src.core.tg_service.constants import MESSAGE_CHANG_SIZE from src.core.tg_service.constants import MESSAGE_CHANG_SIZE
from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema, MessageFromChatSchema from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema, MessageFromChatSchema
from src.core.database.connect import db_helper from src.core.database.connect import db_helper
from src.core.tg_service import crud as tg_crud from src.core.tg_service import crud as tg_crud
from src.core.tg_service.utils import create_and_format_message
from src.main import app
async def check_user_exists( async def check_user_exists(
@ -73,17 +69,18 @@ async def check_chunk_state_and_publish(
data[chat_id] = [message_schema] data[chat_id] = [message_schema]
elif len(messages_chunk) == MESSAGE_CHANG_SIZE: elif len(messages_chunk) == MESSAGE_CHANG_SIZE:
chunk_id = uuid.uuid4() slice_id = uuid.uuid4()
del data[chat_id] del data[chat_id]
async with db_helper.get_async_session_not_closed() as session: async with db_helper.get_async_session_not_closed() as session:
await tg_crud.bulk_insert_messages( await tg_crud.bulk_insert_messages(
messages=messages_chunk, messages=messages_chunk,
session=session session=session,
slice_id=slice_id
) )
await message_handler_publisher.publish( await message_handler_publisher.publish(
message=MessagesForSendToWorkersSchema( message=MessagesForSendToWorkersSchema(
messages=messages_chunk, messages=messages_chunk,
chunk_id=chunk_id slice_id=slice_id
) )
) )
data[chat_id] = [message_schema] data[chat_id] = [message_schema]
@ -92,21 +89,3 @@ async def check_chunk_state_and_publish(
data[chat_id].append(message_schema) data[chat_id].append(message_schema)
async def notify_for_success(
messages: list[TgMessage],
chat: TgChat,
success_reason: Success,
user_model: User,
) -> None:
message = create_and_format_message(
messages=messages,
reason=success_reason.reason,
user_model=user_model,
chat=chat,
)
await app.send_message(
chat_id=settings.NOTIFY.CHAT_ID,
message=message,
parse_mode="Markdown",
)

View File

@ -1,10 +1,9 @@
from uuid import UUID from uuid import UUID
from sqlalchemy import insert, select from sqlalchemy import insert, select
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from src.core.database import TgMessage, TgChat from src.core.database import TgChat, TgMessage
from src.core.workers.schemas import ResponseFromGeminiSchema from src.core.workers.schemas import ResponseFromGeminiSchema
from src.core.database.successes import Success from src.core.database.successes import Success
@ -18,7 +17,7 @@ async def bulk_create_success_reasons(
Success Success
) )
.values( .values(
[chat.model_dump for chat in success_schema.success] [chat.model_dump() for chat in success_schema.success]
) )
) )
await session.execute(stmt) await session.execute(stmt)
@ -28,16 +27,30 @@ async def bulk_create_success_reasons(
async def get_messages_by_slice_id( async def get_messages_by_slice_id(
session: AsyncSession, session: AsyncSession,
slice_id: UUID slice_id: UUID
) -> list[TgMessage]:
stmt = (
select(TgMessage)
.where(
TgMessage.slice_id == slice_id
)
)
scalars_result = await session.scalars(stmt)
return list(scalars_result.all())
async def success_by_slice_id (
session: AsyncSession,
slice_id: UUID
) -> Success: ) -> Success:
stmt = ( stmt = (
select(Success) select(Success)
.options(
joinedload(Success.user_relationship),
joinedload(Success.chat_relationship)
.selectinload(TgChat.message_relationship)
)
.where(Success.slice_id == slice_id) .where(Success.slice_id == slice_id)
) )
return await session.scalar(stmt) return await session.scalar(stmt)

View File

@ -4,7 +4,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
from faststream import Depends from faststream import Depends
from src.core.tg_service.service import notify_for_success from src.core.database import User, TgChat
from src.core.tg_service.notify_success import notify_for_success
from src.core.workers.schemas import ResponseFromGeminiSchema from src.core.workers.schemas import ResponseFromGeminiSchema
from src.core.database.connect import db_helper from src.core.database.connect import db_helper
from src.core.rabbitmq.connect import success_gemini_subscriber from src.core.rabbitmq.connect import success_gemini_subscriber
@ -22,16 +23,19 @@ async def create_success_record(
) )
for success_message in message.success: for success_message in message.success:
success_mod_with_relation = await workers_crud.get_messages_by_slice_id( messages = await workers_crud.get_messages_by_slice_id(
session=session, session=session,
slice_id=success_message.slice_id, slice_id=success_message.slice_id,
) )
user_model = await session.get(User, success_message.user_id)
chat_model = await session.get(TgChat, messages[0].chat_id)
await notify_for_success( await notify_for_success(
messages=success_mod_with_relation.chat_relationship.message_relationship, messages=messages,
chat=success_mod_with_relation.chat_relationship, chat=chat_model, # type:ignore
user_model=success_mod_with_relation.user_relationship, user_model=user_model, # type:ignore
success_reason=success_mod_with_relation,, success_reason=success_message.reason, # type:ignore
) )

View File

@ -1,6 +1,7 @@
from pyrogram import Client, filters, idle from pyrogram import Client, filters, idle
from pyrogram.handlers import MessageHandler from pyrogram.handlers import MessageHandler
from src.core.settings.base import settings
from src.core.tg_service.messages_handler import message_listener from src.core.tg_service.messages_handler import message_listener
from src.core.rabbitmq.connect import broker, init_queue_and_publisher from src.core.rabbitmq.connect import broker, init_queue_and_publisher