Skip to content

rabbit_hole

RabbitHole

Manages content ingestion. I'm late... I'm late!

Source code in cat/rabbit_hole.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
@singleton
class RabbitHole:
    """Manages content ingestion. I'm late... I'm late!"""

    def __init__(self, cat) -> None:
        self.__cat = cat

    # each time we access the file handlers, plugins can intervene
    def __reload_file_handlers(self):
        # default file handlers
        self.__file_handlers = {
            "application/pdf": PDFMinerParser(),
            "text/plain": TextParser(),
            "text/markdown": TextParser(),
            "text/html": BS4HTMLParser(),
        }

        # no access to stray
        self.__file_handlers = self.__cat.mad_hatter.execute_hook(
            "rabbithole_instantiates_parsers", self.__file_handlers, cat=self.__cat
        )

    def __reload_text_splitter(self):
        # default text splitter
        self.__text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
            chunk_size=256,
            chunk_overlap=64,
            separators=["\\n\\n", "\n\n", ".\\n", ".\n", "\\n", "\n", " ", ""],
            encoding_name="cl100k_base",
            keep_separator=True,
            strip_whitespace=True,
        )

        # no access to stray
        self.__text_splitter = self.__cat.mad_hatter.execute_hook(
            "rabbithole_instantiates_splitter", self.__text_splitter, cat=self.__cat
        )

    def ingest_memory(
            self,
            stray,
            file: UploadFile
        ):
        """Upload memories to the declarative memory from a JSON file.

        Parameters
        ----------
        file : UploadFile
            File object sent via `rabbithole/memory` hook.

        Notes
        -----
        This method allows uploading a JSON file containing vector and text memories directly to the declarative memory.
        When doing this, please, make sure the embedder used to export the memories is the same as the one used
        when uploading.
        The method also performs a check on the dimensionality of the embeddings (i.e. length of each vector).

        """

        # Get file bytes
        file_bytes = file.file.read()

        # Load fyle byte in a dict
        memories = json.loads(file_bytes.decode("utf-8"))

        # Check the embedder used for the uploaded memories is the same the Cat is using now
        upload_embedder = memories["embedder"]
        cat_embedder = str(stray.embedder.__class__.__name__)

        if upload_embedder != cat_embedder:
            message = f"Embedder mismatch: file embedder {upload_embedder} is different from {cat_embedder}"
            raise Exception(message)

        # Get Declarative memories in file
        declarative_memories = memories["collections"]["declarative"]

        # Store data to upload the memories in batch
        ids = [i["id"] for i in declarative_memories]
        payloads = [
            {"page_content": p["page_content"], "metadata": p["metadata"]}
            for p in declarative_memories
        ]
        vectors = [v["vector"] for v in declarative_memories]

        log.info(f"Preparing to load {len(vectors)} vector memories")

        # Check embedding size is correct
        embedder_size = stray.memory.vectors.declarative.embedder_size
        len_mismatch = [len(v) == embedder_size for v in vectors]

        if not any(len_mismatch):
            message = (
                f"Embedding size mismatch: vectors length should be {embedder_size}"
            )
            raise Exception(message)

        # Upsert memories in batch mode # TODO REFACTOR: use VectorMemoryCollection.add_point
        stray.memory.vectors.vector_db.upsert(
            collection_name="declarative",
            points=models.Batch(ids=ids, payloads=payloads, vectors=vectors),
        )

    def ingest_file(
        self,
        stray,
        file: Union[str, UploadFile],
        chunk_size: int | None = None,
        chunk_overlap: int | None = None,
        metadata: dict = {}
    ):
        """Load a file in the Cat's declarative memory.

        The method splits and converts the file in Langchain `Document`. Then, it stores the `Document` in the Cat's
        memory.

        Parameters
        ----------
        file : str, UploadFile
            The file can be a path passed as a string or an `UploadFile` object if the document is ingested using the
            `rabbithole` endpoint.
        chunk_size : int
            Number of tokens in each document chunk.
        chunk_overlap : int
            Number of overlapping tokens between consecutive chunks.
        metadata : dict
            Metadata to be stored with each chunk.

        Notes
        ----------
        Currently supported formats are `.txt`, `.pdf` and `.md`.
        You cn add custom ones or substitute the above via RabbitHole hooks.

        See Also
        ----------
        before_rabbithole_stores_documents
        """

        # split file into a list of docs
        docs = self.file_to_docs(
            stray=stray,
            file=file,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )

        # store in memory
        if isinstance(file, str):
            filename = file
        else:
            filename = file.filename

        self.store_documents(stray=stray, docs=docs, source=filename, metadata=metadata)

    def file_to_docs(
        self,
        stray,
        file: Union[str, UploadFile],
        chunk_size: int | None = None,
        chunk_overlap: int | None = None
    ) -> List[Document]:
        """Load and convert files to Langchain `Document`.

        This method takes a file either from a Python script, from the `/rabbithole/` or `/rabbithole/web` endpoints.
        Hence, it loads it in memory and splits it in overlapped chunks of text.

        Parameters
        ----------
        file : str, UploadFile
            The file can be either a string path if loaded programmatically, a FastAPI `UploadFile`
            if coming from the `/rabbithole/` endpoint or a URL if coming from the `/rabbithole/web` endpoint.
        chunk_size : int
            Number of tokens in each document chunk.
        chunk_overlap : int
            Number of overlapping tokens between consecutive chunks.

        Returns
        -------
        docs : List[Document]
            List of Langchain `Document` of chunked text.

        Notes
        -----
        This method is used by both `/rabbithole/` and `/rabbithole/web` endpoints.
        Currently supported files are `.txt`, `.pdf`, `.md` and web pages.

        """

        # Check type of incoming file.
        if isinstance(file, UploadFile):
            # Get mime type and source of UploadFile
            content_type = mimetypes.guess_type(file.filename)[0]
            source = file.filename

            # Get file bytes
            file_bytes = file.file.read()
        elif isinstance(file, str):
            # Check if string file is a string or url
            parsed_file = urlparse(file)
            is_url = all([parsed_file.scheme, parsed_file.netloc])

            if is_url:
                # Make a request with a fake browser name
                request = httpx.get(file, headers={"User-Agent": "Magic Browser"})

                # Define mime type and source of url
                content_type = request.headers["Content-Type"].split(";")[0]
                source = file

                try:
                    # Get binary content of url
                    file_bytes = request.content
                except HTTPError as e:
                    log.error(e)
            else:
                # Get mime type from file extension and source
                content_type = mimetypes.guess_type(file)[0]
                source = os.path.basename(file)

                # Get file bytes
                with open(file, "rb") as f:
                    file_bytes = f.read()
        else:
            raise ValueError(f"{type(file)} is not a valid type.")
        return self.string_to_docs(
            stray=stray,
            file_bytes=file_bytes,
            source=source,
            content_type=content_type,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )

    def string_to_docs(
        self,
        stray,
        file_bytes: str,
        source: str = None,
        content_type: str = "text/plain",
        chunk_size: int | None = None,
        chunk_overlap: int | None = None
    ) -> List[Document]:
        """Convert string to Langchain `Document`.

        Takes a string, converts it to langchain `Document`.
        Hence, loads it in memory and splits it in overlapped chunks of text.

        Parameters
        ----------
        file_bytes : str
            The string to be converted.
        source: str
            Source filename.
        content_type:
            Mimetype of content.
        chunk_size : int
            Number of tokens in each document chunk.
        chunk_overlap : int
            Number of overlapping tokens between consecutive chunks.

        Returns
        -------
        docs : List[Document]
            List of Langchain `Document` of chunked text.
        """

        # Load the bytes in the Blob schema
        blob = Blob(data=file_bytes, mimetype=content_type, source=source).from_data(
            data=file_bytes, mime_type=content_type, path=source
        )
        # Parser based on the mime type
        parser = MimeTypeBasedParser(handlers=self.file_handlers)

        # Parse the text
        stray.send_ws_message(
            "I'm parsing the content. Big content could require some minutes..."
        )
        super_docs = parser.parse(blob)

        # Split
        stray.send_ws_message("Parsing completed. Now let's go with reading process...")
        docs = self.__split_text(
            stray=stray,
            text=super_docs,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
        )
        return docs

    def store_documents(
            self,
            stray,
            docs: List[Document],
            source: str, # TODOV2: is this necessary?
            metadata: dict = {}
        ) -> None:
        """Add documents to the Cat's declarative memory.

        This method loops a list of Langchain `Document` and adds some metadata. Namely, the source filename and the
        timestamp of insertion. Once done, the method notifies the client via Websocket connection.

        Parameters
        ----------
        docs : List[Document]
            List of Langchain `Document` to be inserted in the Cat's declarative memory.
        source : str
            Source name to be added as a metadata. It can be a file name or an URL.
        metadata : dict
            Metadata to be stored with each chunk.

        Notes
        -------
        At this point, it is possible to customize the Cat's behavior using the `before_rabbithole_insert_memory` hook
        to edit the memories before they are inserted in the vector database.

        See Also
        --------
        before_rabbithole_insert_memory
        """

        log.info(f"Preparing to memorize {len(docs)} vectors")

        # hook the docs before they are stored in the vector memory
        docs = stray.mad_hatter.execute_hook(
            "before_rabbithole_stores_documents", docs, cat=stray
        )

        # classic embed
        time_last_notification = time.time()
        time_interval = 10  # a notification every 10 secs
        stored_points = []
        for d, doc in enumerate(docs):
            if time.time() - time_last_notification > time_interval:
                time_last_notification = time.time()
                perc_read = int(d / len(docs) * 100)
                read_message = f"Read {perc_read}% of {source}"
                stray.send_ws_message(read_message)
                log.warning(read_message)

            # add default metadata
            doc.metadata["source"] = source
            doc.metadata["when"] = time.time()
            # add custom metadata (sent via endpoint)
            for k,v in metadata.items():
                doc.metadata[k] = v

            doc = stray.mad_hatter.execute_hook(
                "before_rabbithole_insert_memory", doc, cat=stray
            )
            inserting_info = f"{d + 1}/{len(docs)}):    {doc.page_content}"
            if doc.page_content != "":
                doc_embedding = stray.embedder.embed_documents([doc.page_content])
                stored_point = stray.memory.vectors.declarative.add_point(
                    doc.page_content,
                    doc_embedding[0],
                    doc.metadata,
                )
                stored_points.append(stored_point)

                log.info(f"Inserted into memory ({inserting_info})")
            else:
                log.info(f"Skipped memory insertion of empty doc ({inserting_info})")

            # wait a little to avoid APIs rate limit errors
            time.sleep(0.05)

        # hook the points after they are stored in the vector memory
        stray.mad_hatter.execute_hook(
            "after_rabbithole_stored_documents", source, stored_points, cat=stray
        )

        # notify client
        finished_reading_message = (
            f"Finished reading {source}, I made {len(docs)} thoughts on it."
        )

        stray.send_ws_message(finished_reading_message)

        log.warning(f"Done uploading {source}")

    def __split_text(self, stray, text, chunk_size, chunk_overlap):
        """Split text in overlapped chunks.

        This method executes the `rabbithole_splits_text` to split the incoming text in overlapped
        chunks of text. Other two hooks are available to edit the text before and after the split step.

        Parameters
        ----------
        text : str
            Content of the loaded file.
        chunk_size : int
            Number of tokens in each document chunk.
        chunk_overlap : int
            Number of overlapping tokens between consecutive chunks.

        Returns
        -------
        docs : List[Document]
            List of split Langchain `Document`.

        Notes
        -----
        The default behavior only executes the `rabbithole_splits_text` hook. `before_rabbithole_splits_text` and
        `after_rabbithole_splitted_text` hooks return the original input without any modification.

        See Also
        --------
        before_rabbithole_splits_text
        rabbithole_splits_text
        after_rabbithole_splitted_text

        """
        # do something on the text before it is split
        text = stray.mad_hatter.execute_hook(
            "before_rabbithole_splits_text", text, cat=stray
        )

        # hooks decide the test splitter (see @property .text_splitter)
        text_splitter = self.text_splitter

        # override chunk_size and chunk_overlap only if the request has those info
        if chunk_size:
            text_splitter._chunk_size = chunk_size
        if chunk_overlap:
            text_splitter._chunk_overlap = chunk_overlap

        log.info(f"Chunk size: {chunk_size}, chunk overlap: {chunk_overlap}")
        # split text
        docs = text_splitter.split_documents(text)
        # remove short texts (page numbers, isolated words, etc.)
        # TODO: join each short chunk with previous one, instead of deleting them
        docs = list(filter(lambda d: len(d.page_content) > 10, docs))

        # do something on the text after it is split
        docs = stray.mad_hatter.execute_hook(
            "after_rabbithole_splitted_text", docs, cat=stray
        )

        return docs

    # each time we access the file handlers, plugins can intervene
    @property
    def file_handlers(self):
        self.__reload_file_handlers()
        return self.__file_handlers

    # each time we access the text splitter, plugins can intervene
    @property
    def text_splitter(self):
        self.__reload_text_splitter()
        return self.__text_splitter

