From e32ac4689843167844d110dd3386539aaf890b45 Mon Sep 17 00:00:00 2001 From: harold Date: Sat, 8 Feb 2025 10:41:29 +0500 Subject: [PATCH] add slice for chats --- telegram-application/docker-compose.yml | 1 + .../{src/core/ai_services/__init__.py => sql} | 0 .../sql_scripts/create_table.sql | 31 ++++++++ .../src/core/ai_services/base.py | 12 --- .../core/ai_services/gemini/check_limiter.py | 6 -- .../src/core/ai_services/gemini/constants.py | 69 ---------------- .../src/core/ai_services/gemini/schemas.py | 13 ---- .../src/core/ai_services/gemini/service.py | 76 ------------------ .../src/core/ai_services/google.py | 69 ---------------- .../src/core/ai_services/groq/__init__.py | 0 .../src/core/ai_services/groq/constants.py | 57 -------------- .../src/core/ai_services/groq/service.py | 78 ------------------- .../src/core/ai_services/schemas.py | 19 ----- .../src/core/database/successes.py | 25 ++++++ .../src/core/database/tg_messages.py | 2 + .../src/core/rabbitmq/connect.py | 12 +++ .../src/core/redis_helper/__init__.py | 0 .../src/core/tg_service/crud.py | 15 +++- .../src/core/tg_service/messages_handler.py | 2 +- .../src/core/tg_service/schemas.py | 15 +++- .../src/core/tg_service/service.py | 21 +++-- .../gemini => workers}/__init__.py | 0 telegram-application/src/core/workers/crud.py | 21 +++++ .../src/core/workers/rmq_worker_handler.py | 24 ++++++ .../src/core/workers/schemas.py | 11 +++ telegram-application/src/main.py | 1 - 26 files changed, 165 insertions(+), 415 deletions(-) rename telegram-application/{src/core/ai_services/__init__.py => sql} (100%) create mode 100644 telegram-application/sql_scripts/create_table.sql delete mode 100644 telegram-application/src/core/ai_services/base.py delete mode 100644 telegram-application/src/core/ai_services/gemini/check_limiter.py delete mode 100644 telegram-application/src/core/ai_services/gemini/constants.py delete mode 100644 telegram-application/src/core/ai_services/gemini/schemas.py delete mode 100644 telegram-application/src/core/ai_services/gemini/service.py delete mode 100644 telegram-application/src/core/ai_services/google.py delete mode 100644 telegram-application/src/core/ai_services/groq/__init__.py delete mode 100644 telegram-application/src/core/ai_services/groq/constants.py delete mode 100644 telegram-application/src/core/ai_services/groq/service.py delete mode 100644 telegram-application/src/core/ai_services/schemas.py create mode 100644 telegram-application/src/core/database/successes.py delete mode 100644 telegram-application/src/core/redis_helper/__init__.py rename telegram-application/src/core/{ai_services/gemini => workers}/__init__.py (100%) create mode 100644 telegram-application/src/core/workers/crud.py create mode 100644 telegram-application/src/core/workers/rmq_worker_handler.py create mode 100644 telegram-application/src/core/workers/schemas.py diff --git a/telegram-application/docker-compose.yml b/telegram-application/docker-compose.yml index 9ea06dd..091c24b 100644 --- a/telegram-application/docker-compose.yml +++ b/telegram-application/docker-compose.yml @@ -27,6 +27,7 @@ services: - "9151:5432" volumes: - ./docker/postgres/data:/var/lib/postgresql/data + - ./sql_scripts/create_table.sql:/docker-entrypoint-initdb.d/create_table.sql redis: image: redis:latest diff --git a/telegram-application/src/core/ai_services/__init__.py b/telegram-application/sql similarity index 100% rename from telegram-application/src/core/ai_services/__init__.py rename to telegram-application/sql diff --git a/telegram-application/sql_scripts/create_table.sql b/telegram-application/sql_scripts/create_table.sql new file mode 100644 index 0000000..1128e4b --- /dev/null +++ b/telegram-application/sql_scripts/create_table.sql @@ -0,0 +1,31 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + + +CREATE TABLE tgchats ( + id BIGINT PRIMARY KEY UNIQUE, + chat_type TEXT NOT NULL, + title TEXT NOT NULL +); + +CREATE TABLE users ( + id BIGINT PRIMARY KEY UNIQUE, + username TEXT, + first_name TEXT, + last_name TEXT +); + +CREATE TABLE tg_messages ( + id BIGINT PRIMARY KEY UNIQUE, + text TEXT NOT NULL, + message_time TIMESTAMP NOT NULL, + user_id BIGINT NOT NULL REFERENCES users(id), + chat_id BIGINT NOT NULL REFERENCES tgchats(id), + slice_id UUID NOT NULL +); + +CREATE TABLE successed ( + id BIGSERIAL PRIMARY KEY UNIQUE, + user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + chat_id BIGINT NOT NULL REFERENCES tgchats(id) ON DELETE CASCADE, + created_at TIMESTAMP DEFAULT NOW() NOT NULL +); diff --git a/telegram-application/src/core/ai_services/base.py b/telegram-application/src/core/ai_services/base.py deleted file mode 100644 index 339cae0..0000000 --- a/telegram-application/src/core/ai_services/base.py +++ /dev/null @@ -1,12 +0,0 @@ -from abc import ABC, abstractmethod - -from src.core.ai_services.schemas import MessageFromChatSchema, ResponseFromAiSchema - - -class BaseAiService(ABC): - @abstractmethod - async def create_request_ai( - self, - messages: list[MessageFromChatSchema], - ) -> ResponseFromAiSchema: - raise NotImplementedError diff --git a/telegram-application/src/core/ai_services/gemini/check_limiter.py b/telegram-application/src/core/ai_services/gemini/check_limiter.py deleted file mode 100644 index 51dca9c..0000000 --- a/telegram-application/src/core/ai_services/gemini/check_limiter.py +++ /dev/null @@ -1,6 +0,0 @@ - - - - -async def check_limiter(): - diff --git a/telegram-application/src/core/ai_services/gemini/constants.py b/telegram-application/src/core/ai_services/gemini/constants.py deleted file mode 100644 index aff718d..0000000 --- a/telegram-application/src/core/ai_services/gemini/constants.py +++ /dev/null @@ -1,69 +0,0 @@ -ROLE = """ -Ты специалист по поиску клиентов в компании, которая занимается разработкой любого софта -на заказ. -""" - -ANALYTIC_PROMT = """ -Ты получаешь json с такими полями -{ - chats: [ - "chat_id": integer - "messages": [ - { - "user_id: integer, - "message_id": integer, - "text": string, - "date": datetime - } - ] - - ] -} - -chats - это список чатов. -messages - это срез диалога в чате телеграмма. -пользователи могут общаться на абсолютно разные темы. -Твоя задача: -Прочитать эти сообщения, понять тему текущего среза диалога. -И если ты поймешь, что мы можем какому то пользователю предложить свои услуги -например (написать интернет магазин, мобильное приложение или любой другой айти продукт) -то выведи в таком виде ответ. - -ВАЖНО: Если ты уверен на 100 процентов, что они заинтересованы в подобных услугах и им -можно предложить, то верни. - -Условно в нескольких чатах может быть несколько потенциальных клиентов, тогда вот так выведи - -{ - success: [ - { - "user_id": integer, - "chat_id": integer, - "reason": string - } - ] -} - -поле reason: Кратко(до 100 симоволов) почему ты решил, что это потенциальный клиент. - -Если ты хотя бы чуть чуть не уверен, то верни вот такую строку -{ - success: null -} - - -ВАЖНО: Ты должен вернуть ТОЛЬКО JSON и не словом больше. Иначе я разорюсь. -без ```json ``` просто так без каких либо спецсимволов - -""" - - -GEMINI_BASE_MESSAGE = [ - { - "role": "user", - "parts": [ - {"text": ROLE}, - {"text": ANALYTIC_PROMT}, - ] - } -] \ No newline at end of file diff --git a/telegram-application/src/core/ai_services/gemini/schemas.py b/telegram-application/src/core/ai_services/gemini/schemas.py deleted file mode 100644 index 2056eaf..0000000 --- a/telegram-application/src/core/ai_services/gemini/schemas.py +++ /dev/null @@ -1,13 +0,0 @@ -from pydantic import BaseModel - -from src.core.ai_services.schemas import MessageFromChatSchema - - -class ChatMessageSchema(MessageFromChatSchema): - chat_id: int - messages: list[MessageFromChatSchema] - - -class FullRequestForGeminiSchema(BaseModel): - chats: list[ChatMessageSchema] - diff --git a/telegram-application/src/core/ai_services/gemini/service.py b/telegram-application/src/core/ai_services/gemini/service.py deleted file mode 100644 index 40931da..0000000 --- a/telegram-application/src/core/ai_services/gemini/service.py +++ /dev/null @@ -1,76 +0,0 @@ -import json - -from src.core.ai_services.base import BaseAiService - -import google.generativeai as genai - -from src.core.ai_services.gemini.constants import GEMINI_BASE_MESSAGE -from src.core.ai_services.gemini.schemas import FullRequestForGeminiSchema -from src.core.ai_services.schemas import MessageFromChatSchema, ResponseFromAiSchema -from src.core.settings.base import settings - - -class GoogleHelper(BaseAiService): - def __init__( - self, - api_key: str, - model_name: str, - ) -> None: - self.api_key = api_key - self.model = model_name - - genai.configure(api_key=api_key) - self._model = genai.GenerativeModel(model_name=model_name) - - @staticmethod - def _serialize_messages_to_promt( - chats: FullRequestForGeminiSchema, - ) -> list[dict]: - messages_for_request = GEMINI_BASE_MESSAGE.copy() - - text_for_request = json.dumps(chats.model_dump()) - - extend_message = { - "role": "user", - "parts": [ - { - "text": text_for_request, - } - ], - } - - messages_for_request.append(extend_message) - - return messages_for_request - - @staticmethod - def _serialize_response_to_json( - response_text: str, - ) -> ResponseFromAiSchema: - response = response_text.replace('\n', '') - print(response) - print(len(response)) - print("gemini") - response = response_text.replace('\n', '') - response_as_dict = json.loads(response_text) - - return ResponseFromAiSchema(**response_as_dict) - - async def create_request_ai( - self, - messages: list[MessageFromChatSchema], - ) -> ResponseFromAiSchema: - contents = self._serialize_messages_to_promt(messages) - - response = await self._model.generate_content_async( - contents=contents - ) - - return self._serialize_response_to_json(response.text) - - - -gemini_helper = GoogleHelper( - api_key=settings.GEMINI.API_KEY, - model_name=settings.GEMINI.MODEL_NAME, -) \ No newline at end of file diff --git a/telegram-application/src/core/ai_services/google.py b/telegram-application/src/core/ai_services/google.py deleted file mode 100644 index 0e51c80..0000000 --- a/telegram-application/src/core/ai_services/google.py +++ /dev/null @@ -1,69 +0,0 @@ -import asyncio -import json - -import google.generativeai as genai - -from src.core.common.promt import ANALYTIC_PROMT, ROLE - -model = genai.GenerativeModel("gemini-1.5-flash") -# model.generate_content_async( -# contents="Explain how AI works", -# ) -# response = model.generate_content("Explain how AI works") -data = { - "messages": [ - {'message_id': 116459, 'user_id': 299500769, 'chat_id': -1001412374171, - 'text': 'Это конечно круто, но не надо в питон тащить свои когнитивные искажения из джава мира))', - 'date': '29/01/2025:22:54:08'}, - {'message_id': 116460, 'user_id': 299500769, 'chat_id': -1001412374171, - 'text': 'Не говорю, что там не правильно, просто не везде применимо)', 'date': '29/01/2025:22:54:29'}, - {'message_id': 116461, 'user_id': 6295079014, 'chat_id': -1001412374171, 'text': 'ну так слоистая зато)', - 'date': '29/01/2025:22:54:35'}, - {'message_id': 116462, 'user_id': 6295079014, 'chat_id': -1001412374171, - 'text': 'линтеры, IDE, типы - ради этого если кратко', 'date': '29/01/2025:22:55:26'}, - {'message_id': 116463, 'user_id': 299500769, 'chat_id': -1001412374171, - 'text': 'Не все 1 в 1 переносится от языка к языку)', 'date': '29/01/2025:22:55:33'}, - {'message_id': 116464, 'user_id': 408922819, 'chat_id': -1001412374171, - 'text': 'в каком месте у тебя здесь циклические импорты?', 'date': '29/01/2025:22:55:39'}, - {'message_id': 116465, 'user_id': 6295079014, 'chat_id': -1001412374171, - 'text': 'chat/models.py\nuser/models.py\n\n ссылаются друг на друга ибо в каждом есть импорты друг друга\n\nuser/models.py\nfrom src.chat.models import Chat, ChatMessage\n\nchat/models.py\nfrom src.user.models import User', - 'date': '29/01/2025:22:57:45'}, - {'message_id': 116466, 'user_id': 408922819, 'chat_id': -1001412374171, - 'text': 'ах ты их еще и по аппкам разнес', 'date': '29/01/2025:22:58:34'}, - {'message_id': 116467, 'user_id': 6295079014, 'chat_id': -1001412374171, - 'text': 'если есть грамотный пример структуры приложения fastapi прошу прислать)', - 'date': '29/01/2025:22:58:55'}, - ] -} - -BASE_MESSAGE = [ - { - "role": "user", - "parts": [{"text": ROLE.strip()}] - }, - { - "role": "user", - "parts": [{"text": ANALYTIC_PROMT.strip()}] - }, - { - "role": "user", - "parts": [json.dumps(data)] - }, - -] - -async def print_response(): - res = await model.generate_content_async(BASE_MESSAGE) - print(res.text) - print(res.usage_metadata) - -async def check_limits(): - response = await model.generate_content_async("Как посмотреть сколько лимитов в gemini gemini cloude у меня осталось") - - # Вывод метаданных из ответа - print(response.text) - print("Usage metadata:", response.usage_metadata) - - -asyncio.run(check_limits()) -# asyncio.run(print_response()) diff --git a/telegram-application/src/core/ai_services/groq/__init__.py b/telegram-application/src/core/ai_services/groq/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/telegram-application/src/core/ai_services/groq/constants.py b/telegram-application/src/core/ai_services/groq/constants.py deleted file mode 100644 index 661a2ee..0000000 --- a/telegram-application/src/core/ai_services/groq/constants.py +++ /dev/null @@ -1,57 +0,0 @@ -ROLE = """ -Ты специалист по поиску клиентов в компании, которая занимается разработкой любого софта -на заказ. -""" - -ANALYTIC_PROMT = """ -Ты получаешь json с такими полями -{ - "messages": [ - { - "message_id": integer, - "user_id": integer, - "chat_id": integer, - "text": string, - "date": datetime - } - ] -} -messages - это срез диалога в чате телеграмма. -пользователи могут общаться на абсолютно разные темы. -Твоя задача: -Прочитать эти сообщения, понять тему текущего среза диалога. -И если ты поймешь, что мы можем какому то пользователю предложить свои услуги -например (написать интернет магазин, мобильное приложение или любой другой айти продукт) -то выведи в таком виде ответ. - -ВАЖНО: Если ты уверен на 100 процентов, что они заинтересованы в подобных услугах и им -можно предложить, то верни. - -{ - "user_id": integer, - "chat_id": integer, - "reason": string -} - -поле reason: Кратко(до 100 симоволов) почему ты решил, что это потенциальный клиент. - -Если ты хотя бы чуть чуть не уверен, то верни вот такую строку -{"user_id": null, "chat_id": null, "reason": null} - -ВАЖНО: Ты должен вернуть ТОЛЬКО JSON и не словом больше. Иначе я разорюсь. -без ```json ``` просто так - - -""" - - -GROQ_BASE_MESSAGE = [ - { - "role": "system", - "content": ROLE.replace("\n", '') - }, - { - "role": "user", - "content": ANALYTIC_PROMT.replace("\n", ''), - } -] diff --git a/telegram-application/src/core/ai_services/groq/service.py b/telegram-application/src/core/ai_services/groq/service.py deleted file mode 100644 index ab25447..0000000 --- a/telegram-application/src/core/ai_services/groq/service.py +++ /dev/null @@ -1,78 +0,0 @@ -import json - -from groq import AsyncGroq - -from src.core.ai_services.schemas import MessageFromChatSchema, ResponseFromAiSchema -from .constants import GROQ_BASE_MESSAGE -from ..base import BaseAiService -from ...settings.base import settings - - -class GroqHelper(BaseAiService): - def __init__( - self, - api_key: str, - model: str, - temperature: int, - ) -> None: - self.api_key = api_key - self.model = model - self.temperature = temperature - - self._client = AsyncGroq( - api_key=self.api_key, - ) - - @staticmethod - def _serialize_messages_to_promt( - messages: list[MessageFromChatSchema], - ) -> dict: - messages_for_request = GROQ_BASE_MESSAGE.copy() - extend_message = { - "role": "user", - "content": json.dumps( - { - "messages": [ - message.model_dump_with_datetime() for message in messages - ] - } - ), - } - - messages_for_request.append(extend_message) - - return messages_for_request - - @staticmethod - def _serialize_response_to_json( - response_text: str, - ) -> ResponseFromAiSchema: - print(response_text) - print(len(response_text)) - print("groq") - response = response_text.replace('\n', '') - response_as_dict = json.loads(response) - - return ResponseFromAiSchema(**response_as_dict) - - async def create_request_ai( - self, - messages: list[MessageFromChatSchema], - ) -> ResponseFromAiSchema: - messages_for_promt = self._serialize_messages_to_promt(messages) - response = await self._client.chat.completions.create( - messages=messages_for_promt, - model=self.model, - temperature=self.temperature, - ) - - return self._serialize_response_to_json( - response.choices[0].message.content, - ) - - -groq_helper = GroqHelper( - api_key=settings.GROQ.API_KEY, - model=settings.GROQ.MODEL, - temperature=settings.GROQ.TEMPERATURE, -) diff --git a/telegram-application/src/core/ai_services/schemas.py b/telegram-application/src/core/ai_services/schemas.py deleted file mode 100644 index 1efec56..0000000 --- a/telegram-application/src/core/ai_services/schemas.py +++ /dev/null @@ -1,19 +0,0 @@ -from datetime import datetime - -from pydantic import BaseModel, PositiveInt, NegativeInt - -from src.core.common.schemas import BaseModelWithSerializeDatetime - - -class MessageFromChatSchema(BaseModelWithSerializeDatetime): - id: PositiveInt - user_id: PositiveInt - chat_id: NegativeInt - text: str - message_time: datetime - - -class ResponseFromAiSchema(BaseModel): - user_id: PositiveInt | None - chat_id: NegativeInt | None - reason: str | None diff --git a/telegram-application/src/core/database/successes.py b/telegram-application/src/core/database/successes.py new file mode 100644 index 0000000..cd57110 --- /dev/null +++ b/telegram-application/src/core/database/successes.py @@ -0,0 +1,25 @@ +from datetime import datetime + +from sqlalchemy import ForeignKey, func +from sqlalchemy.orm import Mapped, mapped_column + +from src.core.database import Base + + +class Success(Base): + __tablename__ = 'successed' + id: Mapped[int] = mapped_column( + primary_key=True, + autoincrement=True, + unique=True, + ) + user_id: Mapped[str] = mapped_column( + ForeignKey('users.id', ondelete='CASCADE'), + ) + chat_id: Mapped[int] = mapped_column( + ForeignKey('tgchats.id', ondelete='CASCADE'), + ) + created_at: Mapped[datetime] = mapped_column( + default=datetime.utcnow, + server_default=func.now() + ) \ No newline at end of file diff --git a/telegram-application/src/core/database/tg_messages.py b/telegram-application/src/core/database/tg_messages.py index b5f96a1..4542d4d 100644 --- a/telegram-application/src/core/database/tg_messages.py +++ b/telegram-application/src/core/database/tg_messages.py @@ -1,4 +1,5 @@ from datetime import datetime +from uuid import UUID from sqlalchemy import ForeignKey from sqlalchemy.orm import Mapped, mapped_column @@ -18,3 +19,4 @@ class TgMessage(Base): chat_id: Mapped[int] = mapped_column( ForeignKey("tgchats.id"), ) + slice_id: Mapped[UUID] diff --git a/telegram-application/src/core/rabbitmq/connect.py b/telegram-application/src/core/rabbitmq/connect.py index 2943548..627c760 100644 --- a/telegram-application/src/core/rabbitmq/connect.py +++ b/telegram-application/src/core/rabbitmq/connect.py @@ -17,6 +17,10 @@ base_exchange = RabbitExchange( base_queue = RabbitQueue( name="base_queue", ) +success_queue = RabbitQueue( + name="success_queue", +) + message_handler_publisher = broker.publisher( @@ -24,7 +28,15 @@ message_handler_publisher = broker.publisher( exchange=base_exchange, ) +success_gemini_subscriber = broker.subscriber( + queue=success_queue, + exchange=base_exchange, +) + + + async def init_queue_and_publisher(): await broker.declare_exchange(base_exchange) await broker.declare_queue(base_queue) + await broker.declare_queue(success_queue) diff --git a/telegram-application/src/core/redis_helper/__init__.py b/telegram-application/src/core/redis_helper/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/telegram-application/src/core/tg_service/crud.py b/telegram-application/src/core/tg_service/crud.py index 326928f..3dff301 100644 --- a/telegram-application/src/core/tg_service/crud.py +++ b/telegram-application/src/core/tg_service/crud.py @@ -1,8 +1,10 @@ from sqlalchemy import insert from sqlalchemy.ext.asyncio import AsyncSession -from src.core.ai_services.schemas import MessageFromChatSchema +from uuid import UUID + from src.core.database import User, TgChat, TgMessage +from src.core.tg_service.schemas import MessageFromChatSchema async def create_new_user( @@ -29,13 +31,18 @@ async def create_new_chat( async def bulk_insert_messages( messages: list[MessageFromChatSchema], + chunk_id: UUID, session: AsyncSession, ) -> None: + dicts_for_insert: list[dict] = [] + for message in messages: + dumped_schema = message.model_dump() + dumped_schema["chunk_id"] = chunk_id + dicts_for_insert.append(dumped_schema) + stmt = ( insert(TgMessage), - [ - *[message.model_dump() for message in messages], - ] + dicts_for_insert, ) await session.execute(*stmt) await session.commit() diff --git a/telegram-application/src/core/tg_service/messages_handler.py b/telegram-application/src/core/tg_service/messages_handler.py index fd0f3ee..8ae1402 100644 --- a/telegram-application/src/core/tg_service/messages_handler.py +++ b/telegram-application/src/core/tg_service/messages_handler.py @@ -1,7 +1,7 @@ from pyrogram import Client from pyrogram.types import Message -from src.core.ai_services.schemas import MessageFromChatSchema +from src.core.tg_service.schemas import MessageFromChatSchema from src.core.tg_service.service import check_user_exists, check_chat_exists, check_chunk_state_and_publish from src.core.tg_service import utils as api_tg_utils diff --git a/telegram-application/src/core/tg_service/schemas.py b/telegram-application/src/core/tg_service/schemas.py index 79d8d2c..53ba18e 100644 --- a/telegram-application/src/core/tg_service/schemas.py +++ b/telegram-application/src/core/tg_service/schemas.py @@ -1,6 +1,16 @@ -from pydantic import BaseModel, Field +from datetime import datetime -from src.core.ai_services.schemas import MessageFromChatSchema +from pydantic import BaseModel, PositiveInt, NegativeInt, UUID4 + +from src.core.common.schemas import BaseModelWithSerializeDatetime + + +class MessageFromChatSchema(BaseModelWithSerializeDatetime): + id: PositiveInt + user_id: PositiveInt + chat_id: NegativeInt + text: str + message_time: datetime class UserFromMessageSchema(BaseModel): @@ -12,4 +22,5 @@ class UserFromMessageSchema(BaseModel): class MessagesForSendToWorkersSchema(BaseModel): + chunk_id: UUID4 messages: list[MessageFromChatSchema] \ No newline at end of file diff --git a/telegram-application/src/core/tg_service/service.py b/telegram-application/src/core/tg_service/service.py index 18620bc..8860ba5 100644 --- a/telegram-application/src/core/tg_service/service.py +++ b/telegram-application/src/core/tg_service/service.py @@ -1,12 +1,13 @@ +import uuid + from pyrogram.types.user_and_chats import User as PyroUser, Chat as PyroChat -from src.core.ai_services.schemas import MessageFromChatSchema from src.core.database import TgChat, User from src.core.rabbitmq.connect import message_handler_publisher -from src.core.tg_service.constants import MESSAGE_CHANG_SIZE -from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema -from src.core.database.connect import db_helper from src.core.redis_helper.redis_connect import redis_client +from src.core.tg_service.constants import MESSAGE_CHANG_SIZE +from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema, MessageFromChatSchema +from src.core.database.connect import db_helper from src.core.tg_service import crud as tg_crud @@ -68,19 +69,23 @@ async def check_chunk_state_and_publish( data[chat_id] = [message_schema] elif len(messages_chunk) == MESSAGE_CHANG_SIZE: + chunk_id = uuid.uuid4() del data[chat_id] async with db_helper.get_async_session_not_closed() as session: await tg_crud.bulk_insert_messages( messages=messages_chunk, session=session ) + await message_handler_publisher.publish( + message=MessagesForSendToWorkersSchema( + messages=messages_chunk, + chunk_id=chunk_id + ) + ) data[chat_id] = [message_schema] else: data[chat_id].append(message_schema) - if messages_chunk and len(messages_chunk) == MESSAGE_CHANG_SIZE: - await message_handler_publisher.publish( - message=MessagesForSendToWorkersSchema(messages=messages_chunk) - ) + diff --git a/telegram-application/src/core/ai_services/gemini/__init__.py b/telegram-application/src/core/workers/__init__.py similarity index 100% rename from telegram-application/src/core/ai_services/gemini/__init__.py rename to telegram-application/src/core/workers/__init__.py diff --git a/telegram-application/src/core/workers/crud.py b/telegram-application/src/core/workers/crud.py new file mode 100644 index 0000000..bd5ccbf --- /dev/null +++ b/telegram-application/src/core/workers/crud.py @@ -0,0 +1,21 @@ +from sqlalchemy import insert +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.workers.schemas import ResponseFromGeminiSchema +from src.core.database.successes import Success + + +async def bulk_create_success_reasons( + session: AsyncSession, + success_schema: ResponseFromGeminiSchema, +) -> None: + stmt = ( + insert( + Success + ) + .values( + [chat.model_dump for chat in success_schema.success] + ) + ) + await session.execute(stmt) + await session.commit() diff --git a/telegram-application/src/core/workers/rmq_worker_handler.py b/telegram-application/src/core/workers/rmq_worker_handler.py new file mode 100644 index 0000000..ad5059a --- /dev/null +++ b/telegram-application/src/core/workers/rmq_worker_handler.py @@ -0,0 +1,24 @@ +from typing import Annotated + +from sqlalchemy.ext.asyncio import AsyncSession + +from faststream import Depends + +from src.core.workers.schemas import ResponseFromGeminiSchema +from src.core.database.connect import db_helper +from src.core.rabbitmq.connect import success_gemini_subscriber +from src.core.workers import crud as workers_crud + + +@success_gemini_subscriber +async def create_success_record( + message: ResponseFromGeminiSchema, + session: Annotated[AsyncSession, Depends(db_helper.get_async_session)], +): + await workers_crud.bulk_create_success_reasons( + success_schema=message, + session=session, + ) + + + diff --git a/telegram-application/src/core/workers/schemas.py b/telegram-application/src/core/workers/schemas.py new file mode 100644 index 0000000..fc56e61 --- /dev/null +++ b/telegram-application/src/core/workers/schemas.py @@ -0,0 +1,11 @@ +from pydantic import BaseModel, PositiveInt, NegativeInt + + +class SuccessChatFromAiSchema(BaseModel): + user_id: PositiveInt | None + chat_id: NegativeInt | None + reason: str | None + + +class ResponseFromGeminiSchema(BaseModel): + success: list[SuccessChatFromAiSchema] | None diff --git a/telegram-application/src/main.py b/telegram-application/src/main.py index e5b2e96..3ee80ad 100644 --- a/telegram-application/src/main.py +++ b/telegram-application/src/main.py @@ -25,5 +25,4 @@ async def main(): await idle() await app.stop() -print("*" * 100) app.run(main()) \ No newline at end of file