Очередь команд (2026-01-16)
Мы сериализуем входящие запуски автоответов (все каналы) через небольшую внутрипроцессную очередь, чтобы предотвратить столкновения нескольких запусков агентов, при этом сохраняя безопасный параллелизм между сеансами.Зачем
- Запуски автоответов могут быть дорогими (вызовы LLM) и конфликтовать, когда несколько входящих сообщений приходят почти одновременно.
- Сериализация предотвращает конкуренцию за общие ресурсы (файлы сеансов, логи, stdin CLI) и снижает вероятность срабатывания ограничений скорости у апстрима.
Как это работает
- FIFO-очередь с учётом полос (lane-aware) обрабатывает каждую полосу с настраиваемым лимитом параллелизма (по умолчанию 1 для ненастроенных полос; main — 4, subagent — 8).
runEmbeddedPiAgentставит в очередь по ключу сеанса (полосаsession:<key>), гарантируя, что на сеанс приходится только один активный запуск.- Затем каждый запуск сеанса ставится в глобальную полосу (по умолчанию
main), так что общий параллелизм ограничиваетсяagents.defaults.maxConcurrent. - При включённом подробном логировании поставленные в очередь запуски выводят краткое уведомление, если ожидание перед стартом превысило ~2 с.
- Индикаторы набора текста по-прежнему срабатывают сразу при постановке в очередь (когда это поддерживается каналом), поэтому пользовательский опыт не меняется, пока мы ждём своей очереди.
Режимы очереди (по каналам)
Входящие сообщения могут управлять текущим запуском, ждать следующего хода или делать и то и другое:steer: немедленно внедрить в текущий запуск (отменяет ожидающие вызовы инструментов после следующей границы инструмента). Если нет стриминга, откатывается к followup.followup: поставить в очередь на следующий ход агента после завершения текущего запуска.collect: объединить все поставленные в очередь сообщения в один followup-ход (по умолчанию). Если сообщения нацелены на разные каналы/треды, они обрабатываются по отдельности, чтобы сохранить маршрутизацию.steer-backlog(akasteer+backlog): управлять сейчас и сохранить сообщение для followup-хода.interrupt(legacy): прервать активный запуск для этого сеанса, затем выполнить самое новое сообщение.queue(legacy alias): то же, чтоsteer.
collect/steer, если вам нужен
один ответ на каждое входящее сообщение.
Отправьте /queue collect как отдельную команду (для сеанса) или установите messages.queue.byChannel.discord: "collect".
Значения по умолчанию (если не заданы в конфиге):
- Все поверхности →
collect
messages.queue:
Параметры очереди
Параметры применяются кfollowup, collect и steer-backlog (а также к steer, когда он откатывается к followup):
debounceMs: ожидать тишины перед запуском followup-хода (предотвращает «continue, continue»).cap: максимальное число сообщений в очереди на сеанс.drop: политика переполнения (old,new,summarize).
debounceMs: 1000, cap: 20, drop: summarize.
Переопределения для сеанса
- Отправьте
/queue <mode>как отдельную команду, чтобы сохранить режим для текущего сеанса. - Параметры можно комбинировать:
/queue collect debounce:2s cap:25 drop:summarize /queue defaultили/queue resetочищает переопределение сеанса.
Область действия и гарантии
- Применяется к запускам агентов автоответов во всех входящих каналах, использующих конвейер ответов шлюза (WhatsApp web, Telegram, Slack, Discord, Signal, iMessage, webchat и т. д.).
- Полоса по умолчанию (
main) является процессной для входящих сообщений и основных сигналов keepalive; установитеagents.defaults.maxConcurrent, чтобы разрешить параллельную обработку нескольких сеансов. - Могут существовать дополнительные полосы (например,
cron,subagent), чтобы фоновые задания выполнялись параллельно, не блокируя входящие ответы. - Полосы на уровне сеанса гарантируют, что только один запуск агента работает с данным сеансом в любой момент времени.
- Нет внешних зависимостей или фоновых потоков-воркеров; чистый TypeScript + promises.
Устранение неполадок
- Если команды выглядят зависшими, включите подробные логи и ищите строки «queued for …ms», чтобы подтвердить, что очередь обрабатывается.
- Если вам нужна глубина очереди, включите подробные логи и наблюдайте за строками таймингов очереди.