__split_text(stray, text, chunk_size, chunk_overlap)

Split text in overlapped chunks.

This method executes the rabbithole_splits_text to split the incoming text in overlapped chunks of text. Other two hooks are available to edit the text before and after the split step.

Parameters:

Name Type Description Default
text str

Content of the loaded file.

required
chunk_size int

Number of tokens in each document chunk.

required
chunk_overlap int

Number of overlapping tokens between consecutive chunks.

required

Returns:

Name Type Description
docs List[Document]

List of split Langchain Document.

Notes

The default behavior only executes the rabbithole_splits_text hook. before_rabbithole_splits_text and after_rabbithole_splitted_text hooks return the original input without any modification.

See Also

before_rabbithole_splits_text rabbithole_splits_text after_rabbithole_splitted_text

Source code in cat/rabbit_hole.py
def __split_text(self, stray, text, chunk_size, chunk_overlap):
    """Split text in overlapped chunks.

    This method executes the `rabbithole_splits_text` to split the incoming text in overlapped
    chunks of text. Other two hooks are available to edit the text before and after the split step.

    Parameters
    ----------
    text : str
        Content of the loaded file.
    chunk_size : int
        Number of tokens in each document chunk.
    chunk_overlap : int
        Number of overlapping tokens between consecutive chunks.

    Returns
    -------
    docs : List[Document]
        List of split Langchain `Document`.

    Notes
    -----
    The default behavior only executes the `rabbithole_splits_text` hook. `before_rabbithole_splits_text` and
    `after_rabbithole_splitted_text` hooks return the original input without any modification.

    See Also
    --------
    before_rabbithole_splits_text
    rabbithole_splits_text
    after_rabbithole_splitted_text

    """
    # do something on the text before it is split
    text = stray.mad_hatter.execute_hook(
        "before_rabbithole_splits_text", text, cat=stray
    )

    # hooks decide the test splitter (see @property .text_splitter)
    text_splitter = self.text_splitter

    # override chunk_size and chunk_overlap only if the request has those info
    if chunk_size:
        text_splitter._chunk_size = chunk_size
    if chunk_overlap:
        text_splitter._chunk_overlap = chunk_overlap

    log.info(f"Chunk size: {chunk_size}, chunk overlap: {chunk_overlap}")
    # split text
    docs = text_splitter.split_documents(text)
    # remove short texts (page numbers, isolated words, etc.)
    # TODO: join each short chunk with previous one, instead of deleting them
    docs = list(filter(lambda d: len(d.page_content) > 10, docs))

    # do something on the text after it is split
    docs = stray.mad_hatter.execute_hook(
        "after_rabbithole_splitted_text", docs, cat=stray
    )

    return docs

