Login With Github

ChatterBot Storage Adapters

Storage adapters provide an interface that allows ChatterBot to connect to different storage backends.

The storage adapter that your bot uses can be specified by setting the storage_adapter parameter to the import path of the storage adapter you want to use.

chatbot = ChatBot(
    "My ChatterBot",
    storage_adapter="chatterbot.storage.SQLStorageAdapter"
)

Creating a new storage adapter

You can write your own storage adapters by creating a new class that inherits from StorageAdapterand overrides necessary methods established in the base StorageAdapter class.

import logging


class StorageAdapter(object):
    """
    This is an abstract class that represents the interface
    that all storage adapters should implement.
    """

    def __init__(self, base_query=None, *args, **kwargs):
        """
        Initialize common attributes shared by all storage adapters.
        """
        self.kwargs = kwargs
        self.logger = kwargs.get('logger', logging.getLogger(__name__))
        self.adapter_supports_queries = True
        self.base_query = None

    def get_model(self, model_name):
        """
        Return the model class for a given model name.
        """

        # The string must be lowercase
        model_name = model_name.lower()

        kwarg_model_key = '%s_model' % (model_name, )

        if kwarg_model_key in self.kwargs:
            return self.kwargs.get(kwarg_model_key)

        get_model_method = getattr(self, 'get_%s_model' % (model_name, ))

        return get_model_method()

    def generate_base_query(self, chatterbot, session_id):
        """
        Create a base query for the storage adapter.
        """
        if self.adapter_supports_queries:
            for filter_instance in chatterbot.filters:
                self.base_query = filter_instance.filter_selection(chatterbot, session_id)

    def count(self):
        """
        Return the number of entries in the database.
        """
        raise self.AdapterMethodNotImplementedError(
            'The `count` method is not implemented by this adapter.'
        )

    def find(self, statement_text):
        """
        Returns a object from the database if it exists
        """
        raise self.AdapterMethodNotImplementedError(
            'The `find` method is not implemented by this adapter.'
        )

    def remove(self, statement_text):
        """
        Removes the statement that matches the input text.
        Removes any responses from statements where the response text matches
        the input text.
        """
        raise self.AdapterMethodNotImplementedError(
            'The `remove` method is not implemented by this adapter.'
        )

    def filter(self, **kwargs):
        """
        Returns a list of objects from the database.
        The kwargs parameter can contain any number
        of attributes. Only objects which contain
        all listed attributes and in which all values
        match for all listed attributes will be returned.
        """
        raise self.AdapterMethodNotImplementedError(
            'The `filter` method is not implemented by this adapter.'
        )

    def update(self, statement):
        """
        Modifies an entry in the database.
        Creates an entry if one does not exist.
        """
        raise self.AdapterMethodNotImplementedError(
            'The `update` method is not implemented by this adapter.'
        )

    def get_latest_response(self, conversation_id):
        """
        Returns the latest response in a conversation if it exists.
        Returns None if a matching conversation cannot be found.
        """
        raise self.AdapterMethodNotImplementedError(
            'The `get_latest_response` method is not implemented by this adapter.'
        )

    def create_conversation(self):
        """
        Creates a new conversation.
        """
        raise self.AdapterMethodNotImplementedError(
            'The `create_conversation` method is not implemented by this adapter.'
        )

    def add_to_conversation(self, conversation_id, statement, response):
        """
        Add the statement and response to the conversation.
        """
        raise self.AdapterMethodNotImplementedError(
            'The `add_to_conversation` method is not implemented by this adapter.'
        )

    def get_random(self):
        """
        Returns a random statement from the database.
        """
        raise self.AdapterMethodNotImplementedError(
            'The `get_random` method is not implemented by this adapter.'
        )

    def drop(self):
        """
        Drop the database attached to a given adapter.
        """
        raise self.AdapterMethodNotImplementedError(
            'The `drop` method is not implemented by this adapter.'
        )

    def get_response_statements(self):
        """
        Return only statements that are in response to another statement.
        A statement must exist which lists the closest matching statement in the
        in_response_to field. Otherwise, the logic adapter may find a closest
        matching statement that does not have a known response.

        This method may be overridden by a child class to provide more a
        efficient method to get these results.
        """
        statement_list = self.filter()

        responses = set()
        to_remove = list()
        for statement in statement_list:
            for response in statement.in_response_to:
                responses.add(response.text)
        for statement in statement_list:
            if statement.text not in responses:
                to_remove.append(statement)

        for statement in to_remove:
            statement_list.remove(statement)

        return statement_list

    class EmptyDatabaseException(Exception):

        def __init__(self, value='The database currently contains no entries. At least one entry is expected. You may need to train your chat bot to populate your database.'):
            self.value = value

        def __str__(self):
            return repr(self.value)

    class AdapterMethodNotImplementedError(NotImplementedError):
        """
        An exception to be raised when a storage adapter method has not been implemented.
        Typically this indicates that the method should be implement in a subclass.
        """
        pass

