Data Flow

This document traces the path of a single vendor event from API response to SIEM dashboard.


End-to-end flow

Vendor API
    │
    ▼
Domain module fetches via HTTP (GET or POST)
    │
    ▼
JSON response parsed — events extracted from response body
    │
    ▼
Each event transformed:
  - Wrapped in vendor namespace (e.g., {"op": {...}})
  - Metadata added (integration name, event_type, timestamp)
  - Flattened where needed for SIEM compatibility
    │
    ▼
emit() writes one JSON line to stdout
    │
    ▼
Wazuh wodle manager captures stdout
    │
    ▼
Decoder matches on program name → extracts JSON
    │
    ▼
Rules evaluate extracted fields → assign rule ID, level, description
    │
    ▼
Event indexed in OpenSearch with all fields under data.*
    │
    ▼
Dashboard visualizations query data.{namespace}.*

Stage details

1. API fetch

The domain module constructs and executes the API request. Key behaviors:

2. Event transformation

Each raw vendor event is transformed into the standard emission format:

{
  "integration": "vendorname",
  "vendor_namespace": {
    "event_type": "signin_attempt",
    "field_a": "value",
    "field_b": {
      "nested_field": "preserved as-is"
    }
  }
}

Transformation rules:

3. Emission

The emit() function in utils handles output:

def emit(event: dict) -> None:
    """Write a single JSON event to stdout (one line, no trailing newline issues)."""
    line = json.dumps(event, separators=(",", ":"))
    sys.stdout.write(line + "\n")
    sys.stdout.flush()

Critical constraints:

4. Wazuh decoder

The decoder registers the integration’s program name and activates JSON decoding:

<decoder name="json_decoder">
  <parent>vendorname</parent>
  <plugin_decoder>JSON_Decoder</plugin_decoder>
</decoder>

This tells Wazuh to parse the stdout line as JSON and make all fields available under data.* for rule matching.

5. Wazuh rules

Rules match on decoded fields to assign severity, description, and grouping:

<rule id="100801" level="5">
  <decoded_as>vendorname</decoded_as>
  <field name="integration">vendorname</field>
  <field name="vn.event_type">signin_attempt</field>
  <description>Vendor: sign-in attempt by $(vn.actor.email)</description>
  <group>vendorname,authentication,</group>
</rule>

Rules are the primary filtering and classification layer. The wodle emits everything; rules decide what matters.

6. OpenSearch indexing

Once a rule matches, the event is indexed in OpenSearch. Fields appear under data.*:

Dashboard visualizations query these fields directly. The namespace prefix means vendor fields never collide with Wazuh’s own indexed fields.


State management flow

Start of run
    │
    ▼
load_state(state_file_path)
  → Returns dict (empty dict if file missing or first run)
    │
    ▼
Pass relevant cursor/timestamp to each domain module
    │
    ▼
Module fetches from cursor position
  → Returns updated cursor after processing
    │
    ▼
Orchestrator collects updated cursors from all modules
    │
    ▼
save_state(state_file_path, updated_state)
  → Writes to temp file first
  → os.replace() atomically swaps temp → state file
    │
    ▼
End of run — state is durable, consistent, and crash-safe

The atomic write pattern ensures that if the process is killed at any point during the write, the previous state file remains intact. The next run resumes from the last successful checkpoint.


Credential flow

run.sh sets env vars (paths, non-secret config)
    │
    ▼
{vendor}_utils.py credential_chain():
    │
    ├─► Check $CREDENTIALS_DIRECTORY/{key}     [systemd — most secure]
    │     Found? → return value
    │
    ├─► Check secrets file (KEY=VALUE format)   [.secrets file — recommended]
    │     Found? → return value
    │
    └─► Check $ENV_VAR                          [environment — least secure]
          Found? → return value
          Not found? → raise error (credential is required)

The chain is evaluated per credential key, not globally. This means one credential can come from systemd while another comes from the secrets file — useful in migration scenarios.


Error event flow

When something goes wrong, the integration emits a structured error event through the same stdout pipeline:

{
  "integration": "vendorname",
  "vn": {
    "event_type": "error",
    "error_source": "siem_api",
    "error_message": "HTTP 401: invalid credentials",
    "error_code": 401
  }
}

This error event flows through the same decoder → rules → OpenSearch pipeline. A dedicated rule matches error events at a higher severity level, enabling SIEM alerts on integration health issues. The operator sees integration failures in the same dashboard they use for vendor events — no need to SSH into the host and check logs.