file_to_docs(stray, file, chunk_size=None, chunk_overlap=None)

Load and convert files to Langchain Document.

This method takes a file either from a Python script, from the /rabbithole/ or /rabbithole/web endpoints. Hence, it loads it in memory and splits it in overlapped chunks of text.

Parameters:

Name Type Description Default
file (str, UploadFile)

The file can be either a string path if loaded programmatically, a FastAPI UploadFile if coming from the /rabbithole/ endpoint or a URL if coming from the /rabbithole/web endpoint.

required
chunk_size int

Number of tokens in each document chunk.

None
chunk_overlap int

Number of overlapping tokens between consecutive chunks.

None

Returns:

Name Type Description
docs List[Document]

List of Langchain Document of chunked text.

Notes

This method is used by both /rabbithole/ and /rabbithole/web endpoints. Currently supported files are .txt, .pdf, .md and web pages.

Source code in cat/rabbit_hole.py
def file_to_docs(
    self,
    stray,
    file: Union[str, UploadFile],
    chunk_size: int | None = None,
    chunk_overlap: int | None = None
) -> List[Document]:
    """Load and convert files to Langchain `Document`.

    This method takes a file either from a Python script, from the `/rabbithole/` or `/rabbithole/web` endpoints.
    Hence, it loads it in memory and splits it in overlapped chunks of text.

    Parameters
    ----------
    file : str, UploadFile
        The file can be either a string path if loaded programmatically, a FastAPI `UploadFile`
        if coming from the `/rabbithole/` endpoint or a URL if coming from the `/rabbithole/web` endpoint.
    chunk_size : int
        Number of tokens in each document chunk.
    chunk_overlap : int
        Number of overlapping tokens between consecutive chunks.

    Returns
    -------
    docs : List[Document]
        List of Langchain `Document` of chunked text.

    Notes
    -----
    This method is used by both `/rabbithole/` and `/rabbithole/web` endpoints.
    Currently supported files are `.txt`, `.pdf`, `.md` and web pages.

    """

    # Check type of incoming file.
    if isinstance(file, UploadFile):
        # Get mime type and source of UploadFile
        content_type = mimetypes.guess_type(file.filename)[0]
        source = file.filename

        # Get file bytes
        file_bytes = file.file.read()
    elif isinstance(file, str):
        # Check if string file is a string or url
        parsed_file = urlparse(file)
        is_url = all([parsed_file.scheme, parsed_file.netloc])

        if is_url:
            # Make a request with a fake browser name
            request = httpx.get(file, headers={"User-Agent": "Magic Browser"})

            # Define mime type and source of url
            content_type = request.headers["Content-Type"].split(";")[0]
            source = file

            try:
                # Get binary content of url
                file_bytes = request.content
            except HTTPError as e:
                log.error(e)
        else:
            # Get mime type from file extension and source
            content_type = mimetypes.guess_type(file)[0]
            source = os.path.basename(file)

            # Get file bytes
            with open(file, "rb") as f:
                file_bytes = f.read()
    else:
        raise ValueError(f"{type(file)} is not a valid type.")
    return self.string_to_docs(
        stray=stray,
        file_bytes=file_bytes,
        source=source,
        content_type=content_type,
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )

