This commit is contained in:
harold 2025-05-05 17:04:50 +05:00
parent 7f59b35d62
commit 7bc7e5b09f
2 changed files with 26 additions and 25 deletions

View File

@ -31,10 +31,9 @@ async def message_listener(client: Client, message: Message):
text=message.text, text=message.text,
message_time=message.date, message_time=message.date,
) )
async with lock: await check_chunk_state_and_publish(
await check_chunk_state_and_publish( data=DATA,
data=DATA, message_schema=message_schema,
message_schema=message_schema, chat_id=message.chat.id,
chat_id=message.chat.id, )
)

View File

@ -6,6 +6,7 @@ from src.core.database import TgChat, User
from src.core.rabbitmq.connect import message_handler_publisher from src.core.rabbitmq.connect import message_handler_publisher
from src.core.redis_helper.redis_connect import redis_client from src.core.redis_helper.redis_connect import redis_client
from src.core.tg_service.constants import MESSAGE_CHANG_SIZE from src.core.tg_service.constants import MESSAGE_CHANG_SIZE
from src.core.tg_service.messages_handler import lock
from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema, MessageFromChatSchema from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema, MessageFromChatSchema
from src.core.database.connect import db_helper from src.core.database.connect import db_helper
from src.core.tg_service import crud as tg_crud from src.core.tg_service import crud as tg_crud
@ -65,27 +66,28 @@ async def check_chunk_state_and_publish(
): ):
messages_chunk = data.get(chat_id) messages_chunk = data.get(chat_id)
if messages_chunk is None: async with lock:
data[chat_id] = [message_schema] if messages_chunk is None:
data[chat_id] = [message_schema]
elif len(messages_chunk) == MESSAGE_CHANG_SIZE: elif len(messages_chunk) == MESSAGE_CHANG_SIZE:
slice_id = uuid.uuid4() slice_id = uuid.uuid4()
del data[chat_id] del data[chat_id]
async with db_helper.get_async_session_not_closed() as session: async with db_helper.get_async_session_not_closed() as session:
await tg_crud.bulk_insert_messages( await tg_crud.bulk_insert_messages(
messages=messages_chunk, messages=messages_chunk,
session=session, session=session,
slice_id=slice_id slice_id=slice_id
)
await message_handler_publisher.publish(
message=MessagesForSendToWorkersSchema(
messages=messages_chunk,
slice_id=slice_id
)
) )
await message_handler_publisher.publish( data[chat_id] = [message_schema]
message=MessagesForSendToWorkersSchema(
messages=messages_chunk,
slice_id=slice_id
)
)
data[chat_id] = [message_schema]
else: else:
data[chat_id].append(message_schema) data[chat_id].append(message_schema)