add fix for bot

This commit is contained in:
harold 2025-02-06 10:55:41 +05:00
parent ad71ca253d
commit f2117bbeb4
58 changed files with 450 additions and 144 deletions

Binary file not shown.

View File

@ -1,9 +0,0 @@
from sqlalchemy.orm import Mapped
from src.core.database import Base
class TgChat(Base):
telegram_chat_id: Mapped[int]
chat_type: Mapped[str]
title: Mapped[str]

View File

@ -1,90 +0,0 @@
import json
from time import sleep
from pyrogram import Client, filters
from pyrogram.enums import ChatType
from pyrogram.types import Message
from ai_test import create_request_ai
from src.core.ai_services.gemini.service import gemini_helper
from src.core.ai_services.groq.service import groq_helper
from src.core.ai_services.schemas import MessageFromChatSchema
from src.core.common.promt import BASE_MESSAGE
api_id = 17718565
api_hash = "72f93973f4227415572f039d4f847082"
app = Client(
name="advatroniks",
api_id=api_id,
api_hash=api_hash,
)
DATA: dict[int, list[MessageFromChatSchema]] = dict()
@app.on_message(filters.all) # Используем фильтр для сообщений из всех чатов
async def listen_messages(client: Client, message: Message):
# print(message.chat)
chat_title = message.chat.title or message.chat.first_name or message.chat.username
sender = message.from_user.first_name if message.from_user else "Система/Бот"
text = message.text or "[не текстовое сообщение]"
if message.chat.type not in [ChatType.PRIVATE, ChatType.BOT] and message.from_user and message.text:
if DATA.get(message.chat.id):
DATA[message.chat.id].append(
MessageFromChatSchema(
message_id=message.id,
user_id=message.from_user.id,
chat_id=message.chat.id,
text=message.text,
date=message.date
)
)
else:
DATA[message.chat.id] = [
MessageFromChatSchema(
message_id=message.id,
user_id=message.from_user.id,
chat_id=message.chat.id,
text=message.text,
date=message.date
)
]
print(len(DATA[message.chat.id]))
# print(chat_title, '|', len(DATA[message.chat.id]))
# print(len(DATA[message.chat.id]))
counter = 0
for key, value in DATA.items():
if len(value) == 20 and counter == 0:
gemini_response = await gemini_helper.create_request_ai(
messages=value
)
# print(gemini_response)
# print("*" * 100, "GEMINI", "*" * 100)
groq_response = await groq_helper.create_request_ai(
messages=value,
)
sleep(20)
# print(groq_response)
# print("*" * 100, "GROQ", "*" * 100)
for _ in value:
print(_)
counter += 1
# print(f"Сообщение из чата: {chat_title}")
# print(f"Отправитель: {sender}")
# print(f"Текст: {text}")
# print("-" * 40)
# Запуск клиента
if __name__ == "__main__":
print("Слушаю все сообщения из чатов...")
app.run()

View File

@ -0,0 +1 @@
venv

View File

@ -2,7 +2,7 @@
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
.idea ../.idea
# C extensions # C extensions
*.so *.so

View File

@ -0,0 +1,19 @@
FROM python:3.12-slim
# Устанавливаем зависимости для Poetry
RUN pip install poetry
# Устанавливаем рабочую директорию
WORKDIR /app
# Копируем файлы Poetry (pyproject.toml и poetry.lock)
COPY pyproject.toml poetry.lock ./
# Устанавливаем зависимости через Poetry
RUN poetry install --no-dev --no-interaction
# Копируем весь код приложения
COPY . .
# Команда для запуска приложения
CMD ["poetry", "run", "python", "bot.py"]

View File

@ -1,8 +1,5 @@
import asyncio
from groq import AsyncGroq from groq import AsyncGroq
from src.core.common.promt import ROLE, ANALYTIC_PROMT
from src.core.settings.base import settings from src.core.settings.base import settings
# Убедитесь, что переменная окружения GROQ_API_KEY установлена # Убедитесь, что переменная окружения GROQ_API_KEY установлена

View File