ingest_file(stray, file, chunk_size=None, chunk_overlap=None, metadata={})

Load a file in the Cat's declarative memory.

The method splits and converts the file in Langchain Document. Then, it stores the Document in the Cat's memory.

Parameters:

Name Type Description Default
file (str, UploadFile)

The file can be a path passed as a string or an UploadFile object if the document is ingested using the rabbithole endpoint.

required
chunk_size int

Number of tokens in each document chunk.

None
chunk_overlap int

Number of overlapping tokens between consecutive chunks.

None
metadata dict

Metadata to be stored with each chunk.

{}
Notes

Currently supported formats are .txt, .pdf and .md. You cn add custom ones or substitute the above via RabbitHole hooks.

See Also

before_rabbithole_stores_documents

Source code in cat/rabbit_hole.py
def ingest_file(
    self,
    stray,
    file: Union[str, UploadFile],
    chunk_size: int | None = None,
    chunk_overlap: int | None = None,
    metadata: dict = {}
):
    """Load a file in the Cat's declarative memory.

    The method splits and converts the file in Langchain `Document`. Then, it stores the `Document` in the Cat's
    memory.

    Parameters
    ----------
    file : str, UploadFile
        The file can be a path passed as a string or an `UploadFile` object if the document is ingested using the
        `rabbithole` endpoint.
    chunk_size : int
        Number of tokens in each document chunk.
    chunk_overlap : int
        Number of overlapping tokens between consecutive chunks.
    metadata : dict
        Metadata to be stored with each chunk.

    Notes
    ----------
    Currently supported formats are `.txt`, `.pdf` and `.md`.
    You cn add custom ones or substitute the above via RabbitHole hooks.

    See Also
    ----------
    before_rabbithole_stores_documents
    """

    # split file into a list of docs
    docs = self.file_to_docs(
        stray=stray,
        file=file,
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )

    # store in memory
    if isinstance(file, str):
        filename = file
    else:
        filename = file.filename

    self.store_documents(stray=stray, docs=docs, source=filename, metadata=metadata)