This is an abstract class that represents the interface that all storage adapters should implement.

You will then need to implement the interface established by the StorageAdapter class.

SQL Storage Adapter

SQLStorageAdapter allows ChatterBot to store conversation data semi-structured T-SQL database, virtually, any database that SQL Alchemy supports.

from chatterbot.storage import StorageAdapter


def get_response_table(response):
    from chatterbot.ext.sqlalchemy_app.models import Response
    return Response(text=response.text, occurrence=response.occurrence)


class SQLStorageAdapter(StorageAdapter):
    """
    SQLStorageAdapter allows ChatterBot to store conversation
    data semi-structured T-SQL database, virtually, any database
    that SQL Alchemy supports.

    Notes:
        Tables may change (and will), so, save your training data.
        There is no data migration (yet).
        Performance test not done yet.
        Tests using other databases not finished.

    All parameters are optional, by default a sqlite database is used.

    It will check if tables are present, if they are not, it will attempt
    to create the required tables.

    :keyword database: Used for sqlite database. Ignored if database_uri is specified.
    :type database: str

    :keyword database_uri: eg: sqlite:///database_test.db", use database_uri or database,
        database_uri can be specified to choose database driver (database parameter will be ignored).
    :type database_uri: str

    :keyword read_only: False by default, makes all operations read only, has priority over all DB operations
        so, create, update, delete will NOT be executed
    :type read_only: bool
    """

    def __init__(self, **kwargs):
        super(SQLStorageAdapter, self).__init__(**kwargs)

        from sqlalchemy import create_engine
        from sqlalchemy.orm import sessionmaker

        default_uri = "sqlite:///db.sqlite3"

        database_name = self.kwargs.get("database", False)

        # None results in a sqlite in-memory database as the default
        if database_name is None:
            default_uri = "sqlite://"

        self.database_uri = self.kwargs.get(
            "database_uri", default_uri
        )

        # Create a sqlite file if a database name is provided
        if database_name:
            self.database_uri = "sqlite:///" + database_name

        self.engine = create_engine(self.database_uri, convert_unicode=True)

        from re import search

        if search('^sqlite://', self.database_uri):
            from sqlalchemy.engine import Engine
            from sqlalchemy import event

            @event.listens_for(Engine, "connect")
            def set_sqlite_pragma(dbapi_connection, connection_record):
                dbapi_connection.execute('PRAGMA journal_mode=WAL')
                dbapi_connection.execute('PRAGMA synchronous=NORMAL')

        self.read_only = self.kwargs.get(
            "read_only", False
        )

        if not self.engine.dialect.has_table(self.engine, 'Statement'):
            self.create()

        self.Session = sessionmaker(bind=self.engine, expire_on_commit=True)

        # ChatterBot's internal query builder is not yet supported for this adapter
        self.adapter_supports_queries = False

    def get_statement_model(self):
        """
        Return the statement model.
        """
        from chatterbot.ext.sqlalchemy_app.models import Statement
        return Statement


    def get_response_model(self):
        """
        Return the response model.
        """
        from chatterbot.ext.sqlalchemy_app.models import Response
        return Response


    def get_conversation_model(self):
        """
        Return the conversation model.
        """
        from chatterbot.ext.sqlalchemy_app.models import Conversation
        return Conversation


    def get_tag_model(self):
        """
        Return the conversation model.
        """
        from chatterbot.ext.sqlalchemy_app.models import Tag
        return Tag


    def count(self):
        """
        Return the number of entries in the database.
        """
        Statement = self.get_model('statement')

        session = self.Session()
        statement_count = session.query(Statement).count()
        session.close()
        return statement_count


    def find(self, statement_text):
        """
        Returns a statement if it exists otherwise None
        """
        Statement = self.get_model('statement')
        session = self.Session()

        query = session.query(Statement).filter_by(text=statement_text)
        record = query.first()
        if record:
            statement = record.get_statement()
            session.close()
            return statement

        session.close()
        return None


    def remove(self, statement_text):
        """
        Removes the statement that matches the input text.
        Removes any responses from statements where the response text matches
        the input text.
        """
        Statement = self.get_model('statement')
        session = self.Session()

        query = session.query(Statement).filter_by(text=statement_text)
        record = query.first()

        session.delete(record)

        self._session_finish(session)


    def filter(self, **kwargs):
        """
        Returns a list of objects from the database.
        The kwargs parameter can contain any number
        of attributes. Only objects which contain
        all listed attributes and in which all values
        match for all listed attributes will be returned.
        """
        Statement = self.get_model('statement')
        Response = self.get_model('response')

        session = self.Session()

        filter_parameters = kwargs.copy()

        statements = []
        _query = None

        if len(filter_parameters) == 0:
            _response_query = session.query(Statement)
            statements.extend(_response_query.all())
        else:
            for i, fp in enumerate(filter_parameters):
                _filter = filter_parameters[fp]
                if fp in ['in_response_to', 'in_response_to__contains']:
                    _response_query = session.query(Statement)
                    if isinstance(_filter, list):
                        if len(_filter) == 0:
                            _query = _response_query.filter(
                                Statement.in_response_to == None  # NOQA Here must use == instead of is
                            )
                        else:
                            for f in _filter:
                                _query = _response_query.filter(
                                    Statement.in_response_to.contains(get_response_table(f)))
                    else:
                        if fp == 'in_response_to__contains':
                            _query = _response_query.join(Response).filter(Response.text == _filter)
                        else:
                            _query = _response_query.filter(Statement.in_response_to == None)  # NOQA
                else:
                    if _query:
                        _query = _query.filter(Response.statement_text.like('%' + _filter + '%'))
                    else:
                        _response_query = session.query(Response)
                        _query = _response_query.filter(Response.statement_text.like('%' + _filter + '%'))

                if _query is None:
                    return []
                if len(filter_parameters) == i + 1:
                    statements.extend(_query.all())

        results = []

        for statement in statements:
            if isinstance(statement, Response):
                if statement and statement.statement_table:
                    results.append(statement.statement_table.get_statement())
            else:
                if statement:
                    results.append(statement.get_statement())

        session.close()

        return results


    def update(self, statement):
        """
        Modifies an entry in the database.
        Creates an entry if one does not exist.
        """
        Statement = self.get_model('statement')
        Response = self.get_model('response')
        Tag = self.get_model('tag')

        if statement:
            session = self.Session()

            query = session.query(Statement).filter_by(text=statement.text)
            record = query.first()

            # Create a new statement entry if one does not already exist
            if not record:
                record = Statement(text=statement.text)

            record.extra_data = dict(statement.extra_data)

            for _tag in statement.tags:
                tag = session.query(Tag).filter_by(name=_tag).first()

                if not tag:
                    # Create the record
                    tag = Tag(name=_tag)

                record.tags.append(tag)

            # Get or create the response records as needed
            for response in statement.in_response_to:
                _response = session.query(Response).filter_by(
                    text=response.text,
                    statement_text=statement.text
                ).first()

                if _response:
                    _response.occurrence += 1
                else:
                    # Create the record
                    _response = Response(
                        text=response.text,
                        statement_text=statement.text,
                        occurrence=response.occurrence
                    )

                record.in_response_to.append(_response)

            session.add(record)

            self._session_finish(session)


    def create_conversation(self):
        """
        Create a new conversation.
        """
        Conversation = self.get_model('conversation')

        session = self.Session()
        conversation = Conversation()

        session.add(conversation)
        session.flush()

        session.refresh(conversation)
        conversation_id = conversation.id

        session.commit()
        session.close()

        return conversation_id


    def add_to_conversation(self, conversation_id, statement, response):
        """
        Add the statement and response to the conversation.
        """
        Statement = self.get_model('statement')
        Conversation = self.get_model('conversation')

        session = self.Session()
        conversation = session.query(Conversation).get(conversation_id)

        statement_query = session.query(Statement).filter_by(
            text=statement.text
        ).first()
        response_query = session.query(Statement).filter_by(
            text=response.text
        ).first()

        # Make sure the statements exist
        if not statement_query:
            self.update(statement)
            statement_query = session.query(Statement).filter_by(
                text=statement.text
            ).first()

        if not response_query:
            self.update(response)
            response_query = session.query(Statement).filter_by(
                text=response.text
            ).first()

        conversation.statements.append(statement_query)
        conversation.statements.append(response_query)

        session.add(conversation)
        self._session_finish(session)


    def get_latest_response(self, conversation_id):
        """
        Returns the latest response in a conversation if it exists.
        Returns None if a matching conversation cannot be found.
        """
        Statement = self.get_model('statement')

        session = self.Session()
        statement = None

        statement_query = session.query(Statement).filter(
            Statement.conversations.any(id=conversation_id)
        ).order_by(Statement.id)

        if statement_query.count() >= 2:
            statement = statement_query[-2].get_statement()

        # Handle the case of the first statement in the list
        elif statement_query.count() == 1:
            statement = statement_query[0].get_statement()

        session.close()

        return statement


    def get_random(self):
        """
        Returns a random statement from the database
        """
        import random

        Statement = self.get_model('statement')

        session = self.Session()
        count = self.count()
        if count < 1:
            raise self.EmptyDatabaseException()

        rand = random.randrange(0, count)
        stmt = session.query(Statement)[rand]

        statement = stmt.get_statement()

        session.close()
        return statement


    def drop(self):
        """
        Drop the database attached to a given adapter.
        """
        from chatterbot.ext.sqlalchemy_app.models import Base
        Base.metadata.drop_all(self.engine)


    def create(self):
        """
        Populate the database with the tables.
        """
        from chatterbot.ext.sqlalchemy_app.models import Base
        Base.metadata.create_all(self.engine)


    def _session_finish(self, session, statement_text=None):
        from sqlalchemy.exc import InvalidRequestError
        try:
            if not self.read_only:
                session.commit()
            else:
                session.rollback()
        except InvalidRequestError:
            # Log the statement text and the exception
            self.logger.exception(statement_text)
        finally:
            session.close()

