add worker release

This commit is contained in:
harold 2025-02-07 23:24:11 +05:00
parent 05b6060ccf
commit d3abb1f217
36 changed files with 1986 additions and 10 deletions

176
.gitignore vendored Normal file
View File

@ -0,0 +1,176 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
.env_prod
.env

8
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

8
.idea/misc.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="enabledOnReformat" value="true" />
<option name="sdkName" value="Python 3.12 (scrapper-tg-bot)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (scrapper-tg-bot)" project-jdk-type="Python SDK" />
</project>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

11
gemini_worker/.env_prod Normal file
View File

@ -0,0 +1,11 @@
APP__MODE=PROD
APP__POSTGRES__USER=postgres
APP__POSTGRES__DB_NAME=postgres
APP__POSTGRES__HOST=postgres
APP__POSTGRES__PORT=5432
APP__POSTGRES__PASSWORD=postgres
APP__RABBIT__URL=amqp://test:test@rabbitmq:5672/
APP__GEMINI__API_KEY=AIzaSyCxGFlj221gTBSMGoftD4NjMioU88jI2cQ

8
gemini_worker/.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

8
gemini_worker/.idea/gemini_worker.iml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.12 virtualenv at /home/harold/Documents/scrapper-tg-bot/gemini_worker/venv" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
gemini_worker/.idea/misc.xml generated Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 virtualenv at /home/harold/Documents/scrapper-tg-bot/gemini_worker/venv" project-jdk-type="Python SDK" />
</project>

8
gemini_worker/.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/gemini_worker.iml" filepath="$PROJECT_DIR$/.idea/gemini_worker.iml" />
</modules>
</component>
</project>

6
gemini_worker/.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>

1313
gemini_worker/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,21 @@
[project]
name = "gemini-worker"
version = "0.1.0"
description = ""
authors = [
{name = "harold",email = "tihon414@gmail.com"}
]
readme = "README.md"
requires-python = ">=3.11, <4.0"
dependencies = [
"faststream (>=0.5.34,<0.6.0)",
"aio-pika (>=9.5.4,<10.0.0)",
"google-genai (>=1.0.0,<2.0.0)",
"google-generativeai (>=0.8.4,<0.9.0)",
"pydantic-settings (>=2.7.1,<3.0.0)"
]
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

View File

@ -0,0 +1,23 @@
from enum import Enum
from pathlib import Path
PATH_TO_BASE_FOLDER = Path(__file__).resolve().parent.parent
LOG_DEFAULT_FORMAT = "[%(asctime)s.%(msecs)03d] %(module)10s:%(lineno)-3d %(levelname)-7s - %(message)s"
class EnvironmentEnum(str, Enum):
TEST = 'TEST'
PROD = 'PROD'
DEV = 'DEV'
DOCKER_TEST = 'DOCKER_TEST'
class EnvFileLocation:
TESTING = f'{PATH_TO_BASE_FOLDER}/.env_test'
PRODUCTION = f'{PATH_TO_BASE_FOLDER}/.env_prod'
DOCKER_TESTING = f'{PATH_TO_BASE_FOLDER}/.env_docker_test'
class PydanticEnvPrefixEnum(str, Enum):
APP = "APP__"

View File

@ -0,0 +1,3 @@
GEMINI_TOKENS_LIMIT: int = 10_000

View File

@ -0,0 +1,69 @@
ROLE = """
Ты специалист по поиску клиентов в компании, которая занимается разработкой любого софта
на заказ.
"""
ANALYTIC_PROMT = """
Ты получаешь json с такими полями
{
chats: [
"chat_id": integer
"messages": [
{
"user_id: integer,
"message_id": integer,
"text": string,
"date": datetime
}
]
]
}
chats - это список чатов.
messages - это срез диалога в чате телеграмма.
пользователи могут общаться на абсолютно разные темы.
Твоя задача:
Прочитать эти сообщения, понять тему текущего среза диалога.
И если ты поймешь, что мы можем какому то пользователю предложить свои услуги
например (написать интернет магазин, мобильное приложение или любой другой айти продукт)
то выведи в таком виде ответ.
ВАЖНО: Если ты уверен на 100 процентов, что они заинтересованы в подобных услугах и им
можно предложить, то верни.
Условно в нескольких чатах может быть несколько потенциальных клиентов, тогда вот так выведи
{
success: [
{
"user_id": integer,
"chat_id": integer,
"reason": string
}
]
}
поле reason: Кратко(до 100 симоволов) почему ты решил, что это потенциальный клиент.
Если ты хотя бы чуть чуть не уверен, то верни вот такую строку
{
success: null
}
ВАЖНО: Ты должен вернуть ТОЛЬКО JSON и не словом больше. Иначе я разорюсь.
без ```json ``` просто так без каких либо спецсимволов
"""
GEMINI_BASE_MESSAGE = [
{
"role": "user",
"parts": [
{"text": ROLE},
{"text": ANALYTIC_PROMT},
]
}
]