@ -0,0 +1,54 @@
version: "3.8"
services:
bot:
build: .
container_name: pyrogram_bot
restart: unless-stopped
environment:
- DB_HOST=postgres
- DB_NAME=mydatabase
- DB_USER=user
- DB_PASSWORD=password
- REDIS_HOST=redis
- RABBITMQ_HOST=rabbitmq
depends_on:
- postgres
- redis
- rabbitmq
postgres:
image: postgres:13
container_name: postgres
restart: unless-stopped
environment:
POSTGRES_DB: mydatabase
POSTGRES_USER: user
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
# Сервис для Redis
redis:
image: redis:latest
container_name: redis
restart: unless-stopped
ports:
- "6379:6379"
# Сервис для RabbitMQ
rabbitmq:
image: "rabbitmq:3-management"
container_name: rabbitmq
restart: unless-stopped
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
volumes:
postgres_data:

View File

@ -198,6 +198,56 @@ files = [
{file = "distro-1.9.0.tar.gz", hash = "sha256:2fa77c6fd8940f116ee1d6b94a2f90b13b5ea8d019b98bc8bafdcabcdd9bdbed"}, {file = "distro-1.9.0.tar.gz", hash = "sha256:2fa77c6fd8940f116ee1d6b94a2f90b13b5ea8d019b98bc8bafdcabcdd9bdbed"},
] ]
[[package]]
name = "fast-depends"
version = "2.4.12"
description = "FastDepends - extracted and cleared from HTTP domain logic FastAPI Dependency Injection System. Async and sync are both supported."
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "fast_depends-2.4.12-py3-none-any.whl", hash = "sha256:9e5d110ddc962329e46c9b35e5fe65655984247a13ee3ca5a33186db7d2d75c2"},
{file = "fast_depends-2.4.12.tar.gz", hash = "sha256:9393e6de827f7afa0141e54fa9553b737396aaf06bd0040e159d1f790487b16d"},
]
[package.dependencies]
anyio = ">=3.0.0,<5.0.0"
pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<3.0.0"
[[package]]
name = "faststream"
version = "0.5.34"
description = "FastStream: the simplest way to work with a messaging queues"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "faststream-0.5.34-py3-none-any.whl", hash = "sha256:aa7f61d6968a68f13ebf755cce9e8bf11b00717c28b2ef66e896b5d652a6c6a2"},
{file = "faststream-0.5.34.tar.gz", hash = "sha256:84615968c5768ebaa89b72ae66b53e5302c08e7d18b341ef5193e54cb6ba8623"},
]
[package.dependencies]
anyio = ">=3.7.1,<5"
fast-depends = ">=2.4.0b0,<3.0.0"
typing-extensions = ">=4.8.0"
[package.extras]
cli = ["typer (>=0.9,!=0.12,<=0.15.1)", "watchfiles (>=0.15.0,<1.1.0)"]
confluent = ["confluent-kafka (>=2,<3)", "confluent-kafka (>=2.6,<3)"]
dev = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "bandit (==1.8.0)", "cairosvg", "codespell (==2.3.0)", "confluent-kafka (>=2,<3)", "confluent-kafka (>=2.6,<3)", "confluent-kafka-stubs", "coverage[toml] (==7.6.1)", "coverage[toml] (==7.6.10)", "detect-secrets (==1.5.0)", "dirty-equals (==0.8.0)", "email-validator (==2.2.0)", "fastapi (==0.115.6)", "httpx (==0.28.1)", "mdx-include (==1.4.2)", "mike (==2.1.3)", "mkdocs-git-revision-date-localized-plugin (==1.3.0)", "mkdocs-glightbox (==0.4.0)", "mkdocs-literate-nav (==0.6.1)", "mkdocs-macros-plugin (==1.3.7)", "mkdocs-material (==9.5.49)", "mkdocs-minify-plugin (==0.8.0)", "mkdocs-static-i18n (==1.2.3)", "mkdocstrings[python] (==0.27.0)", "mypy (==1.14.1)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "pillow", "pre-commit (==3.5.0)", "pre-commit (==4.0.1)", "prometheus-client (>=0.20.0,<0.30.0)", "pydantic-settings (>=2.0.0,<3.0.0)", "pytest (==8.3.4)", "pytest-asyncio (==0.24.0)", "pytest-asyncio (==0.25.1)", "pyyaml (==6.0.2)", "redis (>=5.0.0,<6.0.0)", "requests", "ruff (==0.8.6)", "semgrep (==1.101.0)", "typer (>=0.9,!=0.12,<=0.15.1)", "types-aiofiles", "types-deprecated", "types-docutils", "types-pygments", "types-pyyaml", "types-redis", "types-setuptools", "types-ujson", "typing-extensions (>=4.8.0,<4.12.1)", "watchfiles (>=0.15.0,<1.1.0)"]
devdocs = ["cairosvg", "mdx-include (==1.4.2)", "mike (==2.1.3)", "mkdocs-git-revision-date-localized-plugin (==1.3.0)", "mkdocs-glightbox (==0.4.0)", "mkdocs-literate-nav (==0.6.1)", "mkdocs-macros-plugin (==1.3.7)", "mkdocs-material (==9.5.49)", "mkdocs-minify-plugin (==0.8.0)", "mkdocs-static-i18n (==1.2.3)", "mkdocstrings[python] (==0.27.0)", "pillow", "requests"]
kafka = ["aiokafka (>=0.9,<0.13)"]
lint = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "bandit (==1.8.0)", "codespell (==2.3.0)", "confluent-kafka (>=2,<3)", "confluent-kafka (>=2.6,<3)", "confluent-kafka-stubs", "mypy (==1.14.1)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "prometheus-client (>=0.20.0,<0.30.0)", "redis (>=5.0.0,<6.0.0)", "ruff (==0.8.6)", "semgrep (==1.101.0)", "typer (>=0.9,!=0.12,<=0.15.1)", "types-aiofiles", "types-deprecated", "types-docutils", "types-pygments", "types-pyyaml", "types-redis", "types-setuptools", "types-ujson", "watchfiles (>=0.15.0,<1.1.0)"]
nats = ["nats-py (>=2.7.0,<=3.0.0)"]
optionals = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "confluent-kafka (>=2,<3)", "confluent-kafka (>=2.6,<3)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "prometheus-client (>=0.20.0,<0.30.0)", "redis (>=5.0.0,<6.0.0)", "typer (>=0.9,!=0.12,<=0.15.1)", "watchfiles (>=0.15.0,<1.1.0)"]
otel = ["opentelemetry-sdk (>=1.24.0,<2.0.0)"]
prometheus = ["prometheus-client (>=0.20.0,<0.30.0)"]
rabbit = ["aio-pika (>=9,<10)"]
redis = ["redis (>=5.0.0,<6.0.0)"]
test-core = ["coverage[toml] (==7.6.1)", "coverage[toml] (==7.6.10)", "dirty-equals (==0.8.0)", "pytest (==8.3.4)", "pytest-asyncio (==0.24.0)", "pytest-asyncio (==0.25.1)", "typing-extensions (>=4.8.0,<4.12.1)"]
testing = ["coverage[toml] (==7.6.1)", "coverage[toml] (==7.6.10)", "dirty-equals (==0.8.0)", "email-validator (==2.2.0)", "fastapi (==0.115.6)", "httpx (==0.28.1)", "pydantic-settings (>=2.0.0,<3.0.0)", "pytest (==8.3.4)", "pytest-asyncio (==0.24.0)", "pytest-asyncio (==0.25.1)", "pyyaml (==6.0.2)", "typing-extensions (>=4.8.0,<4.12.1)"]
types = ["aio-pika (>=9,<10)", "aiokafka (>=0.9,<0.13)", "confluent-kafka (>=2,<3)", "confluent-kafka (>=2.6,<3)", "confluent-kafka-stubs", "mypy (==1.14.1)", "nats-py (>=2.7.0,<=3.0.0)", "opentelemetry-sdk (>=1.24.0,<2.0.0)", "prometheus-client (>=0.20.0,<0.30.0)", "redis (>=5.0.0,<6.0.0)", "typer (>=0.9,!=0.12,<=0.15.1)", "types-aiofiles", "types-deprecated", "types-docutils", "types-pygments", "types-pyyaml", "types-redis", "types-setuptools", "types-ujson", "watchfiles (>=0.15.0,<1.1.0)"]
[[package]] [[package]]
name = "google-ai-generativelanguage" name = "google-ai-generativelanguage"
version = "0.6.15" version = "0.6.15"
@ -1237,4 +1287,4 @@ zstd = ["zstandard (>=0.18.0)"]
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.11, <4.0" python-versions = ">=3.11, <4.0"
content-hash = "dc59c68a2f05dcce182f7343c2fe7979ad2b654211954f9480a263cd9e128232" content-hash = "ae3423d56de9fa2b9ea991f7cfb550df3b945bbe0f489024da70b663dd17221b"

