Phase 2: Building

Build in this order: utils first (foundation), then domain modules (vendor logic), then entry point (orchestration), then shell wrapper, then decoder/rules. Each layer builds on the one below it.


Step 1: Build utils ({vendor}_utils.py)

Start here because every other file depends on it. Implement these functions in order:

log(level, message, *args)

Write diagnostic output to stderr at configurable verbosity. Never stdout.

def log(level, msg, *args):
    if level <= DEBUG_LEVEL:
        text = msg.format(*args) if args else msg
        sys.stderr.write(f"[vendorname] {text}\n")
        sys.stderr.flush()

load_secrets_file(path)

Parse the KEY=VALUE secrets file. Handle comments, blank lines, optional quotes.

get_secret(key, env_var, secrets, credentials_dir)

Implement the three-tier credential chain. Log which source was used (at debug level 2), never log the value.

emit(event)

Write one JSON line to stdout. Use separators=(",", ":") for compact output. Always flush().

load_state(path) and save_state(path, state)

Load from JSON file (return empty dict if missing). Save via tempfile + os.replace().

HTTP function

Choose based on vendor auth:

Include timeout handling, HTTP error code inspection, and 429 retry logic with Retry-After header support.

Test utils independently

Before moving on, test each function:

# Test credential loading
python3 -c "from vendorname_utils import get_secret; print(get_secret(...))"

# Test HTTP (if you have credentials)
python3 -c "from vendorname_utils import http_get; print(http_get(test_url, headers))"

Step 2: Build domain modules

Implement one module at a time. Each module follows this internal structure:

Function signature

def fetch_events(credentials, cursor, config):
    """Fetch events from vendor API. Returns updated cursor."""

Request construction

Build the API request from the cursor/timestamp and config. This is where vendor-specific logic lives — URL construction, query parameters, POST body formatting.

Pagination loop

while has_more:
    response = http_function(url, headers, body)
    events = extract_events(response)
    
    for event in events:
        enriched = transform(event)
        emit(enriched)
    
    cursor = extract_cursor(response)
    has_more = check_more(response)

return cursor

Event transformation

Map vendor fields into the namespaced output format:

def transform(raw_event):
    return {
        "integration": "vendorname",
        "vn": {
            "event_type": determine_type(raw_event),
            # Map vendor fields explicitly — preserve nesting
            "actor": raw_event.get("actor"),
            "target": raw_event.get("target"),
            "action": raw_event.get("action"),
            "timestamp": raw_event.get("timestamp"),
            # ... map remaining vendor-specific fields
        }
    }

Key decisions during transformation:

Error handling

Wrap the entire fetch in a try/except. On failure, emit an error event and return the unchanged cursor (so the next run retries from the same position).


Step 3: Build the entry point ({vendor}.py)

The orchestrator follows a rigid template:

def main():
    args = parse_args()                    # argparse with --source, --all, --debug, --lookback
    config = load_config(args)             # Merge env vars + CLI overrides
    secrets = load_secrets_file(config)    # Load .secrets
    credentials = build_credentials(secrets, config)  # Apply credential chain
    state = load_state(config.state_file)  # Load persisted cursors
    
    # Call each module
    if should_run("module_a", config):
        try:
            state["module_a_cursor"] = fetch_module_a(credentials, state.get("module_a_cursor"), config)
        except Exception as e:
            emit_error("module_a", str(e))
    
    if should_run("module_b", config):
        try:
            state["module_b_cursor"] = fetch_module_b(credentials, state.get("module_b_cursor"), config)
        except Exception as e:
            emit_error("module_b", str(e))
    
    # Save state (skip in --all mode)
    if not config.all_mode:
        save_state(config.state_file, state)

CLI arguments

Every integration supports these standard flags:

Config loading

Merge environment variables with CLI overrides. CLI takes precedence:

config.debug = args.debug if args.debug is not None else int(os.environ.get("VN_DEBUG", "0"))

Step 4: Build the shell wrapper (run.sh)

#!/usr/bin/env bash
set -euo pipefail

# ── Runtime configuration ──
export VN_BASE_URL="https://api.vendor.com"
export VN_STATE_FILE="/var/ossec/wodles/vendorname/state.json"
export VN_SECRETS_FILE="/var/ossec/wodles/vendorname/.secrets"
export VN_DEBUG="0"

# ── Execute ──
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
exec "$SCRIPT_DIR/vendorname.py" "$@"

Key points:


Step 5: Build decoder and rules

Decoder ({vendor}_decoder.xml)

The decoder’s <program_name> must match the <tag> value from the ossec.conf wodle stanza — Wazuh uses the tag, not the script filename, as the program name.

<decoder name="vendorname">
  <program_name>vendorname</program_name>
</decoder>

<decoder name="vendorname_json">
  <parent>vendorname</parent>
  <plugin_decoder>JSON_Decoder</plugin_decoder>
</decoder>

Rules ({vendor}_rules.xml)

Build rules in this order:

  1. Base rule — matches on integration field, sets group
  2. Event type rules — one per event type, sets severity and description
  3. Conditional rules — elevated severity for specific conditions (failed auth, high-risk actions)
  4. Error rule — matches error events at high severity
<!-- Base rule -->
<rule id="100800" level="0">
  <decoded_as>vendorname</decoded_as>
  <field name="integration">vendorname</field>
  <description>Vendor integration event.</description>
  <group>vendorname,</group>
</rule>

<!-- Event type rule -->
<rule id="100801" level="3">
  <if_sid>100800</if_sid>
  <field name="vn.event_type">signin</field>
  <description>Vendor: sign-in by $(vn.user.email).</description>
  <group>vendorname,authentication,</group>
</rule>

<!-- Error rule -->
<rule id="100890" level="8">
  <if_sid>100800</if_sid>
  <field name="vn.event_type">error</field>
  <description>Vendor integration error: $(vn.error_message).</description>
  <group>vendorname,integration_error,</group>
</rule>

Building checklist