ingest_memory(stray, file)

Upload memories to the declarative memory from a JSON file.

Parameters:

Name Type Description Default
file UploadFile

File object sent via rabbithole/memory hook.

required
Notes

This method allows uploading a JSON file containing vector and text memories directly to the declarative memory. When doing this, please, make sure the embedder used to export the memories is the same as the one used when uploading. The method also performs a check on the dimensionality of the embeddings (i.e. length of each vector).

Source code in cat/rabbit_hole.py
def ingest_memory(
        self,
        stray,
        file: UploadFile
    ):
    """Upload memories to the declarative memory from a JSON file.

    Parameters
    ----------
    file : UploadFile
        File object sent via `rabbithole/memory` hook.

    Notes
    -----
    This method allows uploading a JSON file containing vector and text memories directly to the declarative memory.
    When doing this, please, make sure the embedder used to export the memories is the same as the one used
    when uploading.
    The method also performs a check on the dimensionality of the embeddings (i.e. length of each vector).

    """

    # Get file bytes
    file_bytes = file.file.read()

    # Load fyle byte in a dict
    memories = json.loads(file_bytes.decode("utf-8"))

    # Check the embedder used for the uploaded memories is the same the Cat is using now
    upload_embedder = memories["embedder"]
    cat_embedder = str(stray.embedder.__class__.__name__)

    if upload_embedder != cat_embedder:
        message = f"Embedder mismatch: file embedder {upload_embedder} is different from {cat_embedder}"
        raise Exception(message)

    # Get Declarative memories in file
    declarative_memories = memories["collections"]["declarative"]

    # Store data to upload the memories in batch
    ids = [i["id"] for i in declarative_memories]
    payloads = [
        {"page_content": p["page_content"], "metadata": p["metadata"]}
        for p in declarative_memories
    ]
    vectors = [v["vector"] for v in declarative_memories]

    log.info(f"Preparing to load {len(vectors)} vector memories")

    # Check embedding size is correct
    embedder_size = stray.memory.vectors.declarative.embedder_size
    len_mismatch = [len(v) == embedder_size for v in vectors]

    if not any(len_mismatch):
        message = (
            f"Embedding size mismatch: vectors length should be {embedder_size}"
        )
        raise Exception(message)

    # Upsert memories in batch mode # TODO REFACTOR: use VectorMemoryCollection.add_point
    stray.memory.vectors.vector_db.upsert(
        collection_name="declarative",
        points=models.Batch(ids=ids, payloads=payloads, vectors=vectors),
    )

