Source code for microngo.microngo

from pymongo import MongoClient
from pymongo.cursor import Cursor
from pymongo.command_cursor import CommandCursor
from microngo.pagination import Pagination

[docs]class Microngo(object): def __init__(self, *args, **kwargs): ''' :param str db: Database name. This param is optional, you can use :func:`~microngo.Microngo.database` to set current database. :type db: str or None ''' self.collection = None self._db_name = None self.db = None # Get database name and remove it database = kwargs.get("db") if database: del kwargs["db"] self._db_name = database # New mongo client self.client = MongoClient(*args, **kwargs) # Database if self._db_name: self.db = self.client[self._db_name]
[docs] def database(self, database): '''Set current database :param str database: Database name :return: :class:`~microngo.Microngo` ''' self.db = self.client[database] return self
def _collection(self, collection): if self.db: self.collection = self.db[collection] return self.collection else: raise Exception("database not found, please set database name.")
[docs] def query(self, collection): '''Create query object :param str collection: Collection name :return: :class:`~microngo.Query` ''' return Query(self._collection(collection))
[docs] def insert(self, collection): '''Insert into collection :param str collection: Collection name :return: :class:`~microngo.Document` ''' return Document(self._collection(collection))
[docs]class Document(object): def __init__(self, collection, data=None): ''' :param obj collection: PyMongo collection object :param str data: Data for document :type data: dict or None ''' # Variables self._microngo_collection = collection self._microngo_payloads = [] self._microngo_document_id = None # Dict data if data: if isinstance(data, dict): # Set documen id if exist if data.get("_id"): self._microngo_document_id = data.get("_id") # Update data self.__dict__.update(data) def __getattr__(self, name): return None def _get_payloads(self): # Get self variable in dictionary payloads = dict(self.__dict__) # Remove internal variables and return data del payloads['_microngo_collection'] del payloads['_microngo_payloads'] del payloads['_microngo_document_id'] return payloads def _clear_payloads(self): # Cache variable cache_collection = self._microngo_collection cache_payloads = self._microngo_payloads cache_document_id = self._microngo_document_id # Clear self variables payloads = self.__dict__ payloads.clear() # Reasign internal variables from cache payloads['_microngo_collection'] = cache_collection payloads['_microngo_payloads'] = cache_payloads payloads['_microngo_document_id'] = cache_document_id return self
[docs] def add(self): ''' Add document to list, then insert to collection with insert_many function. ''' if not self._microngo_document_id: # Get payloads payloads = self._get_payloads() # Append to cache self._microngo_payloads.append(payloads) # Clear previous payload and return self._clear_payloads() return self
[docs] def raw(self): ''' Get raw data or dict from document. :return: dict ''' return self._get_payloads()
[docs] def save(self): ''' Save document to collection. :return: list of OjectId or single ObjectId ''' if self._microngo_document_id: # Update self._microngo_collection.update_one( {'_id': self._microngo_document_id}, {'$set': self._get_payloads()}, upsert=False ) return self._microngo_document_id else: # Insert if len(self._microngo_payloads) > 0: data = self._microngo_payloads return self._microngo_collection.insert_many(data).inserted_ids else: data = self._get_payloads() doc_id = self._microngo_collection.insert_one(data).inserted_id self._microngo_document_id = doc_id return doc_id
[docs] def remove(self): ''' Remove document from collection. ''' if self._microngo_document_id: # Delete self._microngo_collection.delete_one({'_id': self._microngo_document_id})
[docs]class Query(object): def __init__(self, collection, cursor=None): ''' :param obj collection: PyMongo collection object :param obj cursor: PyMongo cursor object :type cursor: obj or None ''' self.collection = collection self.cursor = cursor def __getattr__(self, name): # Handle pymongo collection's method def method(*args, **kwargs): if self._cursor_iterable(): func = getattr(self.cursor, name) else: func = getattr(self.collection, name) self.cursor = func(*args, **kwargs) return self return method def _cursor_iterable(self): # Is cursor iterable return isinstance(self.cursor, Cursor) \ or isinstance(self.cursor, CommandCursor) def _document(self, data): # Create document object return Document(self.collection, data)
[docs] def find_by(self, **kwargs): ''' :return: :class:`~microngo.Query` ''' self.cursor = self.collection.find(kwargs) return self
[docs] def raw(self): ''' Get raw result. :return: raw result from PyMongo ''' return self.cursor
[docs] def one(self): ''' Get single document. :return: :class:`~microngo.Document` or None :raises Exception: if the result type isn't dict ''' if isinstance(self.cursor, dict): return self._document(self.cursor) elif self.cursor == None: return {} else: raise Exception("cursor type is '%s'" % (type(self.cursor)))
[docs] def first(self): ''' Get first document from list or from single document result. :return: :class:`~microngo.Document` or None :raises Exception: if the result type isn't dict and not iterable ''' if self._cursor_iterable(): data = list(self.cursor) if len(data) > 0: return self._document(data[0]) else: return {} elif isinstance(self.cursor, dict): return self.one() elif self.cursor == None: return {} else: raise Exception("cursor type is '%s'" % (type(self.cursor)))
[docs] def all(self): ''' Get all document in list. :return: list of :class:`~microngo.Document` or [] :raises Exception: if the result is not iterable ''' documents = [] if self._cursor_iterable(): for i in self.cursor: documents.append(self._document(i)) elif self.cursor == None: return [] else: raise Exception("cursor type is '%s'" % (type(self.cursor))) return documents
[docs] def paginate(self, page, per_page=20): ''' Crate pagination query :return: :class:`~microngo.Pagination` or None ''' if page < 1: return None total = self.cursor.count() items = self.skip((page - 1) * per_page).limit(per_page).all() if len(items) < 1 and page != 1: return None return Pagination(self, page, per_page, total, items)