Meshtastic off-grid comms experiments

Note: this is stub post that I plan to come back to and update as I have more information to add on these experiments — including scripts, pictures, experiment notes, etc. I mainly just wanted to outline what I’ve done so far.

Background

I recently set up a small experiment with a Team Awareness Kit (TAK) server running on Rocky Linux, along with the Civilian Team Awareness Kit (CivTAK) mobile app. I’ve long had both professional and personal interests in TAK, electronics, and programming.

Professionally, my goal is to better understand systems used in defense environments and how they fit into the test and evaluation (T&E) community that I support. This interest is twofold. First, many experiments we support already use TAK for situational awareness and coordination. Second, there are opportunities for T&E professionals to use TAK more directly as part of their support infrastructure. I won’t dwell on those professional applications here, though—they simply provide context for why I’m exploring TAK in civilian scenarios.

My personal interest in TAK began when former colleagues became active in the TAK ecosystem. I knew TAK enabled off-grid communications, including position tracking and chat, but hadn’t explored it deeply. The system supports situational awareness by sharing positional data, messages, and other information over mesh-based communication links. Initially, I focused on the military-grade systems and commercial products marketed for defense use. Some of those platforms have since evolved or disappeared, but TAK itself has become a robust, open, and well-documented ecosystem.

During the Cameron Peak Fire in Colorado, I learned that firefighting crews were using TAK. Following that event closely, I became intrigued by how public safety teams were leveraging communications technologies, live cameras, and mapping tools. That’s when I first installed CivTAK to experiment with the software and some Arduino-based radio modules for off-grid communication, though most of my time was spent just exploring the TAK applications themselves.

More recently, I’ve been doing a lot of off-road trail exploration in areas with little or no cellular coverage. Satellite messengers have become essential for group safety and coordination, but I also started thinking about alternatives that don’t depend on commercial satellite infrastructure. It’s not hard to imagine a future where nearly everyone has satellite messaging on their phones, but for now, that technology remains relatively new—and potentially vulnerable to service outages. Having a local, self-contained communication system seems like a worthwhile goal.

Exploring technology

For off-grid voice communications, most off-roaders use General Mobile Radio Service (GMRS) radios. GMRS requires a license, but it’s inexpensive, easy to obtain, and offers better range and reliability than Citizens Band (CB) radio. What off-road groups generally lack, though, is a shared off-grid “common operational picture” (COP)—a distributed map showing team locations and messages without relying on the internet.

That gap led me to Meshtastic, an open-source project that uses low-power, long-range (LoRa) radios to create mesh networks for text and data communication. Meshtastic bridges simple radio hardware with smartphone apps, providing chat and map-based features over public unlicensed LoRa frequencies. I learned that TAK could operate over this same network using a Meshtastic bridge, which made it a perfect platform for experimentation.

To test the concept, I set up a TAK server on Rocky Linux 9, installed CivTAK on Android devices, and connected everything through Heltec LoRa radios running Meshtastic firmware. The radios linked to phones over Bluetooth and to the server via USB. While TAK was functional, it was more complex than I needed for my immediate interests. I decided to shift focus to Meshtastic itself, exploring its capabilities for lightweight off-grid communication and mapping.

I ordered four Heltec V3 radio kits (without GPS) and two Heltec V4 kits (with GPS). The kits required basic assembly but were otherwise turnkey: each included a circuit board, antenna, battery, and enclosure. After assembling them, I flashed the Meshtastic firmware using a Rocky Linux 9 system—the same one hosting my TAK server.

Meshtastic’s default configuration worked almost immediately after pairing with the Android app. I experimented with settings like the number of hops, store-and-forward behavior, GPS options, transmission triggers, and the network encryption key. By default, Meshtastic uses a public key so devices can join open networks for hobby use. However, I preferred a private network to avoid unnecessary chatter, so I generated a custom encryption key shared by all six of my test radios.

Automating radio configuration

To simplify setup and configuration, I wrote a Python script to automate flashing and provisioning. I used both ChatGPT and Anthropic’s Claude Code as coding assistants. ChatGPT helped define the workflow and convert initial shell commands into a more structured Bash and then Python script. Claude Code assisted later with debugging and improving robustness, particularly around timing and error handling when radios rebooted.

My intent wasn’t to let generative AI write the code for me, but to accelerate development and learn from the process. The AI tools suggested safeguards and improvements I skipped during quick prototyping. While ChatGPT provided most of the structure and logic, Claude Code’s explanations were especially valuable for understanding the reasoning behind each change.

The current version of my script prompts me through each flashing stage, handles timeouts, and accounts for differences between Heltec V3 and V4 models. The V3 radios must be manually placed into flash mode before connecting, while the V4 units can enter flash mode after connection. The process isn’t fast—there are deliberate delays to accommodate reboots—but it’s reliable for small batches of radios.

Initial testing

After initial configuration, all radios successfully joined the same mesh network. I tested them indoors at first across three floors of my house, then on a half-mile away neighborhood walk. Even the basement radios maintained connectivity through relayed hops. Later I tested around the city and nearby foothills.

It’s worth clarifying that my private channel key help encrypts content, but it doesn’t enforce a private network path—nearby nodes using the same LoRa modem preset can still rebroadcast and route my packets unless I change the rebroadcast mode or avoid common presets.

