refine Unicode Placeholders explanation and usage in KGP documentation
This commit is contained in:
+89
-448
@@ -473,492 +473,131 @@ KGP 既支持直接传输 PNG 二进制数据, 也支持传输 24bit 与 32bit
|
||||
|
||||
#### 特性
|
||||
|
||||
Unicode Placeholders 是 Kitty 图像协议的一个独特功能, 它允许使用占位符嵌入图像, 这提供了一些有意思的特性:
|
||||
Unicode Placeholders 是 Kitty 图像协议中处理如何放置图像的方法之一, 它允许使用占位符嵌入图像, 这提供了一些有意思的特性:
|
||||
|
||||
- 可在任意支持 Unicode 字符和 `CSI` 前景色控制序列的终端应用中显示图像.
|
||||
|
||||
- 对于不支持该协议的终端模拟器, 对应位置会显示为相同大小的(彩色)不明字符, 避免格式错乱.
|
||||
|
||||
- 对于一张图像, 可以仅传递一次数据, 后续通过相同 ID 的占位符即可重复引用相同图像.
|
||||
|
||||
- 可以通过仅输出部分占位符来实现"裁剪"显示图像的效果.
|
||||
|
||||
- 更改终端中字体大小时已经显示的图像会被同比例缩放, 而不会像传统 KGP 那样保持原来的大小不变.
|
||||
- 更改终端中字体大小时已经显示的图像会被同比例缩放, 而不会像普通放置方法那样保持原来的大小不变.
|
||||
|
||||
- 只需要简单的清屏即可删除已经显示的图像, 不需要发送额外的控制序列.
|
||||
- 只需要简单的清屏即可删除已经显示的图像, 不需要发送额外的控制序列. 例如如果使用普通放置方法, 虽然 `clear` 能清除图像, 但是在同屏进入如 `intel_gpu_top` 这类全屏 TUI 程序时, 之前放置的图像可能并不会被及时清除, 导致内容被覆盖.
|
||||
|
||||
需要注意的是, Unicode Placeholders 仅仅是 KGP 所涉及的一种放置方法, 并不是一种全新的协议或控制序列. 因此, 只有支持 KGP 的终端模拟器才可能支持 Unicode Placeholders, 但反过来说, 支持 KGP 的终端模拟器不一定支持 Unicode Placeholders.
|
||||
|
||||
#### 使用
|
||||
|
||||
该功能可通过 `kitty +kitten icat` 的 `--unicode-placeholders` 参数启用.
|
||||
该特性可通过 `kitty +kitten icat` 的 `--unicode-placeholders` 参数启用.
|
||||
|
||||
如果想自己从头实现控制序列编码, Unicode Placeholders 相较传统办法最大的不同就是在 `APC` 后输出多行由 `U+10EEEE` 和变音符号组成的 Unicode 字符串, 其中 `U+10EEEE` 为占位字符, 变音符号用于标识行号和列号, 文本前景色用于编码图片 ID, 下划线颜色用于编码放置 ID, 背景色用于...充当背景色. 更多细节可以参考 [Kitty 官方文档](https://sw.kovidgoyal.net/kitty/graphics-protocol/#unicode-placeholders).
|
||||
|
||||
虽然这个功能很有趣, 但就目前而言真正实现它的终端模拟器寥寥无几, 在[前面的表格](#各终端支持情况)中只有 Kitty 和 Ghostty 位于此列, 其他终端模拟器即便支持 KGP, 也只会同时显示占位符和正常的图片, 效果非常诡异.
|
||||
虽然这个特性很有趣, 但就目前而言真正实现它的终端模拟器寥寥无几, 在[前面的表格](#各终端支持情况)中只有 Kitty 和 Ghostty 位于此列, 其他终端模拟器即便支持 KGP, 也只会同时显示占位符和正常的图片, 效果非常诡异.
|
||||
|
||||
#### 实现
|
||||
|
||||
指编码端的实现. 如前文所说, 该方法和传统方法在前半部分是几乎完全一致的, 因此实现 Unicode Placeholders 的同时也可以~~顺便~~实现 KGP 的基础部分. 以下摘取[前面](#性能测试)提到的 idog 的 Python 部分实现代码.
|
||||
|
||||
- 工具函数
|
||||
|
||||
```python
|
||||
import base64
|
||||
import random
|
||||
import zlib
|
||||
|
||||
def random_ID(max: int = 0xFFFFFF): return random.randint(0, max)
|
||||
def base64_encode(data: bytes) -> str: return base64.b64encode(data).decode("ascii")
|
||||
def zlib_compress(data: bytes) -> bytes: return zlib.compress(data)
|
||||
```
|
||||
|
||||
以及构造 PNG 数据的函数, 用于 query 和测试.
|
||||
|
||||
```python
|
||||
import struct
|
||||
|
||||
def png_makechunk(type: bytes, data: bytes) -> bytes:
|
||||
return struct.pack(">I", len(data)) + type + data + struct.pack(">I", zlib.crc32(type + data))
|
||||
|
||||
|
||||
def mock_png_data(width: int, height: int) -> bytes:
|
||||
data = b"\x89PNG\r\n\x1a\n"
|
||||
# IHDR
|
||||
ihdr = struct.pack(">IIBBBBB", width, height, 8, 6, 0, 0, 0)
|
||||
data += png_makechunk(b"IHDR", ihdr)
|
||||
# IDAT
|
||||
compressor = zlib.compressobj(level=9, strategy=zlib.Z_DEFAULT_STRATEGY)
|
||||
idat = compressor.compress(b"".join(
|
||||
b"\x00" + b"\xff\xff\xff\x80" * width for _ in range(height)
|
||||
)) + compressor.flush()
|
||||
data += png_makechunk(b"IDAT", idat)
|
||||
# IEND
|
||||
data += png_makechunk(b"IEND", b"")
|
||||
return data
|
||||
```
|
||||
指编码端的实现. 如前文所说, Unicode Placeholders 是 KGP 的一个子功能, 因此实现 Unicode Placeholders 的同时也可以(或者说必须)实现 KGP 的基础部分. 以下摘取[前面](#性能测试)提到的 idog 的部分实现思路. 完整实现可见 [Uyanide/idog](https://github.com/Uyanide/idog).
|
||||
|
||||
- 构造 KGP 检测序列
|
||||
可以大致分为四个部分:
|
||||
- 检测是否支持特定传输介质
|
||||
- `d`: 直接在控制序列里传输像素数据
|
||||
- `s`: 通过共享内存传输像素数据, 传输完成后共享内存**会被终端模拟器删除**.
|
||||
- `t`: 通过临时文件传输像素数据, 传输完成后临时文件**会被终端模拟器删除**.
|
||||
- `f`: 通过文件传输像素数据, 传输完成后不会删除.
|
||||
|
||||
- 检测是否支持特定图片数据格式
|
||||
- `24`: 24bit RGB 原始像素数据
|
||||
- `32`: 32bit RGBA 原始像素数据
|
||||
- `100`: PNG 二进制数据
|
||||
|
||||
可以大致分为三个部分:
|
||||
- 检测是否支持 KGP
|
||||
这可以通过测试最通用的传输媒介 `d` 与最平凡的 payload 格式 `24` 来实现.
|
||||
|
||||
- 检测是否支持 Unicode Placeholders
|
||||
- 检测是否支持 Shared Memory 作为传输介质
|
||||
|
||||
其中有不少重复代码, 可以先进行抽象:
|
||||
|
||||
```python
|
||||
import re
|
||||
import os
|
||||
import sys
|
||||
import termios
|
||||
from select import select
|
||||
|
||||
def do_query(code: str, expected_response: re.Pattern, fence_response: re.Pattern, timeout: float = -1) -> bool:
|
||||
"""Helper function to send a query and wait for the expected response"""
|
||||
if timeout < 0:
|
||||
timeout = 1 if os.environ.get("SSH_TTY") else 0.1
|
||||
|
||||
fd = sys.stdin.fileno()
|
||||
if not os.isatty(fd):
|
||||
return False
|
||||
|
||||
old_settings = termios.tcgetattr(fd)
|
||||
response = ""
|
||||
|
||||
try:
|
||||
new_settings = termios.tcgetattr(fd)
|
||||
# Disable canonical mode and echo
|
||||
new_settings[3] = new_settings[3] & ~termios.ICANON & ~termios.ECHO
|
||||
termios.tcsetattr(fd, termios.TCSANOW, new_settings)
|
||||
|
||||
sys.stdout.write(code)
|
||||
sys.stdout.flush()
|
||||
|
||||
success = False
|
||||
while True:
|
||||
# Set a timeout to prevent blocking indefinitely
|
||||
r, w, e = select([fd], [], [], timeout)
|
||||
if not r:
|
||||
break
|
||||
|
||||
char = os.read(fd, 1)
|
||||
if not char:
|
||||
break
|
||||
|
||||
response += char.decode('utf-8', errors='ignore')
|
||||
|
||||
if expected_response.search(response):
|
||||
success = True
|
||||
|
||||
if fence_response.search(response):
|
||||
break
|
||||
|
||||
return success
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
|
||||
|
||||
return False
|
||||
```
|
||||
|
||||
检测是否支持 KGP. 该部分原理与[前文](#kgp)所述完全一致:
|
||||
|
||||
```python
|
||||
def query_support() -> bool:
|
||||
query_id = random_ID(0xFFFFFFFF)
|
||||
query_code = f"\033_Gi={query_id},s=1,v=1,a=q,t=d,f=24;AAAA\033\\"
|
||||
expected_response = re.compile(rf"\033_Gi={query_id};OK\033\\")
|
||||
fence_code = "\033[c"
|
||||
fence_response = re.compile(r"\033\[\?[0-9;]*c")
|
||||
|
||||
return do_query(query_code + fence_code, expected_response, fence_response)
|
||||
```
|
||||
|
||||
检测是否支持 Unicode Placeholders. KGP 并未针对此功能提供专门的查询方法, 但是如[前文](#使用)所说, 支持该功能的终端模拟器很少, 因此可以通过在检测是否支持 KGP 的基础上添加对终端模拟器特有的环境变量的检查来实现:
|
||||
|
||||
```python
|
||||
def query_unicode_placeholder_support() -> bool:
|
||||
if os.environ.get("KITTY_PID") or os.environ.get("GHOSTTY_SHELL_FEATURES"):
|
||||
return query_support()
|
||||
return False
|
||||
```
|
||||
|
||||
类似 `KITTY_PID` 和 `GHOSTTY_SHELL_FEATURES` 这样的环境变量(主观上似乎)比 `TERM` 更可靠. 并且需要注意的是即便明确知道所使用的终端模拟器支持该协议, 最好也用控制序列进行一次验证, 因为终端复用器/配置/特殊环境可能导致实际支持情况与预期不符.
|
||||
|
||||
检测是否支持 Shared Memory 作为传输介质:
|
||||
|
||||
```python
|
||||
from pathlib import Path
|
||||
from multiprocessing import shared_memory, resource_tracker
|
||||
|
||||
def query_shared_memory_support(format: str = "32") -> bool:
|
||||
# Mock data
|
||||
size = 0
|
||||
data = b""
|
||||
|
||||
if format == "32":
|
||||
size = 4
|
||||
data = b"\x00\x00\x00\x00"
|
||||
elif format == "24":
|
||||
size = 3
|
||||
data = b"\x00\x00\x00"
|
||||
elif format == "100":
|
||||
data = mock_png_data(1, 1)
|
||||
size = len(data)
|
||||
else:
|
||||
raise ValueError(f"Unsupported format: {format}")
|
||||
|
||||
query_id = random_ID(0xFFFFFFFF)
|
||||
memory_name = f"idog_{query_id}"
|
||||
encoded_memory_name = base64_encode(memory_name.encode("utf-8"))
|
||||
shm: shared_memory.SharedMemory | None = None
|
||||
success = False
|
||||
try:
|
||||
shm = shared_memory.SharedMemory(name=memory_name, create=True, size=size)
|
||||
if shm is None or shm.buf is None:
|
||||
return False
|
||||
shm.buf[:size] = data
|
||||
query_code = f"\033_Gi={query_id},s=1,v=1,a=q,t=s,f={format};{encoded_memory_name}\033\\"
|
||||
expected_response = re.compile(rf"\033_Gi={query_id};OK\033\\")
|
||||
fence_code = "\033[c"
|
||||
fence_response = re.compile(r"\033\[\?[0-9;]*c")
|
||||
success = do_query(query_code + fence_code, expected_response, fence_response)
|
||||
except Exception:
|
||||
success = False
|
||||
finally:
|
||||
try:
|
||||
if shm is not None:
|
||||
shm.close()
|
||||
if Path(f"/dev/shm/{shm.name}").exists():
|
||||
shm.unlink()
|
||||
else:
|
||||
# shm unlinked by terminal
|
||||
resource_tracker.unregister(f"/{shm.name}", "shared_memory")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return success
|
||||
```
|
||||
|
||||
需要注意的是, 如果终端模拟器支持 Shared Memory, 则会在数据传输完成后**主动**删除对应的 Shared Memory, 因此需要在 finally 里先检查 Shared Memory 是否已经被删除, 再决定是否需要调用 `unlink`. 如果确认不需要手动 unlink, 则可以调用 `resource_tracker.unregister` 来避免 Python 进程退出时发出警告.
|
||||
KGP 并未针对此功能提供专门的查询方法, 但是如[前文](#使用)所说, 支持该功能的终端模拟器很少, 因此可以通过在检测是否支持 KGP 的基础上添加对终端模拟器特有的环境变量的检查来实现:
|
||||
|
||||
- 基础序列构造
|
||||
|
||||
序列的格式为 `\033_G{options};{payload}\033\\`, 其中:
|
||||
- `options` 包含了所有必要的元数据, 如:
|
||||
- `a`: 操作类型, 取值为 `q`(查询) / `t`(传输) / `T`(传输并显示) / `d`(删除) 等
|
||||
- `i=<ID>`: 查询图片编号
|
||||
- `s=<width>`: 图片宽度(像素)
|
||||
- `v=<height>`: 图片高度(像素)
|
||||
- `f=<format>`: 图片数据格式, 取值为 `24`(24bit RGB) / `32`(32bit RGBA) / `png`(PNG 二进制数据)
|
||||
- `t=<medium>`: 传输介质, 取值为 `d`(直接在控制序列里传输) / `s`(通过共享内存传输) / `t`(通过临时文件传输) / `f`(通过文件传输)
|
||||
- `m=<more>`: 是否有更多数据块, 取值为 `1`(有) / `0`(没有), 仅在 payload 超过单条控制序列最大长度时使用, 用于指示后续控制序列是否为同一图片数据的后续块.
|
||||
|
||||
完整的选项列表可以参考 [Kitty 官方文档](https://sw.kovidgoyal.net/kitty/graphics-protocol/#control-data-reference).
|
||||
|
||||
- `payload` 包含了图片数据, 格式取决于 `t` 和 `f` 选项的值:
|
||||
|
||||
| Medium (`t`) | Format (`f`) | Payload |
|
||||
| ------------ | ------------ | --------------------------------------------------- |
|
||||
| `d` | `24` / `32` | 经过 Base64 编码和可选的 zlib 压缩的像素数据 |
|
||||
| `d` | `100` | 经过 Base64 编码和可选的 zlib 压缩的 PNG 二进制数据 |
|
||||
| `s` | `24` / `32` | 经过 Base64 编码的共享内存名称, 存储原始像素数据 |
|
||||
| `s` | `100` | 经过 Base64 编码的共享内存名称, 存储 PNG 二进制数据 |
|
||||
| `t` | `24` / `32` | 经过 Base64 编码的临时文件路径, 存储原始像素数据 |
|
||||
| `t` | `100` | 经过 Base64 编码的临时文件路径, 存储 PNG 二进制数据 |
|
||||
| `f` | `24` / `32` | 经过 Base64 编码的文件路径, 存储原始像素数据 |
|
||||
| `f` | `100` | 经过 Base64 编码的文件路径, 存储 PNG 二进制数据 |
|
||||
|
||||
需要注意的是, **共享内存名称**不包含路径, 也不包含前缀的`/`. 例如某共享内存完整路径为 `/dev/shm/idog_12345678`, 则共享内存名称为 `idog_12345678`.
|
||||
|
||||
- 分块传输
|
||||
|
||||
当直接在控制序列中传输数据时, 由于控制序列的最大长度限制, 可能需要将图片数据分为多个块进行传输. 此时可以使用 `m=<more>` 选项来指示是否有更多的数据块需要传输, 以及在后续的控制序列中省略重复的选项以减少冗余. 如:
|
||||
|
||||
```python
|
||||
import fcntl
|
||||
import array
|
||||
import termios
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from PIL import Image
|
||||
from multiprocessing import resource_tracker, shared_memory
|
||||
|
||||
|
||||
class KGPEncoderBase:
|
||||
image_id: int
|
||||
# Original image
|
||||
image: Image.Image
|
||||
# Resized image that fits the terminal dimensions
|
||||
resized_image: Image.Image
|
||||
|
||||
# Displayed image dimensions in terms of character cells
|
||||
displayCols: int
|
||||
displayRows: int
|
||||
|
||||
def __init__(self, path: Path):
|
||||
self._init_id()
|
||||
self._init_image(path)
|
||||
self._init_size()
|
||||
|
||||
def _init_id(self) -> None:
|
||||
"""Initialize a random image ID"""
|
||||
self.image_id = random_ID(0xFFFFFFFF)
|
||||
|
||||
def _init_image(self, path: Path) -> None:
|
||||
"""Load the image and convert it to a supported pixel format"""
|
||||
image = Image.open(path).convert("RGB")
|
||||
if image.mode in ("RGBA", "LA") or (image.mode == "P" and "transparency" in image.info):
|
||||
self.image = image.convert("RGBA")
|
||||
else:
|
||||
self.image = image.convert("RGB")
|
||||
|
||||
def _init_size(self, max_cols=-1, max_rows=-1) -> None:
|
||||
"""Initialize size-related attributes based on the image and terminal dimensions"""
|
||||
# Obtain terminal dimensions via ioctl
|
||||
buf = array.array('H', [0, 0, 0, 0])
|
||||
fcntl.ioctl(sys.stdin.fileno(), termios.TIOCGWINSZ, buf)
|
||||
rows, cols, x_pixels, y_pixels = buf
|
||||
if 0 in (rows, cols, x_pixels, y_pixels):
|
||||
raise RuntimeError("Failed to get terminal dimensions")
|
||||
cell_width = x_pixels / cols
|
||||
cell_height = y_pixels / rows
|
||||
|
||||
# Unicode Placeholder method has a maximum size of 289x289 cells
|
||||
new_cols = cols
|
||||
new_rows = rows
|
||||
if max_cols > 0:
|
||||
new_cols = min(cols, max_cols)
|
||||
if max_rows > 0:
|
||||
new_rows = min(rows, max_rows)
|
||||
new_x_pixels = cell_width * new_cols
|
||||
new_y_pixels = cell_height * new_rows
|
||||
if 0 in (new_cols, new_rows, new_x_pixels, new_y_pixels):
|
||||
raise RuntimeError("Invalid terminal dimensions or maximum size constraints")
|
||||
|
||||
# If the image is small enough to fit without resizing
|
||||
if self.image.width <= new_x_pixels and self.image.height <= new_y_pixels:
|
||||
self.displayCols = int(self.image.width / cell_width)
|
||||
self.displayRows = int(self.image.height / cell_height)
|
||||
self.displayWidth = self.image.width
|
||||
self.displayHeight = self.image.height
|
||||
self.resized_image = self.image.copy()
|
||||
return
|
||||
|
||||
# Resize while maintaining aspect ratio
|
||||
image_aspect = self.image.width / self.image.height
|
||||
display_aspect = new_x_pixels / new_y_pixels
|
||||
if image_aspect > display_aspect:
|
||||
self.displayCols = new_cols
|
||||
self.displayRows = int(new_x_pixels / image_aspect / cell_height)
|
||||
else:
|
||||
self.displayCols = int(new_y_pixels * image_aspect / cell_width)
|
||||
self.displayRows = new_rows
|
||||
|
||||
displayWidth = int(self.displayCols * cell_width)
|
||||
displayHeight = int(self.displayRows * cell_height)
|
||||
|
||||
self.resized_image = self.image.resize((displayWidth, displayHeight), Image.Resampling.LANCZOS)
|
||||
|
||||
def _shm_name(self) -> str:
|
||||
"""Generate a unique shared memory name based on the image ID"""
|
||||
return f"idog_{self.image_id}"
|
||||
|
||||
def _format_KGP(self, payload: str, options: str, chunk_size: int) -> list[str]:
|
||||
"""Format the KGP payload into one or more escape sequences based on the chunk size"""
|
||||
if len(payload) <= chunk_size:
|
||||
return [f"\033_G{options};{payload}\033\\"]
|
||||
else:
|
||||
ret = [f"\033_G{options},m=1;{payload[:chunk_size]}\033\\"]
|
||||
for offset in range(chunk_size, len(payload), chunk_size):
|
||||
chunk = payload[offset:offset + chunk_size]
|
||||
# m=0 for the last chunk, m=1 for all previous
|
||||
m = 1 if offset + chunk_size < len(payload) else 0
|
||||
# The other options only need to be specified in the first chunk, subsequent chunks can omit them
|
||||
ret.append(f"\033_Gm={m};{chunk}\033\\")
|
||||
return ret
|
||||
|
||||
def _construct_payload(self, medium: str, compress: bool) -> str:
|
||||
"""Construct the KGP payload, optionally compressing it"""
|
||||
if medium == "d":
|
||||
if compress:
|
||||
return base64_encode(zlib_compress(self.resized_image.tobytes()))
|
||||
else:
|
||||
return base64_encode(self.resized_image.tobytes())
|
||||
if medium == "s":
|
||||
shm_name = self._shm_name()
|
||||
if not Path(f"/dev/shm/{shm_name}").exists():
|
||||
shm: shared_memory.SharedMemory | None = None
|
||||
data = self.resized_image.tobytes()
|
||||
try:
|
||||
shm = shared_memory.SharedMemory(name=shm_name, create=True, size=len(data))
|
||||
if shm is None or shm.buf is None:
|
||||
raise RuntimeError("Failed to create shared memory segment")
|
||||
shm.buf[:len(data)] = data
|
||||
resource_tracker.unregister(f"/{shm.name}", "shared_memory")
|
||||
except FileExistsError:
|
||||
raise RuntimeError("Shared memory segment already exists")
|
||||
return base64_encode(shm_name.encode("utf-8"))
|
||||
raise ValueError(f"Unsupported transmission medium: {medium}")
|
||||
|
||||
def _gen_options(self, medium: str, compress: bool) -> str:
|
||||
"""Generate the options string for the KGP escape sequence"""
|
||||
if medium not in ("d", "s"):
|
||||
raise ValueError(f"Unsupported transmission medium: {medium}")
|
||||
if medium == "s" and compress:
|
||||
compress = False # Disable compression for shared memory transmission
|
||||
format = "32" if self.image.mode == "RGBA" else "24"
|
||||
# a=T: Action, transmit and display
|
||||
# f=...: Pixel format, 24 for RGB, 32 for RGBA, 100 for PNG
|
||||
# t=...: transmission medium, d for transmitting data directly in control sequence, s for shared memory
|
||||
# c=...,r=...: Specify the image dimensions in terms of character cells
|
||||
# s=...,v=...: Specify the image dimensions in pixels, required when transmitting raw pixel data
|
||||
# o=z: Enable zlib compression (optional)
|
||||
options = f"i={self.image_id},a=T,f={format},t={medium},"\
|
||||
f"c={self.displayCols},r={self.displayRows},"\
|
||||
f"s={self.resized_image.width},v={self.resized_image.height}"
|
||||
if compress:
|
||||
options += ",o=z"
|
||||
return options
|
||||
|
||||
def construct_KGP(self, medium: str = "d", chunk_size: int = 4096, compress: bool = True) -> list[str]:
|
||||
"""Construct the KGP escape sequences for the image"""
|
||||
if chunk_size <= 0:
|
||||
raise ValueError("Chunk size must be a positive integer.")
|
||||
|
||||
options = self._gen_options(medium, compress)
|
||||
payload = self._construct_payload(medium, compress)
|
||||
ret = self._format_KGP(payload, options, chunk_size)
|
||||
def _format_KGP(self, payload: str, options_str: str, chunk_size: int) -> list[str]:
|
||||
"""Format the KGP payload into one or more escape sequences based on the chunk size"""
|
||||
if len(payload) <= chunk_size:
|
||||
return [f"\033_G{options_str};{payload}\033\\"]
|
||||
else:
|
||||
ret = [f"\033_G{options_str},m=1;{payload[:chunk_size]}\033\\"]
|
||||
for offset in range(chunk_size, len(payload), chunk_size):
|
||||
chunk = payload[offset:offset + chunk_size]
|
||||
# m=0 for the last chunk, m=1 for all previous
|
||||
m = 1 if offset + chunk_size < len(payload) else 0
|
||||
# The other options only need to be specified in the first chunk, subsequent chunks can omit them
|
||||
ret.append(f"\033_Gm={m};{chunk}\033\\")
|
||||
return ret
|
||||
|
||||
def delete_image(self) -> str:
|
||||
"""Construct the escape sequence to delete the image from the terminal"""
|
||||
if Path(f"/dev/shm/{self._shm_name()}").exists():
|
||||
try:
|
||||
shm = shared_memory.SharedMemory(name=self._shm_name(), create=False)
|
||||
shm.close()
|
||||
shm.unlink()
|
||||
except FileNotFoundError:
|
||||
pass # Already unlinked by terminal
|
||||
return f"\033_Ga=d,d=i,i={self.image_id}\033\\"
|
||||
```
|
||||
|
||||
该类接受一个图片路径作为实例化参数, 通过 `ioctl` 获取终端尺寸并计算出合适的显示尺寸, 通过 `PIL` 加载和处理图片, 最后通过 `construct_KGP` 方法生成对应的 KGP 控制序列列表. 并包含一些额外特性:
|
||||
- 选择是否启用 zlib 压缩. 仅在传输介质为直接传输数据(`d`)时有效.
|
||||
[Kitty 官方文档](https://sw.kovidgoyal.net/kitty/graphics-protocol/#remote-client)中建议的最大分块大小为 4096 字节.
|
||||
|
||||
- 选择传输介质类型. 目前支持直接在控制序列里传输数据(`d`)和通过 Shared Memory 传输(`s`)两种方式.
|
||||
- 普通放置
|
||||
|
||||
- 清除显示的图片. 同时也会尝试删除对应的 Shared Memory.
|
||||
|
||||
对于 Shared Memory 模式, 需要注意的是当发送控制序列后, 该 Shared Memory 的所有权可以被视为**转移**给了终端模拟器, 因此如果终端模拟器不支持 KGP 或环境不支持 Shared Memory, 则极有可能发生泄漏. 因此在使用 Shared Memory 作为传输介质时, 最好先通过类似 `query_shared_memory_support` 的方法进行一次验证, 确保当前环境和终端模拟器确实支持该功能, 再进行后续操作.
|
||||
较为简单直白, 参见 [Kitty 官方文档](https://sw.kovidgoyal.net/kitty/graphics-protocol/#controlling-displayed-image-layout).
|
||||
|
||||
- Unicode Placeholders
|
||||
|
||||
有了基类, 后续就方便多了. 只需要修改几个方法即可.
|
||||
要想放置图片, 首先需要传输数据. 这部分通过 KGP 的基础功能实现, 需要注意的是在生成选项字符串时需要添加 `U=1` 来启用 Unicode Placeholders 的功能, 以及 `q=2` 来禁止终端模拟器对查询序列的响应, 以避免响应的干扰.
|
||||
|
||||
放置图像的具体做法为输出由占位符组成的多行字符串, 以指定图片在终端中的位置. 这部分需要自己构造字符串, 思路是使用 `U+10EEEE` 作为占位字符, 使用变音符号来编码行号和列号, 使用前景色来编码图片 ID. 由于变音符号的数量有限, 因此 Unicode Placeholders 的**最大显示尺寸**为 289x289 字符单元. 每行都可以重新设置与重置前景色, 以保证最大程度的兼容性. 下面是一个简单的实现示例:
|
||||
|
||||
```python
|
||||
KGP_PLACEHOLDER = "\U0010EEEE"
|
||||
# https://sw.kovidgoyal.net/kitty/_downloads/f0a0de9ec8d9ff4456206db8e0814937/rowcolumn-diacritics.txt
|
||||
KGP_DIACRITICS = (
|
||||
"\u0305\u030D\u030E\u0310\u0312\u033D\u033E\u033F\u0346\u034A"
|
||||
...)
|
||||
def construct_unicode_placeholders(self) -> list[str]:
|
||||
"""Construct the Unicode placeholders for the image"""
|
||||
# Using 24-bit True Color foreground to encode the image ID,
|
||||
# the maximum id is therefore 0xFFFFFF, which is likely enough
|
||||
image_id_str = f"\033[38;2;{(self.image_id >> 16) & 0xFF};{(self.image_id >> 8) & 0xFF};{self.image_id & 0xFF}m"
|
||||
ret = []
|
||||
for i in range(self.displayRows):
|
||||
line = image_id_str
|
||||
|
||||
# Placehoder + Row Diacritic + Column Diacritic
|
||||
line += f"{KGP_PLACEHOLDER}{KGP_DIACRITICS[i]}{KGP_DIACRITICS[0]}"
|
||||
for _ in range(1, self.displayCols):
|
||||
# Col index and row index will be automatically determined
|
||||
line += KGP_PLACEHOLDER
|
||||
|
||||
class KGPEncoderUnicode(KGPEncoderBase):
|
||||
def _init_id(self):
|
||||
"""Initialize a smaller random image ID"""
|
||||
self.image_id = random_ID(0xFFFFFF)
|
||||
line += "\033[39m"
|
||||
ret.append(line)
|
||||
|
||||
def _init_size(self, max_cols=-1, max_rows=-1) -> None:
|
||||
if max_cols > 0:
|
||||
max_cols = min(max_cols, len(KGP_DIACRITICS))
|
||||
else:
|
||||
max_cols = len(KGP_DIACRITICS)
|
||||
if max_rows > 0:
|
||||
max_rows = min(max_rows, len(KGP_DIACRITICS))
|
||||
else:
|
||||
max_rows = len(KGP_DIACRITICS)
|
||||
super()._init_size(max_cols, max_rows)
|
||||
return ret
|
||||
|
||||
def _gen_options(self, medium: str, compress: bool) -> str:
|
||||
"""Generate the options string for the KGP escape sequence"""
|
||||
options = super()._gen_options(medium, compress)
|
||||
# q=2: Suppress response, required when using Unicode Placeholders
|
||||
# U=1: Enable Unicode Placeholders
|
||||
options += ",q=2,U=1"
|
||||
return options
|
||||
|
||||
def construct_unicode_placeholders(self) -> list[str]:
|
||||
"""Construct the Unicode placeholders for the image"""
|
||||
# Using 24-bit True Color foreground to encode the image ID,
|
||||
# the maximum id is therefore 0xFFFFFF, which is likely enough
|
||||
image_id_str = f"\033[38;2;{(self.image_id >> 16) & 0xFF};{(self.image_id >> 8) & 0xFF};{self.image_id & 0xFF}m"
|
||||
ret = []
|
||||
for i in range(self.displayRows):
|
||||
line = image_id_str
|
||||
|
||||
# Placehoder + Row Diacritic + Column Diacritic
|
||||
line += f"{KGP_PLACEHOLDER}{KGP_DIACRITICS[i]}{KGP_DIACRITICS[0]}"
|
||||
for _ in range(1, self.displayCols):
|
||||
# Col index and row index will be automatically determined
|
||||
line += KGP_PLACEHOLDER
|
||||
|
||||
line += "\033[39m"
|
||||
ret.append(line)
|
||||
|
||||
return ret
|
||||
|
||||
```
|
||||
|
||||
唯一新增的方法是 `construct_unicode_placeholders`, 该方法会生成一个字符串列表, 每个字符串代表一行, 所有行的开头是设置前景色的控制序列, 用于编码图片 ID, 后续会通过占位符的前景色来识别该占位符对应的图片. 每个占位符由一个占位字符和两个变音符号组成, 其中一个变音符号用于编码行号, 另一个用于编码列号, 每行的非首个占位符可省去变音符号. 由于变音符号的数量有限, 因此 Unicode Placeholders 的**最大显示尺寸**为 289x289 字符单元. 每行都会重新设置与重置前景色, 以保证最大程度的兼容性.
|
||||
|
||||
- 调用实例
|
||||
|
||||
```python
|
||||
image_path = Path("test.png")
|
||||
|
||||
if not query_support():
|
||||
sys.stderr.write("KGP not supported in this terminal.\n")
|
||||
sys.exit(1)
|
||||
|
||||
# Prefer Shared memory if supported, fallback to direct
|
||||
medium = "s" if query_shared_memory_support() else "d"
|
||||
placeholders = []
|
||||
encoder = None
|
||||
|
||||
sys.stderr.write("Transmission medium: " + ("Shared Memory\n" if medium == "s" else "Direct Data\n"))
|
||||
|
||||
# Prefer Unicode Placeholders if supported, fallback to normal KGP
|
||||
if query_unicode_placeholder_support():
|
||||
sys.stderr.write("Using Unicode Placeholders\n")
|
||||
encoder = KGPEncoderUnicode(image_path)
|
||||
placeholders = encoder.construct_unicode_placeholders()
|
||||
else:
|
||||
sys.stderr.write("Using KGP without Unicode Placeholders\n")
|
||||
encoder = KGPEncoderBase(image_path)
|
||||
|
||||
for seq in encoder.construct_KGP(medium=medium):
|
||||
print(seq, end="")
|
||||
sys.stdout.flush()
|
||||
|
||||
# placeholders will be empty if using normal KGP,
|
||||
# in which case nothing will be printed in this loop
|
||||
for i, line in enumerate(placeholders):
|
||||
print(line, end="" if i == len(placeholders) - 1 else "\n")
|
||||
|
||||
# Delete image on user input
|
||||
input()
|
||||
print(encoder.delete_image(), end="")
|
||||
sys.stdout.flush()
|
||||
```
|
||||
|
||||
## 默认 Shell
|
||||
@@ -1096,6 +735,8 @@ Unicode Placeholders 是 Kitty 图像协议的一个独特功能, 它允许使
|
||||
|
||||
- [Feature Reporting Spec - ITerm2](https://iterm2.com/feature-reporting/)
|
||||
|
||||
- [Uyanide / idog](https://github.com/Uyanide/idog)
|
||||
|
||||
- [Shell Command Language](https://pubs.opengroup.org/onlinepubs/9699919799/utilities/V3_chap02.html)
|
||||
|
||||
- [Fish - ArchWiki](https://wiki.archlinux.org/title/Fish)
|
||||
|
||||
Reference in New Issue
Block a user