Notes:Tables may change (and will), so, save your training data. There is no data migration (yet). Performance test not done yet. Tests using other databases not finished.

All parameters are optional, by default a sqlite database is used.

It will check if tables are present, if they are not, it will attempt to create the required tables.

Parameters:

  • database (str) – Used for sqlite database. Ignored if database_uri is specified.
  • database_uri (str) – eg: sqlite:///database_test.db", use database_uri or database, database_uri can be specified to choose database driver (database parameter will be ignored).
  • read_only (bool) – False by default, makes all operations read only, has priority over all DB operations so, create, update, delete will NOT be executed
Method Name Description
Method Name Description
add_to_conversation(conversation_idstatementresponse) Add the statement and response to the conversation.
count() Return the number of entries in the database.
create() Populate the database with the tables.
create_conversation() Create a new conversation.
drop() Drop the database attached to a given adapter.
filter(**kwargs) Returns a list of objects from the database. The kwargs parameter can contain any number of attributes. Only objects which contain all listed attributes and in which all values match for all listed attributes will be returned.
find(statement_text) Returns a statement if it exists otherwise None
get_conversation_model() Return the conversation model.
get_latest_response(conversation_id) Returns the latest response in a conversation if it exists. Returns None if a matching conversation cannot be found.
get_random() Returns a random statement from the database
get_response_model() Return the response model.
get_statement_model() Return the statement model.
get_tag_model() Return the conversation model.
remove(statement_text) Removes the statement that matches the input text. Removes any responses from statements where the response text matches the input text.
update(statement) Modifies an entry in the database. Creates an entry if one does not exist.

