Skip to content
Snippets Groups Projects

SQLite context manager

Merged Vojtech Myslivec requested to merge sqlite into master
Compare and
3 files
+ 27
22
Preferences
Compare changes
Files
3
+ 19
17
@@ -9,26 +9,28 @@ from .crypto import cert_from_bytes, get_cert_bytes, get_cert_common_name
from .exceptions import CASetupError
def init_db(conf):
@contextlib.contextmanager
def db_connection(conf):
conn = sqlite3.connect(conf.get("db", "path"))
try:
# test table and columns existence
c = conn.cursor()
c.execute("""
SELECT sn, state, common_name, not_before, not_after, cert
FROM certs
LIMIT 1
""")
c.close()
with contextlib.closing(conn.cursor()) as c:
c.execute("""
SELECT sn, state, common_name, not_before, not_after, cert
FROM certs
LIMIT 1
""")
yield conn
except sqlite3.OperationalError:
raise CASetupError("Incorrect DB scheme")
return conn
finally:
conn.close()
def get_certs(db, identity, date):
with contextlib.closing(db.cursor()) as c:
def get_certs(conn, identity, date):
with contextlib.closing(conn.cursor()) as c:
c.execute("""
SELECT cert
FROM certs
@@ -44,24 +46,24 @@ def get_certs(db, identity, date):
yield cert_from_bytes(row[0])
def store_cert(db, cert):
def store_cert(conn, cert):
serial_number = cert.serial_number
identity = get_cert_common_name(cert)
not_before = cert.not_valid_before
not_after = cert.not_valid_after
cert_bytes = get_cert_bytes(cert)
with contextlib.closing(db.cursor()) as c:
with contextlib.closing(conn.cursor()) as c:
c.execute("""
INSERT INTO certs(sn, state, common_name, not_before, not_after, cert)
VALUES (?,?,?,?,?,?)
""",
(str(serial_number), "valid", identity, not_before, not_after, cert_bytes)
)
db.commit()
conn.commit()
def row_with_serial_number(db, serial_number):
with contextlib.closing(db.cursor()) as c:
def row_with_serial_number(conn, serial_number):
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT * FROM certs WHERE sn=?', (str(serial_number),))
return c.fetchone()