View File

@ -2,6 +2,7 @@
name = "scrapper-tg-bot" name = "scrapper-tg-bot"
version = "0.1.0" version = "0.1.0"
description = "" description = ""
package-mode = false
authors = [ authors = [
{name = "harold",email = "tihon414@gmail.com"} {name = "harold",email = "tihon414@gmail.com"}
] ]
@ -16,7 +17,8 @@ dependencies = [
"sqlalchemy (>=2.0.37,<3.0.0)", "sqlalchemy (>=2.0.37,<3.0.0)",
"pydantic-settings (>=2.7.1,<3.0.0)", "pydantic-settings (>=2.7.1,<3.0.0)",
"redis (>=5.2.1,<6.0.0)", "redis (>=5.2.1,<6.0.0)",
"google-generativeai (>=0.8.4,<0.9.0)" "google-generativeai (>=0.8.4,<0.9.0)",
"faststream[rabbitmq] (>=0.5.34,<0.6.0)"
] ]

View File

@ -0,0 +1,6 @@
async def check_limiter():

View File

@ -6,16 +6,21 @@ ROLE = """
ANALYTIC_PROMT = """ ANALYTIC_PROMT = """
Ты получаешь json с такими полями Ты получаешь json с такими полями
{ {
"messages": [ chats: [
{ "chat_id": integer
"message_id": integer, "messages": [
"user_id": integer, {
"chat_id": integer, "user_id: integer,
"text": string, "message_id": integer,
"date": datetime "text": string,
} "date": datetime
] }
} ]
]
}
chats - это список чатов.
messages - это срез диалога в чате телеграмма. messages - это срез диалога в чате телеграмма.
пользователи могут общаться на абсолютно разные темы. пользователи могут общаться на абсолютно разные темы.
Твоя задача: Твоя задача:
@ -27,16 +32,24 @@ messages - это срез диалога в чате телеграмма.
ВАЖНО: Если ты уверен на 100 процентов, что они заинтересованы в подобных услугах и им ВАЖНО: Если ты уверен на 100 процентов, что они заинтересованы в подобных услугах и им
можно предложить, то верни. можно предложить, то верни.
Условно в нескольких чатах может быть несколько потенциальных клиентов, тогда вот так выведи
{ {
"user_id": integer, success: [
"chat_id": integer, {
"reason": string "user_id": integer,
"chat_id": integer,
"reason": string
}
]
} }
поле reason: Кратко(до 100 симоволов) почему ты решил, что это потенциальный клиент. поле reason: Кратко(до 100 симоволов) почему ты решил, что это потенциальный клиент.
Если ты хотя бы чуть чуть не уверен, то верни вот такую строку Если ты хотя бы чуть чуть не уверен, то верни вот такую строку
{"user_id": null, "chat_id": null, "reason": null} {
success: null
}
ВАЖНО: Ты должен вернуть ТОЛЬКО JSON и не словом больше. Иначе я разорюсь. ВАЖНО: Ты должен вернуть ТОЛЬКО JSON и не словом больше. Иначе я разорюсь.

View File

@ -0,0 +1,13 @@
from pydantic import BaseModel
from src.core.ai_services.schemas import MessageFromChatSchema
class ChatMessageSchema(MessageFromChatSchema):
chat_id: int
messages: list[MessageFromChatSchema]
class FullRequestForGeminiSchema(BaseModel):
chats: list[ChatMessageSchema]

View File

@ -5,6 +5,7 @@ from src.core.ai_services.base import BaseAiService
import google.generativeai as genai import google.generativeai as genai
from src.core.ai_services.gemini.constants import GEMINI_BASE_MESSAGE from src.core.ai_services.gemini.constants import GEMINI_BASE_MESSAGE
from src.core.ai_services.gemini.schemas import FullRequestForGeminiSchema
from src.core.ai_services.schemas import MessageFromChatSchema, ResponseFromAiSchema from src.core.ai_services.schemas import MessageFromChatSchema, ResponseFromAiSchema
from src.core.settings.base import settings from src.core.settings.base import settings
@ -23,12 +24,11 @@ class GoogleHelper(BaseAiService):
@staticmethod @staticmethod
def _serialize_messages_to_promt( def _serialize_messages_to_promt(
messages: list[MessageFromChatSchema], chats: FullRequestForGeminiSchema,
) -> list[dict]: ) -> list[dict]:
messages_for_request = GEMINI_BASE_MESSAGE.copy() messages_for_request = GEMINI_BASE_MESSAGE.copy()
dumped_messages = [msg.model_dump_with_datetime() for msg in messages]
text_for_request = json.dumps({"messages": dumped_messages}) text_for_request = json.dumps(chats.model_dump())
extend_message = { extend_message = {
"role": "user", "role": "user",

View File

@ -2,19 +2,15 @@ from datetime import datetime
from pydantic import BaseModel, PositiveInt, NegativeInt from pydantic import BaseModel, PositiveInt, NegativeInt
from src.core.common.schemas import BaseModelWithSerializeDatetime
class MessageFromChatSchema(BaseModel):
message_id: PositiveInt class MessageFromChatSchema(BaseModelWithSerializeDatetime):
id: PositiveInt
user_id: PositiveInt user_id: PositiveInt
chat_id: NegativeInt chat_id: NegativeInt
text: str text: str
date: datetime message_time: datetime
def model_dump_with_datetime(self) -> dict:
dumped_model = self.model_dump()
dumped_model["date"] = dumped_model["date"].isoformat()
return dumped_model
class ResponseFromAiSchema(BaseModel): class ResponseFromAiSchema(BaseModel):

View File

@ -0,0 +1,17 @@
from datetime import datetime
from pydantic import BaseModel, field_validator
class BaseModelWithSerializeDatetime(BaseModel):
@field_validator("*")
def remove_tzinfo(cls, value):
if isinstance(value, datetime) and value.tzinfo is not None:
return value.replace(tzinfo=None)
return value
class Config:
json_encoders = {
datetime: lambda v: f"{v.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-5]}Z"
}

View File

@ -2,9 +2,11 @@ __all__ = [
"Base", "Base",
"TgMessage", "TgMessage",
"TgChat", "TgChat",
"User",
] ]
from .base import Base from .base import Base
from .tg_messages import TgMessage from .tg_messages import TgMessage
from .chats import TgChat from .chats import TgChat
from .users import User

View File

@ -1,6 +1,6 @@
from uuid import uuid4, UUID from uuid import uuid4, UUID
from sqlalchemy import Uuid as Alq_uuid, text from sqlalchemy import text, BigInteger
from sqlalchemy.orm import DeclarativeBase, declared_attr, Mapped, mapped_column from sqlalchemy.orm import DeclarativeBase, declared_attr, Mapped, mapped_column
@ -11,9 +11,8 @@ class Base(DeclarativeBase):
def __tablename__(cls) -> str: def __tablename__(cls) -> str:
return f"{cls.__name__.lower()}s" return f"{cls.__name__.lower()}s"
id: Mapped[UUID] = mapped_column( id: Mapped[int] = mapped_column(
Alq_uuid, BigInteger,
default=uuid4,
server_default=text("uuid_generate_v4()"),
primary_key=True, primary_key=True,
unique=True,
) )

View File

@ -0,0 +1,8 @@
from sqlalchemy.orm import Mapped, mapped_column
from src.core.database import Base
class TgChat(Base):
chat_type: Mapped[str]
title: Mapped[str]

View File

@ -11,7 +11,7 @@ class TgMessage(Base):
text: Mapped[str] text: Mapped[str]
message_time: Mapped[datetime] message_time: Mapped[datetime]
user_id: Mapped[int] = mapped_column( user_id: Mapped[int] = mapped_column(
ForeignKey("users.telegram_id"), ForeignKey("users.id"),
) )
chat_id: Mapped[int] = mapped_column( chat_id: Mapped[int] = mapped_column(
ForeignKey("chats.telegram_chat_id"), ForeignKey("chats.telegram_chat_id"),

View File

@ -5,11 +5,6 @@ from src.core.database.base import Base
class User(Base): class User(Base):
telegram_id: Mapped[int] = mapped_column(
BigInteger,
primary_key=True,
unique=True,
)
username: Mapped[str] username: Mapped[str]
first_name: Mapped[str | None] first_name: Mapped[str | None]
last_name: Mapped[str | None] last_name: Mapped[str | None]

View File

@ -0,0 +1,14 @@
from faststream.rabbit import RabbitBroker, RabbitExchange, ExchangeType, QueueType, RabbitQueue
broker = RabbitBroker()
base_exchange = RabbitExchange(
name="base_exchange",
type=ExchangeType.DIRECT,
)
base_queue = RabbitQueue(
name="base_queue",
queue_type=QueueType.CLASSIC,
)

View File

@ -0,0 +1,11 @@
from redis.asyncio import ConnectionPool, Redis
from src.core.settings.base import settings
redis_pool = ConnectionPool(
host=settings.REDIS.HOST,
port=settings.REDIS.PORT,
decode_responses=True
)
redis_client = Redis(connection_pool=redis_pool)

View File

@ -5,7 +5,8 @@ from src.core.common.constants import PydanticEnvPrefixEnum, EnvFileLocation, En
from src.core.settings.database import DatabaseSettings from src.core.settings.database import DatabaseSettings
from src.core.settings.gemini import GeminiSettings from src.core.settings.gemini import GeminiSettings
from src.core.settings.groq import GroqSettings from src.core.settings.groq import GroqSettings
from src.core.settings.rabbitmq import RabbitmqSettings
from src.core.settings.redis import RedisSettings
print(EnvFileLocation.PRODUCTION) print(EnvFileLocation.PRODUCTION)
@ -26,5 +27,8 @@ class Settings(BaseSettings):
GROQ: GroqSettings GROQ: GroqSettings
GEMINI: GeminiSettings GEMINI: GeminiSettings
REDIS: RedisSettings
RABBIT: RabbitmqSettings
settings = Settings() # type:ignore settings = Settings() # type:ignore

View File

@ -0,0 +1,7 @@
from typing import Annotated
from pydantic import BaseModel, AmqpDsn, AfterValidator
class RabbitmqSettings(BaseModel):
URL: Annotated[AmqpDsn, AfterValidator(str)]

View File

@ -0,0 +1,8 @@
from typing import Annotated
from pydantic import BaseModel, RedisDsn, AfterValidator
class RedisSettings(BaseModel):
HOST: str
PORT: int

View File

@ -0,0 +1 @@
MESSAGE_CHANG_SIZE: int = 20

View File

@ -0,0 +1,42 @@
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.ai_services.schemas import MessageFromChatSchema
from src.core.database import User, TgChat, TgMessage
async def create_new_user(
session: AsyncSession,
**kwargs
) -> User:
new_user = User(**kwargs)
session.add(new_user)
await session.commit()
return new_user
async def create_new_chat(
session: AsyncSession,
**kwargs
) -> TgChat:
new_chat = TgChat(**kwargs)
session.add(new_chat)
await session.commit()
return new_chat
async def bulk_insert_messages(
messages: list[MessageFromChatSchema],
session: AsyncSession,
) -> None:
stmt = (
insert(TgMessage),
[
*[message.model_dump() for message in messages],
]
)
await session.execute(*stmt)
await session.commit()

View File

@ -0,0 +1,44 @@
from pyrogram import filters, Client
from pyrogram.enums import ChatType
from pyrogram.types import Message
from src.core.ai_services.schemas import MessageFromChatSchema
from src.core.tg_service.constants import MESSAGE_CHANG_SIZE
from src.core.tg_service.service import check_user_exists, check_chat_exists
from src.core.tg_service import utils as api_tg_utils
DATA: dict[int, list[MessageFromChatSchema]] = dict()
async def some_publisher():
pass
async def message_listener(client: Client, message: Message):
if api_tg_utils.check_message_condition(message):
await check_user_exists(
user_pyrogram=message.from_user,
)
await check_chat_exists(
chat_pyrogram=message.chat,
)
messages_chunk = DATA.get(message.chat.id)
message_schema = MessageFromChatSchema(
id=message.id,
user_id=message.from_user.id,
chat_id=message.chat.id,
text=message.text,
message_time=message.date,
)
if messages_chunk and len(messages_chunk) == MESSAGE_CHANG_SIZE: # TODO в констант.
await some_publisher()
del DATA[message.chat.id]
if messages_chunk:
DATA[message.chat.id].append(message_schema)
else:
DATA[message.chat.id] = [message_schema]

View File

@ -0,0 +1,9 @@
from pydantic import BaseModel
class UserFromMessageSchema(BaseModel):
id: int
username: str
first_name: str
last_name: str

View File

@ -0,0 +1,55 @@
from pyrogram.types.user_and_chats import User, Chat
from src.core.tg_service.schemas import UserFromMessageSchema
from src.core.database.connect import db_helper
from src.core.redis_helper.redis_connect import redis_client
from src.core.tg_service import crud as tg_crud
async def check_user_exists(
user_pyrogram: User,
) -> None:
user_schema = UserFromMessageSchema(
username=user_pyrogram.username,
first_name=user_pyrogram.first_name,
last_name=user_pyrogram.last_name,
id=user_pyrogram.id,
)
user_model = await redis_client.get(name=str(user_schema.id))
if user_model:
return
async with db_helper.get_async_session() as session:
user_model = await session.get(id=user_model.id)
if not user_model:
await tg_crud.create_new_user(
session=session,
**user_schema.model_dump()
)
await redis_client.set(name=str(user_schema.id), value=True)
async def check_chat_exists(
chat_pyrogram: Chat
) -> None:
chat = await redis_client.get(chat_id=str(chat_pyrogram.id))
if chat:
return
async with db_helper.get_async_session() as session:
chat = await session.get(id=chat_pyrogram.id)
if not chat:
await tg_crud.create_new_chat(
session=session,
id=chat_pyrogram.id,
chat_type=chat_pyrogram.type.value,
title=chat_pyrogram.title,
)
await redis_client.set(name=str(chat_pyrogram.id), value=True)

View File

@ -0,0 +1,13 @@
from pyrogram.types import Message
from pyrogram.enums import ChatType
def check_message_condition(
message: Message,
) -> bool:
conditions = (
message.chat.type not in [ChatType.PRIVATE, ChatType.BOT],
bool(message.from_user),
bool(message.text),
)
return all(conditions)

View File

@ -0,0 +1,25 @@
from pyrogram import Client, filters
from pyrogram.handlers import MessageHandler
from src.core.tg_service.messages_handler import message_listener
from src.core.rabbitmq.connect import broker
api_id = 17718565
api_hash = "72f93973f4227415572f039d4f847082"
app = Client(
name="advatroniks",
api_id=api_id,
api_hash=api_hash,
)
app.add_handler(MessageHandler(
callback=message_listener,
filters=filters.all
))
# Запуск клиента
if __name__ == "__main__":
async with broker.start():
async with app.run():
app.run()