4.1.2. Run Lifecycle
Production Run — End-to-End
A production run involves every component of the platform. This section follows the exact sequence of operations.
Phase 1 — Frontend Prepares the Payload
When the user clicks Launch, the frontend performs four operations before any network call:
1. Script bundling. The bundler reads main.py and recursively inlines every from X import Y where X is in the addon/ folder or where the filename starts with ctx. The result is a single Python string with all user code concatenated. Imports from standard libraries or pip packages are left untouched. See Code Bundling and Execution for the complete bundling rules.
main.py
├── from ctx_agent_context import AgentContext → inlined
├── from ctx_job_context import JobContext → inlined
├── from ctx_script import ctx_script → inlined
├── from addon.my_modifier import modifier → inlined
└── from celery.signals import ... → left as-is (stdlib/pip)
2. Environment substitution. Every string in config_template.json that starts with $env_ is replaced by the corresponding value from the config tune file (config_local.json or config_distant.json). See JSON Configuration — Environment Substitution for the full mechanism.
3. UUID and code injection. The frontend generates a run_id (UUID v4) and injects both the run_id and the bundled Python code into the config:
{
"run_message": { "run_id": "a1b2c3d4-..." },
"default_agent_message": { "code": "...bundled python...", "run_id": "a1b2c3d4-..." },
"default_job_message": { "run_id": "a1b2c3d4-..." }
}
4. FormData POST. The frontend sends a multipart/form-data request to the orchestrator at POST /bot/start with five parts:
| Part name | Content |
|---|---|
configtemplate |
The fully processed JSON config (with env values resolved, UUID and code injected) |
configtune |
The raw config tune JSON |
data_job |
CSV file with per-job overrides |
data_agent |
CSV file with per-agent overrides |
requirements_txt |
Optional — extra pip dependencies |
Phase 2 — Orchestrator Processes the Request
The orchestrator’s POST /bot/start handler executes five steps synchronously:
Step 1 — Parse inputs.
configtemplate_content = json.loads(configtemplate.file.read())
list_of_json_output_job = parse_csv_to_list(data_job)
list_of_json_output_agent = parse_csv_to_list(data_agent)
CSV parsing deserializes each row into a dict, recursively parsing any JSON-shaped cell values. Each row is tagged with its index:
# Row 0 of data_agent.csv → {"proxy_location": "US", "targetted_agent": 0}
# Row 1 of data_agent.csv → {"proxy_location": "FR", "targetted_agent": 1}
Step 2 — Deep merge. For each agent index i in [0, number_of_agents), the orchestrator deep-merges the CSV override row onto a copy of default_agent_message. The same logic applies to job messages. See Configuration Pipeline — CSV Deep Merge for the algorithm details.
Step 3 — Push to Redis. Each consolidated agent message (which now contains the bundled code, the run_id, and all config) is serialized to JSON and stored in Redis:
redis.set(f"{run_id}{agent_index}", json.dumps(agent_message))
# Key example: "a1b2c3d4-...0", "a1b2c3d4-...1", etc.
Step 4 — Create Kubernetes Job (production mode only). The orchestrator uses the kubernetes-client to create a batch/v1 Job:
apiVersion: batch/v1
kind: Job
metadata:
name: armada-agent-{run_id}
spec:
completionMode: Indexed
completions: N # one per agent
parallelism: N # all at once
ttlSecondsAfterFinished: 1000000
template:
spec:
containers:
- name: armada-agent
image: {hub}/armada-agent:{version}
env:
- name: RUN_ID
value: "{run_id}"
- name: POD_INDEX
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['batch.kubernetes.io/job-completion-index']"
- name: SQL_SERVER_*
valueFrom:
secretKeyRef: armada-sql-server-secret
# ... Redis, RabbitMQ, service URLs
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: ScheduleAnyway
Key points:
completionMode: Indexedgives each pod a unique index viabatch.kubernetes.io/job-completion-indexPOD_INDEXis derived from this annotation- Topology spread distributes pods across nodes
imagePullPolicyisNeveron Minikube,Alwaysin production
Step 5 — Dispatch jobs via Celery. For each consolidated job message, the orchestrator sends a Celery task to RabbitMQ:
celery_app.send_task(
"tasks.consume_message",
args=[job_message],
queue=run_id
)
A background thread then monitors the queue every 15 seconds. When the queue is empty and no tasks are active, it shuts down all workers listening on that queue.
Phase 3 — Agent Pod Starts
Each agent pod runs the same image (armada-agent). The entrypoint is entrypoint.sh:
# 1. Install extra pip dependencies if provided
if [ -n "$REQUIREMENTS_TXT" ]; then
echo "$REQUIREMENTS_TXT" | base64 -d > /tmp/requirements.txt
pip install -r /tmp/requirements.txt
fi
# 2. Start Celery worker
exec python -m celery -A main worker \
--queues="$RUN_ID" \
--concurrency=1 \
-n worker"$POD_INDEX" \
--prefetch-multiplier=1
celery -A main imports main.py, which does three things:
# services/agent/main.py
agent_message = load_agent_message() # Fetch from Redis
app = Celery('celery_app', broker=RABBITMQ_URL)
exec(agent_message["code"], {'app': app, 'agent_message': agent_message})
The exec() call loads the project’s bundled main.py into the agent process. From this point, the project’s code takes over and defines Celery signals and tasks.
Phase 4 — Worker Initialization
The project’s main.py (now running inside exec()) registers three things:
@worker_process_init.connect
def init_worker(sender, **kwargs):
# Runs once per worker process
event_loop = asyncio.new_event_loop()
agent_ctx = await AgentContext(agent_message).__aenter__()
# → ProxyManager, FingerprintManager, DatabaseConnector
# → Screen (Xvfb), FantomasNoDriver (Chrome)
@worker_process_shutdown.connect
def shutdown_worker(sender, **kwargs):
# Runs when the worker process terminates
agent_ctx.__aexit__(None, None, None)
# → browser.stop(), screen.stop_screen()
@app.task(name='tasks.consume_message', queue=agent_message["run_id"])
def run_job(job_message):
# Runs for each job consumed from RabbitMQ
event_loop.run_until_complete(process_message(job_message, agent_ctx))
The AgentContext initializes heavy resources once — browser, display server, proxy, database connection. These persist across all jobs processed by this agent. See Python Files for the AgentContext and JobContext class reference.
Phase 5 — Job Execution
For each job message consumed from RabbitMQ:
async def process_message(job_message, agent_ctx):
async with JobContext(job_message) as job_ctx:
await ctx_script(job_ctx, agent_ctx)
JobContext.__aenter__ creates:
- A new
job_uuid(UUID v4) - A
MonitoringClientthat registers the run and job with the backend API (POST /api/runs/,POST /api/jobs/) - An
Identityobject (fake person via Fantomas)
ctx_script is the user’s automation function (see Python Files — ctx_script.py). It has access to:
agent_ctx.browser— Chrome browser instanceagent_ctx.proxy_manager— local mitmproxy with optional upstreamagent_ctx.fingerprint_manager— fingerprint forgingagent_ctx.database— SQL Server connectorjob_ctx.monitoring_client— event reportingjob_ctx.identity— generated identity
Phase 6 — Monitoring and Real-Time Updates
Throughout execution, the agent reports events to the backend:
Agent → POST /api/events/ → Backend → WebSocket broadcast → Frontend
Agent → PATCH /api/jobs/status → Backend → WebSocket broadcast → Frontend
The backend API stores every mutation in SQL Server and immediately broadcasts it to all connected WebSocket clients. The frontend’s monitor panel updates in real time. See the Monitoring Client guide for the user-facing reporting API.
Phase 7 — Shutdown
When the orchestrator’s monitoring thread detects an empty queue with no active tasks:
- It calls
celery_app.control.shutdown(destination=[worker])for each worker on the run’s queue - Celery triggers the
worker_process_shutdownsignal AgentContext.__aexit__stops the browser and virtual display- The pod terminates
- Kubernetes garbage-collects the pod after
ttlSecondsAfterFinished(approximately 11.5 days)
Local Workbench Run
The workbench (services/project/workbench/run_workbench.py) replicates the same execution flow without Kubernetes or Redis. It synthesizes agent_message and job_message from local config files, then calls init_worker, run_job, and shutdown_worker directly as functions instead of relying on Celery signals and RabbitMQ message delivery.
For the complete workbench synthesis pipeline, see Workbench Mode under the hood.