I did a quick, very informal range check. I drove to pick up my son Lee from school, leaving radios scattered around the house. At just over 6 km away, I was still “connected” to the home radios, but the app showed those links were 2–3 hops. That makes sense. Even though my network uses a private channel key, other Meshtastic nodes can relay packets they can’t decrypt when they share the same modem settings, depending on their configuration. In other words, my packets were riding across other people’s nodes as opportunistic relays, which is expected on public meshes.

That made me curious, so I went west into the nearby hills to avoid the city and other nodes. The farthest I managed without any hops—that is, a direct radio link—was about 3 km. I pushed out to roughly 10 km from the house and never lost the ability to exchange packets, but it depended on several hops. Hop limits and node placement matter a lot here; nodes rebroadcast what they hear until the packet’s max-hop value is consumed.

I won’t really know how this performs in mountains and forests until I test it there, but these drives around Fort Collins were useful. They also highlighted how city routing works in practice. Private channels keep content encrypted, but the mesh can still act like a carrier if other nodes share my modem preset. If I truly want isolation, I can switch to less common presets or set rebroadcast to local-only, at the cost of losing opportunistic coverage.

Meshtastic app screenshots

I took a few screenshots during my drive. The top node — J001 — was in my vehicle while I was driving. All other nodes were at the house in various places, including the top floor, main floor, and basement. The first screenshot shows a good connection at 2.9 km from the house — to the J005 radio. In the second screenshot, I still have a direct connection at 4.1 km to J005, but you can see the reduced quality in the connection. In the third screenshot, I’m 10.8 km from the house and no longer directly connected, but I am still in contact via hopping over other people’s radios. The final screen shot is just to give an idea of the spread in distance between my vehicle (J001) and my house (J003 is on top of the other nodes so it’s the only one you see on this map view) — in relationship to Horsetooth Reservoir.

Next steps

My next step will be to distribute radios among multiple vehicles and houses for wider testing, focusing on real-world terrain effects and practical communication limits.

After I’m comfortable with the baseline Meshtastic setup, I plan two next phases of experimentation. First, I’ll see how useful it might be for our off-road group’s trail rides, especially among others interested in the technology. Second, I’ll revisit my original goal: using this Meshtastic network as the transport layer for other communications experiments—e.g. , TAK in an off-grid environment. In any case, these experiments will help establish a resilient off-grid mesh communications layer for future experiments.

Rocky Linux 9 server

I had a Rocky Linux 9 server installed on a small form factor x86 64 bit computer (headless, no desktop) to support the aforementioned Tak server experimentation. I used this same server to support development and testing of a Python script to automate flashing and provisioning of LoRa radios with Meshtastic firmware.

Here are the RPM and Python dependencies to run the script:

sudo dnf install -y python3.11 python3.11-devel \
  systemd-devel \
  libudev-devel \
  python3-pyserial \
  usbutils \
  psmisc \
  lsof
python3.11 -m venv ~/meshprovision-env
source ~/meshprovision-env/bin/activate
pip install --upgrade pip setuptools wheel
pip install esptool meshtastic pyudev rich humanfriendly pyserial

On subsequent terminal sessions, I can return to Python environment like this:

source ~/meshprovision-env/bin/activate

I only need to generate one PSK file that will be used to configure the radios on a private net. This PSG file can be generated like this:

openssl rand -base64 16 > ~/juszak4x4-$(date +%Y%m%d).key
chmod 400 ~/juszak4x4-$(date +%Y%m%d).key

This is the Python script I used to flash and provision the radios — saved as mesh_flash_provision.py. The Heltec v3 radios need to be put into flash mode prior to connecting to USB, while the v4 radios can be put into flash mode when the script prompts the user (an area I can improve on later if I decide to). Below this somewhat long script I include a description of what it is actually doing:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
mesh_flash_provision.py — Flash + provision Heltec V4/V3 Meshtastic radios reliably.

Design goals:
- Survive bootloader/app re-enumeration under /dev/serial/by-id
- Use esptool via subprocess (API-stable) for erase/flash
- Use Meshtastic CLI via subprocess with guarded retries and backoff
- Batch LoRa edits in one transaction (single app restart)
- Verify readiness after each disruptive action
- Optional pyudev if available; otherwise fall back to polling
- Per-port lockfile so only one orchestrator owns the port at a time

Requires:
  - Python 3.9+
  - esptool in PATH
  - meshtastic CLI in PATH
Tested on Rocky Linux 9 style systems.

Example:
  python3 mesh_flash_provision.py \
    --port /dev/serial/by-id/usb-Espressif_Systems_heltec_wifi_lora_32_v4__...-if00 \
    --fw   ~/meshtastic-firmware/firmware-heltec-v4-2.7.11.ee68575.bin \
    --owner Juszak-00X --short J00X --psk-file ~/juszak4x4.key \
    --gps integrated
