Basic CRUD
Open a store, write and read key-value pairs, check existence, and delete entries.
#define SNKV_IMPLEMENTATION #include "snkv.h" KVStore *pKV = NULL; kvstore_open("hello.db", &pKV, KVSTORE_JOURNAL_WAL); kvstore_put(pKV, "greeting", 8, "Hello, World!", 13); void *pValue = NULL; int nValue = 0; kvstore_get(pKV, "greeting", 8, &pValue, &nValue); printf("Retrieved: %.*s\n", nValue, (char*)pValue); snkv_free(pValue); kvstore_close(pKV);
/* Create */ kvstore_put(pKV, "user:1", 6, "Alice Smith", 11); /* Update */ kvstore_put(pKV, "user:1", 6, "Alice Johnson", 13); /* Delete */ kvstore_delete(pKV, "user:1", 6); /* Existence check */ int exists = 0; kvstore_exists(pKV, "item:laptop", 11, &exists); printf("%s\n", exists ? "EXISTS" : "NOT FOUND");
Transactions
Batch multiple writes into a single atomic commit, or roll back on error.
int rc = kvstore_begin(pKV, 1); /* 1 = write */ kvstore_put(pKV, "account:alice", 13, "800", 3); kvstore_put(pKV, "account:bob", 11, "700", 3); if (rc == KVSTORE_OK) { kvstore_commit(pKV); printf("Transfer committed\n"); } else { kvstore_rollback(pKV); /* both writes discarded */ }
Column Families
Logical namespaces within a single file. Each CF is a separate B-tree — keys never bleed across CFs.
KVColumnFamily *pUsersCF = NULL, *pProductsCF = NULL; kvstore_cf_create(pKV, "users", &pUsersCF); kvstore_cf_create(pKV, "products", &pProductsCF); kvstore_cf_put(pUsersCF, "user:1", 6, "alice@example.com", 17); kvstore_cf_put(pProductsCF, "prod:100",8, "Laptop:$999", 11); /* List all CFs */ char **azNames; int nCount; kvstore_cf_list(pKV, &azNames, &nCount); for (int i = 0; i < nCount; i++) { printf(" CF: %s\n", azNames[i]); snkv_free(azNames[i]); } snkv_free(azNames); kvstore_cf_close(pUsersCF); kvstore_cf_close(pProductsCF);
Iterators
Walk all keys in ascending order or scan only keys matching a prefix using the B-tree directly.
KVIterator *pIter = NULL; kvstore_iterator_create(pKV, &pIter); for (kvstore_iterator_first(pIter); !kvstore_iterator_eof(pIter); kvstore_iterator_next(pIter)) { void *pKey, *pValue; int nKey, nValue; kvstore_iterator_key(pIter, &pKey, &nKey); kvstore_iterator_value(pIter, &pValue, &nValue); printf("%.*s -> %.*s\n", nKey, (char*)pKey, nValue, (char*)pValue); } kvstore_iterator_close(pIter);
KVIterator *pIter = NULL; kvstore_prefix_iterator_create(pKV, "user:", 5, &pIter); for (kvstore_iterator_first(pIter); !kvstore_iterator_eof(pIter); kvstore_iterator_next(pIter)) { void *pKey; int nKey; kvstore_iterator_key(pIter, &pKey, &nKey); printf(" %.*s\n", nKey, (char*)pKey); } kvstore_iterator_close(pIter);
Reverse Iterators
Walk keys in descending order — pure B-tree traversal, no sort, no full scan.
/* Reverse full scan — largest key first */ KVIterator *pIter = NULL; kvstore_reverse_iterator_create(pKV, &pIter); for (kvstore_iterator_last(pIter); !kvstore_iterator_eof(pIter); kvstore_iterator_prev(pIter)) { void *pKey; int nKey; kvstore_iterator_key(pIter, &pKey, &nKey); printf("%.*s\n", nKey, (char*)pKey); } kvstore_iterator_close(pIter); /* Reverse prefix scan — largest matching key first */ kvstore_reverse_prefix_iterator_create(pKV, "score:", 6, &pIter); for (kvstore_iterator_last(pIter); !kvstore_iterator_eof(pIter); kvstore_iterator_prev(pIter)) { /* ... */ } kvstore_iterator_close(pIter);
TTL — Time-To-Live
Per-key expiry stored in a dedicated index. Expired keys are lazily deleted on access; bulk cleanup via kvstore_purge_expired.
/* Put with TTL — expire_ms is an absolute Unix timestamp in ms */ int64_t expire_ms = kvstore_now_ms() + 5000; /* 5 seconds */ kvstore_put_ttl(pKV, "session", 7, "tok123", 6, expire_ms); /* Inspect remaining lifetime */ int64_t remaining = 0; kvstore_ttl_remaining(pKV, "session", 7, &remaining); printf("TTL: %lld ms remaining\n", (long long)remaining); /* Lazy expiry — expired key returns KVSTORE_NOTFOUND */ void *pVal = NULL; int nVal = 0; int64_t rem = 0; int rc = kvstore_get_ttl(pKV, "session", 7, &pVal, &nVal, &rem); /* Bulk purge all expired keys */ int n = 0; kvstore_purge_expired(pKV, &n); printf("Removed %d expired keys\n", n);
KVColumnFamily *pCF = NULL; kvstore_cf_create(pKV, "rate_limits", &pCF); int64_t expire_ms = kvstore_now_ms() + 1000; /* 1 second */ kvstore_cf_put_ttl(pCF, "user:42", 7, "5", 1, expire_ms); int64_t rem = 0; kvstore_cf_ttl_remaining(pCF, "user:42", 7, &rem); printf("Remaining: %lld ms\n", (long long)rem); int n = 0; kvstore_cf_purge_expired(pCF, &n); kvstore_cf_close(pCF);
Seek · put_if_absent · clear · count · stats
O(log N) positional seek, atomic conditional insert, bulk truncation, page-level entry count, and extended diagnostics.
KVIterator *pIter = NULL; /* Forward seek — positions at first key >= target */ kvstore_iterator_create(pKV, &pIter); kvstore_iterator_seek(pIter, "cherry", 6); while (!kvstore_iterator_eof(pIter)) { void *pKey; int nKey; kvstore_iterator_key(pIter, &pKey, &nKey); printf("%.*s\n", nKey, (char*)pKey); kvstore_iterator_next(pIter); } kvstore_iterator_close(pIter); /* Reverse seek — positions at last key <= target */ kvstore_reverse_iterator_create(pKV, &pIter); kvstore_iterator_last(pIter); kvstore_iterator_seek(pIter, "bbc", 3); /* lands on "bbb" */ kvstore_iterator_close(pIter);
>int inserted = 0; /* Key absent → inserted=1 */ kvstore_put_if_absent(pKV, "lock", 4, "owner:alice", 11, 0, &inserted); printf("inserted=%d\n", inserted); /* 1 */ /* Key present → inserted=0, original value unchanged */ kvstore_put_if_absent(pKV, "lock", 4, "owner:bob", 9, 0, &inserted); printf("inserted=%d\n", inserted); /* 0 */ /* With TTL — auto-releasing distributed lock */ int64_t exp = kvstore_now_ms() + 30000; kvstore_put_if_absent(pKV, "job:1", 5, "worker-A", 8, exp, NULL);
>/* count — O(pages) via sqlite3BtreeCount */ int64_t n = 0; kvstore_count(pKV, &n); printf("entries: %lld\n", (long long)n); /* clear — O(pages) bulk truncation; TTL index cleared atomically */ kvstore_clear(pKV); kvstore_count(pKV, &n); printf("after clear: %lld\n", (long long)n); /* 0 */ /* Extended stats — 12 counters */ KVStoreStats st = {0}; kvstore_stats(pKV, &st); printf("puts=%llu gets=%llu bytes_written=%llu db_pages=%llu\n", st.nPuts, st.nGets, st.nBytesWritten, st.nDbPages); kvstore_stats_reset(pKV); /* zero cumulative counters */
Session Store
A practical in-process session store using a dedicated column family with TTL.
>KVColumnFamily *pSessions = NULL; kvstore_cf_create(pKV, "sessions", &pSessions); /* Create session — expires in 30 minutes */ int64_t exp = kvstore_now_ms() + 1800000; kvstore_cf_put_ttl(pSessions, "sess:abc123", 11, "user_id=42", 10, exp); /* Validate — KVSTORE_NOTFOUND means expired or never existed */ void *pVal = NULL; int nVal = 0; int64_t rem = 0; int rc = kvstore_cf_get_ttl(pSessions, "sess:abc123", 11, &pVal, &nVal, &rem); if (rc == KVSTORE_OK) { printf("Valid session: %.*s (%lld ms left)\n", nVal, (char*)pVal, (long long)rem); snkv_free(pVal); } else { printf("Session expired or not found\n"); } kvstore_cf_close(pSessions);
Checkpoint
Flush WAL frames back to the main database file to control file size.
>/* Manual checkpoint — TRUNCATE mode resets WAL to zero bytes */ int nLog = 0, nCkpt = 0; kvstore_checkpoint(pKV, KVSTORE_CHECKPOINT_TRUNCATE, &nLog, &nCkpt); printf("WAL frames: %d total, %d checkpointed\n", nLog, nCkpt); /* Auto-checkpoint: open with wal_size_limit (available via kvstore_open_ex / config struct) */
Basic CRUD
Open a store with a context manager, write and read with a dict-like API, handle errors, and use in-memory databases.
from snkv import KVStore, NotFoundError with KVStore("mydb.db") as db: # Write db["user:1"] = "Alice" db.put(b"user:2", b"Bob") # Read print(db["user:1"].decode()) # Alice print(db.get("user:2").decode()) # Bob print(db.get("missing", b"n/a")) # b'n/a' # Update db["user:1"] = "Alice Smith" # Delete del db["user:1"] # Existence print("user:2" in db) # True print(db.exists("user:1")) # False
># Arbitrary binary keys and values db.put(bytes([0x00, 0x01, 0xFF]), bytes(range(256))) # In-memory store — no file created, data gone after close with KVStore(None) as db: db["temp"] = "ephemeral" print(db["temp"]) # b'ephemeral'
Transactions
Batch writes into a single atomic commit. Without an explicit transaction each put auto-commits.
>with KVStore("bank.db") as db: # Atomic batch — all writes commit together db.begin(write=True) try: db["account:alice"] = "800" db["account:bob"] = "700" db.commit() except Exception: db.rollback() # both writes discarded raise # Large batch — 1000 writes in one transaction db.begin(write=True) for i in range(1000): db[f"item:{i:05d}"] = f"value_{i}" db.commit()
Column Families
Logical namespaces inside one file. Each CF is a separate B-tree — the same key in two CFs holds independent values.
>with KVStore("store.db") as db: # Create namespaces with db.create_column_family("users") as users: users["alice"] = "alice@example.com" users["bob"] = "bob@example.com" with db.create_column_family("products") as products: products["prod:100"] = "Laptop:$999" # Same key, different values per CF ns_a = db.create_column_family("ns_a") ns_b = db.create_column_family("ns_b") ns_a["key"] = "from_a" ns_b["key"] = "from_b" ns_a.close(); ns_b.close() # List and drop print(db.list_column_families()) # ['users', 'products', 'ns_a', 'ns_b'] db.drop_column_family("ns_a") db.drop_column_family("ns_b")
Iterators
Iterate all keys in B-tree order, scan a prefix, or drive the cursor manually.
>with KVStore("store.db") as db: # Full scan — for loop uses iterator() under the hood for key, value in db: print(key.decode(), "->", value.decode()) # Prefix scan — only keys starting with "user:" for key, value in db.prefix_iterator(b"user:"): print(key, value) # Manual control it = db.iterator() it.first() while not it.eof: print(it.key, it.value) it.next() it.close()
Reverse Iterators
Walk keys in descending order — pure B-tree traversal, no sort step.
>with KVStore("store.db") as db: # Reverse full scan — largest key first for key, value in db.reverse_iterator(): print(key, value) # Reverse prefix scan — only "score:" keys, descending for key, value in db.reverse_prefix_iterator(b"score:"): print(key, value) # Unified API — reverse=True on iterator() for key, value in db.iterator(reverse=True, prefix=b"user:"): print(key, value) # Seek in reverse — positions at last key <= target with db.iterator(reverse=True) as it: it.last() it.seek(b"user:m") # jump to last key <= "user:m" while not it.eof: print(it.key) it.prev()
TTL — Time-To-Live
Per-key expiry via ttl=seconds on put(). Expired keys are lazily deleted on read; bulk cleanup via purge_expired().
>from snkv import KVStore, NotFoundError, NO_TTL with KVStore("store.db") as db: # Put with TTL — expires in 60 seconds db.put(b"session", b"tok123", ttl=60) db[b"token", 30] = b"bearer-xyz" # dict-style shorthand # Inspect remaining TTL remaining = db.ttl(b"session") # e.g. 59.98 (float seconds) print(f"{remaining:.2f}s remaining") # Lazy expiry — get() returns None if expired val = db.get(b"session") # None if expired, bytes if alive # Bulk purge — deletes all expired keys, returns count n = db.purge_expired() print(f"Removed {n} expired keys")
>class RateLimiter: def __init__(self, db, limit, window_s): self._cf = db.create_column_family("rl") self._limit = limit; self._window = window_s def is_allowed(self, user_id): key = user_id.encode() raw = self._cf.get(key) if raw is None: self._cf.put(key, b"1", ttl=self._window) return True count = int(raw) if count >= self._limit: return False ttl = self._cf.ttl(key) or self._window self._cf.put(key, str(count+1).encode(), ttl=ttl) return True rl = RateLimiter(db, limit=5, window_s=60) print(rl.is_allowed("alice")) # True (1/5)
Seek · put_if_absent · clear · count · stats
O(log N) positional seek, atomic conditional insert, bulk truncation, page-level entry count, and extended diagnostics with reset.
>with KVStore("store.db") as db: # Forward seek — first key >= target with db.iterator() as it: it.seek(b"cherry") while not it.eof: print(it.key); it.next() # Reverse seek — last key <= target with db.iterator(reverse=True) as it: it.last() it.seek(b"bbc") # lands on b"bbb" print(it.key) # Prefix iterator + seek — boundary enforced with db.iterator(prefix=b"user:") as it: it.seek(b"user:m") # skip to "user:m..." while not it.eof: print(it.key); it.next() # Chaining — seek() returns self key = db.iterator().seek(b"target").key
>with KVStore("store.db") as db: # Absent → inserted=True inserted = db.put_if_absent(b"lock", b"owner:alice") print(inserted) # True # Present → inserted=False, value unchanged inserted = db.put_if_absent(b"lock", b"owner:bob") print(inserted) # False print(db.get(b"lock")) # b'owner:alice' # With TTL — auto-releasing lock db.put_if_absent(b"job:1", b"worker-A", ttl=30) # CF variant — message deduplication with db.create_column_family("dedup") as cf: if cf.put_if_absent(b"msg:001", b"hello"): process() # only first caller
>with KVStore("store.db") as db: # count — O(pages) via sqlite3BtreeCount print(db.count()) # e.g. 10000 # clear — O(pages) bulk truncation; TTL index cleared atomically db.clear() print(db.count()) # 0 # Extended stats — 12 counters db.put(b"k", b"v"); db.get(b"k") st = db.stats() print(st["puts"], st["gets"], st["bytes_written"], st["db_pages"]) # Reset cumulative counters; db_pages stays live db.stats_reset() st = db.stats() print(st["puts"]) # 0 print(st["db_pages"]) # still > 0
Session Store
A practical session store using a column family with TTL and put_if_absent for idempotent job scheduling.
>with KVStore("store.db") as db: with db.create_column_family("sessions") as cf: # Create — expires in 30 minutes cf.put(b"sess:abc", b"user_id=42", ttl=1800) # Validate data = cf.get(b"sess:abc") # None if expired # Idempotent job scheduler via put_if_absent + TTL claimed = cf.put_if_absent(b"job:001", b"worker-A", ttl=30) if claimed: run_job() # only one worker proceeds
Multiprocess
Multiple processes sharing one WAL-mode database. Each process opens its own KVStore; WAL serialises writers and busy_timeout retries on contention.
>import multiprocessing from snkv import KVStore, JOURNAL_WAL def worker(db_path, worker_id): # Each process opens its own connection with KVStore(db_path, journal_mode=JOURNAL_WAL, busy_timeout=5000) as db: for i in range(100): db.put(f"w{worker_id}:key:{i}".encode(), b"value") procs = [multiprocessing.Process( target=worker, args=("shared.db", i)) for i in range(5)] for p in procs: p.start() for p in procs: p.join() # All 500 keys durable on disk