View File

@ -0,0 +1,23 @@
from pydantic import BaseModel, PositiveInt, NegativeInt
from src.schemas import MessageFromChatSchema, BaseModelWithSerializeDatetime
class ChatMessageSchema(BaseModelWithSerializeDatetime):
chat_id: int
messages: list[MessageFromChatSchema]
class FullRequestForGeminiSchema(BaseModelWithSerializeDatetime):
chats: list[ChatMessageSchema]
class SuccessChatFromAiSchema(BaseModelWithSerializeDatetime):
user_id: PositiveInt | None
chat_id: NegativeInt | None
reason: str | None
class ResponseFromGeminiSchema(BaseModelWithSerializeDatetime):
success: list[SuccessChatFromAiSchema] | None

View File

@ -0,0 +1,70 @@
import json
import google.generativeai as genai
from src.gemini_sdk.promt import GEMINI_BASE_MESSAGE
from src.gemini_sdk.schemas import FullRequestForGeminiSchema, ResponseFromGeminiSchema
from src.schemas import MessagesForSendToWorkersSchema
from src.settings.base import settings
class GoogleHelper:
def __init__(
self,
api_key: str,
model_name: str,
) -> None:
self.api_key = api_key
self.model = model_name
genai.configure(api_key=api_key)
self._model = genai.GenerativeModel(model_name=model_name)
@staticmethod
def _serialize_messages_to_promt(
chats: MessagesForSendToWorkersSchema,
) -> list[dict]:
messages_for_request = GEMINI_BASE_MESSAGE.copy()
text_for_request = json.dumps(chats.model_dump_json())
extend_message = {
"role": "user",
"parts": [
{
"text": text_for_request,
}
],
}
messages_for_request.append(extend_message)
return messages_for_request
@staticmethod
def _serialize_response_to_json(
response_text: str,
) -> ResponseFromGeminiSchema:
response = response_text.replace('\n', '')
response_as_dict = json.loads(response_text)
return ResponseFromGeminiSchema(**response_as_dict)
def create_request_ai(
self,
messages: MessagesForSendToWorkersSchema,
) -> ResponseFromGeminiSchema:
contents = self._serialize_messages_to_promt(messages)
response = self._model.generate_content(
contents=contents
)
return self._serialize_response_to_json(response.text)
gemini_helper = GoogleHelper(
api_key=settings.GEMINI.API_KEY,
model_name=settings.GEMINI.MODEL_NAME,
)

13
gemini_worker/src/main.py Normal file
View File

@ -0,0 +1,13 @@
from faststream import FastStream
from src.rmq.connect import broker, message_subscriber
from src.schemas import MessagesForSendToWorkersSchema
from src.service.rmq_msg_handler import handle_message_service
app = FastStream(broker)
@message_subscriber
async def handle_message(
message_schema: MessagesForSendToWorkersSchema
):
await handle_message_service(message_schema)

View File

View File

@ -0,0 +1,32 @@
from faststream.rabbit import RabbitBroker, RabbitExchange, ExchangeType, RabbitQueue
url = "amqp://test:test@localhost:9072/"
broker = RabbitBroker(
url=url
)
base_exchange = RabbitExchange(
name="base_exchange",
type=ExchangeType.DIRECT,
)
base_queue = RabbitQueue(
name="base_queue",
)
message_subscriber = broker.subscriber(
queue=base_queue,
exchange=base_exchange,
)
async def init_queue_and_publisher():
await broker.declare_exchange(base_exchange)
await broker.declare_queue(base_queue)

View File

@ -0,0 +1,35 @@
from pydantic import UUID4, PositiveInt, NegativeInt
from datetime import datetime
from pydantic import BaseModel, field_validator
class BaseConfig:
json_encoders = {
datetime: lambda v: v.isoformat(), # Преобразует datetime в строку ISO
UUID4: lambda v: str(v), # Преобразует UUID в строку
}
class BaseModelWithSerializeDatetime(BaseModel):
@field_validator("*")
def remove_tzinfo(cls, value):
if isinstance(value, datetime) and value.tzinfo is not None:
return value.replace(tzinfo=None)
return value
class Config(BaseConfig):
pass
class MessageFromChatSchema(BaseModelWithSerializeDatetime):
id: PositiveInt
user_id: PositiveInt
chat_id: NegativeInt
text: str
message_time: datetime
class MessagesForSendToWorkersSchema(BaseModelWithSerializeDatetime):
messages: list[MessageFromChatSchema]

View File

View File

