OXIESEC PANEL
- Current Dir:
/
/
opt
/
alt
/
python311
/
lib
/
python3.11
/
site-packages
/
loguru
Server IP: 2a02:4780:11:1594:0:ef5:22d7:a
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
01/09/2025 02:18:04 AM
rwxr-xr-x
📄
__init__.py
626 bytes
05/14/2024 03:18:46 PM
rw-r--r--
📄
__init__.pyi
14.1 KB
05/14/2024 03:18:46 PM
rw-r--r--
📁
__pycache__
-
05/14/2024 03:18:46 PM
rwxr-xr-x
📄
_asyncio_loop.py
597 bytes
05/14/2024 03:18:46 PM
rw-r--r--
📄
_better_exceptions.py
19.53 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_colorama.py
1.63 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_colorizer.py
14.26 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_contextvars.py
321 bytes
05/14/2024 03:18:46 PM
rw-r--r--
📄
_ctime_functions.py
1.5 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_datetime.py
3.67 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_defaults.py
2.86 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_error_interceptor.py
1.08 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_file_sink.py
14.19 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_filters.py
618 bytes
05/14/2024 03:18:46 PM
rw-r--r--
📄
_get_frame.py
458 bytes
05/14/2024 03:18:46 PM
rw-r--r--
📄
_handler.py
12.34 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_locks_machinery.py
1.28 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_logger.py
93.72 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_recattrs.py
2.79 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_simple_sinks.py
3.65 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
_string_parsers.py
4.6 KB
05/14/2024 03:18:46 PM
rw-r--r--
📄
py.typed
0 bytes
05/14/2024 03:18:46 PM
rw-r--r--
Editing: _simple_sinks.py
Close
import asyncio import logging import weakref from ._asyncio_loop import get_running_loop, get_task_loop class StreamSink: def __init__(self, stream): self._stream = stream self._flushable = callable(getattr(stream, "flush", None)) self._stoppable = callable(getattr(stream, "stop", None)) self._completable = asyncio.iscoroutinefunction(getattr(stream, "complete", None)) def write(self, message): self._stream.write(message) if self._flushable: self._stream.flush() def stop(self): if self._stoppable: self._stream.stop() def tasks_to_complete(self): if not self._completable: return [] return [self._stream.complete()] class StandardSink: def __init__(self, handler): self._handler = handler def write(self, message): record = message.record message = str(message) exc = record["exception"] record = logging.getLogger().makeRecord( record["name"], record["level"].no, record["file"].path, record["line"], message, (), (exc.type, exc.value, exc.traceback) if exc else None, record["function"], {"extra": record["extra"]}, ) if exc: record.exc_text = "\n" self._handler.handle(record) def stop(self): self._handler.close() def tasks_to_complete(self): return [] class AsyncSink: def __init__(self, function, loop, error_interceptor): self._function = function self._loop = loop self._error_interceptor = error_interceptor self._tasks = weakref.WeakSet() def write(self, message): try: loop = self._loop or get_running_loop() except RuntimeError: return coroutine = self._function(message) task = loop.create_task(coroutine) def check_exception(future): if future.cancelled() or future.exception() is None: return if not self._error_interceptor.should_catch(): raise future.exception() self._error_interceptor.print(message.record, exception=future.exception()) task.add_done_callback(check_exception) self._tasks.add(task) def stop(self): for task in self._tasks: task.cancel() def tasks_to_complete(self): # To avoid errors due to "self._tasks" being mutated while iterated, the # "tasks_to_complete()" method must be protected by the same lock as "write()" (which # happens to be the handler lock). However, the tasks must not be awaited while the lock is # acquired as this could lead to a deadlock. Therefore, we first need to collect the tasks # to complete, then return them so that they can be awaited outside of the lock. return [self._complete_task(task) for task in self._tasks] async def _complete_task(self, task): loop = get_running_loop() if get_task_loop(task) is not loop: return try: await task except Exception: pass # Handled in "check_exception()" def __getstate__(self): state = self.__dict__.copy() state["_tasks"] = None return state def __setstate__(self, state): self.__dict__.update(state) self._tasks = weakref.WeakSet() class CallableSink: def __init__(self, function): self._function = function def write(self, message): self._function(message) def stop(self): pass def tasks_to_complete(self): return []