You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
600 lines
18 KiB
Python
600 lines
18 KiB
Python
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
"""
|
|
Run this in the current working directory to extract words
|
|
from comments in code & markup.
|
|
|
|
Arguments:
|
|
|
|
"--cache"
|
|
The directory to store cache files.
|
|
|
|
--input-paths
|
|
Input paths separated by the systems path separator (';' on MS-Windows, otherwise ':').
|
|
Files will be scanned, directories will be included recursively.
|
|
|
|
"--text"
|
|
Input text, leave blank to generate cache for all N-GRAMS.
|
|
"--partial-text" (optional)
|
|
|
|
"--input-paths-size-limit"
|
|
When scanning paths recursively, skip files exceeding this size.
|
|
|
|
"--update" (optional, defaults to "check-manifest")
|
|
Method used to check for updates.
|
|
Valid values include: ("check-manifest", "when-missing").
|
|
|
|
Output text is printed.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import re
|
|
|
|
# To store an `input_path` directory as a file-name.
|
|
from urllib.parse import quote
|
|
|
|
|
|
from typing import (
|
|
Callable,
|
|
Dict,
|
|
Generator,
|
|
List,
|
|
Optional,
|
|
Sequence,
|
|
Tuple,
|
|
)
|
|
|
|
ModelType = Dict[str, Dict[str, int]]
|
|
ManifestType = Tuple[List[str], List[float]]
|
|
|
|
# Gets overwritten by command line argument.
|
|
CACHE_DIRECTORY = ""
|
|
|
|
# Ignore files over this size (zero to ignore).
|
|
TEXT_EXTRACT_SIZE_LIMIT = 0
|
|
|
|
MULTI_PROCESS = True
|
|
|
|
# Avoid over-large models, split into parts.
|
|
SPLIT_BUCKET_COUNT = 8192
|
|
|
|
# Generate N-Grams with this many words or more,
|
|
# WARNING: each level adds significant overhead,
|
|
# 4 may be useful but values such as 10 or more are likely to explode storage...
|
|
# although it may be interesting to test.
|
|
NGRAM_MAX = 4
|
|
|
|
FILE_TYPE_SOURCE = 0
|
|
FILE_TYPE_TEXT = 1
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Generic Utilities
|
|
|
|
|
|
# Include all files recursively.
|
|
def files_recursive_with_ext(path: str, ext: Optional[Tuple[str, ...]]) -> Generator[Tuple[str, str], None, None]:
|
|
for dirpath, dirnames, filenames in os.walk(path):
|
|
# Skip `.git` and other dot-files.
|
|
dirnames[:] = [d for d in dirnames if not d.startswith(".")]
|
|
for filename in filenames:
|
|
if filename.startswith("."):
|
|
continue
|
|
if not ext or filename.endswith(ext):
|
|
yield (dirpath, filename)
|
|
|
|
|
|
_clear_words_trans = {
|
|
ord("-"): None,
|
|
ord("'"): None,
|
|
}
|
|
|
|
|
|
def words_from_file_with_fancy_lexer_support(filepath: str, file_type: int) -> List[List[str]]:
|
|
"""
|
|
Fill ``bag_of_words`` with comments from ``filepath``.
|
|
"""
|
|
|
|
try:
|
|
with open(filepath, encoding="utf-8") as fh:
|
|
data = fh.read()
|
|
except:
|
|
# Possibly a missing symbolic link, skip the file.
|
|
# print("Unable to read")
|
|
return []
|
|
|
|
if not data:
|
|
return []
|
|
|
|
def clean_words(words: List[str]) -> List[List[str]]:
|
|
words_list: List[List[str]] = []
|
|
this_word_list: List[str] = []
|
|
last_split = False
|
|
for i, w in enumerate(words):
|
|
if (
|
|
# Allow literal digits because they may be used in text,
|
|
# in particular 1 or 2.
|
|
(not w.isdigit()) and
|
|
(not w.isalpha()) and
|
|
(not w.translate(_clear_words_trans).isalpha())
|
|
):
|
|
if not words_list:
|
|
words_list.append(words[0:i])
|
|
last_split = True
|
|
continue
|
|
|
|
if last_split:
|
|
this_word_list = []
|
|
words_list.append(this_word_list)
|
|
|
|
if words_list:
|
|
this_word_list.append(w)
|
|
last_split = False
|
|
|
|
if not words_list:
|
|
words_list.append(words)
|
|
|
|
# Ensure word lists have at least 3 words, fewer are not useful for prediction.
|
|
for i in reversed(range(len(words_list))):
|
|
if len(words_list[i]) < 3:
|
|
del words_list[i]
|
|
return words_list
|
|
|
|
print(filepath)
|
|
|
|
re_word = re.compile(r"[\w]+[\w'-]*")
|
|
re_split = re.compile("[\\.?!;()[]{}]")
|
|
|
|
# Don't parse some files as plain-text.
|
|
if file_type == FILE_TYPE_TEXT:
|
|
words_list = []
|
|
for block in re.split(re_split, data):
|
|
if not block:
|
|
continue
|
|
words_list.extend(clean_words(re.findall(re_word, block)))
|
|
return words_list
|
|
|
|
# Fancy parsing using `pygments`.
|
|
import pygments
|
|
from pygments.lexers import guess_lexer_for_filename
|
|
try:
|
|
lexer = guess_lexer_for_filename(filepath, data)
|
|
except pygments.util.ClassNotFound:
|
|
return []
|
|
|
|
from pygments.token import Token
|
|
extract_tokens = {
|
|
Token.Comment,
|
|
Token.Comment.Single,
|
|
Token.Comment.Multiline,
|
|
}
|
|
|
|
bag_of_words: List[List[str]] = []
|
|
is_first = True
|
|
assert hasattr(lexer, "get_tokens")
|
|
for ty, token_text in lexer.get_tokens(data):
|
|
# Skip the first comment as it's very often a license block.
|
|
if is_first:
|
|
is_first = False
|
|
if ty in extract_tokens:
|
|
continue
|
|
if ty in extract_tokens:
|
|
for block in re.split(re_split, token_text):
|
|
if not block:
|
|
continue
|
|
bag_of_words.extend(clean_words(re.findall(re_word, block)))
|
|
|
|
return bag_of_words
|
|
|
|
|
|
def words_from_files(files_and_types: List[Tuple[str, int]]) -> List[List[str]]:
|
|
bag_of_words = []
|
|
if MULTI_PROCESS:
|
|
import multiprocessing
|
|
job_total = multiprocessing.cpu_count()
|
|
# As some files.
|
|
job_total = job_total + (job_total // 2)
|
|
|
|
with multiprocessing.Pool(processes=job_total) as pool:
|
|
for bag_of_words_for_file in pool.starmap(words_from_file_with_fancy_lexer_support, files_and_types):
|
|
bag_of_words.extend(bag_of_words_for_file)
|
|
else:
|
|
for filepath, ty in files_and_types:
|
|
bag_of_words.extend(words_from_file_with_fancy_lexer_support(filepath, ty))
|
|
return bag_of_words
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Top Level Model Utilities
|
|
|
|
from hashlib import sha1
|
|
|
|
|
|
def model_word_to_index(word_key: str) -> int:
|
|
m = sha1()
|
|
m.update(word_key.encode())
|
|
return int.from_bytes(m.digest(), 'little') % SPLIT_BUCKET_COUNT
|
|
|
|
|
|
def directory_from_params(input_path: str) -> str:
|
|
return os.path.join(CACHE_DIRECTORY, quote(input_path.lstrip(os.sep), safe=''))
|
|
|
|
|
|
def model_filpath_from_params(n: int, split_index: int, input_path: str) -> str:
|
|
directory = directory_from_params(input_path)
|
|
return os.path.join(directory, "{:d}_{:04d}.pickle".format(n, split_index))
|
|
|
|
|
|
def model_generate_all(n: int, bag_of_words: List[List[str]]) -> List[ModelType]:
|
|
"""
|
|
Return a list of models.
|
|
"""
|
|
if n < 2:
|
|
raise Exception("`n` must be 2 or more.")
|
|
|
|
model_list: List[ModelType] = [{} for _ in range(SPLIT_BUCKET_COUNT)]
|
|
for words in bag_of_words:
|
|
for i in range(n, len(words)):
|
|
word_key = " ".join(words[j].lower() for j in range(i - n, i))
|
|
word_val = words[i]
|
|
|
|
model = model_list[model_word_to_index(word_key)]
|
|
|
|
# Avoid `defaultdict` because this should be pickled and reused with high efficiency.
|
|
try:
|
|
d = model[word_key]
|
|
except:
|
|
d = model[word_key] = {}
|
|
try:
|
|
d[word_val] += 1
|
|
except:
|
|
d[word_val] = 1
|
|
return model_list
|
|
|
|
|
|
def model_file_read(filepath: str) -> ModelType:
|
|
import pickle
|
|
import lzma
|
|
with lzma.open(filepath, 'rb') as fh:
|
|
model: ModelType = pickle.load(fh)
|
|
return model
|
|
|
|
|
|
def model_file_write(filepath: str, model: ModelType) -> None:
|
|
import pickle
|
|
import lzma
|
|
with lzma.open(filepath, 'wb') as fh:
|
|
pickle.dump(model, fh)
|
|
|
|
|
|
def model_ensure_for_split_index(
|
|
n: int,
|
|
split_index: int,
|
|
input_path: str,
|
|
word_generate_on_demand_fn: Callable[[], List[List[str]]],
|
|
) -> ModelType:
|
|
filepath = model_filpath_from_params(n, split_index, input_path)
|
|
if os.path.exists(filepath):
|
|
return model_file_read(filepath)
|
|
|
|
# Always generate all models.
|
|
bag_of_words = word_generate_on_demand_fn()
|
|
model_list = model_generate_all(n, bag_of_words)
|
|
for i, model in enumerate(model_list):
|
|
model_file_write(model_filpath_from_params(n, i, input_path), model)
|
|
|
|
return model_list[i]
|
|
|
|
|
|
def model_files_generate(
|
|
input_path: str,
|
|
input_paths_match_source_re: re.Pattern[str],
|
|
input_paths_match_text_re: re.Pattern[str],
|
|
) -> List[Tuple[str, int]]:
|
|
|
|
files_and_types = []
|
|
is_file = not os.path.isdir(input_path)
|
|
|
|
for dirpath, f in (
|
|
(os.path.split(input_path),) if is_file else
|
|
files_recursive_with_ext(input_path, None)
|
|
):
|
|
if input_paths_match_source_re.match(f):
|
|
ty = FILE_TYPE_SOURCE
|
|
elif input_paths_match_text_re.match(f):
|
|
ty = FILE_TYPE_TEXT
|
|
elif is_file:
|
|
ty = FILE_TYPE_TEXT
|
|
else:
|
|
continue
|
|
|
|
f = os.path.join(dirpath, f)
|
|
|
|
# Don't apply limits to individual files passed in
|
|
# because the user explicitly asked for them.
|
|
if not is_file and TEXT_EXTRACT_SIZE_LIMIT > 0:
|
|
try:
|
|
size = os.path.getsize(f)
|
|
except Exception as ex:
|
|
# Ignore files that can't be read.
|
|
print("Unable to access the size of:", f, str(ex))
|
|
continue
|
|
|
|
if size >= TEXT_EXTRACT_SIZE_LIMIT:
|
|
continue
|
|
|
|
files_and_types.append((f, ty))
|
|
|
|
# Quickly test a subset.
|
|
# files_and_types[64:] = []
|
|
|
|
# Sort for trivial comparison.
|
|
files_and_types.sort()
|
|
return files_and_types
|
|
|
|
|
|
def manifest_file_read(filepath: str) -> ManifestType:
|
|
import pickle
|
|
with open(filepath, 'rb') as fh:
|
|
manifest: ManifestType = pickle.load(fh)
|
|
return manifest
|
|
|
|
|
|
def manifest_file_write(filepath: str, manifest: ManifestType) -> None:
|
|
import pickle
|
|
with open(filepath, 'wb') as fh:
|
|
pickle.dump(manifest, fh)
|
|
|
|
|
|
def model_ensure_up_to_date_or_clear(
|
|
input_path: str, files_and_types: List[Tuple[str, int]], update_method: str) -> None:
|
|
directory = directory_from_params(input_path)
|
|
|
|
# Quick check, for simple case.
|
|
if update_method == "when-missing":
|
|
if os.path.isdir(directory):
|
|
return
|
|
|
|
manifest_path = os.path.join(directory, "manifest.pickle")
|
|
|
|
do_clear = False
|
|
if not os.path.exists(manifest_path):
|
|
do_clear = True
|
|
else:
|
|
manifest = manifest_file_read(manifest_path)
|
|
files = [ft[0] for ft in files_and_types]
|
|
if manifest[0] != files:
|
|
do_clear = True
|
|
else:
|
|
for filepath, mtime in zip(manifest[0], manifest[1]):
|
|
if mtime != os.path.getmtime(filepath):
|
|
do_clear = True
|
|
break
|
|
|
|
if do_clear:
|
|
import shutil
|
|
shutil.rmtree(directory)
|
|
os.makedirs(directory)
|
|
|
|
|
|
def manifest_write_from_files(input_path: str, files_and_types: List[Tuple[str, int]]) -> None:
|
|
files = [ft[0] for ft in files_and_types]
|
|
mtime = [0.0] * len(files)
|
|
for i, filepath in enumerate(files):
|
|
mtime[i] = os.path.getmtime(filepath)
|
|
|
|
manifest = files, mtime
|
|
|
|
directory = directory_from_params(input_path)
|
|
manifest_path = os.path.join(directory, "manifest.pickle")
|
|
|
|
manifest_file_write(manifest_path, manifest)
|
|
|
|
|
|
def complete_word_with_root(
|
|
input_path: str,
|
|
words: List[str],
|
|
partial_word: str,
|
|
input_paths_match_source: List[str],
|
|
input_paths_match_text: List[str],
|
|
update_method: str,
|
|
generate_all: bool,
|
|
) -> bool:
|
|
|
|
bag_of_words = None
|
|
files_and_types = None
|
|
|
|
def files_generage_on_demand() -> List[Tuple[str, int]]:
|
|
import fnmatch
|
|
nonlocal files_and_types
|
|
|
|
if files_and_types is not None:
|
|
return files_and_types
|
|
|
|
input_paths_match_source_value = "({:s})".format("|".join([
|
|
fnmatch.translate(exclude_glob)
|
|
for exclude_glob in input_paths_match_source
|
|
]))
|
|
input_paths_match_text_value = "({:s})".format("|".join([
|
|
fnmatch.translate(exclude_glob)
|
|
for exclude_glob in input_paths_match_text
|
|
]))
|
|
|
|
try:
|
|
input_paths_match_source_re = re.compile(input_paths_match_source_value)
|
|
except Exception as ex:
|
|
print(ex)
|
|
sys.exit(0)
|
|
try:
|
|
input_paths_match_text_re = re.compile(input_paths_match_text_value)
|
|
except Exception as ex:
|
|
print(ex)
|
|
sys.exit(0)
|
|
|
|
files_and_types = model_files_generate(input_path, input_paths_match_source_re, input_paths_match_text_re)
|
|
|
|
return files_and_types
|
|
|
|
def word_generator_on_demand() -> List[List[str]]:
|
|
nonlocal bag_of_words
|
|
if bag_of_words is not None:
|
|
return bag_of_words
|
|
|
|
files_and_types = files_generage_on_demand()
|
|
|
|
bag_of_words = words_from_files(files_and_types)
|
|
manifest_write_from_files(input_path, files_and_types)
|
|
|
|
return bag_of_words
|
|
|
|
# The size of the N-grams (2 or more).
|
|
if generate_all:
|
|
files_and_types = files_generage_on_demand()
|
|
model_ensure_up_to_date_or_clear(input_path, files_and_types, update_method)
|
|
|
|
n_total = NGRAM_MAX
|
|
for n in reversed(range(2, n_total + 1)):
|
|
# Requesting the first split-index ensures all are properly generated.
|
|
model_ensure_for_split_index(n, 0, input_path, word_generator_on_demand)
|
|
|
|
return False
|
|
else:
|
|
n_total = min(NGRAM_MAX, len(words))
|
|
|
|
# If extending the partial word fails, check
|
|
for pass_number in range(2 if partial_word else 1):
|
|
# Nothing found in the first pass, run a second pass.
|
|
add_space = False
|
|
if pass_number == 1:
|
|
words.append(partial_word)
|
|
partial_word = ""
|
|
add_space = True
|
|
|
|
for n in reversed(range(2, n_total + 1)):
|
|
word_key = " ".join(tuple(words[-n:]))
|
|
split_index = model_word_to_index(word_key)
|
|
model = model_ensure_for_split_index(n, split_index, input_path, word_generator_on_demand)
|
|
|
|
# Only generating, not looking for a result.
|
|
if not words:
|
|
continue
|
|
|
|
v_best = -1
|
|
k_best = ""
|
|
for k, v in model.get(word_key, {}).items():
|
|
|
|
if partial_word:
|
|
k_lower = k.lower()
|
|
if not k_lower.startswith(partial_word):
|
|
continue
|
|
# Unlikely but possible.
|
|
# Skip this as it effectively completes to an empty string.
|
|
# Even if that result has the highest probability, it's not useful to propose nothing.
|
|
if k_lower == partial_word:
|
|
continue
|
|
|
|
if v_best < v:
|
|
v_best = v
|
|
k_best = k
|
|
elif v_best == v:
|
|
# So the order is not random.
|
|
if k < k_best:
|
|
k_best = k
|
|
if k_best:
|
|
if partial_word:
|
|
k_best = k_best[len(partial_word):]
|
|
elif add_space:
|
|
# Adding a second word.
|
|
k_best = " " + k_best
|
|
sys.stdout.write(k_best)
|
|
return True
|
|
return False
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Main Function Main
|
|
|
|
def main() -> None:
|
|
global CACHE_DIRECTORY
|
|
global TEXT_EXTRACT_SIZE_LIMIT
|
|
|
|
args = sys.argv[1:]
|
|
|
|
def extract_arg_value_pair(
|
|
arg_id: str,
|
|
*,
|
|
default: Optional[str] = None,
|
|
valid_values: Optional[Sequence[str]] = None,
|
|
) -> str:
|
|
try:
|
|
i = args.index(arg_id)
|
|
except ValueError:
|
|
if default is not None:
|
|
return default
|
|
sys.stderr.write('Argument "{:s}" missing, abort!\n'.format(arg_id))
|
|
sys.exit(1)
|
|
|
|
if len(args) == i + 1:
|
|
sys.stderr.write('Argument "{:s}" missing value, abort!\n'.format(arg_id))
|
|
sys.exit(1)
|
|
value = args[i + 1]
|
|
if (valid_values is not None) and (value not in valid_values):
|
|
sys.stderr.write(
|
|
'Argument "{:s}" has unexpected value "{:s}", not found in: {:s}, abort!\n'.format(
|
|
arg_id, value, ", ".join(['"' + word + '"' for word in valid_values])
|
|
)
|
|
)
|
|
sys.exit(1)
|
|
|
|
del args[i: i + 2]
|
|
return value
|
|
|
|
CACHE_DIRECTORY = extract_arg_value_pair("--cache")
|
|
TEXT_EXTRACT_SIZE_LIMIT = int(extract_arg_value_pair("--input-paths-size-limit"))
|
|
|
|
input_paths = extract_arg_value_pair("--input-paths").split(os.pathsep)
|
|
input_paths_match_source = extract_arg_value_pair("--input-paths-match-source").split(os.pathsep)
|
|
input_paths_match_text = extract_arg_value_pair("--input-paths-match-text").split(os.pathsep)
|
|
words = extract_arg_value_pair("--text").lower().split()
|
|
partial_word = extract_arg_value_pair("--partial-text", default="").lower()
|
|
update_method = extract_arg_value_pair(
|
|
"--update",
|
|
default="check-manifest",
|
|
valid_values=("check-manifest", "when-missing"),
|
|
)
|
|
|
|
if args:
|
|
sys.stderr.write("Found unknown arguments: {!r}, abort!")
|
|
|
|
if not input_paths:
|
|
return
|
|
|
|
# Blank is for cache generation (only).
|
|
if words and len(words) < (1 if partial_word else 2):
|
|
sys.stderr.write("'--text' must contain at least two words!\n")
|
|
return
|
|
|
|
generate_all = not bool(words)
|
|
for input_path in input_paths:
|
|
|
|
# Prevent trailing slash impacting the name of temporary paths.
|
|
input_path = input_path.rstrip(os.sep)
|
|
directory = directory_from_params(input_path)
|
|
os.makedirs(directory, exist_ok=True)
|
|
|
|
if complete_word_with_root(
|
|
input_path,
|
|
words,
|
|
partial_word,
|
|
input_paths_match_source,
|
|
input_paths_match_text,
|
|
update_method,
|
|
generate_all,
|
|
):
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|