Своя реализация ConfigSource
В Phase 1 спецификации готовы три источника — YamlFileSource, JsonFileSource, InMemorySource (DictSource в Go). Phase 2+ привезёт официальные адаптеры для etcd, Consul, Vault, HTTP, SQL, Kubernetes ConfigMap. Свой ConfigSource имеет смысл писать только если стандартные адаптеры не покрывают инфраструктурные ограничения — например, нужен бэкенд на ZooKeeper, файловый формат не-YAML, или промежуточный слой для тестов.
Прежде чем писать, проверь: (а) решает ли проблему InMemorySource (DictSource в Go) с программным деревом, (б) можно ли дождаться Phase 2-адаптера, (в) не достаточно ли YamlFileSource с нестандартным путём. Кастомный источник — это ещё один артефакт, который надо поддерживать.
Контракт интерфейса
Любой ConfigSource реализует пять полей:
ConfigSource {
id: string # уникальный идентификатор для диагностики
# e.g., "yaml:app-config.yaml", "zookeeper://zk.example.com/my-app"
load(): ConfigTree # возвращает дерево значений; реализация выбирает sync/async
interpolate: boolean # подсказка загрузчику: применять ${VAR} к строковым листьям
watch?(callback): Subscription # опционально — push-based обновление (Phase 2+)
close?(): void # опционально — освобождение ресурсов
}
Из пяти — три обязательных (id, load, interpolate) и два опциональных (watch, close). Если реализация не поддерживает watch, подписки через config.onSectionChange() получают active = false, inactive_reason = "no watch-capable source registered" (прямой пример из ADR-0001 §7.2) — это не ошибка, а диагностический сигнал.
Шаг 1. Именование id
Рекомендуемая конвенция — URI-форма:
| Пример | Компоненты |
|---|---|
yaml:app-config.yaml | schema yaml: + путь |
etcd://prod/dagstack/my-app | schema etcd: + cluster + prefix |
consul://consul.example.com/kv/dagstack | schema consul: + host + путь |
vault://secret/dagstack/production | schema vault: + mount + path |
zookeeper://zk.example.com/my-app | schema zookeeper: + host + znode |
id попадает в логи, сообщения об ошибках (ConfigError.source_id), diagnostic output config.dump(). Делай его человекочитаемым, но компактным — 60-80 символов хватает.
Шаг 2. Реализация load()
load() возвращает ConfigTree — вложенную структуру из maps / sequences / скаляров (string / int / float / bool / null). Это язык-нейтральное представление; типизация делается потом на уровне Config.get*() / getSection().
- Python
- TypeScript
- Go
from dagstack.config import ConfigSource, ConfigTree, Subscription, ConfigError
class ZookeeperSource(ConfigSource):
interpolate = True # применять ${VAR} к значениям, если они строки
def __init__(self, host: str, prefix: str):
self._host = host
self._prefix = prefix
self._client = zk_connect(host)
self.id = f"zookeeper://{host}/{prefix.lstrip('/')}"
def load(self) -> ConfigTree:
try:
raw = self._client.get_recursive(self._prefix)
except ZkError as err:
raise ConfigError(
path="",
reason="source_unavailable",
details=f"ZooKeeper get failed: {err}",
source_id=self.id,
) from err
return ConfigTree.from_dict(raw)
import type { ConfigSource, ConfigTree, Subscription } from "@dagstack/config";
import { ConfigError } from "@dagstack/config";
export class ZookeeperSource implements ConfigSource {
readonly id: string;
readonly interpolate = true;
constructor(private host: string, private prefix: string) {
this.id = `zookeeper://${host}/${prefix.replace(/^\//, "")}`;
}
async load(): Promise<ConfigTree> {
try {
const raw = await this.client.getRecursive(this.prefix);
return ConfigTree.fromDict(raw);
} catch (err) {
throw new ConfigError({
path: "",
reason: "source_unavailable",
details: `ZooKeeper get failed: ${(err as Error).message}`,
source_id: this.id,
});
}
}
}
type ZookeeperSource struct {
host string
prefix string
id string
client *zk.Client
}
func NewZookeeperSource(host, prefix string) *ZookeeperSource {
return &ZookeeperSource{
host: host,
prefix: prefix,
id: fmt.Sprintf("zookeeper://%s/%s", host, strings.TrimPrefix(prefix, "/")),
client: zk.Connect(host),
}
}
func (s *ZookeeperSource) ID() string { return s.id }
func (s *ZookeeperSource) Interpolate() bool { return true }
func (s *ZookeeperSource) Load(ctx context.Context) (config.Tree, error) {
raw, err := s.client.GetRecursive(ctx, s.prefix)
if err != nil {
return nil, &config.Error{
Path: "",
Reason: "source_unavailable",
Details: fmt.Sprintf("ZooKeeper get failed: %v", err),
SourceID: s.id,
}
}
return config.TreeFromDict(raw), nil
}
Правила:
- Ошибку нормализуй в
ConfigErrorсreason: source_unavailable, чтобы загрузчик мог отличить её отparse_error/validation_failed. - Не выполняй здесь env-интерполяцию — это ответственность загрузчика (вызывается после
load()). Ты только возвращаешь raw-дерево. - Если источник пустой (ключ не найден, но это валидный кейс) — возвращай пустой
ConfigTree, не null.
Шаг 3. Флаг interpolate
interpolate = true— загрузчик обойдёт все строковые листья вConfigTreeи применит${VAR}/${VAR:-default}. Подходит для файловых источников с человекочитаемыми значениями.interpolate = false— загрузчик вернёт дерево как есть. Используй для источников, где${…}— буквальная строка (например, Vault уже подставил плейсхолдеры, или etcd хранит pre-rendered дерево).
Для 95% кастомных адаптеров — true. Сомневаешься — оставь true: неиспользуемый env не даст ошибку, отсутствие интерполяции наоборот поломает конфиг.
Шаг 4. watch(callback) — опционально
Нужен только если источник умеет push-уведомлять об изменениях (ZooKeeper watchers, etcd streaming API, Kubernetes informers). Если умеет — реализуй:
- Python
- TypeScript
- Go
def watch(self, callback) -> Subscription:
# callback будет вызван загрузчиком при каждом изменении дерева.
handler = self._client.subscribe(
self._prefix,
on_change=lambda: callback(self.load()),
)
return Subscription(
unsubscribe=handler.cancel,
active=True,
path=self._prefix,
)
watch(callback: (tree: ConfigTree) => void): Subscription {
const handler = this.client.subscribe(this.prefix, async () => {
callback(await this.load());
});
return {
unsubscribe: () => handler.cancel(),
active: true,
path: this.prefix,
};
}
func (s *ZookeeperSource) Watch(ctx context.Context, cb func(config.Tree)) (config.Subscription, error) {
handler, err := s.client.Subscribe(ctx, s.prefix, func() {
tree, _ := s.Load(ctx)
cb(tree)
})
if err != nil {
return nil, err
}
return config.Subscription{
Unsubscribe: handler.Cancel,
Active: true,
Path: s.prefix,
}, nil
}
Требования к watch:
- Callback вызывается без ожидания ответа. Не жди завершения его работы перед следующим уведомлением.
- Если
watch()не реализован — не определяй метод вовсе (или возвращайSubscription{active: false}). Загрузчик корректно обработает оба варианта. - Ошибки наблюдателя (разрыв соединения, timeout) — логируй через diagnostic channel реализации (
logging.warningв Python, structured log в Go), но не поднимай исключение — watch не должен убивать приложение. - Не вызывай callback синхронно из
watch()— это invariant: подписка доставляет только последующие изменения, не текущее состояние.
Шаг 5. close() — освобождение ресурсов
Реализуй, если источник держит открытые соединения / файловые дескрипторы / подписки:
- Python
- TypeScript
- Go
def close(self) -> None:
self._client.close()
close(): void {
this.client.disconnect();
}
func (s *ZookeeperSource) Close() error {
return s.client.Close()
}
Загрузчик вызывает close() на всех источниках при config.close() и при программном выходе. Метод должен быть идемпотентным (повторный вызов — no-op).
Шаг 6. Регистрация в загрузчике
- Python
- TypeScript
- Go
from dagstack.config import Config, YamlFileSource
config = Config.loadFrom([
YamlFileSource("app-config.yaml"), # 1. base defaults
ZookeeperSource("zk.example.com:2181", "/dagstack/prod"), # 2. runtime overrides
])
import { Config, YamlFileSource } from "@dagstack/config";
const config = await Config.loadFrom([
new YamlFileSource("app-config.yaml"),
new ZookeeperSource("zk.example.com:2181", "/dagstack/prod"),
]);
cfg, err := config.LoadFrom(ctx, []config.Source{
config.NewYamlFileSource("app-config.yaml"),
NewZookeeperSource("zk.example.com:2181", "/dagstack/prod"),
})
Порядок в списке = порядок приоритета. Последний источник переопределяет предыдущие (deep-merge для maps, атомарная замена для массивов).
Шаг 7. Тесты
Минимальный smoke-test:
- Python
- TypeScript
- Go
def test_zookeeper_source_load():
fake_zk = FakeZkClient({"/dagstack/prod/llm/base_url": "http://fake"})
source = ZookeeperSource.from_client(fake_zk, prefix="/dagstack/prod")
tree = source.load()
assert tree.get("llm.base_url") == "http://fake"
assert source.id == "zookeeper://fake/dagstack/prod"
def test_zookeeper_source_missing_prefix():
fake_zk = FakeZkClient({})
source = ZookeeperSource.from_client(fake_zk, prefix="/not-there")
tree = source.load()
assert tree.is_empty() # пустой source — валидное состояние
def test_zookeeper_source_connection_error():
source = ZookeeperSource("unreachable.example:2181", "/x")
with pytest.raises(ConfigError) as exc:
source.load()
assert exc.value.reason == "source_unavailable"
assert exc.value.source_id == "zookeeper://unreachable.example:2181/x"
import { ConfigError } from "@dagstack/config";
test("ZookeeperSource load", async () => {
const fakeZk = new FakeZkClient({ "/dagstack/prod/llm/base_url": "http://fake" });
const source = ZookeeperSource.fromClient(fakeZk, "/dagstack/prod");
const tree = await source.load();
expect(tree.get("llm.base_url")).toBe("http://fake");
expect(source.id).toBe("zookeeper://fake/dagstack/prod");
});
test("ZookeeperSource connection error", async () => {
const source = new ZookeeperSource("unreachable.example:2181", "/x");
await expect(source.load()).rejects.toMatchObject({
reason: "source_unavailable",
source_id: "zookeeper://unreachable.example:2181/x",
});
});
func TestZookeeperSourceLoad(t *testing.T) {
fakeZk := NewFakeZkClient(map[string]string{
"/dagstack/prod/llm/base_url": "http://fake",
})
source := ZookeeperSourceFromClient(fakeZk, "/dagstack/prod")
tree, err := source.Load(ctx)
require.NoError(t, err)
require.Equal(t, "http://fake", tree.Get("llm.base_url"))
}
Полный conformance — прогон фикстур из dagstack/config-spec/conformance/ через test-runner твоего кастомного источника. Добавь тег custom в manifest.yaml и хуки для загрузки тестовых деревьев в бэкенд (например, предзагрузка ZooKeeper до прогона теста).
Частые ошибки
- Возврат null вместо пустого
ConfigTree. Загрузчик не ожидает null и упадёт с NullPointerException / panic. Пустой source → пустое дерево. - Env-интерполяция внутри
load(). Это делает загрузчик. Если ты это делаешь — двойная интерполяция ломает экранирование$$и вложенные значения. - Потеря
source_idв ошибках. Безsource_idоператор не различит, какой из пяти источников вloadFromупал. Обязательно заполняй при поднятииConfigError. - Watch-callback вызван синхронно в
watch(). Подписка — про последующие изменения, не про текущее. Если нужен текущий снимок — проси его черезconfig.getSection()сразу после регистрации подписки. close()не идемпотентен. Загрузчик может вызватьcloseдважды (при ошибке + при shutdown). Second call должен быть no-op.- Игнор ошибок наблюдателя. Приложение тихо теряет hot-reload. Логируй предупреждение через diagnostic channel, чтобы оператор увидел.
Когда не писать свой
- Нужен файл на диске — бери
YamlFileSource/JsonFileSource. - Нужна программная сборка дерева (тесты) —
InMemorySource(DictSourceв Go). - Нужен etcd / Consul / Vault / HTTP / Kubernetes — дождись Phase 2-адаптера (roadmap в концепте sources).
- Нужна подмена значений в тестах — используй
Config.loadFrom([YamlFileSource, InMemorySource])с in-memory источником в конце списка (он перекрывает значения предыдущих слоёв).
Свой источник оправдан только для нестандартных бэкендов (ZooKeeper, собственный KV-store, специфичный legacy-формат).
См. также
- Источники (ConfigSource) — концептуальный обзор.
- Hot-reload (watch) — как
watch()интегрируется с подписками приложения. - Слои конфигурации — как загрузчик объединяет несколько источников.
- ADR-0001 §8 — нормативный контракт.