"""

from __future__ import annotations

import argparse
import base64
import fcntl
import os
import re
import shutil
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional, List

# ==============================================================================
# Fleet defaults & tunables
# ==============================================================================
CHANNEL_NAME = "Juszak4x4"
REGION = "US"                      # US915
MODEM_PRESET = "LONG_FAST"
FREQ_SLOT = "20"
HOP_LIMIT = "4"
TX_POWER = "0"                     # 0 => device default max legal
RX_BOOST = "true"
ROLE = "CLIENT"

POS_BROADCAST_SECS = "20"
POS_SMART_ENABLED = "true"
POS_SMART_MIN_DIST = "40"
POS_SMART_MIN_INTERVAL_SECS = "15"
GPS_MODE = "ENABLED"              # Used when --gps integrated
GPS_UPDATE_INTERVAL = "20"

POSITION_PRECISION = "32"
UPLINK_ENABLED = "false"
DOWNLINK_ENABLED = "false"

# Flash params
CHIP = "esp32s3"
FLASH_BAUD = "460800"
FLASH_SIZE = "detect"   # "detect" (auto) or explicit like "8MB" / "16MB"

# Timeouts / backoff
APP_READY_TIMEOUT = 300  # seconds to wait for app to become responsive
APP_READY_POLL = 2       # seconds between readiness polls
CMD_TIMEOUT = 120        # seconds for Meshtastic CLI calls
RETRY_SLEEP = 3          # seconds between retries
RETRIES = 3
TRIED_WINDOW_SECS = 15   # per-port probe throttle window

# Locking state (per-port lockfiles to prevent concurrent provisioning)
LOCK_DIR = Path.home() / ".cache" / "juszak4x4" / "locks"
LOCK_DIR.mkdir(parents=True, exist_ok=True)

_last_tried_at: dict[str, float] = {}

# ==============================================================================
# Logging & prerequisite checks
# ==============================================================================

def eprint(*a, **kw) -> None:
    """Print to stderr for progress logs."""
    print(*a, file=sys.stderr, **kw)


def check_prereqs() -> None:
    """Verify required executables are on PATH; exit early if missing."""
    for exe in ("esptool", "meshtastic"):
        if shutil.which(exe) is None:
            eprint(f"ERROR: {exe} not found in PATH")
            sys.exit(2)

# ==============================================================================
# Serial/USB identification helpers
# ==============================================================================

def is_bootloader_name(name: str) -> bool:
    """Return True if a /dev/serial/by-id name indicates the ESP32-S3 bootloader."""
    return ("USB_JTAG_serial_debug_unit" in name) or ("Espressif_USB_JTAG" in name)


def list_byid() -> list[str]:
    """List /dev/serial/by-id entries (sorted). Empty list if the dir is absent."""
    try:
        return sorted(os.listdir("/dev/serial/by-id"))
    except FileNotFoundError:
        return []


def full_byid_paths() -> list[str]:
    """Return absolute /dev/serial/by-id/* paths for convenience."""
    return [str(Path("/dev/serial/by-id") / n) for n in list_byid()]


def is_candidate_tty(node: str) -> bool:
    """Accept only USB-class tty nodes that could host the app."""
    base = os.path.basename(node or "")
    return base.startswith(("ttyACM", "ttyUSB"))


def byid_points_to_existing_node(byid_path: str) -> bool:
    """Validate that a by-id symlink resolves to an existing, plausible tty node."""
    try:
        p = Path(byid_path)
        if not p.exists():
            return False
        target = p.resolve()
        return target.exists() and str(target).startswith(("/dev/ttyACM", "/dev/ttyUSB"))
    except Exception:
        return False


def _byid_for_tty(tty_node: str) -> Optional[str]:
    """Return matching /dev/serial/by-id/* symlink for a given /dev/tty*, if any."""
    try:
        byid_dir = Path("/dev/serial/by-id")
        if not byid_dir.exists():
            return None
        tty_path = Path(tty_node).resolve()
        for entry in sorted(byid_dir.iterdir()):
            try:
                if entry.resolve() == tty_path:
                    return str(entry)
            except Exception:
                continue
    except Exception:
        pass
    return None


def _expand_ttys(pattern: str) -> list[str]:
    """Expand simple /dev globs like /dev/ttyACM* to concrete paths."""
    p = Path(pattern)
    if p.parent == Path("/dev") and any(ch in p.name for ch in "*?[]"):
        return [str(x) for x in Path("/dev").glob(p.name)]
    return [str(p)]


def _resolve_if_byid(node: str) -> str:
    """Resolve a /dev/serial/by-id symlink to its real /dev/tty* device."""
    try:
        return str(Path(node).resolve())
    except Exception:
        return node

# ==============================================================================
# Port/process mediation utilities
# ==============================================================================

def _should_probe(node: str) -> bool:
    """Rate-limit probes of the same node to avoid flapping storms."""
    now = time.time()
    t = _last_tried_at.get(node, 0)
    if now - t >= TRIED_WINDOW_SECS:
        _last_tried_at[node] = now
        return True
    return False


def pulse_dtr_rts(node: str) -> None:
    """Briefly toggle DTR/RTS to coax CDC enumeration; silently ignore errors."""
    try:
        import serial  # type: ignore
        real = _resolve_if_byid(node)
        with serial.Serial(real, 115200, timeout=0.2) as s:
            for _ in range(2):
                s.dtr = False; s.rts = False; time.sleep(0.05)
                s.dtr = True;  s.rts = True;  time.sleep(0.05)
            s.dtr = False; s.rts = False
    except Exception:
        pass


def kill_port_holders(port_like: str) -> None:
    """Terminate processes currently holding a matching tty."""
    nodes = _expand_ttys(port_like)
    if not nodes:
        nodes = [port_like]
    for n in nodes:
        target = _resolve_if_byid(n)
        try:
            subprocess.run(
                ["fuser", "-kv", target],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=3
            )
        except Exception:
            pass
        try:
            out = subprocess.run(
                ["lsof", "-t", target],
                stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, timeout=3, text=True
            )
            pids = [pid for pid in out.stdout.strip().splitlines() if pid.isdigit()]
            if pids:
                subprocess.run(
                    ["kill", "-9", *pids],
                    stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2
                )
        except Exception:
            pass
    try:
        subprocess.run(
            ["pkill", "-f", r"meshtastic.*--port"],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2
        )
    except Exception:
        pass


def acquire_lock(port: str):
    """Create and hold an exclusive lockfile for this port for the process lifetime."""
    safe = re.sub(r"[^A-Za-z0-9._:-]+", "_", port)
    lockfile = LOCK_DIR / f"{safe}.lock"
    fh = open(lockfile, "w")
    try:
        fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
    except OSError:
        eprint(f"ERROR: Another provisioning process holds {lockfile}; bailing.")
        sys.exit(4)
    fh.write(f"pid={os.getpid()} port={port}\n")
    fh.flush()
    return fh

# ==============================================================================
# Command wrappers (subprocess)
# ==============================================================================

def run(cmd: list[str], timeout: int = CMD_TIMEOUT, quiet: bool = False) -> subprocess.CompletedProcess:
    """Run a command, capturing stdout/stderr; optionally echo the command to stderr."""
    if not quiet:
        eprint(">>", " ".join(cmd))
    return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, text=True)


def meshtastic_cli(port: str, args: list[str], timeout: int = CMD_TIMEOUT, retries: int = RETRIES) -> subprocess.CompletedProcess:
    """Invoke the Meshtastic CLI with guarded retries and pre-emptive port eviction."""
    try:
        kill_port_holders(port)
        time.sleep(0.1)
    except Exception:
        pass

    cmd = ["meshtastic", "--timeout", str(CMD_TIMEOUT), "--wait-to-disconnect", "3", "--port", port] + args

    for attempt in range(1, retries + 1):
        try:
            cp = run(cmd, timeout=timeout, quiet=(attempt > 1))
            if cp.returncode == 0:
                return cp
        except subprocess.TimeoutExpired as te:
            cp = subprocess.CompletedProcess(cmd, returncode=124, stdout="", stderr=str(te))
        if attempt < retries:
            eprint(f"   …meshtastic attempt {attempt} failed; sleeping {RETRY_SLEEP}s and retrying…")
            time.sleep(RETRY_SLEEP)
    try:
        cp = run(cmd, timeout=timeout, quiet=False)
    except subprocess.TimeoutExpired as te:
        cp = subprocess.CompletedProcess(cmd, returncode=124, stdout="", stderr=str(te))
    return cp

# ==============================================================================
# App readiness and discovery
# ==============================================================================

def wait_for_app_ready(port: str, timeout: int = APP_READY_TIMEOUT) -> bool:
    """Poll the given port with `meshtastic --info` until the app responds or times out."""
    t0 = time.time()
    while time.time() - t0 < timeout:
        cp = meshtastic_cli(port, ["--info"], timeout=CMD_TIMEOUT, retries=1)
        if cp.returncode == 0:
            return True
        time.sleep(APP_READY_POLL)
    return False


def dump_serial_state() -> None:
    """Log the current /dev/serial/by-id map for operator visibility and debugging."""
    try:
        byid = Path("/dev/serial/by-id")
        if not byid.exists():
            eprint("   (no /dev/serial/by-id present)")
            return
        eprint("   by-id map:")
        for entry in sorted(byid.iterdir()):
            try:
                target = entry.resolve()
                eprint(f"     - {entry.name} -> {target}")
            except Exception as ex:
                eprint(f"     - {entry.name} -> <resolve error: {ex}>")
    except Exception as ex:
        eprint(f"   dump_serial_state error: {ex}")


def wait_for_app_any_polling(preferred_keywords: tuple = ("heltec", "meshtastic"),
                             timeout: int = APP_READY_TIMEOUT) -> Optional[str]:
    """Search all plausible ttys until a Meshtastic app responds."""
    t0 = time.time()
    probe_timeout = 15
    first_dump = True

    while time.time() - t0 < timeout:
        if first_dump:
            dump_serial_state()
            first_dump = False
        names = list_byid()
        preferred = [n for n in names if any(k in n.lower() for k in preferred_keywords)]
        others = [n for n in names if n not in preferred]

        for group in (preferred, others):
            for n in group:
                byid = str(Path("/dev/serial/by-id") / n)
                if is_bootloader_name(n):
                    continue
                if not byid_points_to_existing_node(byid):
                    eprint(f"   …skipping stale by-id: {byid}")
                    continue
                if not _should_probe(byid):
                    continue
                kill_port_holders(byid)
                pulse_dtr_rts(byid)
                eprint(f">> meshtastic probe by-id: {byid}")
                cp = meshtastic_cli(byid, ["--info"], timeout=probe_timeout, retries=1)
                if cp.returncode == 0:
                    return byid

        raw_ttys = [str(p) for p in Path("/dev").glob("ttyACM*")] + [str(p) for p in Path("/dev").glob("ttyUSB*")]
        for node in sorted(raw_ttys):
            mapped = _byid_for_tty(node) or node
            if not _should_probe(mapped):
                continue
            kill_port_holders(mapped)
            pulse_dtr_rts(mapped)
            eprint(f">> meshtastic probe raw: {mapped}")
            cp = meshtastic_cli(mapped, ["--info"], timeout=probe_timeout, retries=1)
            if cp.returncode == 0:
                return mapped

        if any("USB_JTAG" in n for n in names):
            eprint("   …still seeing only bootloader; tap RST on the board (do NOT hold BOOT) …")
        time.sleep(6)

    return None


def wait_for_app_any(preferred_keywords: tuple = ("heltec", "meshtastic"),
                     timeout: int = APP_READY_TIMEOUT) -> Optional[str]:
    """Find a responding Meshtastic app port via udev when possible, else poll."""
    try:
        import pyudev
        ctx = pyudev.Context()
        mon = pyudev.Monitor.from_netlink(ctx)
        mon.filter_by("tty")

        def _probe_nodes(nodes: list[str]) -> Optional[str]:
            scored: List[tuple[int, str]] = []
            for node in nodes:
                if not node or not node.startswith("/dev/tty"):
                    continue
                if not is_candidate_tty(node):
                    continue
                label = _byid_for_tty(node) or node
                if "/dev/serial/by-id/" in label and is_bootloader_name(Path(label).name):
                    continue
                score = -1 if any(k in label.lower() for k in preferred_keywords) else 0
                scored.append((score, label))
            for _, label in sorted(scored):
                cp = meshtastic_cli(label, ["--info"], timeout=4, retries=1)
                if cp.returncode == 0:
                    return label
            return None

        present = [d.device_node for d in ctx.list_devices(subsystem="tty") if getattr(d, "device_node", None)]
        found = _probe_nodes(present)
        if found:
            return found

        t0 = time.time()
        while time.time() - t0 < timeout:
            dev = mon.poll(timeout=2)
            if dev is None:
                continue
            if dev.action != "add":
                continue
            node = getattr(dev, "device_node", None)
            if not node:
                continue
            found = _probe_nodes([node])
            if found:
                return found
    except Exception:
        pass

    return wait_for_app_any_polling(preferred_keywords, timeout)

# ==============================================================================
# Esptool (chip detect, erase, flash)
# ==============================================================================

def esptool_chip_id(port: str) -> bool:
    """Lightweight connectivity check using `esptool chip-id` without resets."""
    before_arg = "no-reset"
    after_arg = "no-reset"
    cmd = "chip-id"

    cp = run(["esptool", "--chip", CHIP, "--port", port, "--baud", "115200",
              "--before", before_arg, "--after", after_arg, cmd],
             timeout=30, quiet=True)
    return cp.returncode == 0


def flash(port: str, fw_path: str, skip_erase: bool, baud: str, flash_size: str) -> str:
    """Erase (optional) and flash the combined firmware image, then locate the app."""
    before_arg = "no-reset"
    after_arg = "no-reset"
    erase_cmd = "erase-flash"
    write_cmd = "write-flash"
    size_arg = "--flash-size"
    mode_arg = "--flash-mode"
    freq_arg = "--flash-freq"

    if not esptool_chip_id(port):
        eprint("==> Put board in BOOTLOADER: hold BOOT/PRG, tap RST, release BOOT")
        baseline = set(list_byid())
        bootloader_found = False
        for _ in range(60):
            time.sleep(6)
            cur = set(list_byid())
            if cur != baseline:
                for name in cur:
                    if is_bootloader_name(name):
                        port = str(Path("/dev/serial/by-id") / name)
                        eprint(f"==> Bootloader port: {port}")
                        bootloader_found = True
                        break
                if bootloader_found:
                    break
        if not bootloader_found:
            eprint("ERROR: Bootloader device not detected after 60s. Check USB connection.")
            sys.exit(5)

    if not skip_erase:
        eprint("==> Erasing flash (wipes identity/config)")
        cp = run(["esptool", "--chip", CHIP, "--port", port, "--baud", "115200",
                  "--before", before_arg, "--after", after_arg, erase_cmd],
                 timeout=120)
        if cp.returncode != 0:
            eprint(cp.stdout); eprint(cp.stderr)
            sys.exit(5)
    else:
        eprint("==> Skipping erase")

    effective_size = flash_size if str(flash_size).lower() != "detect" else "auto"
    eprint(f"==> Flashing (baud {baud}, size {effective_size}, mode dio, freq 80m)")
    flash_cmd = [
        "esptool", "--chip", CHIP, "--port", port, "--baud", baud,
        "--before", before_arg, "--after", after_arg,
        write_cmd, "-z",
        *( [size_arg, flash_size] if str(flash_size).lower() != "detect" else [] ),
        mode_arg, "dio", freq_arg, "80m",
        "0x0000", fw_path
    ]

    cp = run(flash_cmd, timeout=180)
    if cp.returncode != 0:
        eprint(cp.stdout); eprint(cp.stderr)
        sys.exit(6)

    try:
        run(["udevadm", "settle", "-t", "5"], timeout=10, quiet=True)
    except Exception:
        pass

    eprint("==> Tap RST once on the board (do not hold BOOT); searching for app…")
    time.sleep(6)
    dump_serial_state()
    kill_port_holders("/dev/ttyACM*")
    time.sleep(0.2)
    kill_port_holders("/dev/ttyUSB*")
    time.sleep(0.2)
    app_port = wait_for_app_any(timeout=240)
    if not app_port:
        eprint("ERROR: Meshtastic app did not come up after flashing.")
        sys.exit(7)
    eprint(f"==> App port: {app_port}")
    return app_port

# ==============================================================================
# Configuration transactions (Meshtastic CLI)
# ==============================================================================

def apply_then_wait(port: str, cli_args: list[str], expect_restart: bool = False) -> str:
    """Run one Meshtastic CLI command, then block until the app is responsive again."""
    cp = meshtastic_cli(port, cli_args, timeout=CMD_TIMEOUT, retries=RETRIES)
    if cp.returncode != 0:
        eprint(cp.stdout); eprint(cp.stderr)

    if expect_restart:
        eprint("==> Waiting for radio to return after commit (may reboot)…")
        new_port = wait_for_app_any(timeout=APP_READY_TIMEOUT)
        if not new_port:
            eprint("ERROR: radio did not return after commit")
            sys.exit(8)
        return new_port

    if wait_for_app_ready(port, timeout=APP_READY_TIMEOUT):
        return port

    eprint("   …device did not respond on the same port; roaming to find it…")
    new_port = wait_for_app_any(timeout=APP_READY_TIMEOUT)
    if not new_port:
        eprint(f"ERROR: device on {port} did not come back after command")
        sys.exit(9)
    return new_port


def validate_base64_psk(path: str) -> str:
    """Read and validate a base64-encoded 16-byte PSK from file."""
    data = Path(path).read_text().strip().replace("\n", "")
    try:
        raw = base64.b64decode(data, validate=True)
    except Exception:
        eprint("ERROR: PSK file must contain valid base64")
        sys.exit(2)
    if len(raw) != 16:
        eprint("ERROR: PSK must be exactly 16 bytes (AES-128).")
        sys.exit(2)
    return data


def ensure_app_responsive(port: str) -> None:
    """Hard gate before configuration to ensure the application is answering."""
    eprint(f"==> Ensuring Meshtastic app is responsive on {port}…")
    if not wait_for_app_ready(port, timeout=APP_READY_TIMEOUT):
        eprint("ERROR: Meshtastic app is not responding; tap RST and re-run with --skip-erase if needed.")
        sys.exit(10)


def configure(port: str, owner: str, short: str, psk_b64: str, gps_choice: str) -> str:
    """Apply staged configuration to the radio with minimal restarts.

    gps_choice:
      - "integrated": device has onboard GPS; set mode ENABLED and GPS update policy.
      - "none": device has no GPS; set mode DISABLED, fixed_position false, and broadcast interval = 20.
    """
    # 1) Owner + role first, minimal disruption
    port = apply_then_wait(port, ["--set-owner", owner, "--set-owner-short", short, "--set", "device.role", ROLE])

    # 2) Batch LoRa settings (single restart)
    try:
        meshtastic_cli(port, ["--begin-edit"])
        meshtastic_cli(port, ["--set", "lora.region", REGION])
        meshtastic_cli(port, ["--set", "lora.modem_preset", MODEM_PRESET])
        meshtastic_cli(port, ["--set", "lora.channel_num", FREQ_SLOT])
        meshtastic_cli(port, ["--set", "lora.hop_limit", HOP_LIMIT])
        meshtastic_cli(port, ["--set", "lora.tx_power", TX_POWER])
        meshtastic_cli(port, ["--set", "lora.sx126x_rx_boosted_gain", RX_BOOST])
        meshtastic_cli(port, ["--set", "lora.tx_enabled", "true"])
        cp = meshtastic_cli(port, ["--commit-edit"])
        if cp.returncode != 0:
            eprint("WARNING: commit-edit failed, attempting to cancel edit session")
            meshtastic_cli(port, ["--cancel-edit"], retries=1)
            raise RuntimeError("LoRa configuration commit failed")
    except Exception as e:
        eprint(f"ERROR during LoRa batch edit: {e}")
        try:
            meshtastic_cli(port, ["--cancel-edit"], retries=1)
        except Exception:
            pass
        sys.exit(11)

    eprint("==> Waiting for radio to return after LoRa commit (may reboot)…")
    new_port = wait_for_app_any(timeout=APP_READY_TIMEOUT)
    if not new_port:
        eprint("ERROR: radio did not return after LoRa commit")
        sys.exit(8)
    port = new_port

    # 3) Re-assert hop_limit explicitly and verify
    port = apply_then_wait(port, ["--set", "lora.hop_limit", HOP_LIMIT])
    cp = meshtastic_cli(port, ["--get", "lora.hop_limit"])
    if cp.returncode == 0:
        eprint(f"Hop limit verification: {cp.stdout.strip()}")

    # 4) Positioning and GPS behavior (batch)
    try:
        meshtastic_cli(port, ["--begin-edit"])
        if gps_choice == "integrated":
            meshtastic_cli(port, ["--set", "position.position_broadcast_secs", POS_BROADCAST_SECS])
            meshtastic_cli(port, ["--set", "position.position_broadcast_smart_enabled", POS_SMART_ENABLED])
            meshtastic_cli(port, ["--set", "position.broadcast_smart_minimum_distance", POS_SMART_MIN_DIST])
            meshtastic_cli(port, ["--set", "position.broadcast_smart_minimum_interval_secs", POS_SMART_MIN_INTERVAL_SECS])
            meshtastic_cli(port, ["--set", "position.gps_mode", "ENABLED"])  # use integrated GPS
            meshtastic_cli(port, ["--set", "position.gps_update_interval", GPS_UPDATE_INTERVAL])
        else:  # gps_choice == "none"
            meshtastic_cli(port, ["--set", "position.gps_mode", "DISABLED"])  # no onboard GPS
            meshtastic_cli(port, ["--set", "position.fixed_position", "false"])  # allow external updates
            meshtastic_cli(port, ["--set", "position.position_broadcast_secs", "20"])  # still broadcast if provided
        cp = meshtastic_cli(port, ["--commit-edit"])
        if cp.returncode != 0:
            eprint("WARNING: commit-edit (position) failed, cancelling edit")
            meshtastic_cli(port, ["--cancel-edit"], retries=1)
            raise RuntimeError("Position/GPS configuration commit failed")
    except Exception as e:
        eprint(f"ERROR during Position/GPS batch edit: {e}")
        try:
            meshtastic_cli(port, ["--cancel-edit"], retries=1)
        except Exception:
            pass
        sys.exit(11)

    eprint("==> Position/GPS committed; waiting for app to return…")
    port_after = wait_for_app_any(timeout=APP_READY_TIMEOUT)
    if not port_after:
        eprint("ERROR: radio did not return after Position/GPS commit")
        sys.exit(8)
    port = port_after
    port = apply_then_wait(port, ["--info"], expect_restart=False)

    # 5) Channel 0 basics (batch)
    try:
        meshtastic_cli(port, ["--begin-edit"])
        meshtastic_cli(port, ["--ch-set", "name", CHANNEL_NAME, "--ch-index", "0"])
        meshtastic_cli(port, ["--ch-set", "psk", f"base64:{psk_b64}", "--ch-index", "0"])
        meshtastic_cli(port, ["--ch-set", "uplink_enabled", UPLINK_ENABLED, "--ch-index", "0"])
        meshtastic_cli(port, ["--ch-set", "downlink_enabled", DOWNLINK_ENABLED, "--ch-index", "0"])
        meshtastic_cli(port, ["--ch-set", "module_settings.position_precision", POSITION_PRECISION, "--ch-index", "0"])
        cp = meshtastic_cli(port, ["--commit-edit"])
        if cp.returncode != 0:
            eprint("WARNING: commit-edit (channel) failed, cancelling edit")
            meshtastic_cli(port, ["--cancel-edit"], retries=1)
            raise RuntimeError("Channel configuration commit failed")
    except Exception as e:
        eprint(f"ERROR during Channel batch edit: {e}")
        try:
            meshtastic_cli(port, ["--cancel-edit"], retries=1)
        except Exception:
            pass
        sys.exit(11)

    eprint("==> Channel settings committed; waiting for app to return…")
    port_after = wait_for_app_any(timeout=APP_READY_TIMEOUT)
    if not port_after:
        eprint("ERROR: radio did not return after Channel commit")
        sys.exit(8)
    port = port_after
    port = apply_then_wait(port, ["--info"], expect_restart=False)

    # 6) Store and Forward (best-effort)
    try:
        eprint("==> Enabling Store and Forward…")
        meshtastic_cli(port, ["--set", "store_forward.enabled", "true"])
        meshtastic_cli(port, ["--set", "store_forward.is_server", "true"])
        meshtastic_cli(port, ["--set", "store_forward.records", "50"])
        meshtastic_cli(port, ["--set", "store_forward.history_return_max", "10"])
        meshtastic_cli(port, ["--set", "store_forward.history_return_window", "1800"])
        port = apply_then_wait(port, ["--info"])
    except Exception as e:
        eprint(f"WARNING: could not enable Store and Forward: {e}")

    return port

# ==============================================================================
# User-facing selection helpers
# ==============================================================================

def ensure_present_or_select(port: str) -> str:
    """Validate the supplied port or pick the only by-id device if singular."""
    if Path(port).exists():
        if port.startswith("/dev/serial/by-id/"):
            if byid_points_to_existing_node(port):
                return port
        else:
            base = os.path.basename(port)
            if base.startswith(("ttyACM", "ttyUSB")):
                return port
    names = list_byid()
    if not names:
        eprint("ERROR: no devices in /dev/serial/by-id")
        sys.exit(3)
    if len(names) == 1:
        only = str(Path("/dev/serial/by-id") / names[0])
        eprint(f"==> Auto-selected present device: {only}")
        return only
    eprint(f"ERROR: {port} not present and multiple devices found:")
    for n in names:
        eprint(" ", str(Path("/dev/serial/by-id") / n))
    sys.exit(3)

# ==============================================================================
# Orchestration (CLI)
# ==============================================================================

def main() -> None:
    check_prereqs()
    ap = argparse.ArgumentParser(description="Flash + provision Meshtastic Heltec V3/V4 radios reliably.")
    ap.add_argument("--port", required=True, help="Initial /dev/serial/by-id/... (bootloader or app)")
    ap.add_argument("--fw", required=True, help="Path to firmware .bin (combined image)")
    ap.add_argument("--owner", required=True, help="Owner long name")
    ap.add_argument("--short", required=True, help="Owner short name")
    ap.add_argument("--psk-file", required=True, help="File containing base64 of 16-byte AES key")
    ap.add_argument("--bt-pin", help="Optional fixed Bluetooth PIN (e.g., 373839)", default=None)
    ap.add_argument("--skip-erase", action="store_true", help="Skip erase_flash step (faster re-provision)")
    ap.add_argument("--baud", default=FLASH_BAUD, help="Flash baud (default 460800)")
    ap.add_argument("--flash-size", default=FLASH_SIZE,
                    help='Flash size for esptool (e.g., "8MB", "16MB", or "detect" to auto)')
    ap.add_argument("--configure-only", action="store_true", help="Skip flashing; only configure")
    ap.add_argument("--gps", required=True, choices=["integrated", "none"],
                    help="Specify whether the radio has an integrated GPS: 'integrated' or 'none'.")
    args = ap.parse_args()

    port = ensure_present_or_select(args.port)
    fw = Path(args.fw)
    if not args.configure_only and not fw.exists():
        eprint(f"ERROR: firmware not found: {fw}")
        sys.exit(2)
    psk_b64 = validate_base64_psk(args.psk_file)

    lockfh = acquire_lock(port)

    for svc in ("ModemManager", "brltty"):
        try:
            run(["sudo", "systemctl", "stop", svc], timeout=10, quiet=True)
        except Exception:
            pass

    eprint(f"==> Target (initial): {port}")
    if not args.configure_only:
        eprint(f"==> Firmware        : {fw}")
    eprint(f"==> Owner/Short     : {args.owner} / {args.short}")
    eprint(f"==> GPS             : {args.gps}")

    if args.configure_only:
        eprint("==> --configure-only set; skipping flash.")
        ensure_app_responsive(port)
        app_port = port
    elif args.skip_erase and meshtastic_cli(port, ["--info"], timeout=15, retries=1).returncode == 0:
        eprint("==> App is already running and --skip-erase set; skipping flash.")
        app_port = port
    else:
        eprint("==> Proceeding with flash (starting state may be app or bootloader)")
        chosen_size = args.flash_size
        byid_name = Path(port).name if "/dev/serial/by-id/" in port else ""
        if args.flash_size == "detect":
            if "16_MB_FLASH" in byid_name:
                chosen_size = "16MB"
            elif "8_MB_FLASH" in byid_name:
                chosen_size = "8MB"
        app_port = flash(port, str(fw), skip_erase=args.skip_erase, baud=args.baud, flash_size=chosen_size)

    ensure_app_responsive(app_port)

    if args.bt_pin:
        app_port = apply_then_wait(app_port, [
            "--set", "bluetooth.enabled", "true",
            "--set", "bluetooth.fixed_pin", str(args.bt_pin)
        ])

    app_port = configure(app_port, args.owner, args.short, psk_b64, args.gps)

    cp = meshtastic_cli(app_port, ["--info"], timeout=60, retries=2)
    sys.stdout.write(cp.stdout)
    eprint(f"==> Flash + provisioning complete on {app_port}")

    _ = lockfh


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        eprint("\nInterrupted by user.")
        sys.exit(130)

This script automates flashing and first-time provisioning of Heltec V3/V4 Meshtastic radios. It verifies prerequisites, acquires a per-port lock, and pauses common interfering services. It handles USB re-enumeration by preferring /dev/serial/by-id, probing raw ttyACM/ttyUSB when necessary, and rate-limiting probes. As part of recovery, it can “pulse DTR/RTS,” meaning it briefly toggles the serial modem control lines Data Terminal Ready and Request To Send to nudge the USB CDC interface to re-enumerate or wake up; this is a best-effort step and is skipped harmlessly if the host cannot do it.

For flashing, it uses esptool to optionally erase and then write the combined firmware image, then waits for the Meshtastic application to start and locates the correct port. For provisioning, it applies the owner and role, commits LoRa parameters in a single batch, and configures positioning. GPS behavior is selected on the command line with –gps {integrated, none}: integrated enables onboard GPS and smart broadcasting; none disables onboard GPS, sets position.fixed_position to false, and still broadcasts externally provided locations at 20-second intervals. It then sets channel 0 (name, PSK, link options) and enables Store-and-Forward. Each disruptive step includes guarded retries, backoff, and readiness checks for reliability end to end.

This is what the script usage looks like (“–help” print out):

usage: mesh_flash_provision.py [-h] --port PORT --fw FW --owner OWNER --short SHORT
                               --psk-file PSK_FILE [--bt-pin BT_PIN] [--skip-erase]
                               [--baud BAUD] [--flash-size FLASH_SIZE] [--configure-only]
                               --gps {integrated,none}

Flash + provision Meshtastic Heltec V3/V4 radios reliably.

optional arguments:
  -h, --help            show this help message and exit
  --port PORT           Initial /dev/serial/by-id/... (bootloader or app)
  --fw FW               Path to firmware .bin (combined image)
  --owner OWNER         Owner long name
  --short SHORT         Owner short name
  --psk-file PSK_FILE   File containing base64 of 16-byte AES key
  --bt-pin BT_PIN       Optional fixed Bluetooth PIN (e.g., 373839)
  --skip-erase          Skip erase_flash step (faster re-provision)
  --baud BAUD           Flash baud (default 460800)
  --flash-size FLASH_SIZE
                        Flash size for esptool (e.g., "8MB", "16MB", or "detect" to auto)
  --configure-only      Skip flashing; only configure
  --gps {integrated,none}
                        Specify whether the radio has an integrated GPS: 'integrated' or 'none'.

Here’s an example of how I have actually run this script (flashing/provisioning radio 2):

python3 mesh_flash_provision.py \
  --port /dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0 \
  --fw ~/meshtastic-firmware/firmware-heltec-v3-2.7.11.ee68575.bin \
  --owner Juszak-002 \
  --short J002 \
  --psk-file ~/juszak4x4-20251103.key \
  --gps none

I’ve tested this on Heltec v3 radios without integrated GPS and Heltec v4 radios with integrated GPS. If I add any more radios, I’ll probably just configure them with integrated GPS since I have seen no downsides, and several upsides to having integrated GPS and not relying on external GPS from a phone.