Jay Taylor's notes
back to listing indexUnderstanding Kubernetes’ tools/cache package: part 3
[web search]Blame Laird
Sum ergo mea culpa est.
Understanding Kubernetes’ tools/cache package: part 3
In part 2 we encountered the Controller
concept and explored how its lenient contract nevertheless “expects” that processing ability is added to the capabilities offered by its associated Reflector
. You may want to start with part 0 if you haven’t been following along in this series to understand what it’s all about.
In this post, we’ll look in a little more detail at two things:
- the de facto standard implementation of the
Controller
type (which, strictly speaking is only one of many possibilities, but which colors theController
concept’s expectations, unfortunately) - the notional idea of an informer, and the concrete concept of a
SharedInformer
, specifically aSharedIndexInformer
controller
–struct
-Backed Controller
Implementation
We’ve seen that the Controller
type
is extraordinarily underspecified. All you need to do to have a Controller
is to implement three undocumented functions: Run
, HasSynced
and LastSyncResourceVersion
. Technically speaking you could implement them any way you want. Pragmatically speaking, though, the controller
–struct
-backed implementation of this type
, its de facto standard reference implementation, also shows that there is an implicit requirement that any Controller
implementation will, as part of its Run
function implementation, process a Queue
which it is also assumed to have, which is additionally assumed to be the “target” of a Reflector
it is assumed to have. That’s a lot of assumptions; as we model this in Java we’ll formalize some of them.
Let’s look into the details of queue processing. First, recall that a Queue
is just a Store
with the ability to Pop
. See part 2 for more details.
In controller.go
, to keep our mental stack clean, let’s just look at the processLoop
function in isolation:
// processLoop drains the work queue. // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times // concurrently. // // TODO: Plumb through the stopCh here (and down to the queue) so that this can // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. Converting this whole package to use Context would // also be helpful. func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == FIFOClosedError { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
Since this is Go and the function starts with a lowercase p
, we know it is private to the controller
–struct
-backed implementation. For now, we’ll trust that this is called from somewhere useful within that implementation.
In this loop, we can sort of see intuitively that an object is popped off the Queue
supplied at creation time to this Controller
implementation, and is handed to some sort of processor function which was also supplied at creation time to this Controller
implementation. Then, if there was an error, and we’re supposed to retry, we re-enqueue the object.
More specifically, PopProcessFunc(c.config.Process)
turns out to be a Go type conversion that, in this case, converts the ProcessFunc
stored in the Config
supplied to this Controller
implementation at creation time to a PopProcessFunc
defined by the fifo.go
file. Recall that c.config.Process
is of type ProcessFunc
. (The two types seem, to this Go rookie, at any rate, equivalent.)
From all this, and looking at it from a Java programmer’s perspective, we’re starting to see an abstract class—not an interface—emerge. There are certain things that a Controller
implementation is really expected to do, in nearly all cases, whether they’re called out in the contract or not (they’re not). Specifically, its mandated Run
function is, clearly, “supposed to” invoke logic one way or another equivalent to that outlined in the processLoop
above. And it’s expected in one fashion or another that the Controller
implementation will be working with a Queue
that can be drained. And there’s an expectation that Run
will spawn parallel computation to accomplish all this. We’ll file this away for now, but now we know in quite some detail exactly what a Controller
implementation is expected to do, even if those things aren’t really called out in its contract.
One question that might occur to you at this point is: aren’t we done? Don’t we have a mechanism to reflect the Kubernetes API server via lists and watches into a cache, and, with what we now know about Controller
implementations, the ability to process that cache? Doesn’t that give us a framework to receive and act on Kubernetes resource specification messages, as discussed in part 0?
For now, accept that the answer is no, and so push everything you’ve learned so far onto your mental stack under the umbrella term of Controller
. Recall briefly that lurking underneath it are concepts like Reflector
s, Store
s, Queue
s, ListerWatcher
s, ProcessFunc
s and the like, but for now you can wrap them all up into Controller
and free up some space for what comes next: informers.
Informers
You might have picked up on a filename naming pattern at this point: Controller
is defined in controller.go
; ListerWatcher
is defined in listwatch.go
and so on. But you won’t find an informer.go
file. And you won’t find an Informer
type. Nevertheless, there is an informer concept, which you can piece together from logical, but not concrete, extensions of it.
First, let’s look at the NewInformer
function, defined in controller.go
:
// NewInformer returns a Store and a controller for populating the store // while also providing event notifications. You should only used the returned // Store for Get/List operations; Add/Modify/Deletes will cause the event // notifications to be faulty. // // Parameters: // * lw is list and watch functions for the source of the resource you want to // be informed of. // * objType is an object of the type that you expect to receive. // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate // calls, even if nothing changed). Otherwise, re-list will be delayed as // long as possible (until the upstream source closes the watch or times out, // or you stop the controller). // * h is the object you want notifications sent to. // func NewInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, ) (Store, Controller) { // This will hold the client state, as we know it. clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState) cfg := &Config{ Queue: fifo, ListerWatcher: lw, ObjectType: objType, FullResyncPeriod: resyncPeriod, RetryOnError: false, Process: func(obj interface{}) error { // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: if old, exists, err := clientState.Get(d.Object); err == nil && exists { if err := clientState.Update(d.Object); err != nil { return err } h.OnUpdate(old, d.Object) } else { if err := clientState.Add(d.Object); err != nil { return err } h.OnAdd(d.Object) } case Deleted: if err := clientState.Delete(d.Object); err != nil { return err } h.OnDelete(d.Object) } } return nil }, } return clientState, New(cfg) }
As you can see, there’s no such thing as an informer. But you can also see that there’s an implicit construct here: a particular kind of Controller
, really, whose associated Queue
is something called a DeltaFIFO
(which we’ll get to later), that therefore works on deltas, and that has some sort of notion of event handlers that are notified after a given delta is “converted” back into its respective object-verb combination. We see our old friends ListerWatcher
and Config
in there as well: squinting a bit, you can see that you are supplying the raw materials for a Reflector
and its containing Controller
as well as for a custom ProcessFunc
that delegates to the event handler stuff by way of an additional transient Store
representing the client’s state and permitting this pinball machine to determine whether a particular element being processed was an addition, modification or a deletion.
Physically, the NewInformer
function simply leverages Go’s ability to have multiple return types and returns a Controller
and its associated Store
, not an Informer since there is no such thing, and adds some unenforced requirements about the usage of the Store
it returns (i.e. don’t modify it). A client could “throw away” the Controller
and use just the returned Store
, maybe in conjunction with the event handler it supplied, or maybe not, and that Store
would serve as a convenient cache of the Kubernetes API server.
Anyway, from all this we can posit that conceptually an informer “is a” Controller
together with the ability to distribute its Queue
-related operations to an appropriate event handler. This will help us when we model this in Java, and, to Java programmers, should start to look a little bit like good old JavaBeans event listeners. This is a particularly good insight as ultimately where we’d like to get to is for some Java programmer to write some sort of method that is called when an addition, modification or deletion is found, without that programmer having to worry about all the multithreaded queue manipulation we’ve encountered so far. It also helps because this sort of thing is one of those cases where Java can end up helping us write simpler code.
As you might expect, there are different concrete kinds of (definitionally notional) informers. We’ll look at one in particular, but know that there are others. The one that we’ll look at is called a SharedIndexInformer
, and can be found in the shared_informer.go
file.
SharedIndexInformer
A SharedIndexInformer
is, itself, a SharedInformer
implementation that adds the ability to index its contents. I mention this with all of its forward references just to set the stage: you should be asking questions like: what’s shared? What’s being indexed? Why do we need to share things? and so on.
Let’s look first at the SharedInformer
contract:
// SharedInformer has a shared data cache and is capable of distributing notifications for changes // to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is // one behavior change compared to a standard Informer. When you receive a notification, the cache // will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend // on the contents of the cache exactly matching the notification you've received in handler // functions. If there was a create, followed by a delete, the cache may NOT have your item. This // has advantages over the broadcaster since it allows us to share a common cache across many // controllers. Extending the broadcaster would have required us keep duplicate caches for each // watch. type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. AddEventHandler(handler ResourceEventHandler) // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the // specified resync period. Events to a single handler are delivered sequentially, but there is // no coordination between different handlers. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) // GetStore returns the Store. GetStore() Store // GetController gives back a synthetic interface that "votes" to start the informer GetController() Controller // Run starts the shared informer, which will be stopped when stopCh is closed. Run(stopCh <-chan struct{}) // HasSynced returns true if the shared informer's store has synced. HasSynced() bool // LastSyncResourceVersion is the resource version observed when last synced with the underlying // store. The value returned is not synchronized with access to the underlying store and is not // thread-safe. LastSyncResourceVersion() string }
This tells us what any SharedInformer
must do. Recall that conceptually an informer, which has no Go construct to represent it, despite the documentation’s mention of a “standard Informer
“, which does not exist, is a combination of a Controller
and an event distribution mechanism. A SharedInformer
is a kind of informer that can support many event handlers. You may also notice that it happens to have the same function signatures as Controller
, which in Go terms means that it behaves like a Controller
, or, for all intents and purposes, effectively is one. Lastly, the Shared
part of the term SharedInformer
somewhat clumsily refers to the fact that since this particular informer construct can have many event handlers, then the single cache that it is built with—the Queue
housed by the Controller
it was built with, in most cases—becomes “shared” between those handlers as a result and by default.
All that SharedIndexInformer
adds to the picture is the ability to locate items in its cache by various keys:
type SharedIndexInformer interface { SharedInformer // AddIndexers add indexers to the informer before it starts. AddIndexers(indexers Indexers) error GetIndexer() Indexer }
Here you need to pay very close attention to singular and plural nouns. Note that what you add is an Indexers
, plural, and what you get is an Indexer
, singular. Let’s look at what those are.
We’ll start with Indexers
, the plural, since oddly enough it turns out to be the singular item, and is not, most notably, a bunch of Indexer
instances!
This is actually simply a kind of map
(singular) and can be found in the index.go
file:
// Indexers maps a name to a IndexFunc type Indexers map[string]IndexFunc
An IndexFunc
in turn is just a mapping function:
// IndexFunc knows how to provide an indexed value for an object. type IndexFunc func(obj interface{}) ([]string, error)
So hand it a Kubernetes resource, for example, and it will give you back a set (I’m assuming) of strin
g values corresponding to it in some way.
An Indexers
, a singular object with a plural name, is therefore a simple map
of such functions each of which is, in turn, indexed under its own string
key somewhere.
So you can add a bunch of these map
s to a SharedIndexInformer
for its usage.
An Indexer
, a singular noun describing an aggregate concept (!), is a collection (!) of such Indexers
instances with some additional aggregate behavior layered on top:
// Indexer is a storage interface that lets you list objects using multiple indexing functions type Indexer interface { Store // Retrieve list of objects that match on the named indexing function Index(indexName string, obj interface{}) ([]interface{}, error) // IndexKeys returns the set of keys that match on the named indexing function. IndexKeys(indexName, indexKey string) ([]string, error) // ListIndexFuncValues returns the list of generated values of an Index func ListIndexFuncValues(indexName string) []string // ByIndex lists object that match on the named indexing function with the exact key ByIndex(indexName, indexKey string) ([]interface{}, error) // GetIndexer return the indexers GetIndexers() Indexers // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error }
Clear as mud, right? So from a SharedIndexInformer
you can get its one true Indexer
that is logically comprised of various Indexers
instances added by way (hopefully) of that SharedIndexInformer
‘s AddIndexers
function, although it would also be possible to add them from the Indexer
directly.
One other extremely important thing to notice here is that an Indexer
is also a Store
! But pay very careful attention to how it is used in its capacity as a Store
.
Specifically, let’s first recall that a non-shared informer—which has no Go language reification—is conceptually a combination of a Controller
and a Store
. We have seen, for example, that the NewInformer
function returns both a Controller
and a Store
that is attached to that Controller
; the combination gives you an automatically populating cache.
In SharedIndexInformer
, the Store
into which Kubernetes API server events are reflected is a DeltaFIFO
, not an Indexer
. But an Indexer
is supplied to that DeltaFIFO
, and is the return value of the SharedIndexInformer
‘s GetStore
function! This tells us more about the implied contract of GetStore
: clearly the Store
it returns must not be used for modification!
Another extremely important thing to note is that any Store
is also a KeyListerGetter
, a KeyLister
and a KeyGetter
. A KeyListerGetter
is a combination of the KeyLister
and KeyGetter
types. A KeyLister
is anything that can list its keys, and a KeyGetter
is not something that can get a key, but something that houses other things that can be retrieved by key (like some sort of map).
So rewinding a bit, when SharedIndexInformer
creates a new DeltaFIFO
to use as its Store
, and, in turn, supplies an Indexer
to that DeltaFIFO
, it is supplying that Indexer
only in its capacity as a KeyListerGetter
. Similarly, its GetStore
method probably should really return something closer to a KeyListerGetter
than an actual Store
, since calling its modification methods is prohibited.
So clearly to understand more here, we’re going to have to dive in to DeltaFIFO
, which we’ve now seen is at the heart of all of this, and will be the subject of the next post.
Let’s review what we’ve come up with here so far:
- A
Controller
really kind of has to behave like the referencecontroller
–struct
-backedController
implementation. Specifically it needs to manage aQueue
, which is often aDeltaFIFO
, and both populate and drain it, and incorporate an event listener mechanism. - The idea of an informer exists, which is a combination of a
Controller
and theStore
that it’s hooked up to, but does not physically exist except in “shared” form. - A
SharedIndexInformer
instantiates the concept of an informer with multiple event handlers and, by definition, a singleQueue
, aDeltaFIFO
, that they share.
We might model this visually as follows: