add slice for chats

This commit is contained in:
harold 2025-02-08 10:41:29 +05:00
parent d3abb1f217
commit e32ac46898
26 changed files with 165 additions and 415 deletions

View File

@ -27,6 +27,7 @@ services:
- "9151:5432" - "9151:5432"
volumes: volumes:
- ./docker/postgres/data:/var/lib/postgresql/data - ./docker/postgres/data:/var/lib/postgresql/data
- ./sql_scripts/create_table.sql:/docker-entrypoint-initdb.d/create_table.sql
redis: redis:
image: redis:latest image: redis:latest

View File

@ -0,0 +1,31 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE tgchats (
id BIGINT PRIMARY KEY UNIQUE,
chat_type TEXT NOT NULL,
title TEXT NOT NULL
);
CREATE TABLE users (
id BIGINT PRIMARY KEY UNIQUE,
username TEXT,
first_name TEXT,
last_name TEXT
);
CREATE TABLE tg_messages (
id BIGINT PRIMARY KEY UNIQUE,
text TEXT NOT NULL,
message_time TIMESTAMP NOT NULL,
user_id BIGINT NOT NULL REFERENCES users(id),
chat_id BIGINT NOT NULL REFERENCES tgchats(id),
slice_id UUID NOT NULL
);
CREATE TABLE successed (
id BIGSERIAL PRIMARY KEY UNIQUE,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
chat_id BIGINT NOT NULL REFERENCES tgchats(id) ON DELETE CASCADE,
created_at TIMESTAMP DEFAULT NOW() NOT NULL
);

View File

@ -1,12 +0,0 @@
from abc import ABC, abstractmethod
from src.core.ai_services.schemas import MessageFromChatSchema, ResponseFromAiSchema
class BaseAiService(ABC):
@abstractmethod
async def create_request_ai(
self,
messages: list[MessageFromChatSchema],
) -> ResponseFromAiSchema:
raise NotImplementedError

View File

@ -1,6 +0,0 @@
async def check_limiter():

View File

@ -1,69 +0,0 @@
ROLE = """
Ты специалист по поиску клиентов в компании, которая занимается разработкой любого софта
на заказ.
"""
ANALYTIC_PROMT = """
Ты получаешь json с такими полями
{
chats: [
"chat_id": integer
"messages": [
{
"user_id: integer,
"message_id": integer,
"text": string,
"date": datetime
}
]
]
}
chats - это список чатов.
messages - это срез диалога в чате телеграмма.
пользователи могут общаться на абсолютно разные темы.
Твоя задача:
Прочитать эти сообщения, понять тему текущего среза диалога.
И если ты поймешь, что мы можем какому то пользователю предложить свои услуги
например (написать интернет магазин, мобильное приложение или любой другой айти продукт)
то выведи в таком виде ответ.
ВАЖНО: Если ты уверен на 100 процентов, что они заинтересованы в подобных услугах и им
можно предложить, то верни.
Условно в нескольких чатах может быть несколько потенциальных клиентов, тогда вот так выведи
{
success: [
{
"user_id": integer,
"chat_id": integer,
"reason": string
}
]
}
поле reason: Кратко(до 100 симоволов) почему ты решил, что это потенциальный клиент.
Если ты хотя бы чуть чуть не уверен, то верни вот такую строку
{
success: null
}
ВАЖНО: Ты должен вернуть ТОЛЬКО JSON и не словом больше. Иначе я разорюсь.
без ```json ``` просто так без каких либо спецсимволов
"""
GEMINI_BASE_MESSAGE = [
{
"role": "user",
"parts": [
{"text": ROLE},
{"text": ANALYTIC_PROMT},
]
}
]

View File

@ -1,13 +0,0 @@
from pydantic import BaseModel
from src.core.ai_services.schemas import MessageFromChatSchema
class ChatMessageSchema(MessageFromChatSchema):
chat_id: int
messages: list[MessageFromChatSchema]
class FullRequestForGeminiSchema(BaseModel):
chats: list[ChatMessageSchema]

View File

@ -1,76 +0,0 @@
import json
from src.core.ai_services.base import BaseAiService
import google.generativeai as genai
from src.core.ai_services.gemini.constants import GEMINI_BASE_MESSAGE
from src.core.ai_services.gemini.schemas import FullRequestForGeminiSchema
from src.core.ai_services.schemas import MessageFromChatSchema, ResponseFromAiSchema
from src.core.settings.base import settings
class GoogleHelper(BaseAiService):
def __init__(
self,
api_key: str,
model_name: str,
) -> None:
self.api_key = api_key
self.model = model_name
genai.configure(api_key=api_key)
self._model = genai.GenerativeModel(model_name=model_name)
@staticmethod
def _serialize_messages_to_promt(
chats: FullRequestForGeminiSchema,
) -> list[dict]:
messages_for_request = GEMINI_BASE_MESSAGE.copy()
text_for_request = json.dumps(chats.model_dump())
extend_message = {
"role": "user",
"parts": [
{
"text": text_for_request,
}
],
}
messages_for_request.append(extend_message)
return messages_for_request
@staticmethod
def _serialize_response_to_json(
response_text: str,
) -> ResponseFromAiSchema:
response = response_text.replace('\n', '')
print(response)
print(len(response))
print("gemini")
response = response_text.replace('\n', '')
response_as_dict = json.loads(response_text)
return ResponseFromAiSchema(**response_as_dict)
async def create_request_ai(
self,
messages: list[MessageFromChatSchema],
) -> ResponseFromAiSchema:
contents = self._serialize_messages_to_promt(messages)
response = await self._model.generate_content_async(
contents=contents
)
return self._serialize_response_to_json(response.text)
gemini_helper = GoogleHelper(
api_key=settings.GEMINI.API_KEY,
model_name=settings.GEMINI.MODEL_NAME,
)

View File

@ -1,69 +0,0 @@
import asyncio
import json
import google.generativeai as genai
from src.core.common.promt import ANALYTIC_PROMT, ROLE
model = genai.GenerativeModel("gemini-1.5-flash")
# model.generate_content_async(
# contents="Explain how AI works",
# )
# response = model.generate_content("Explain how AI works")
data = {
"messages": [
{'message_id': 116459, 'user_id': 299500769, 'chat_id': -1001412374171,
'text': 'Это конечно круто, но не надо в питон тащить свои когнитивные искажения из джава мира))',
'date': '29/01/2025:22:54:08'},
{'message_id': 116460, 'user_id': 299500769, 'chat_id': -1001412374171,
'text': 'Не говорю, что там не правильно, просто не везде применимо)', 'date': '29/01/2025:22:54:29'},
{'message_id': 116461, 'user_id': 6295079014, 'chat_id': -1001412374171, 'text': 'ну так слоистая зато)',
'date': '29/01/2025:22:54:35'},
{'message_id': 116462, 'user_id': 6295079014, 'chat_id': -1001412374171,
'text': 'линтеры, IDE, типы - ради этого если кратко', 'date': '29/01/2025:22:55:26'},
{'message_id': 116463, 'user_id': 299500769, 'chat_id': -1001412374171,
'text': 'Не все 1 в 1 переносится от языка к языку)', 'date': '29/01/2025:22:55:33'},
{'message_id': 116464, 'user_id': 408922819, 'chat_id': -1001412374171,
'text': 'в каком месте у тебя здесь циклические импорты?', 'date': '29/01/2025:22:55:39'},
{'message_id': 116465, 'user_id': 6295079014, 'chat_id': -1001412374171,
'text': 'chat/models.py\nuser/models.py\n\n ссылаются друг на друга ибо в каждом есть импорты друг друга\n\nuser/models.py\nfrom src.chat.models import Chat, ChatMessage\n\nchat/models.py\nfrom src.user.models import User',
'date': '29/01/2025:22:57:45'},
{'message_id': 116466, 'user_id': 408922819, 'chat_id': -1001412374171,
'text': 'ах ты их еще и по аппкам разнес', 'date': '29/01/2025:22:58:34'},
{'message_id': 116467, 'user_id': 6295079014, 'chat_id': -1001412374171,
'text': 'если есть грамотный пример структуры приложения fastapi прошу прислать)',
'date': '29/01/2025:22:58:55'},
]
}
BASE_MESSAGE = [
{
"role": "user",
"parts": [{"text": ROLE.strip()}]
},
{
"role": "user",
"parts": [{"text": ANALYTIC_PROMT.strip()}]
},
{
"role": "user",
"parts": [json.dumps(data)]
},
]
async def print_response():
res = await model.generate_content_async(BASE_MESSAGE)
print(res.text)
print(res.usage_metadata)
async def check_limits():
response = await model.generate_content_async("Как посмотреть сколько лимитов в gemini gemini cloude у меня осталось")
# Вывод метаданных из ответа
print(response.text)
print("Usage metadata:", response.usage_metadata)
asyncio.run(check_limits())
# asyncio.run(print_response())

View File

@ -1,57 +0,0 @@
ROLE = """
Ты специалист по поиску клиентов в компании, которая занимается разработкой любого софта
на заказ.
"""
ANALYTIC_PROMT = """
Ты получаешь json с такими полями
{
"messages": [
{
"message_id": integer,
"user_id": integer,
"chat_id": integer,
"text": string,
"date": datetime
}
]
}
messages - это срез диалога в чате телеграмма.
пользователи могут общаться на абсолютно разные темы.
Твоя задача:
Прочитать эти сообщения, понять тему текущего среза диалога.
И если ты поймешь, что мы можем какому то пользователю предложить свои услуги
например (написать интернет магазин, мобильное приложение или любой другой айти продукт)
то выведи в таком виде ответ.
ВАЖНО: Если ты уверен на 100 процентов, что они заинтересованы в подобных услугах и им
можно предложить, то верни.
{
"user_id": integer,
"chat_id": integer,
"reason": string
}
поле reason: Кратко(до 100 симоволов) почему ты решил, что это потенциальный клиент.
Если ты хотя бы чуть чуть не уверен, то верни вот такую строку
{"user_id": null, "chat_id": null, "reason": null}
ВАЖНО: Ты должен вернуть ТОЛЬКО JSON и не словом больше. Иначе я разорюсь.
без ```json ``` просто так
"""
GROQ_BASE_MESSAGE = [
{
"role": "system",
"content": ROLE.replace("\n", '')
},
{
"role": "user",
"content": ANALYTIC_PROMT.replace("\n", ''),
}
]

View File

@ -1,78 +0,0 @@
import json
from groq import AsyncGroq
from src.core.ai_services.schemas import MessageFromChatSchema, ResponseFromAiSchema
from .constants import GROQ_BASE_MESSAGE
from ..base import BaseAiService
from ...settings.base import settings
class GroqHelper(BaseAiService):
def __init__(
self,
api_key: str,
model: str,
temperature: int,
) -> None:
self.api_key = api_key
self.model = model
self.temperature = temperature
self._client = AsyncGroq(
api_key=self.api_key,
)
@staticmethod
def _serialize_messages_to_promt(
messages: list[MessageFromChatSchema],
) -> dict:
messages_for_request = GROQ_BASE_MESSAGE.copy()
extend_message = {
"role": "user",
"content": json.dumps(
{
"messages": [
message.model_dump_with_datetime() for message in messages
]
}
),
}
messages_for_request.append(extend_message)
return messages_for_request
@staticmethod
def _serialize_response_to_json(
response_text: str,
) -> ResponseFromAiSchema:
print(response_text)
print(len(response_text))
print("groq")
response = response_text.replace('\n', '')
response_as_dict = json.loads(response)
return ResponseFromAiSchema(**response_as_dict)
async def create_request_ai(
self,
messages: list[MessageFromChatSchema],
) -> ResponseFromAiSchema:
messages_for_promt = self._serialize_messages_to_promt(messages)
response = await self._client.chat.completions.create(
messages=messages_for_promt,
model=self.model,
temperature=self.temperature,
)
return self._serialize_response_to_json(
response.choices[0].message.content,
)
groq_helper = GroqHelper(
api_key=settings.GROQ.API_KEY,
model=settings.GROQ.MODEL,
temperature=settings.GROQ.TEMPERATURE,
)

View File

@ -1,19 +0,0 @@
from datetime import datetime
from pydantic import BaseModel, PositiveInt, NegativeInt
from src.core.common.schemas import BaseModelWithSerializeDatetime
class MessageFromChatSchema(BaseModelWithSerializeDatetime):
id: PositiveInt
user_id: PositiveInt
chat_id: NegativeInt
text: str
message_time: datetime
class ResponseFromAiSchema(BaseModel):
user_id: PositiveInt | None
chat_id: NegativeInt | None
reason: str | None

