diff --git a/telegram-application/src/core/tg_service/messages_handler.py b/telegram-application/src/core/tg_service/messages_handler.py index c689744..ccff0b5 100644 --- a/telegram-application/src/core/tg_service/messages_handler.py +++ b/telegram-application/src/core/tg_service/messages_handler.py @@ -31,10 +31,9 @@ async def message_listener(client: Client, message: Message): text=message.text, message_time=message.date, ) - async with lock: - await check_chunk_state_and_publish( - data=DATA, - message_schema=message_schema, - chat_id=message.chat.id, - ) + await check_chunk_state_and_publish( + data=DATA, + message_schema=message_schema, + chat_id=message.chat.id, + ) diff --git a/telegram-application/src/core/tg_service/service.py b/telegram-application/src/core/tg_service/service.py index 13e5c59..96bb5b5 100644 --- a/telegram-application/src/core/tg_service/service.py +++ b/telegram-application/src/core/tg_service/service.py @@ -6,6 +6,7 @@ from src.core.database import TgChat, User from src.core.rabbitmq.connect import message_handler_publisher from src.core.redis_helper.redis_connect import redis_client from src.core.tg_service.constants import MESSAGE_CHANG_SIZE +from src.core.tg_service.messages_handler import lock from src.core.tg_service.schemas import UserFromMessageSchema, MessagesForSendToWorkersSchema, MessageFromChatSchema from src.core.database.connect import db_helper from src.core.tg_service import crud as tg_crud @@ -65,27 +66,28 @@ async def check_chunk_state_and_publish( ): messages_chunk = data.get(chat_id) - if messages_chunk is None: - data[chat_id] = [message_schema] + async with lock: + if messages_chunk is None: + data[chat_id] = [message_schema] - elif len(messages_chunk) == MESSAGE_CHANG_SIZE: - slice_id = uuid.uuid4() - del data[chat_id] - async with db_helper.get_async_session_not_closed() as session: - await tg_crud.bulk_insert_messages( - messages=messages_chunk, - session=session, - slice_id=slice_id + elif len(messages_chunk) == MESSAGE_CHANG_SIZE: + slice_id = uuid.uuid4() + del data[chat_id] + async with db_helper.get_async_session_not_closed() as session: + await tg_crud.bulk_insert_messages( + messages=messages_chunk, + session=session, + slice_id=slice_id + ) + await message_handler_publisher.publish( + message=MessagesForSendToWorkersSchema( + messages=messages_chunk, + slice_id=slice_id + ) ) - await message_handler_publisher.publish( - message=MessagesForSendToWorkersSchema( - messages=messages_chunk, - slice_id=slice_id - ) - ) - data[chat_id] = [message_schema] + data[chat_id] = [message_schema] - else: - data[chat_id].append(message_schema) + else: + data[chat_id].append(message_schema)