store_documents(stray, docs, source, metadata={})

Add documents to the Cat's declarative memory.

This method loops a list of Langchain Document and adds some metadata. Namely, the source filename and the timestamp of insertion. Once done, the method notifies the client via Websocket connection.

Parameters:

Name Type Description Default
docs List[Document]

List of Langchain Document to be inserted in the Cat's declarative memory.

required
source str

Source name to be added as a metadata. It can be a file name or an URL.

required
metadata dict

Metadata to be stored with each chunk.

{}
Notes

At this point, it is possible to customize the Cat's behavior using the before_rabbithole_insert_memory hook to edit the memories before they are inserted in the vector database.

See Also

before_rabbithole_insert_memory

Source code in cat/rabbit_hole.py
def store_documents(
        self,
        stray,
        docs: List[Document],
        source: str, # TODOV2: is this necessary?
        metadata: dict = {}
    ) -> None:
    """Add documents to the Cat's declarative memory.

    This method loops a list of Langchain `Document` and adds some metadata. Namely, the source filename and the
    timestamp of insertion. Once done, the method notifies the client via Websocket connection.

    Parameters
    ----------
    docs : List[Document]
        List of Langchain `Document` to be inserted in the Cat's declarative memory.
    source : str
        Source name to be added as a metadata. It can be a file name or an URL.
    metadata : dict
        Metadata to be stored with each chunk.

    Notes
    -------
    At this point, it is possible to customize the Cat's behavior using the `before_rabbithole_insert_memory` hook
    to edit the memories before they are inserted in the vector database.

    See Also
    --------
    before_rabbithole_insert_memory
    """

    log.info(f"Preparing to memorize {len(docs)} vectors")

    # hook the docs before they are stored in the vector memory
    docs = stray.mad_hatter.execute_hook(
        "before_rabbithole_stores_documents", docs, cat=stray
    )

    # classic embed
    time_last_notification = time.time()
    time_interval = 10  # a notification every 10 secs
    stored_points = []
    for d, doc in enumerate(docs):
        if time.time() - time_last_notification > time_interval:
            time_last_notification = time.time()
            perc_read = int(d / len(docs) * 100)
            read_message = f"Read {perc_read}% of {source}"
            stray.send_ws_message(read_message)
            log.warning(read_message)

        # add default metadata
        doc.metadata["source"] = source
        doc.metadata["when"] = time.time()
        # add custom metadata (sent via endpoint)
        for k,v in metadata.items():
            doc.metadata[k] = v

        doc = stray.mad_hatter.execute_hook(
            "before_rabbithole_insert_memory", doc, cat=stray
        )
        inserting_info = f"{d + 1}/{len(docs)}):    {doc.page_content}"
        if doc.page_content != "":
            doc_embedding = stray.embedder.embed_documents([doc.page_content])
            stored_point = stray.memory.vectors.declarative.add_point(
                doc.page_content,
                doc_embedding[0],
                doc.metadata,
            )
            stored_points.append(stored_point)

            log.info(f"Inserted into memory ({inserting_info})")
        else:
            log.info(f"Skipped memory insertion of empty doc ({inserting_info})")

        # wait a little to avoid APIs rate limit errors
        time.sleep(0.05)

    # hook the points after they are stored in the vector memory
    stray.mad_hatter.execute_hook(
        "after_rabbithole_stored_documents", source, stored_points, cat=stray
    )

    # notify client
    finished_reading_message = (
        f"Finished reading {source}, I made {len(docs)} thoughts on it."
    )

    stray.send_ws_message(finished_reading_message)

    log.warning(f"Done uploading {source}")

