Skip to content

Commit a535512

Browse files
authored
feat: Add StatusMessageWatcher (#407)
### Description Add the option to log status and status messages of another Actor run. If such Actor run is also redirecting logs from another Actor run, then those logs can propagate all the way to the top through the `StreamedLog` - deep redirection. If the Actor run from which we are redirecting status messages is not redirecting logs from its children, then the log and status redirection remains shallow - only from that Actor run. ### Issues - Closes: #404
1 parent e4368e7 commit a535512

File tree

5 files changed

+397
-67
lines changed

5 files changed

+397
-67
lines changed

src/apify_client/_logging.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,5 +164,5 @@ def format(self, record: logging.LogRecord) -> str:
164164
Returns:
165165
Formated log message.
166166
"""
167-
formated_logger_name = f'{Fore.CYAN}[{record.name}]{Style.RESET_ALL} '
168-
return f'{formated_logger_name}-> {record.msg}'
167+
formated_logger_name = f'{Fore.CYAN}[{record.name}]{Style.RESET_ALL}'
168+
return f'{formated_logger_name} -> {record.msg}'

src/apify_client/clients/resource_clients/actor.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,8 @@ def call(
317317
waits indefinitely.
318318
logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined
319319
default logger will be used. Setting `None` will disable any log propagation. Passing custom logger
320-
will redirect logs to the provided logger.
320+
will redirect logs to the provided logger. The logger is also used to capture status and status message
321+
of the other Actor run.
321322
322323
Returns:
323324
The run object.
@@ -336,12 +337,11 @@ def call(
336337
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
337338

338339
run_client = self.root_client.run(run_id=started_run['id'])
340+
339341
if logger == 'default':
340-
log_context = run_client.get_streamed_log()
341-
else:
342-
log_context = run_client.get_streamed_log(to_logger=logger)
342+
logger = None
343343

344-
with log_context:
344+
with run_client.get_status_message_watcher(to_logger=logger), run_client.get_streamed_log(to_logger=logger):
345345
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
346346

347347
def build(
@@ -722,7 +722,8 @@ async def call(
722722
waits indefinitely.
723723
logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined
724724
default logger will be used. Setting `None` will disable any log propagation. Passing custom logger
725-
will redirect logs to the provided logger.
725+
will redirect logs to the provided logger. The logger is also used to capture status and status message
726+
of the other Actor run.
726727
727728
Returns:
728729
The run object.
@@ -742,12 +743,14 @@ async def call(
742743
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
743744

744745
run_client = self.root_client.run(run_id=started_run['id'])
746+
745747
if logger == 'default':
746-
log_context = await run_client.get_streamed_log()
747-
else:
748-
log_context = await run_client.get_streamed_log(to_logger=logger)
748+
logger = None
749+
750+
status_redirector = await run_client.get_status_message_watcher(to_logger=logger)
751+
streamed_log = await run_client.get_streamed_log(to_logger=logger)
749752

750-
async with log_context:
753+
async with status_redirector, streamed_log:
751754
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
752755

753756
async def build(

src/apify_client/clients/resource_clients/log.py

Lines changed: 171 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
import logging
55
import re
66
import threading
7+
import time
78
from asyncio import Task
89
from contextlib import asynccontextmanager, contextmanager
9-
from datetime import datetime, timezone
10+
from datetime import datetime, timedelta, timezone
1011
from threading import Thread
1112
from typing import TYPE_CHECKING, Any, cast
1213

@@ -23,6 +24,8 @@
2324
import httpx
2425
from typing_extensions import Self
2526

27+
from apify_client.clients import RunClient, RunClientAsync
28+
2629

2730
class LogClient(ResourceClient):
2831
"""Sub-client for manipulating logs."""
@@ -228,9 +231,9 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non
228231
logs for long-running actors in stand-by.
229232
230233
"""
231-
self._to_logger = to_logger
232234
if self._force_propagate:
233235
to_logger.propagate = True
236+
self._to_logger = to_logger
234237
self._stream_buffer = list[bytes]()
235238
self._split_marker = re.compile(rb'(?:\n|^)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)')
236239
self._relevancy_time_limit: datetime | None = None if from_start else datetime.now(tz=timezone.utc)
@@ -350,13 +353,16 @@ def start(self) -> Task:
350353
self._streaming_task = asyncio.create_task(self._stream_log())
351354
return self._streaming_task
352355

353-
def stop(self) -> None:
356+
async def stop(self) -> None:
354357
"""Stop the streaming task."""
355358
if not self._streaming_task:
356359
raise RuntimeError('Streaming task is not active')
357360

358361
self._streaming_task.cancel()
359-
self._streaming_task = None
362+
try:
363+
await self._streaming_task
364+
except asyncio.CancelledError:
365+
self._streaming_task = None
360366

361367
async def __aenter__(self) -> Self:
362368
"""Start the streaming task within the context. Exiting the context will cancel the streaming task."""
@@ -367,7 +373,7 @@ async def __aexit__(
367373
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
368374
) -> None:
369375
"""Cancel the streaming task."""
370-
self.stop()
376+
await self.stop()
371377

372378
async def _stream_log(self) -> None:
373379
async with self._log_client.stream(raw=True) as log_stream:
@@ -378,3 +384,163 @@ async def _stream_log(self) -> None:
378384

379385
# If the stream is finished, then the last part will be also processed.
380386
self._log_buffer_content(include_last_part=True)
387+
388+
389+
class StatusMessageWatcher:
390+
"""Utility class for logging status messages from another Actor run.
391+
392+
Status message is logged at fixed time intervals, and there is no guarantee that all messages will be logged,
393+
especially in cases of frequent status message changes.
394+
"""
395+
396+
_force_propagate = False
397+
# This is final sleep time to try to get the last status and status message of finished Actor run.
398+
# The status and status message can get set on the Actor run with a delay. Sleep time does not guarantee that the
399+
# final message will be captured, but increases the chances of that.
400+
_final_sleep_time_s = 6
401+
402+
def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)) -> None:
403+
"""Initialize `StatusMessageWatcher`.
404+
405+
Args:
406+
to_logger: The logger to which the status message will be redirected.
407+
check_period: The period with which the status message will be polled.
408+
"""
409+
if self._force_propagate:
410+
to_logger.propagate = True
411+
self._to_logger = to_logger
412+
self._check_period = check_period.total_seconds()
413+
self._last_status_message = ''
414+
415+
def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
416+
"""Get relevant run data, log them if changed and return `True` if more data is expected.
417+
418+
Args:
419+
run_data: The dictionary that contains the run data.
420+
421+
Returns:
422+
`True` if more data is expected, `False` otherwise.
423+
"""
424+
if run_data is not None:
425+
status = run_data.get('status', 'Unknown status')
426+
status_message = run_data.get('statusMessage', '')
427+
new_status_message = f'Status: {status}, Message: {status_message}'
428+
429+
if new_status_message != self._last_status_message:
430+
self._last_status_message = new_status_message
431+
self._to_logger.info(new_status_message)
432+
433+
return not (run_data.get('isStatusMessageTerminal', False))
434+
return True
435+
436+
437+
class StatusMessageWatcherAsync(StatusMessageWatcher):
438+
"""Async variant of `StatusMessageWatcher` that is logging in task."""
439+
440+
def __init__(
441+
self, *, run_client: RunClientAsync, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
442+
) -> None:
443+
"""Initialize `StatusMessageWatcherAsync`.
444+
445+
Args:
446+
run_client: The client for run that will be used to get a status and message.
447+
to_logger: The logger to which the status message will be redirected.
448+
check_period: The period with which the status message will be polled.
449+
"""
450+
super().__init__(to_logger=to_logger, check_period=check_period)
451+
self._run_client = run_client
452+
self._logging_task: Task | None = None
453+
454+
def start(self) -> Task:
455+
"""Start the logging task. The caller has to handle any cleanup by manually calling the `stop` method."""
456+
if self._logging_task:
457+
raise RuntimeError('Logging task already active')
458+
self._logging_task = asyncio.create_task(self._log_changed_status_message())
459+
return self._logging_task
460+
461+
async def stop(self) -> None:
462+
"""Stop the logging task."""
463+
if not self._logging_task:
464+
raise RuntimeError('Logging task is not active')
465+
466+
self._logging_task.cancel()
467+
try:
468+
await self._logging_task
469+
except asyncio.CancelledError:
470+
self._logging_task = None
471+
472+
async def __aenter__(self) -> Self:
473+
"""Start the logging task within the context. Exiting the context will cancel the logging task."""
474+
self.start()
475+
return self
476+
477+
async def __aexit__(
478+
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
479+
) -> None:
480+
"""Cancel the logging task."""
481+
await asyncio.sleep(self._final_sleep_time_s)
482+
await self.stop()
483+
484+
async def _log_changed_status_message(self) -> None:
485+
while True:
486+
run_data = await self._run_client.get()
487+
if not self._log_run_data(run_data):
488+
break
489+
await asyncio.sleep(self._check_period)
490+
491+
492+
class StatusMessageWatcherSync(StatusMessageWatcher):
493+
"""Sync variant of `StatusMessageWatcher` that is logging in thread."""
494+
495+
def __init__(
496+
self, *, run_client: RunClient, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
497+
) -> None:
498+
"""Initialize `StatusMessageWatcherSync`.
499+
500+
Args:
501+
run_client: The client for run that will be used to get a status and message.
502+
to_logger: The logger to which the status message will be redirected.
503+
check_period: The period with which the status message will be polled.
504+
"""
505+
super().__init__(to_logger=to_logger, check_period=check_period)
506+
self._run_client = run_client
507+
self._logging_thread: Thread | None = None
508+
self._stop_logging = False
509+
510+
def start(self) -> Thread:
511+
"""Start the logging thread. The caller has to handle any cleanup by manually calling the `stop` method."""
512+
if self._logging_thread:
513+
raise RuntimeError('Logging thread already active')
514+
self._stop_logging = False
515+
self._logging_thread = threading.Thread(target=self._log_changed_status_message)
516+
self._logging_thread.start()
517+
return self._logging_thread
518+
519+
def stop(self) -> None:
520+
"""Signal the _logging_thread thread to stop logging and wait for it to finish."""
521+
if not self._logging_thread:
522+
raise RuntimeError('Logging thread is not active')
523+
time.sleep(self._final_sleep_time_s)
524+
self._stop_logging = True
525+
self._logging_thread.join()
526+
self._logging_thread = None
527+
self._stop_logging = False
528+
529+
def __enter__(self) -> Self:
530+
"""Start the logging task within the context. Exiting the context will cancel the logging task."""
531+
self.start()
532+
return self
533+
534+
def __exit__(
535+
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
536+
) -> None:
537+
"""Cancel the logging task."""
538+
self.stop()
539+
540+
def _log_changed_status_message(self) -> None:
541+
while True:
542+
if not self._log_run_data(self._run_client.get()):
543+
break
544+
if self._stop_logging:
545+
break
546+
time.sleep(self._check_period)

0 commit comments

Comments
 (0)