mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
fix(subscriptions): retry sending msg when slack api fails to download (#40982)
This commit is contained in:
@@ -6,6 +6,7 @@ from django.conf import settings
|
||||
|
||||
import aiohttp
|
||||
import structlog
|
||||
from slack_sdk.errors import SlackApiError
|
||||
|
||||
from posthog.models.exported_asset import ExportedAsset
|
||||
from posthog.models.integration import Integration, SlackIntegration
|
||||
@@ -196,24 +197,30 @@ async def _send_slack_message_with_retry(client, max_retries: int = 3, **kwargs)
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
return await client.chat_postMessage(**kwargs)
|
||||
except TimeoutError:
|
||||
if attempt < max_retries - 1:
|
||||
wait_time = 2**attempt
|
||||
logger.warning(
|
||||
"_send_slack_message_with_retry.timeout_retrying",
|
||||
attempt=attempt + 1,
|
||||
max_retries=max_retries,
|
||||
wait_time=wait_time,
|
||||
channel=kwargs.get("channel"),
|
||||
is_thread=bool(kwargs.get("thread_ts")),
|
||||
exc_info=True,
|
||||
)
|
||||
await asyncio.sleep(wait_time)
|
||||
continue
|
||||
except (TimeoutError, SlackApiError) as e:
|
||||
if isinstance(e, SlackApiError):
|
||||
slack_error = e.response.get("error", "")
|
||||
if slack_error != "invalid_blocks":
|
||||
raise
|
||||
log_event = "_send_slack_message_with_retry.invalid_blocks_retrying"
|
||||
else:
|
||||
# Final attempt failed, re-raise
|
||||
log_event = "_send_slack_message_with_retry.timeout_retrying"
|
||||
|
||||
if attempt >= max_retries - 1:
|
||||
raise
|
||||
|
||||
logger.warning(
|
||||
log_event,
|
||||
attempt=attempt + 1,
|
||||
max_retries=max_retries,
|
||||
channel=kwargs.get("channel"),
|
||||
is_thread=bool(kwargs.get("thread_ts")),
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
wait_time = 2**attempt
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
|
||||
async def send_slack_message_with_integration_async(
|
||||
integration: Integration,
|
||||
@@ -234,6 +241,7 @@ async def send_slack_message_with_integration_async(
|
||||
blocks=message_data.blocks,
|
||||
text=message_data.title,
|
||||
)
|
||||
logger.info("send_slack_message_with_integration_async.main_message_sent", subscription_id=subscription.id)
|
||||
|
||||
thread_ts = message_res.get("ts")
|
||||
failed_thread_messages = []
|
||||
@@ -247,7 +255,8 @@ async def send_slack_message_with_integration_async(
|
||||
thread_ts=thread_ts,
|
||||
**thread_msg,
|
||||
)
|
||||
except TimeoutError:
|
||||
except Exception as e:
|
||||
# Thread message failed, continue with others
|
||||
logger.error(
|
||||
"send_slack_message_with_integration_async.slack_thread_message_failed_after_retries",
|
||||
subscription_id=subscription.id,
|
||||
@@ -255,6 +264,7 @@ async def send_slack_message_with_integration_async(
|
||||
thread_index=idx,
|
||||
total_thread_messages=len(message_data.thread_messages),
|
||||
thread_ts=thread_ts,
|
||||
error=str(e),
|
||||
exc_info=True,
|
||||
)
|
||||
failed_thread_messages.append(idx)
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
from freezegun import freeze_time
|
||||
from posthog.test.base import APIBaseTest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from unittest.mock import AsyncMock, MagicMock, call, patch
|
||||
|
||||
from posthog.models.dashboard import Dashboard
|
||||
from posthog.models.exported_asset import ExportedAsset
|
||||
@@ -349,3 +350,83 @@ class TestSlackSubscriptionsAsyncTasks(APIBaseTest):
|
||||
assert mock_async_client.chat_postMessage.call_count == 3
|
||||
assert result.is_complete_success
|
||||
assert result.main_message_sent
|
||||
|
||||
@patch("ee.tasks.subscriptions.slack_subscriptions.asyncio.sleep", new_callable=AsyncMock)
|
||||
def test_async_delivery_retry_on_invalid_blocks_failure(
|
||||
self, mock_sleep: AsyncMock, MockSlackIntegration: MagicMock
|
||||
) -> None:
|
||||
from slack_sdk.errors import SlackApiError
|
||||
from slack_sdk.web.async_slack_response import AsyncSlackResponse
|
||||
|
||||
mock_slack_integration = MagicMock()
|
||||
MockSlackIntegration.return_value = mock_slack_integration
|
||||
|
||||
mock_async_client = AsyncMock()
|
||||
mock_slack_integration.async_client = MagicMock(return_value=mock_async_client)
|
||||
|
||||
mock_response = {
|
||||
"ok": False,
|
||||
"error": "invalid_blocks",
|
||||
"errors": ["downloading image failed [json-pointer:/blocks/0/image_url]"],
|
||||
}
|
||||
slack_error = SlackApiError(
|
||||
"Error",
|
||||
AsyncSlackResponse(
|
||||
client=None, http_verb="POST", api_url="", req_args={}, data=mock_response, headers={}, status_code=200
|
||||
),
|
||||
)
|
||||
|
||||
mock_async_client.chat_postMessage.side_effect = [slack_error, slack_error, slack_error]
|
||||
assets = list(ExportedAsset.objects.filter(id=self.asset.id).select_related("insight"))
|
||||
|
||||
with pytest.raises(SlackApiError):
|
||||
asyncio.run(
|
||||
send_slack_message_with_integration_async(
|
||||
self.integration, self.subscription, assets, self.TOTAL_ASSET_COUNT
|
||||
)
|
||||
)
|
||||
|
||||
assert mock_async_client.chat_postMessage.call_count == 3
|
||||
mock_sleep.assert_has_awaits([call(1), call(2)])
|
||||
|
||||
@patch("ee.tasks.subscriptions.slack_subscriptions.asyncio.sleep", new_callable=AsyncMock)
|
||||
def test_async_delivery_retry_on_invalid_blocks_success(
|
||||
self, mock_sleep: AsyncMock, MockSlackIntegration: MagicMock
|
||||
) -> None:
|
||||
from slack_sdk.errors import SlackApiError
|
||||
from slack_sdk.web.async_slack_response import AsyncSlackResponse
|
||||
|
||||
mock_slack_integration = MagicMock()
|
||||
MockSlackIntegration.return_value = mock_slack_integration
|
||||
|
||||
mock_async_client = AsyncMock()
|
||||
mock_slack_integration.async_client = MagicMock(return_value=mock_async_client)
|
||||
|
||||
mock_response = {
|
||||
"ok": False,
|
||||
"error": "invalid_blocks",
|
||||
"errors": ["downloading image failed [json-pointer:/blocks/0/image_url]"],
|
||||
}
|
||||
slack_error = SlackApiError(
|
||||
"Error",
|
||||
AsyncSlackResponse(
|
||||
client=None, http_verb="POST", api_url="", req_args={}, data=mock_response, headers={}, status_code=200
|
||||
),
|
||||
)
|
||||
|
||||
mock_async_client.chat_postMessage.side_effect = [
|
||||
slack_error,
|
||||
{"ts": "1.234"},
|
||||
{"ts": "2.345"},
|
||||
]
|
||||
assets = list(ExportedAsset.objects.filter(id=self.asset.id).select_related("insight"))
|
||||
|
||||
result = asyncio.run(
|
||||
send_slack_message_with_integration_async(
|
||||
self.integration, self.subscription, assets, self.TOTAL_ASSET_COUNT
|
||||
)
|
||||
)
|
||||
|
||||
assert mock_async_client.chat_postMessage.call_count == 3
|
||||
assert result.is_complete_success
|
||||
mock_sleep.assert_awaited_once_with(1)
|
||||
|
||||
Reference in New Issue
Block a user