@ -0,0 +1,19 @@
import json
from google.generativeai import GenerativeModel
from src.gemini_sdk.constants import GEMINI_TOKENS_LIMIT
from src.service.storage import MESSAGES_STORAGE_SCHEMA
def check_current_token_limit(
model: GenerativeModel
) -> bool:
raw_response = json.dumps(MESSAGES_STORAGE_SCHEMA.model_dump_json())
tokens_response_count = model.count_tokens(raw_response).total_tokens
print("tokens", model.count_tokens(raw_response).total_tokens)
if tokens_response_count >= GEMINI_TOKENS_LIMIT:
return False
return True

View File

@ -0,0 +1,32 @@
from time import sleep
from src.gemini_sdk.schemas import ChatMessageSchema
from src.gemini_sdk.service import gemini_helper
from src.schemas import MessagesForSendToWorkersSchema
from src.service.limter_checker import check_current_token_limit
from src.service.storage import MESSAGES_STORAGE_SCHEMA
async def handle_message_service(
message_from_rmq: MessagesForSendToWorkersSchema,
):
chat_message_schema = ChatMessageSchema(
chat_id=message_from_rmq.messages[0].chat_id,
messages=message_from_rmq.messages
)
MESSAGES_STORAGE_SCHEMA.chats.append(chat_message_schema)
if check_current_token_limit(
model=gemini_helper._model
):
return
else:
result = gemini_helper.create_request_ai(
messages=MESSAGES_STORAGE_SCHEMA
)
print(result)
sleep(60)
MESSAGES_STORAGE_SCHEMA.chats = []

View File

@ -0,0 +1,5 @@
from src.gemini_sdk.schemas import FullRequestForGeminiSchema
MESSAGES_STORAGE_SCHEMA = FullRequestForGeminiSchema(
chats=[]
)

View File

View File

@ -0,0 +1,29 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
from src.constants import PydanticEnvPrefixEnum, EnvFileLocation, EnvironmentEnum
from src.settings.database import DatabaseSettings
from src.settings.gemini import GeminiSettings
from src.settings.rabbitmq import RabbitmqSettings
class Settings(BaseSettings):
model_config = SettingsConfigDict(
case_sensitive=True,
env_nested_delimiter='__',
env_prefix=PydanticEnvPrefixEnum.APP.value,
env_file=(
EnvFileLocation.PRODUCTION,
# EnvFileLocation.TESTING,
)
)
MODE: EnvironmentEnum
POSTGRES: DatabaseSettings
GEMINI: GeminiSettings
RABBIT: RabbitmqSettings
settings = Settings() # type:ignore

View File

@ -0,0 +1,25 @@
from typing import Optional
from pydantic import PostgresDsn, BaseModel
class DatabaseSettings(BaseModel):
USER: str
PASSWORD: str
HOST: str
PORT: str
DB_NAME: str
ECHO_LOG: bool = False
@property
def async_connect_url(self) -> Optional[PostgresDsn]:
return PostgresDsn(
f"postgresql+asyncpg://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{self.DB_NAME}" # type:ignore
)
@property
def sync_connect_url(self) -> Optional[PostgresDsn]:
return PostgresDsn(
f"postgresql+psycopg2://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{self.DB_NAME}" # type:ignore
)

View File

@ -0,0 +1,7 @@
from pydantic import BaseModel
class GeminiSettings(BaseModel):
API_KEY: str
MODEL_NAME: str = "gemini-1.5-flash"

View File

@ -0,0 +1,7 @@
from typing import Annotated
from pydantic import BaseModel, AmqpDsn, AfterValidator
class RabbitmqSettings(BaseModel):
URL: Annotated[AmqpDsn, AfterValidator(str)]

View File

@ -9,10 +9,6 @@ from src.core.tg_service import utils as api_tg_utils
DATA: dict[int, list[MessageFromChatSchema]] = dict() DATA: dict[int, list[MessageFromChatSchema]] = dict()
async def some_publisher():
pass
async def message_listener(client: Client, message: Message): async def message_listener(client: Client, message: Message):
if api_tg_utils.check_message_condition(message): if api_tg_utils.check_message_condition(message):
await check_user_exists( await check_user_exists(

View File

@ -58,15 +58,11 @@ async def check_chat_exists(
async def check_chunk_state_and_publish( async def check_chunk_state_and_publish(
data: dict, data: dict[int, list[MessageFromChatSchema]],
chat_id: int, chat_id: int,
message_schema: MessageFromChatSchema, message_schema: MessageFromChatSchema,
) -> bool: ):
messages_chunk = data.get(chat_id) messages_chunk = data.get(chat_id)
if messages_chunk and len(messages_chunk) == MESSAGE_CHANG_SIZE:
await message_handler_publisher.publish(
message=MessagesForSendToWorkersSchema(messages=messages_chunk)
)
if messages_chunk is None: if messages_chunk is None:
data[chat_id] = [message_schema] data[chat_id] = [message_schema]