View File

@ -0,0 +1,25 @@
from datetime import datetime
from sqlalchemy import ForeignKey, func
from sqlalchemy.orm import Mapped, mapped_column
from src.core.database import Base
class Success(Base):
__tablename__ = 'successed'
id: Mapped[int] = mapped_column(
primary_key=True,
autoincrement=True,
unique=True,
)
user_id: Mapped[str] = mapped_column(
ForeignKey('users.id', ondelete='CASCADE'),
)
chat_id: Mapped[int] = mapped_column(
ForeignKey('tgchats.id', ondelete='CASCADE'),
)
created_at: Mapped[datetime] = mapped_column(
default=datetime.utcnow,
server_default=func.now()
)

View File

@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
from uuid import UUID
from sqlalchemy import ForeignKey from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
@ -18,3 +19,4 @@ class TgMessage(Base):
chat_id: Mapped[int] = mapped_column( chat_id: Mapped[int] = mapped_column(
ForeignKey("tgchats.id"), ForeignKey("tgchats.id"),
) )
slice_id: Mapped[UUID]

View File

@ -17,6 +17,10 @@ base_exchange = RabbitExchange(
base_queue = RabbitQueue( base_queue = RabbitQueue(
name="base_queue", name="base_queue",
) )
success_queue = RabbitQueue(
name="success_queue",
)
message_handler_publisher = broker.publisher( message_handler_publisher = broker.publisher(
@ -24,7 +28,15 @@ message_handler_publisher = broker.publisher(
exchange=base_exchange, exchange=base_exchange,
) )
success_gemini_subscriber = broker.subscriber(
queue=success_queue,
exchange=base_exchange,
)
async def init_queue_and_publisher(): async def init_queue_and_publisher():
await broker.declare_exchange(base_exchange) await broker.declare_exchange(base_exchange)
await broker.declare_queue(base_queue) await broker.declare_queue(base_queue)
await broker.declare_queue(success_queue)

View File

@ -1,8 +1,10 @@
from sqlalchemy import insert from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from src.core.ai_services.schemas import MessageFromChatSchema from uuid import UUID
from src.core.database import User, TgChat, TgMessage from src.core.database import User, TgChat, TgMessage
from src.core.tg_service.schemas import MessageFromChatSchema
async def create_new_user( async def create_new_user(
@ -29,13 +31,18 @@ async def create_new_chat(
async def bulk_insert_messages( async def bulk_insert_messages(
messages: list[MessageFromChatSchema], messages: list[MessageFromChatSchema],
chunk_id: UUID,
session: AsyncSession, session: AsyncSession,
) -> None: ) -> None:
dicts_for_insert: list[dict] = []
for message in messages:
dumped_schema = message.model_dump()
dumped_schema["chunk_id"] = chunk_id
dicts_for_insert.append(dumped_schema)
stmt = ( stmt = (
insert(TgMessage), insert(TgMessage),
[ dicts_for_insert,
*[message.model_dump() for message in messages],
]
) )
await session.execute(*stmt) await session.execute(*stmt)
await session.commit() await session.commit()

View File

@ -1,7 +1,7 @@
from pyrogram import Client from pyrogram import Client
from pyrogram.types import Message from pyrogram.types import Message
from src.core.ai_services.schemas import MessageFromChatSchema from src.core.tg_service.schemas import MessageFromChatSchema
from src.core.tg_service.service import check_user_exists, check_chat_exists, check_chunk_state_and_publish from src.core.tg_service.service import check_user_exists, check_chat_exists, check_chunk_state_and_publish
from src.core.tg_service import utils as api_tg_utils from src.core.tg_service import utils as api_tg_utils

View File

@ -1,6 +1,16 @@
from pydantic import BaseModel, Field from datetime import datetime
from src.core.ai_services.schemas import MessageFromChatSchema from pydantic import BaseModel, PositiveInt, NegativeInt, UUID4
from src.core.common.schemas import BaseModelWithSerializeDatetime
class MessageFromChatSchema(BaseModelWithSerializeDatetime):
id: PositiveInt
user_id: PositiveInt
chat_id: NegativeInt
text: str
message_time: datetime
class UserFromMessageSchema(BaseModel): class UserFromMessageSchema(BaseModel):
@ -12,4 +22,5 @@ class UserFromMessageSchema(BaseModel):
class MessagesForSendToWorkersSchema(BaseModel): class MessagesForSendToWorkersSchema(BaseModel):
chunk_id: UUID4
messages: list[MessageFromChatSchema] messages: list[MessageFromChatSchema]

View File

@ -1,12 +1,13 @@
import uuid
from pyrogram.types.user_and_chats import User as PyroUser, Chat as PyroChat from pyrogram.types.user_and_chats import User as PyroUser, Chat as PyroChat
from src.core.ai_services.schemas import MessageFromChatSchema
from src.core.database import TgChat, User from src.core.database import TgChat, User
from src.core.rabbitmq.connect import message_handler_publisher from src.core.rabbitmq.connect import message_handler_publisher
from src.core.tg_service.constants import MESSAGE_CHANG_SIZE
from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema
from src.core.database.connect import db_helper
from src.core.redis_helper.redis_connect import redis_client from src.core.redis_helper.redis_connect import redis_client
from src.core.tg_service.constants import MESSAGE_CHANG_SIZE
from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema, MessageFromChatSchema
from src.core.database.connect import db_helper
from src.core.tg_service import crud as tg_crud from src.core.tg_service import crud as tg_crud
@ -68,19 +69,23 @@ async def check_chunk_state_and_publish(
data[chat_id] = [message_schema] data[chat_id] = [message_schema]
elif len(messages_chunk) == MESSAGE_CHANG_SIZE: elif len(messages_chunk) == MESSAGE_CHANG_SIZE:
chunk_id = uuid.uuid4()
del data[chat_id] del data[chat_id]
async with db_helper.get_async_session_not_closed() as session: async with db_helper.get_async_session_not_closed() as session:
await tg_crud.bulk_insert_messages( await tg_crud.bulk_insert_messages(
messages=messages_chunk, messages=messages_chunk,
session=session session=session
) )
await message_handler_publisher.publish(
message=MessagesForSendToWorkersSchema(
messages=messages_chunk,
chunk_id=chunk_id
)
)
data[chat_id] = [message_schema] data[chat_id] = [message_schema]
else: else:
data[chat_id].append(message_schema) data[chat_id].append(message_schema)
if messages_chunk and len(messages_chunk) == MESSAGE_CHANG_SIZE:
await message_handler_publisher.publish(
message=MessagesForSendToWorkersSchema(messages=messages_chunk)
)

View File

@ -0,0 +1,21 @@
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.workers.schemas import ResponseFromGeminiSchema
from src.core.database.successes import Success
async def bulk_create_success_reasons(
session: AsyncSession,
success_schema: ResponseFromGeminiSchema,
) -> None:
stmt = (
insert(
Success
)
.values(
[chat.model_dump for chat in success_schema.success]
)
)
await session.execute(stmt)
await session.commit()

View File

@ -0,0 +1,24 @@
from typing import Annotated
from sqlalchemy.ext.asyncio import AsyncSession
from faststream import Depends
from src.core.workers.schemas import ResponseFromGeminiSchema
from src.core.database.connect import db_helper
from src.core.rabbitmq.connect import success_gemini_subscriber
from src.core.workers import crud as workers_crud
@success_gemini_subscriber
async def create_success_record(
message: ResponseFromGeminiSchema,
session: Annotated[AsyncSession, Depends(db_helper.get_async_session)],
):
await workers_crud.bulk_create_success_reasons(
success_schema=message,
session=session,
)

View File

@ -0,0 +1,11 @@
from pydantic import BaseModel, PositiveInt, NegativeInt
class SuccessChatFromAiSchema(BaseModel):
user_id: PositiveInt | None
chat_id: NegativeInt | None
reason: str | None
class ResponseFromGeminiSchema(BaseModel):
success: list[SuccessChatFromAiSchema] | None

View File

@ -25,5 +25,4 @@ async def main():
await idle() await idle()
await app.stop() await app.stop()
print("*" * 100)
app.run(main()) app.run(main())