Razones para integrar Zettana
Para muchas empresas, el verdadero cuello de botella no está en procesar los comprobantes, sino en obtenerlos de forma confiable y constante. Ese problema se vuelve todavía más evidente cuando se intenta integrar esa información con un ERP, un CRM financiero, una plataforma de cuentas por pagar o un software contable propio.
Depender de descargas manuales desde el SRI consume demasiado tiempo y además es un proceso propenso a errores. A pequeña escala ya genera fricción; cuando hay cientos o miles de documentos involucrados, se vuelve claramente inviable como operación diaria.
Zettana resuelve este problema. En lugar de pedirle al usuario que entre al SRI, busque documentos y descargue archivos manualmente, tu software puede consumirlos por API e incorporarlos directamente a sus propios flujos.
Qué capacidades habilita
Con Zettana, tu sistema puede:
- sincronizar documentos sin intervención manual
- tener todos tus documentos en un solo lugar
- manejar una cantidad ilimitada de RUCs
- consumir XMLs para procesos estructurados
- descargar archivos RIDE/PDF cuando el archivo visual también es necesario
- automatizar flujos contables, conciliaciones y archivo digital
Ventajas para software contable
Integrar Zettana en un software contable, ERP o plataforma financiera trae ventajas claras:
- reduce la dependencia de tareas manuales
- elimina procesos repetitivos de descarga local y reenvío de archivos
- disminuye errores operativos en la obtención de documentos
- acelera la ingestión de documentos hacia tu propio modelo de datos
- te permite concentrarte en tu lógica de negocio, no en la descarga de archivos desde el SRI
Para proveedores de software contable, esto no es solo una mejora técnica, también mejora de forma directa la experiencia del cliente final: una vez completada la integración, los comprobantes se cargan automáticamente en el software donde el usuario ya trabaja, eliminando la necesidad de entrar al SRI, descargar archivos manualmente y moverlos entre sistemas, lo que reduce pasos y ahorra tiempo.
Walkthrough técnico
1. Crear una cuenta en Zettana
El primer paso es crear una cuenta en Zettana. Desde ahí, el usuario puede acceder al área de producto, gestionar sus credenciales y comenzar la integración.
Puedes registrarte en Sign up. ¡Es gratuito!
2. Generar una API key
Una vez creada la cuenta,puedes generar una API
key dentro de Zettana. Tu sistema debe guardarla de forma segura y enviarla en
cada request mediante x-api-key. Para generar tu API key dirigete al Panel de desarrolador
3. Registrar los RUCs
Cada RUC se registra una sola vez. Zettana valida las credenciales contra el SRI y, cuando la validación termina correctamente, ese RUC queda listo para otras operaciones.
Este paso puede hacerse de dos formas:
- Desde la interfaz de usuario, dentro del dashboard, cuando se quiere operar manualmente desde Zettana.
- De forma programática vía API, cuando el objetivo es integrarlo dentro del flujo de tu propio software.
Documentación sobre uso de rucs por medio de la API la encuestras en nuestra documentación.
4. Configurar sincronizaciones recurrentes
Una vez que el RUC está listo, puedes configurar sincronizaciones recurrentes para este ruc. Con esto, Zettana se encargue de mantener la información actualizada de manera automatica.
Este paso puede hacerse de dos formas:
- Desde la interfaz de usuario: navega al dashboard y dirigete a la seccion de documentos del RUC para el que deseas activar sincronizaciones recurrentes.
- De forma programática vía API, cuando el objetivo es que tu propio software cree y administre esas configuraciones.
La documentación sobre uso de sincronizaciones por medio de la API la encuentras en nuestra documentación.
5. Consultar documentos de forma incremental
Para detectar nuevos documentos de manera continua, tu sistema puede consultar
GET /v1/documents/incremental. Este endpoint devuelve los documentos ordenados
por retrievedAt, lo que permite recuperar únicamente lo que Zettana obtuvo
desde la última vez que tu integración revisó novedades.
La primera consulta debe hacerse con since, usando una fecha en formato ISO
8601. Después, guarda el nextCursor de la respuesta y úsalo como cursor en
las siguientes consultas para continuar el recorrido incremental sin volver a
procesar lo ya visto.
Puedes además aplicar filtros como documentFormat, documentType o rucId,
según lo que necesite tu integración. No envíes since y cursor al mismo
tiempo.
La documentación sobre consulta incremental de documentos la encuentras en nuestra documentación.
6. Obtener el contenido que necesita tu producto
Después de identificar los documentos que te interesan, el siguiente paso es recuperar su contenido según el formato que tu integración necesite.
- Para trabajar con información estructurada, usa
POST /documents/xml-contentsy envía losdocumentIdsde los documentos XML que quieras obtener. - Para descargar la representación visual (archivo RIDE / PDF) de un comprobante, usa
GET /v1/documents/:documentId/signedLink. La API devuelve un link temporal para acceder a ese documento específico, por lo que conviene descargarlo poco después de solicitarlo.
La documentación sobre obtención de contenidos XML y descarga de PDFs la encuentras en nuestra documentación.
Demo: script para actualizaciones diarias
Recomendamos haber leido las páginas de documentación para poder entender este script de mejor manera.
Este script asume que ya se tienen configuradas programaciones recurrentes en Zettana que mantenga la información actualizada
Un patrón práctico es correr un job diario que:
- consulta los documentos más recientes
- toma los XML y los manda a tu capa de procesamiento interno
- toma los PDFs y los descarga para archivarlos
- guarda estado local para que la siguiente ejecución continúe desde donde quedó la anterior
El siguiente ejemplo usa Python. Para el demo, el estado se persiste en un archivo JSON local. En producción, normalmente guardarías ese estado en tu base de datos o en la tabla de integraciones de tu ERP o CRM.
La idea es simple: Zettana se encarga de generar las sincronizaciones recurrentes y de traer los comprobantes desde el SRI; tu script solo consume de forma incremental lo nuevo que ya fue recuperado.
import json
import os
from pathlib import Path
from typing import Iterable
import requests
BASE_URL = "https://backend.zettana.com/v1"
API_KEY = os.environ["ZETTANA_API_KEY"]
INITIAL_SINCE = os.environ.get(
"ZETTANA_INITIAL_SINCE",
"2026-04-01T00:00:00.000Z",
)
# Demo simple: este archivo guarda el estado de avance de la integración.
# En producción, normalmente lo guardarías en tu base de datos
# o en la tabla de estado de integraciones de tu sistema.
STATE_FILE = Path("zettana_sync_state.json")
PDF_ROOT = Path("zettana_pdfs")
PAGE_SIZE = 100
XML_BATCH_SIZE = 100 # El endpoint /v1/documents/xml-contents acepta hasta 100 IDs.
session = requests.Session()
session.headers.update({"x-api-key": API_KEY})
def load_state() -> dict:
"""Carga el estado persistido de la integración."""
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
return {
"since": INITIAL_SINCE,
"processed_ids_at_latest_retrieved_at": [],
}
def save_state(state: dict) -> None:
"""Guarda el punto de reanudación para la siguiente vez que desees obtener documentos."""
STATE_FILE.write_text(
json.dumps(state, indent=2, ensure_ascii=False),
encoding="utf-8",
)
def chunked(items: list[str], size: int) -> Iterable[list[str]]:
"""Divide una lista en lotes del tamaño indicado."""
for index in range(0, len(items), size):
yield items[index : index + size]
def fetch_incremental_page(*, since: str | None, cursor: str | None) -> dict:
"""Obtiene los documentos de forma incremental. Usada para leer los documentos mas nuevos encontrados por Zettana."""
params = {"limit": PAGE_SIZE}
if cursor:
params["cursor"] = cursor
else:
params["since"] = since
response = session.get(
f"{BASE_URL}/documents/incremental",
params=params,
timeout=60,
)
response.raise_for_status()
return response.json()
def fetch_xml_contents(document_ids: list[str]) -> dict[str, str]:
"""Recupera el contenido XML para un lote de documentos."""
response = session.post(
f"{BASE_URL}/documents/xml-contents",
json={"documentIds": document_ids},
timeout=120,
)
response.raise_for_status()
payload = response.json()
return {
item["id"]: item["xmlContent"]
for item in payload["documents"]
}
def process_xml_document_in_your_system(
document: dict,
xml_content: str,
) -> None:
"""
Punto de integración para tu sistema.
Reemplaza esta función por la lógica real de tu producto. Por ejemplo:
- parsear el XML
- crear o actualizar un registro contable
- asociarlo a un cliente, proveedor o transacción
- etc.
"""
print(
f"[XML] Procesado {document['id']} "
f"({document['documentType']} - {document['numeroDocumento']})"
)
def process_pdf_document_in_your_system(document: dict) -> Path:
"""
Punto de integración para PDFs.
En este demo, la implementación solamente descarga el archivo localmente.
En tu sistema, podrías cambiarlo por cualquier otra acción. Por ejemplo:
- subirlo a Sharepoint
- archivarlo en un file server
- adjuntarlo a un expediente documental
- enviarlo a otro servicio interno
- enviarlo al departamente contable como archivo adjunto de un email
- etc.
"""
signed_link_response = session.get(
f"{BASE_URL}/documents/{document['id']}/signedLink",
timeout=30,
)
signed_link_response.raise_for_status()
signed_link = signed_link_response.json()["signedLink"]
# Esta estructura de carpetas es solo un ejemplo. Puedes cambiarla por
# cualquier destino que tenga sentido para tu sistema.
target_dir = (
PDF_ROOT
/ document["rucId"]
/ str(document["year"])
/ f"{document['month']:02d}"
/ document["documentType"]
)
target_dir.mkdir(parents=True, exist_ok=True)
file_name = f"{document['id']}.pdf"
destination = target_dir / file_name
with requests.get(signed_link, stream=True, timeout=120) as response:
response.raise_for_status()
with destination.open("wb") as output_file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
output_file.write(chunk)
print(f"[PDF] Descargado {destination}")
return destination
def process_xml_documents(xml_documents: list[dict]) -> None:
"""
Agrupa IDs en lotes, recupera el contenido XML desde Zettana y delega
el tratamiento de cada documento a la lógica específica de tu sistema.
"""
if not xml_documents:
return
documents_by_id = {document["id"]: document for document in xml_documents}
for batch in chunked(list(documents_by_id.keys()), XML_BATCH_SIZE):
xml_contents = fetch_xml_contents(batch)
for document_id in batch:
xml_content = xml_contents[document_id]
document = documents_by_id[document_id]
process_xml_document_in_your_system(document, xml_content)
def process_pdf_documents(pdf_documents: list[dict]) -> None:
"""
Recorre cada documento, obtiene un signed link temporal y delega la acción
final a la lógica que tu sistema.
"""
for document in pdf_documents:
process_pdf_document_in_your_system(document)
def run_daily_sync() -> None:
"""
Ejecuta un ciclo completo.
Este flujo asume que ya existe una sincronización recurrente en Zettana
manteniendo los datos al día. El script arranca desde `since`, procesa
todas las páginas disponibles y luego persiste el nuevo punto de
reanudación.
"""
state = load_state()
since = state["since"]
processed_ids_at_latest_retrieved_at = set(
state.get("processed_ids_at_latest_retrieved_at", [])
)
cursor = None
latest_retrieved_at = since
latest_timestamp_ids = set(processed_ids_at_latest_retrieved_at)
while True:
page = fetch_incremental_page(since=since, cursor=cursor)
documents = page["documents"]
fresh_documents = []
for document in documents:
if (
document["retrievedAt"] == since
and document["id"] in processed_ids_at_latest_retrieved_at
):
continue
fresh_documents.append(document)
xml_documents = [
document
for document in fresh_documents
if document["documentFormat"] == "XML"
]
pdf_documents = [
document
for document in fresh_documents
if document["documentFormat"] == "PDF"
]
process_xml_documents(xml_documents)
process_pdf_documents(pdf_documents)
for document in fresh_documents:
document_retrieved_at = document["retrievedAt"]
# El endpoint incremental ya entrega los documentos ordenados por
# retrievedAt e id. Por eso, cuando vemos un retrievedAt mayor,
# sabemos que entramos en un nuevo borde temporal más reciente.
if document_retrieved_at > latest_retrieved_at:
latest_retrieved_at = document_retrieved_at
latest_timestamp_ids = set()
# Solo guardamos IDs del retrievedAt más reciente visto en esta ejecución.
# Eso permite reanudar con el mismo `since` en la próxima
# ejecución y saltarnos únicamente los documentos ya procesados en
# ese borde temporal.
if document_retrieved_at == latest_retrieved_at:
latest_timestamp_ids.add(document["id"])
if not page["hasMore"]:
break
cursor = page["nextCursor"]
state["since"] = latest_retrieved_at
state["processed_ids_at_latest_retrieved_at"] = sorted(latest_timestamp_ids)
save_state(state)
print(
"Sync completado. "
f"Nuevo since: {state['since']}. "
"IDs procesados en el retrievedAt más reciente: "
f"{len(state['processed_ids_at_latest_retrieved_at'])}"
)
if __name__ == "__main__":
run_daily_sync()Qué hace este script exactamente
Ese script implementa un flujo realista:
- Parte de la base de que Zettana ya está ejecutando sincronizaciones recurrentes para mantener los documentos actualizados.
- Lee un
sinceguardado localmente. - Hace polling a
GET /v1/documents/incremental. - Separa la página en documentos XML y PDF.
- Para XML, llama a
POST /v1/documents/xml-contentsen lotes. - Para PDFs, pide un
signedLinkpor documento y descarga el archivo. - Actualiza el estado local para que la siguiente ejecución continúe desde el último timestamp procesado.
Dónde conectarlo con tu ERP o CRM
En el ejemplo, el punto de integración para XMLs está enprocess_xml_document_in_your_system(...).
El punto de integración para PDFs está en process_pdf_document_in_your_system(...).
Ahí es donde viviría tu lógica de negocio.
Cómo correrlo cada día
Una forma simple es ejecutarlo desde cron, un scheduler interno o un worker periódico.
0 6 * * * /usr/bin/python3 /opt/miapp/jobs/zettana_daily_sync.pyEn una arquitectura más robusta, ese mismo flujo puede vivir dentro de:
- un worker de Celery o RQ
- un job periódico en Kubernetes
- un scheduler propio dentro de tu plataforma
- etc