图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
5.0 KiB

  1. import contextlib
  2. import itertools
  3. import logging
  4. import sys
  5. import time
  6. from typing import IO, Generator, Optional
  7. from pip._internal.utils.compat import WINDOWS
  8. from pip._internal.utils.logging import get_indentation
  9. logger = logging.getLogger(__name__)
  10. class SpinnerInterface:
  11. def spin(self) -> None:
  12. raise NotImplementedError()
  13. def finish(self, final_status: str) -> None:
  14. raise NotImplementedError()
  15. class InteractiveSpinner(SpinnerInterface):
  16. def __init__(
  17. self,
  18. message: str,
  19. file: Optional[IO[str]] = None,
  20. spin_chars: str = "-\\|/",
  21. # Empirically, 8 updates/second looks nice
  22. min_update_interval_seconds: float = 0.125,
  23. ):
  24. self._message = message
  25. if file is None:
  26. file = sys.stdout
  27. self._file = file
  28. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  29. self._finished = False
  30. self._spin_cycle = itertools.cycle(spin_chars)
  31. self._file.write(" " * get_indentation() + self._message + " ... ")
  32. self._width = 0
  33. def _write(self, status: str) -> None:
  34. assert not self._finished
  35. # Erase what we wrote before by backspacing to the beginning, writing
  36. # spaces to overwrite the old text, and then backspacing again
  37. backup = "\b" * self._width
  38. self._file.write(backup + " " * self._width + backup)
  39. # Now we have a blank slate to add our status
  40. self._file.write(status)
  41. self._width = len(status)
  42. self._file.flush()
  43. self._rate_limiter.reset()
  44. def spin(self) -> None:
  45. if self._finished:
  46. return
  47. if not self._rate_limiter.ready():
  48. return
  49. self._write(next(self._spin_cycle))
  50. def finish(self, final_status: str) -> None:
  51. if self._finished:
  52. return
  53. self._write(final_status)
  54. self._file.write("\n")
  55. self._file.flush()
  56. self._finished = True
  57. # Used for dumb terminals, non-interactive installs (no tty), etc.
  58. # We still print updates occasionally (once every 60 seconds by default) to
  59. # act as a keep-alive for systems like Travis-CI that take lack-of-output as
  60. # an indication that a task has frozen.
  61. class NonInteractiveSpinner(SpinnerInterface):
  62. def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
  63. self._message = message
  64. self._finished = False
  65. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  66. self._update("started")
  67. def _update(self, status: str) -> None:
  68. assert not self._finished
  69. self._rate_limiter.reset()
  70. logger.info("%s: %s", self._message, status)
  71. def spin(self) -> None:
  72. if self._finished:
  73. return
  74. if not self._rate_limiter.ready():
  75. return
  76. self._update("still running...")
  77. def finish(self, final_status: str) -> None:
  78. if self._finished:
  79. return
  80. self._update(f"finished with status '{final_status}'")
  81. self._finished = True
  82. class RateLimiter:
  83. def __init__(self, min_update_interval_seconds: float) -> None:
  84. self._min_update_interval_seconds = min_update_interval_seconds
  85. self._last_update: float = 0
  86. def ready(self) -> bool:
  87. now = time.time()
  88. delta = now - self._last_update
  89. return delta >= self._min_update_interval_seconds
  90. def reset(self) -> None:
  91. self._last_update = time.time()
  92. @contextlib.contextmanager
  93. def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
  94. # Interactive spinner goes directly to sys.stdout rather than being routed
  95. # through the logging system, but it acts like it has level INFO,
  96. # i.e. it's only displayed if we're at level INFO or better.
  97. # Non-interactive spinner goes through the logging system, so it is always
  98. # in sync with logging configuration.
  99. if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
  100. spinner: SpinnerInterface = InteractiveSpinner(message)
  101. else:
  102. spinner = NonInteractiveSpinner(message)
  103. try:
  104. with hidden_cursor(sys.stdout):
  105. yield spinner
  106. except KeyboardInterrupt:
  107. spinner.finish("canceled")
  108. raise
  109. except Exception:
  110. spinner.finish("error")
  111. raise
  112. else:
  113. spinner.finish("done")
  114. HIDE_CURSOR = "\x1b[?25l"
  115. SHOW_CURSOR = "\x1b[?25h"
  116. @contextlib.contextmanager
  117. def hidden_cursor(file: IO[str]) -> Generator[None, None, None]:
  118. # The Windows terminal does not support the hide/show cursor ANSI codes,
  119. # even via colorama. So don't even try.
  120. if WINDOWS:
  121. yield
  122. # We don't want to clutter the output with control characters if we're
  123. # writing to a file, or if the user is running with --quiet.
  124. # See https://github.com/pypa/pip/issues/3418
  125. elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
  126. yield
  127. else:
  128. file.write(HIDE_CURSOR)
  129. try:
  130. yield
  131. finally:
  132. file.write(SHOW_CURSOR)