MongoDB Storage Adapter

The MongoDatabaseAdapter is an interface that allows ChatterBot to store statements in a MongoDB database.

from chatterbot.storage import StorageAdapter


class Query(object):

    def __init__(self, query={}):
        self.query = query

    def value(self):
        return self.query.copy()

    def raw(self, data):
        query = self.query.copy()

        query.update(data)

        return Query(query)

    def statement_text_equals(self, statement_text):
        query = self.query.copy()

        query['text'] = statement_text

        return Query(query)

    def statement_text_not_in(self, statements):
        query = self.query.copy()

        if 'text' not in query:
            query['text'] = {}

        if '$nin' not in query['text']:
            query['text']['$nin'] = []

        query['text']['$nin'].extend(statements)

        return Query(query)

    def statement_response_list_contains(self, statement_text):
        query = self.query.copy()

        if 'in_response_to' not in query:
            query['in_response_to'] = {}

        if '$elemMatch' not in query['in_response_to']:
            query['in_response_to']['$elemMatch'] = {}

        query['in_response_to']['$elemMatch']['text'] = statement_text

        return Query(query)

    def statement_response_list_equals(self, response_list):
        query = self.query.copy()

        query['in_response_to'] = response_list

        return Query(query)


class MongoDatabaseAdapter(StorageAdapter):
    """
    The MongoDatabaseAdapter is an interface that allows
    ChatterBot to store statements in a MongoDB database.

    :keyword database: The name of the database you wish to connect to.
    :type database: str

    .. code-block:: python

       database='chatterbot-database'

    :keyword database_uri: The URI of a remote instance of MongoDB.
    :type database_uri: str

    .. code-block:: python

       database_uri='mongodb://example.com:8100/'
    """

    def __init__(self, **kwargs):
        super(MongoDatabaseAdapter, self).__init__(**kwargs)
        from pymongo import MongoClient
        from pymongo.errors import OperationFailure

        self.database_name = self.kwargs.get(
            'database', 'chatterbot-database'
        )
        self.database_uri = self.kwargs.get(
            'database_uri', 'mongodb://localhost:27017/'
        )

        # Use the default host and port
        self.client = MongoClient(self.database_uri)

        # Increase the sort buffer to 42M if possible
        try:
            self.client.admin.command({'setParameter': 1, 'internalQueryExecMaxBlockingSortBytes': 44040192})
        except OperationFailure:
            pass

        # Specify the name of the database
        self.database = self.client[self.database_name]

        # The mongo collection of statement documents
        self.statements = self.database['statements']

        # The mongo collection of conversation documents
        self.conversations = self.database['conversations']

        # Set a requirement for the text attribute to be unique
        self.statements.create_index('text', unique=True)

        self.base_query = Query()

    def get_statement_model(self):
        """
        Return the class for the statement model.
        """
        from chatterbot.conversation import Statement

        # Create a storage-aware statement
        statement = Statement
        statement.storage = self

        return statement


    def get_response_model(self):
        """
        Return the class for the response model.
        """
        from chatterbot.conversation import Response

        # Create a storage-aware response
        response = Response
        response.storage = self

        return response


    def count(self):
        return self.statements.count()


    def find(self, statement_text):
        Statement = self.get_model('statement')
        query = self.base_query.statement_text_equals(statement_text)

        values = self.statements.find_one(query.value())

        if not values:
            return None

        del values['text']

        # Build the objects for the response list
        values['in_response_to'] = self.deserialize_responses(
            values.get('in_response_to', [])
        )

        return Statement(statement_text, **values)


    def deserialize_responses(self, response_list):
        """
        Takes the list of response items and returns
        the list converted to Response objects.
        """
        Statement = self.get_model('statement')
        Response = self.get_model('response')
        proxy_statement = Statement('')

        for response in response_list:
            text = response['text']
            del response['text']

            proxy_statement.add_response(
                Response(text, **response)
            )

        return proxy_statement.in_response_to


    def mongo_to_object(self, statement_data):
        """
        Return Statement object when given data
        returned from Mongo DB.
        """
        Statement = self.get_model('statement')
        statement_text = statement_data['text']
        del statement_data['text']

        statement_data['in_response_to'] = self.deserialize_responses(
            statement_data.get('in_response_to', [])
        )

        return Statement(statement_text, **statement_data)


    def filter(self, **kwargs):
        """
        Returns a list of statements in the database
        that match the parameters specified.
        """
        import pymongo

        query = self.base_query

        order_by = kwargs.pop('order_by', None)

        # Convert Response objects to data
        if 'in_response_to' in kwargs:
            serialized_responses = []
            for response in kwargs['in_response_to']:
                serialized_responses.append({'text': response})

            query = query.statement_response_list_equals(serialized_responses)
            del kwargs['in_response_to']

        if 'in_response_to__contains' in kwargs:
            query = query.statement_response_list_contains(
                kwargs['in_response_to__contains']
            )
            del kwargs['in_response_to__contains']

        query = query.raw(kwargs)

        matches = self.statements.find(query.value())

        if order_by:

            direction = pymongo.ASCENDING

            # Sort so that newer datetimes appear first
            if order_by == 'created_at':
                direction = pymongo.DESCENDING

            matches = matches.sort(order_by, direction)

        results = []

        for match in list(matches):
            results.append(self.mongo_to_object(match))

        return results


    def update(self, statement):
        from pymongo import UpdateOne
        from pymongo.errors import BulkWriteError

        data = statement.serialize()

        operations = []

        update_operation = UpdateOne(
            {'text': statement.text},
            {'$set': data},
            upsert=True
        )
        operations.append(update_operation)

        # Make sure that an entry for each response is saved
        for response_dict in data.get('in_response_to', []):
            response_text = response_dict.get('text')

            # $setOnInsert does nothing if the document is not created
            update_operation = UpdateOne(
                {'text': response_text},
                {'$set': response_dict},
                upsert=True
            )
            operations.append(update_operation)

        try:
            self.statements.bulk_write(operations, ordered=False)
        except BulkWriteError as bwe:
            # Log the details of a bulk write error
            self.logger.error(str(bwe.details))

        return statement


    def create_conversation(self):
        """
        Create a new conversation.
        """
        conversation_id = self.conversations.insert_one({}).inserted_id
        return conversation_id


    def get_latest_response(self, conversation_id):
        """
        Returns the latest response in a conversation if it exists.
        Returns None if a matching conversation cannot be found.
        """
        from pymongo import DESCENDING

        statements = list(self.statements.find({
            'conversations.id': conversation_id
        }).sort('conversations.created_at', DESCENDING))

        if not statements:
            return None

        return self.mongo_to_object(statements[-2])


    def add_to_conversation(self, conversation_id, statement, response):
        """
        Add the statement and response to the conversation.
        """
        from datetime import datetime, timedelta
        self.statements.update_one(
            {
                'text': statement.text
            },
            {
                '$push': {
                    'conversations': {
                        'id': conversation_id,
                        'created_at': datetime.utcnow()
                    }
                }
            }
        )
        self.statements.update_one(
            {
                'text': response.text
            },
            {
                '$push': {
                    'conversations': {
                        'id': conversation_id,
                        # Force the response to be at least one millisecond after the input statement
                        'created_at': datetime.utcnow() + timedelta(milliseconds=1)
                    }
                }
            }
        )


    def get_random(self):
        """
        Returns a random statement from the database
        """
        from random import randint

        count = self.count()

        if count < 1:
            raise self.EmptyDatabaseException()

        random_integer = randint(0, count - 1)

        statements = self.statements.find().limit(1).skip(random_integer)

        return self.mongo_to_object(list(statements)[0])


    def remove(self, statement_text):
        """
        Removes the statement that matches the input text.
        Removes any responses from statements if the response text matches the
        input text.
        """
        for statement in self.filter(in_response_to__contains=statement_text):
            statement.remove_response(statement_text)
            self.update(statement)

        self.statements.delete_one({'text': statement_text})


    def get_response_statements(self):
        """
        Return only statements that are in response to another statement.
        A statement must exist which lists the closest matching statement in the
        in_response_to field. Otherwise, the logic adapter may find a closest
        matching statement that does not have a known response.
        """
        response_query = self.statements.aggregate([{'$group': {'_id': '$in_response_to.text'}}])

        responses = []
        for r in response_query:
            try:
                responses.extend(r['_id'])
            except TypeError:
                pass

        _statement_query = {
            'text': {
                '$in': responses
            }
        }

        _statement_query.update(self.base_query.value())
        statement_query = self.statements.find(_statement_query)
        statement_objects = []
        for statement in list(statement_query):
            statement_objects.append(self.mongo_to_object(statement))
        return statement_objects


    def drop(self):
        """
        Remove the database.
        """
        self.client.drop_database(self.database_name)

