Skip to main content

Своя реализация ConfigSource

В Phase 1 спецификации готовы три источника — YamlFileSource, JsonFileSource, InMemorySource (DictSource в Go). Phase 2+ привезёт официальные адаптеры для etcd, Consul, Vault, HTTP, SQL, Kubernetes ConfigMap. Свой ConfigSource имеет смысл писать только если стандартные адаптеры не покрывают инфраструктурные ограничения — например, нужен бэкенд на ZooKeeper, файловый формат не-YAML, или промежуточный слой для тестов.

Прежде чем писать, проверь: (а) решает ли проблему InMemorySource (DictSource в Go) с программным деревом, (б) можно ли дождаться Phase 2-адаптера, (в) не достаточно ли YamlFileSource с нестандартным путём. Кастомный источник — это ещё один артефакт, который надо поддерживать.

Контракт интерфейса

Любой ConfigSource реализует пять полей:

ConfigSource {
id: string # уникальный идентификатор для диагностики
# e.g., "yaml:app-config.yaml", "zookeeper://zk.example.com/my-app"
load(): ConfigTree # возвращает дерево значений; реализация выбирает sync/async
interpolate: boolean # подсказка загрузчику: применять ${VAR} к строковым листьям
watch?(callback): Subscription # опционально — push-based обновление (Phase 2+)
close?(): void # опционально — освобождение ресурсов
}

Из пяти — три обязательных (id, load, interpolate) и два опциональных (watch, close). Если реализация не поддерживает watch, подписки через config.onSectionChange() получают active = false, inactive_reason = "no watch-capable source registered" (прямой пример из ADR-0001 §7.2) — это не ошибка, а диагностический сигнал.

Шаг 1. Именование id

Рекомендуемая конвенция — URI-форма:

ПримерКомпоненты
yaml:app-config.yamlschema yaml: + путь
etcd://prod/dagstack/my-appschema etcd: + cluster + prefix
consul://consul.example.com/kv/dagstackschema consul: + host + путь
vault://secret/dagstack/productionschema vault: + mount + path
zookeeper://zk.example.com/my-appschema zookeeper: + host + znode

id попадает в логи, сообщения об ошибках (ConfigError.source_id), diagnostic output config.dump(). Делай его человекочитаемым, но компактным — 60-80 символов хватает.

Шаг 2. Реализация load()

load() возвращает ConfigTree — вложенную структуру из maps / sequences / скаляров (string / int / float / bool / null). Это язык-нейтральное представление; типизация делается потом на уровне Config.get*() / getSection().

from dagstack.config import ConfigSource, ConfigTree, Subscription, ConfigError

class ZookeeperSource(ConfigSource):
interpolate = True # применять ${VAR} к значениям, если они строки

def __init__(self, host: str, prefix: str):
self._host = host
self._prefix = prefix
self._client = zk_connect(host)
self.id = f"zookeeper://{host}/{prefix.lstrip('/')}"

def load(self) -> ConfigTree:
try:
raw = self._client.get_recursive(self._prefix)
except ZkError as err:
raise ConfigError(
path="",
reason="source_unavailable",
details=f"ZooKeeper get failed: {err}",
source_id=self.id,
) from err
return ConfigTree.from_dict(raw)

Правила:

  • Ошибку нормализуй в ConfigError с reason: source_unavailable, чтобы загрузчик мог отличить её от parse_error / validation_failed.
  • Не выполняй здесь env-интерполяцию — это ответственность загрузчика (вызывается после load()). Ты только возвращаешь raw-дерево.
  • Если источник пустой (ключ не найден, но это валидный кейс) — возвращай пустой ConfigTree, не null.

Шаг 3. Флаг interpolate

  • interpolate = true — загрузчик обойдёт все строковые листья в ConfigTree и применит ${VAR} / ${VAR:-default}. Подходит для файловых источников с человекочитаемыми значениями.
  • interpolate = false — загрузчик вернёт дерево как есть. Используй для источников, где ${…} — буквальная строка (например, Vault уже подставил плейсхолдеры, или etcd хранит pre-rendered дерево).

Для 95% кастомных адаптеров — true. Сомневаешься — оставь true: неиспользуемый env не даст ошибку, отсутствие интерполяции наоборот поломает конфиг.

Шаг 4. watch(callback) — опционально

Нужен только если источник умеет push-уведомлять об изменениях (ZooKeeper watchers, etcd streaming API, Kubernetes informers). Если умеет — реализуй:

