re-Inventing The Wheel
In this post:
- I talk about some somewhat advanced topics (regular expression parsing, arena-based allocation, unicode segmentation), then butcher them with code.
Every once in a while, I implement data structures or algorithms from scratch, as kind of a relaxation thing. Sometimes, the results are pretty useless, but I still enjoy working through it.
This time around, I want to try implementing the regex data structures and algorithms from Regular Expression Matching Can Be Simple And Fast. While it's in Python, it's probably not going to be terribly fast in general, compared to lower-level implementations, but again, working through it.
There are some divergences I'm putting in from the article. One is that I want to look at extended grapheme clusters instead of individual bytes. And, related to that, I want to shape the data in a different way; proper polymorphism, and arena-based access.
Arenas are something I learned about from reading Rust blogs, but I'm pretty sure they're a pre-existing concept. The basic idea is that, in something like a graph data structure, instead of having individual nodes point to each other, have them refer to an index within an underlying structure. I think there are varying degrees of sophistication that can be employed here, but I just used indices. Anyway, doing this solves a few issues. One is that stuff like Python's tuples aren't "supposed" to be able to do self-reference. (I believe there's some kind of hacky thing that can be done with the C API, but I'm pretty sure it's a bad idea.) Another issue is limiting the scope of what a node points to. Arena-based graphs don't make it any more obvious, given a node, just how big a subgraph that node points to, but seeing the arena does place an upper bound, which is something.
The idea I have behind using graphemes is, I want to mess around with unicode-aware regular expressions, and how that works out in practice. Might be an interesting use of this, to, like, parse a unicode-aware programming language.
For now, my major concern is figuring out how to address the glaring edge case in the code.
I might come back to this later, but for now, I'm just going to paste in the full source code, complete with obnoxious unhandled edge case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 | """Prototype implementation of NFA-based regex engine. See https://swtch.com/~rsc/regexp/regexp1.html """ # My old nemesis # pylint: disable=too-few-public-methods import enum import typing import typing_extensions # I think pylint is getting a little big for its britches when it tells us not # to follow the usage given by the official documentation for the typing module Index = typing.NewType('Index', int) # pylint: disable=invalid-name Grapheme = typing.NewType('Grapheme', str) # pylint: disable=invalid-name class Tail(typing.NamedTuple): """A "pointer" to an out-field of a State in an arena.""" source: Index field: str _T = typing.TypeVar('_T') class State(typing_extensions.Protocol): """States have tails, and the ability to assign to tails.""" # I really hope there's a plugin or something for dealing with Protocols. # pylint: disable=no-self-use, unused-argument def tails(self) -> typing.Iterator[Tail]: """Return the state's tails.""" # I don't know if protocols NEED-need this, I'm just following the doc. ... # pylint: disable=pointless-statement # There's probably a more correct interface to this... def _replace(self: _T, **kwargs) -> _T: """Point the state's tail at an index.""" # I don't know if protocols NEED-need this, I'm just following the doc. ... # pylint: disable=pointless-statement class GraphemeMatch(typing.NamedTuple): """Match a single grapheme.""" # When I extend it, I end up noticing that tuple has an obnoxiously big API # surface. So many common words I can't use because all methods go in the # same namespace... index_: Index grapheme: Grapheme out: typing.Optional[Index] = None def tails(self) -> typing.Iterator[Tail]: """Yield the only followup state.""" yield Tail(self.index_, 'out') class Split(typing.NamedTuple): """Split NFA evaluation between two possible states.""" index_: Index out: typing.Optional[Index] = None out1: typing.Optional[Index] = None def tails(self) -> typing.Iterator[Tail]: """Yield both followup states.""" yield Tail(self.index_, 'out') yield Tail(self.index_, 'out1') class Match(typing.NamedTuple): """Terminate NFA evaluation.""" index_: Index @staticmethod def tails() -> typing.Iterator[Tail]: """Yield nothing, there is no followup state.""" yield from iter(()) class Fragment(typing.NamedTuple): """NFA fragment.""" start: Index tail_list: typing.Tuple[Tail, ...] class NFA(typing.NamedTuple): """Completed NFA of a regex.""" start_index: Index arena: typing.Tuple[State, ...] def _addstate(self, indices: typing.Set[Index], index: Index): state = self.arena[index] if isinstance(state, Split): self._addstate(indices, state.out) self._addstate(indices, state.out1) return indices.add(index) def match(self, stream: typing.Iterable[Grapheme]) -> bool: """Return whether given grapheme stream is accepted by the regex.""" indices: typing.Set[Index] = set() self._addstate(indices, self.start_index) for grapheme in stream: new_indices: typing.Set[Index] = set() for index in indices: state = self.arena[index] if isinstance( state, GraphemeMatch) and state.grapheme == grapheme: self._addstate(new_indices, state.out) indices = new_indices final_index = len(self.arena) - 1 return final_index in indices def patch( arena: typing.List[State], tail_list: typing.Sequence[Tail], index: Index): """Connect all tails to the given index.""" for tail in tail_list: arena[tail.source] = arena[tail.source]._replace(**{tail.field: index}) class Token(typing_extensions.Protocol): """Protocol for postfix tokens.""" # I really hope there's a plugin or something for dealing with Protocols. # pylint: disable=no-self-use, unused-argument def fragment_args( self, stack: typing.List[Fragment]) -> typing.Iterable[Fragment]: """Return the fragment args required by the fragment() call.""" # I don't know if protocols NEED-need this, I'm just following the doc. ... # pylint: disable=pointless-statement def fragment( self, arena: typing.List[State], fragment_args: typing.Iterable[Fragment]) -> Fragment: """Create a new fragment.""" # I don't know if protocols NEED-need this, I'm just following the doc. ... # pylint: disable=pointless-statement class Literal(typing.NamedTuple): """Postfix token for matching a grapheme.""" grapheme: Grapheme # pylint: disable=unused-argument @staticmethod def fragment_args( stack: typing.List[Fragment]) -> typing.Iterable[Fragment]: """No need to manipulate the stack.""" return () def fragment( self, arena: typing.List[State], fragment_args: typing.Iterable[Fragment]) -> Fragment: """Create a completely new Fragment.""" index = push(arena, GraphemeMatch, self.grapheme) return Fragment(index, tuple(arena[index].tails())) class Operator: """Postfix token for operators. This class may not be necessary. """ def fragment_args( self, stack: typing.List[Fragment]) -> typing.Iterable[Fragment]: """Return the fragment_args required by the fragment() call.""" raise NotImplementedError def fragment( self, arena: typing.List[State], fragment_args: typing.Iterable[Fragment]) -> Fragment: """Create a new fragment.""" raise NotImplementedError class Binary(Operator, enum.Enum): """Postfix token for binary operators.""" CONCATENATE = enum.auto() ALTERNATE = enum.auto() def fragment_args( self, stack: typing.List[Fragment]) -> typing.Iterable[Fragment]: fragment_2 = stack.pop() fragment_1 = stack.pop() return fragment_1, fragment_2 def fragment( self, arena: typing.List[State], fragment_args: typing.Iterable[Fragment]) -> Fragment: fragment_1, fragment_2 = fragment_args if self == Binary.CONCATENATE: patch(arena, fragment_1.tail_list, fragment_2.start) return Fragment(fragment_1.start, fragment_2.tail_list) assert self == Binary.ALTERNATE index = push(arena, Split, fragment_1.start, fragment_2.start) return Fragment(index, fragment_1.tail_list + fragment_2.tail_list) class Unary(Operator, enum.Enum): """Postfix token for unary operators.""" OPTIONAL = enum.auto() ANY_AMOUNT = enum.auto() ONE_PLUS = enum.auto() def fragment_args( self, stack: typing.List[Fragment]) -> typing.Iterable[Fragment]: return stack.pop(), def fragment( self, arena: typing.List[State], fragment_args: typing.Iterable[Fragment]): fragment, = fragment_args index = push(arena, Split, fragment.start) if self == Unary.OPTIONAL: return Fragment( index, fragment.tail_list + tuple(arena[index].tails())) elif self == Unary.ANY_AMOUNT: patch(arena, fragment.tail_list, index) return Fragment(index, (Tail(index, 'out1'),)) assert self == Unary.ONE_PLUS patch(arena, fragment.tail_list, index) return Fragment(fragment.start, (Tail(index, 'out1'),)) def push(arena: typing.List[State], factory, *args, **kwargs): """Create a state using given factory, add to arena, return index.""" index = Index(len(arena)) state = factory(index, *args, **kwargs) arena.append(state) return index def post_to_nfa(tokens: typing.Sequence[Token]): """Convert a list of postfix tokens to an NFA.""" stack: typing.List[Fragment] = [] arena: typing.List[State] = [] for token in tokens: stack.append(token.fragment(arena, token.fragment_args(stack))) # pylint pointed out a bug on this next line. It's possible to hit it by # passing an empty string, which raises the question of how the C # implementation handles it. I don't know C well enough, so I'm just # staring at the for-loop and wondering "So, like, does that end if the # current character is null, or something?" fragment, = stack index = push(arena, Match) patch(arena, fragment.tail_list, index) return NFA(fragment.start, tuple(arena)) |
Postscript: I made some changes to the match and post_to_nfa methods that fix the bug, but the resulting invariants are... confusing, which makes me think that this needs more work.
def match(self, stream: typing.Iterable[Grapheme]) -> bool:
"""Return whether given grapheme stream is accepted by the regex."""
indices: typing.Set[Index] = set()
self._addstate(indices, self.start_index)
for grapheme in stream:
new_indices: typing.Set[Index] = set()
for index in indices:
state = self.arena[index]
if isinstance(
state, GraphemeMatch) and state.grapheme == grapheme:
self._addstate(new_indices, state.out)
indices = new_indices
final_index = Index(0)
return final_index in indices
...
def post_to_nfa(tokens: typing.Sequence[Token]):
"""Convert a list of postfix tokens to an NFA."""
stack: typing.List[Fragment] = []
arena: typing.List[State] = []
fragment = Fragment(Index(0), ())
for token in tokens:
fragment = token.fragment(arena, token.fragment_args(stack))
stack.append(fragment)
index = push(arena, Match)
patch(arena, fragment.tail_list, index)
return NFA(fragment.start, tuple(arena))
Postpostscript: Okay, now I'm going to take a break, even though I haven't improved the invariants any.
out: Index = Index(0)
...
out: Index = Index(0)
out1: Index = Index(0)
def post_to_nfa(tokens: typing.Sequence[Token]):
"""Convert a list of postfix tokens to an NFA."""
stack: typing.List[Fragment] = []
arena: typing.List[State] = []
start_index = push(arena, Match)
for token in tokens:
fragment = token.fragment(arena, token.fragment_args(stack))
start_index = fragment.start
stack.append(fragment)
# There should really be a better way to handle this invariant.
if start_index:
assert len(stack) == 1
return NFA(start_index, tuple(arena))