Parameters:

 

database (str) – The name of the database you wish to connect to.

database='chatterbot-database'

Parameters:

database_uri (str) – The URI of a remote instance of MongoDB.

database_uri='mongodb://example.com:8100/'
Method Name Description
add_to_conversation(conversation_idstatementresponse) Add the statement and response to the conversation.
count() Return the number of entries in the database.
create_conversation() Create a new conversation.
deserialize_responses(response_list) Takes the list of response items and returns the list converted to Response objects.
drop() Remove the database.
filter(**kwargs) Returns a list of statements in the database that match the parameters specified.
find(statement_text) Returns a object from the database if it exists
get_latest_response(conversation_id) Returns the latest response in a conversation if it exists. Returns None if a matching conversation cannot be found.
get_random() Returns a random statement from the database
get_response_model() Return the class for the response model.
get_response_statements() Return only statements that are in response to another statement. A statement must exist which lists the closest matching statement in the in_response_to field. Otherwise, the logic adapter may find a closest matching statement that does not have a known response.
get_statement_model() Return the class for the statement model.
mongo_to_object(statement_data) Return Statement object when given data returned from Mongo DB.
remove(statement_text) Removes the statement that matches the input text. Removes any responses from statements if the response text matches the input text.
update(statement) Modifies an entry in the database. Creates an entry if one does not exist.

0 Comment

temp