string_to_docs(stray, file_bytes, source=None, content_type='text/plain', chunk_size=None, chunk_overlap=None)

Convert string to Langchain Document.

Takes a string, converts it to langchain Document. Hence, loads it in memory and splits it in overlapped chunks of text.

Parameters:

Name Type Description Default
file_bytes str

The string to be converted.

required
source str

Source filename.

None
content_type str

Mimetype of content.

'text/plain'
chunk_size int

Number of tokens in each document chunk.

None
chunk_overlap int

Number of overlapping tokens between consecutive chunks.

None

Returns:

Name Type Description
docs List[Document]

List of Langchain Document of chunked text.

Source code in cat/rabbit_hole.py
def string_to_docs(
    self,
    stray,
    file_bytes: str,
    source: str = None,
    content_type: str = "text/plain",
    chunk_size: int | None = None,
    chunk_overlap: int | None = None
) -> List[Document]:
    """Convert string to Langchain `Document`.

    Takes a string, converts it to langchain `Document`.
    Hence, loads it in memory and splits it in overlapped chunks of text.

    Parameters
    ----------
    file_bytes : str
        The string to be converted.
    source: str
        Source filename.
    content_type:
        Mimetype of content.
    chunk_size : int
        Number of tokens in each document chunk.
    chunk_overlap : int
        Number of overlapping tokens between consecutive chunks.

    Returns
    -------
    docs : List[Document]
        List of Langchain `Document` of chunked text.
    """

    # Load the bytes in the Blob schema
    blob = Blob(data=file_bytes, mimetype=content_type, source=source).from_data(
        data=file_bytes, mime_type=content_type, path=source
    )
    # Parser based on the mime type
    parser = MimeTypeBasedParser(handlers=self.file_handlers)

    # Parse the text
    stray.send_ws_message(
        "I'm parsing the content. Big content could require some minutes..."
    )
    super_docs = parser.parse(blob)

    # Split
    stray.send_ws_message("Parsing completed. Now let's go with reading process...")
    docs = self.__split_text(
        stray=stray,
        text=super_docs,
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )
    return docs