Skip to content

Pool - Using @pooled() Decorator

Decorator Overview

The @pooled() decorator wraps functions with automatic pool acquisition. When the pool is full, you can either raise an error or invoke a callback with the original function arguments.

Basic Decorator Usage

import asyncio

from py_cachify import init_cachify, pooled, CachifyPoolFullError


init_cachify()


@pooled(key='api-call-pool', max_size=3, raise_on_full=True)
async def call_external_api(user_id: str) -> dict:
    # Simulate API call
    await asyncio.sleep(1)
    return {'user_id': user_id, 'data': 'response'}


async def main() -> None:
    # This works - within pool limit
    result = await call_external_api('user-1')
    print(f'Result: {result}')


if __name__ == '__main__':
    asyncio.run(main())

The on_full Callback

When raise_on_full=False (the default), you can provide an on_full callback. This receives the exact same *args, **kwargs as the original function call.

This flexibility enables various fallback strategies:

Example 1: Rescheduling (Celery/Taskiq)

from py_cachify import init_cachify, pooled
from celery import shared_task


init_cachify()


def reschedule_task(*args, **kwargs):
    # Reschedule this task to run later
    process_user_task.delay(*args, **kwargs)
    return {'status': 'rescheduled', 'user_id': kwargs.get('user_id')}


@shared_task()
@pooled(key='user-processing-pool-{user_id}', max_size=2, on_full=reschedule_task)
def process_user_task(user_id: str) -> dict:
    # Process user data
    return {'status': 'completed', 'user_id': user_id}

Example 2: Returning Cached Data

import asyncio

from py_cachify import init_cachify, pooled, cached


init_cachify()


# Fallback returns stale cached data
def return_cached(*args, **kwargs):
    user_id = kwargs.get('user_id')
    # Return a sensible default or fetch from cache
    return {'user_id': user_id, 'data': 'stale-cache', 'fresh': False}


@pooled(key='expensive-query-pool', max_size=5, on_full=return_cached)
@cached(key='expensive-query-{user_id}', ttl=60)
async def expensive_query(user_id: str) -> dict:
    await asyncio.sleep(2)  # Simulate slow query
    return {'user_id': user_id, 'data': 'fresh-data', 'fresh': True}


async def main() -> None:
    result = await expensive_query(user_id='123')
    print(f'Result: {result}')


if __name__ == '__main__':
    asyncio.run(main())

Example 3: Logging and Dropping

import asyncio
import logging

from py_cachify import init_cachify, pooled


init_cachify()
logger = logging.getLogger(__name__)


def log_and_skip(*args, **kwargs):
    user_id = kwargs.get('user_id', 'unknown')
    logger.warning(f'Pool full - dropping task for user {user_id}')
    return None


@pooled(key='notification-pool', max_size=10, on_full=log_and_skip)
async def send_notification(user_id: str, message: str) -> dict:
    await asyncio.sleep(0.5)
    return {'sent': True, 'user_id': user_id}

Dynamic Keys with Format Strings

Like @cached() and @lock(), @pooled() supports dynamic keys:

from py_cachify import init_cachify, pooled


init_cachify()


@pooled(key='user-pool-{user_id}', max_size=2, on_full=lambda **kw: None)
async def process_user(user_id: str) -> dict:
    return {'user_id': user_id}


# Each user_id gets its own pool of size 2
async def main() -> None:
    # These use different pools (user-pool-1, user-pool-2)
    await process_user(user_id='1')
    await process_user(user_id='2')

Checking Pool Size on Decorated Functions

The @pooled() decorator attaches a size() method to the wrapped function:

import asyncio

from py_cachify import init_cachify, pooled


init_cachify()


@pooled(key='checkable-pool-{user_id}', max_size=3)
async def do_work(user_id: str) -> str:
    await asyncio.sleep(1)
    return f'work-done-{user_id}'


async def main() -> None:
    # Check pool occupancy for user_id='123'
    occupancy = await do_work.size(user_id='123')
    print(f'Pool occupancy for user 123: {occupancy}')


if __name__ == '__main__':
    asyncio.run(main())

The size(*args, **kwargs) method uses the same key formatting as the decorator, so you check the specific pool instance that would be used for those arguments.

Synchronous Usage

@pooled() works with synchronous functions too:

from py_cachify import init_cachify, pooled


init_cachify()


@pooled(key='sync-pool', max_size=2, raise_on_full=True)
def sync_work(task_id: str) -> str:
    return f'completed-{task_id}'


# Usage
def main():
    try:
        result = sync_work('task-1')
        print(f'Result: {result}')
    except Exception as e:
        print(f'Failed: {e}')


if __name__ == '__main__':
    main()

Conclusion

The @pooled() decorator integrates pool management into your function definitions. The on_full callback receives the same arguments as your function, enabling flexible responses: rescheduling, returning cached data, logging, or any custom logic your application requires.

What's Next

The full API reference for pool() and @pooled() is available here.