Source code for can.io.sqlite

from can.listener import BufferedReader
from can.message import Message

import sys
import time
import threading
import sqlite3
import logging

log = logging.getLogger('can.io.sql')

if sys.version_info > (3,):
    buffer = memoryview


class SqlReader:
    def __init__(self, filename):
        log.debug("Starting sqlreader with {}".format(filename))
        conn = sqlite3.connect(filename)

        self.c = conn.cursor()


    @staticmethod
    def create_frame_from_db_tuple(frame_data):
        ts, id, is_extended, is_remote, is_error, dlc, data = frame_data
        return Message(
            ts, is_remote, is_extended, is_error, id, dlc, data
        )

    def __iter__(self):
        log.debug("Iterating through messages from sql db")
        for frame_data in self.c.execute("SELECT * FROM messages"):
            yield SqlReader.create_frame_from_db_tuple(frame_data)


[docs]class SqliteWriter(BufferedReader): """Logs received CAN data to a simple SQL database. The sqlite database may already exist, otherwise it will be created when the first message arrives. Messages are internally buffered and written to the SQL file in a background thread. .. note:: When the listener's :meth:`~SqliteWriter.stop` method is called the thread writing to the sql file will continue to receive and internally buffer messages if they continue to arrive before the :attr:`~SqliteWriter.GET_MESSAGE_TIMEOUT`. If the :attr:`~SqliteWriter.GET_MESSAGE_TIMEOUT` expires before a message is received, the internal buffer is written out to the sql file. However if the bus is still saturated with messages, the Listener will continue receiving until the :attr:`~SqliteWriter.MAX_TIME_BETWEEN_WRITES` timeout is reached. """ insert_msg_template = ''' INSERT INTO messages VALUES (?, ?, ?, ?, ?, ?, ?) ''' GET_MESSAGE_TIMEOUT = 0.25 """Number of seconds to wait for messages from internal queue""" MAX_TIME_BETWEEN_WRITES = 5 """Maximum number of seconds to wait between writes to the database""" def __init__(self, filename): super(SqliteWriter, self).__init__() self.db_fn = filename self.stop_running_event = threading.Event() self.writer_thread = threading.Thread(target=self.db_writer_thread) self.writer_thread.start() def _create_db(self): # Note you can't share sqlite3 connections between threads # hence we setup the db here. log.info("Creating sqlite db") self.conn = sqlite3.connect(self.db_fn) c = self.conn.cursor() # create table structure c.execute(''' CREATE TABLE IF NOT EXISTS messages ( ts REAL, arbitration_id INTEGER, extended INTEGER, remote INTEGER, error INTEGER, dlc INTEGER, data BLOB ) ''') self.conn.commit() self.db_setup = True def db_writer_thread(self): num_frames = 0 last_write = time.time() self._create_db() while not self.stop_running_event.is_set(): messages = [] m = self.get_message(self.GET_MESSAGE_TIMEOUT) while m is not None: log.debug("sqlitewriter buffering message") messages.append(( m.timestamp, m.arbitration_id, m.id_type, m.is_remote_frame, m.is_error_frame, m.dlc, buffer(m.data) )) if time.time() - last_write > self.MAX_TIME_BETWEEN_WRITES: log.debug("Max timeout between writes reached") break m = self.get_message(self.GET_MESSAGE_TIMEOUT) if len(messages) > 0: with self.conn: log.debug("Writing %s frames to db", len(messages)) self.conn.executemany(SqliteWriter.insert_msg_template, messages) num_frames += len(messages) last_write = time.time() self.conn.close() log.info("Stopped sqlite writer after writing %s messages", num_frames) def stop(self): self.stop_running_event.set() log.debug("Stopping sqlite writer") self.writer_thread.join()