def watch(self, callback) -> Subscription:
# callback будет вызван загрузчиком при каждом изменении дерева.
handler = self._client.subscribe(
self._prefix,
on_change=lambda: callback(self.load()),
)
return Subscription(
unsubscribe=handler.cancel,
active=True,
path=self._prefix,
)

Требования к watch:

  • Callback вызывается без ожидания ответа. Не жди завершения его работы перед следующим уведомлением.
  • Если watch() не реализован — не определяй метод вовсе (или возвращай Subscription{active: false}). Загрузчик корректно обработает оба варианта.
  • Ошибки наблюдателя (разрыв соединения, timeout) — логируй через diagnostic channel реализации (logging.warning в Python, structured log в Go), но не поднимай исключение — watch не должен убивать приложение.
  • Не вызывай callback синхронно из watch() — это invariant: подписка доставляет только последующие изменения, не текущее состояние.

Шаг 5. close() — освобождение ресурсов

Реализуй, если источник держит открытые соединения / файловые дескрипторы / подписки:

def close(self) -> None:
self._client.close()

Загрузчик вызывает close() на всех источниках при config.close() и при программном выходе. Метод должен быть идемпотентным (повторный вызов — no-op).

Шаг 6. Регистрация в загрузчике

from dagstack.config import Config, YamlFileSource

config = Config.loadFrom([
YamlFileSource("app-config.yaml"), # 1. base defaults
ZookeeperSource("zk.example.com:2181", "/dagstack/prod"), # 2. runtime overrides
])

Порядок в списке = порядок приоритета. Последний источник переопределяет предыдущие (deep-merge для maps, атомарная замена для массивов).

Шаг 7. Тесты

Минимальный smoke-test:

def test_zookeeper_source_load():
fake_zk = FakeZkClient({"/dagstack/prod/llm/base_url": "http://fake"})
source = ZookeeperSource.from_client(fake_zk, prefix="/dagstack/prod")

tree = source.load()
assert tree.get("llm.base_url") == "http://fake"
assert source.id == "zookeeper://fake/dagstack/prod"

def test_zookeeper_source_missing_prefix():
fake_zk = FakeZkClient({})
source = ZookeeperSource.from_client(fake_zk, prefix="/not-there")

tree = source.load()
assert tree.is_empty() # пустой source — валидное состояние

def test_zookeeper_source_connection_error():
source = ZookeeperSource("unreachable.example:2181", "/x")
with pytest.raises(ConfigError) as exc:
source.load()
assert exc.value.reason == "source_unavailable"
assert exc.value.source_id == "zookeeper://unreachable.example:2181/x"

Полный conformance — прогон фикстур из dagstack/config-spec/conformance/ через test-runner твоего кастомного источника. Добавь тег custom в manifest.yaml и хуки для загрузки тестовых деревьев в бэкенд (например, предзагрузка ZooKeeper до прогона теста).

Частые ошибки

  • Возврат null вместо пустого ConfigTree. Загрузчик не ожидает null и упадёт с NullPointerException / panic. Пустой source → пустое дерево.
  • Env-интерполяция внутри load(). Это делает загрузчик. Если ты это делаешь — двойная интерполяция ломает экранирование $$ и вложенные значения.
  • Потеря source_id в ошибках. Без source_id оператор не различит, какой из пяти источников в loadFrom упал. Обязательно заполняй при поднятии ConfigError.
  • Watch-callback вызван синхронно в watch(). Подписка — про последующие изменения, не про текущее. Если нужен текущий снимок — проси его через config.getSection() сразу после регистрации подписки.
  • close() не идемпотентен. Загрузчик может вызвать close дважды (при ошибке + при shutdown). Second call должен быть no-op.
  • Игнор ошибок наблюдателя. Приложение тихо теряет hot-reload. Логируй предупреждение через diagnostic channel, чтобы оператор увидел.

Когда не писать свой

  • Нужен файл на диске — бери YamlFileSource / JsonFileSource.
  • Нужна программная сборка дерева (тесты) — InMemorySource (DictSource в Go).
  • Нужен etcd / Consul / Vault / HTTP / Kubernetes — дождись Phase 2-адаптера (roadmap в концепте sources).
  • Нужна подмена значений в тестах — используй Config.loadFrom([YamlFileSource, InMemorySource]) с in-memory источником в конце списка (он перекрывает значения предыдущих слоёв).

Свой источник оправдан только для нестандартных бэкендов (ZooKeeper, собственный KV-store, специфичный legacy-формат).

См. также