forked from benitogf/pivot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinstance.go
More file actions
252 lines (229 loc) · 8.39 KB
/
Copy pathinstance.go
File metadata and controls
252 lines (229 loc) · 8.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package pivot
import (
"sync"
"sync/atomic"
"github.com/benitogf/ooo"
"github.com/benitogf/ooo/storage"
"github.com/benitogf/ooo/ui"
)
// instances stores pivot Instance per server for GetInstance lookup
var instances = make(map[*ooo.Server]*Instance)
var instancesMu sync.RWMutex
// PivotHealthStatus tracks health status for a single pivot connection
type PivotHealthStatus struct {
Healthy bool
LastCheck string
Protocol string // Protocol version of the pivot ("2.0", "unknown")
Compatible bool // true if pivot protocol matches local version
}
// Instance contains pivot callbacks for use with external storages.
// Use GetInstance(server) to retrieve after Setup.
type Instance struct {
BeforeRead func(string) // Callback for sync-on-read
SyncCallback StorageSyncCallback // Callback for storage events (write/delete sync)
ClusterURL string // Config.ClusterURL - empty for pure pivot server
SyncedKeys []string // Keys being synchronized
NodeHealth *NodeHealth // Node health tracker (only for pivot servers)
GetNodes func() []string // Function to get registered nodes (only for pivot servers)
PivotHealth map[string]*PivotHealthStatus // Health status per pivot URL (for node servers)
ExtraNodeURLs []string // Additional node URLs (can be modified after Setup)
VVManager *VVManager // Version vector manager (for both pivot and node servers)
syncerPool *syncerPool // Internal syncer pool for node servers (for testing hooks)
nodesCache *nodesCache // Cache for NodesKey address list, invalidated by storage events
triggers *triggerCoalescer // Per-node trigger coalescer (only set on pivot servers)
healthMu sync.RWMutex // Protects PivotHealth map
extraNodeURLMu sync.RWMutex // Protects ExtraNodeURLs
shutdown int32 // Atomic flag to prevent access during shutdown
}
// AddExtraNodeURL adds a node URL to receive sync notifications (for cluster leader servers).
func (i *Instance) AddExtraNodeURL(url string) {
i.extraNodeURLMu.Lock()
i.ExtraNodeURLs = append(i.ExtraNodeURLs, url)
i.extraNodeURLMu.Unlock()
}
// GetExtraNodeURLs returns a copy of the extra node URLs.
func (i *Instance) GetExtraNodeURLs() []string {
i.extraNodeURLMu.RLock()
defer i.extraNodeURLMu.RUnlock()
result := make([]string, len(i.ExtraNodeURLs))
copy(result, i.ExtraNodeURLs)
return result
}
// Shutdown marks the instance as shutting down to prevent access during close.
func (i *Instance) Shutdown() {
atomic.StoreInt32(&i.shutdown, 1)
}
// IsShutdown returns true if the instance is shutting down.
func (i *Instance) IsShutdown() bool {
return atomic.LoadInt32(&i.shutdown) != 0
}
// GetInstance returns the pivot Instance for a server configured with Setup.
// Returns nil if Setup was not called for this server.
func GetInstance(server *ooo.Server) *Instance {
instancesMu.RLock()
defer instancesMu.RUnlock()
return instances[server]
}
// GetPivotInfo returns a function that provides pivot status for the ooo UI.
// Pass the returned function to ui.Handler.GetPivotInfo to enable pivot status in the UI.
// Returns nil if pivot is not configured for this server.
func GetPivotInfo(server *ooo.Server) func() *ui.PivotInfo {
return func() *ui.PivotInfo {
instance := GetInstance(server)
if instance == nil {
return nil
}
// Determine role - mixed if server has both pivot and node keys
hasPivotKeys := false
hasNodeKeys := false
if instance.syncerPool != nil && len(instance.syncerPool.syncers) > 0 {
hasNodeKeys = true
}
// Check if any keys are in pivot mode (Local=true or no ClusterURL)
for _, keyPath := range instance.SyncedKeys {
if instance.syncerPool == nil || instance.syncerPool.keyMap[keyPath] == "" {
hasPivotKeys = true
break
}
}
var role string
if hasPivotKeys && hasNodeKeys {
role = "mixed"
} else if hasPivotKeys {
role = "pivot"
} else {
role = "node"
}
// Build node status list - only for pivot servers
var nodes []ui.PivotNodeStatus
if role == "pivot" || role == "mixed" {
// First, get nodes from GetNodes function (reads from storage)
if instance.GetNodes != nil {
registeredNodes := instance.GetNodes()
healthStatus := make(map[string]NodeStatus)
if instance.NodeHealth != nil {
for _, status := range instance.NodeHealth.GetStatus() {
healthStatus[status.Address] = status
}
}
for _, addr := range registeredNodes {
status := ui.PivotNodeStatus{
Address: addr,
Healthy: false, // Unknown until checked
LastCheck: "Never",
Protocol: "unknown", // Default until version check
}
if hs, ok := healthStatus[addr]; ok {
status.Healthy = hs.Healthy
status.LastCheck = hs.LastCheck
status.Protocol = hs.Protocol
status.Compatible = hs.Compatible
}
nodes = append(nodes, status)
}
}
// Also include nodes from health tracker that might not be in storage yet
if instance.NodeHealth != nil {
healthStatuses := instance.NodeHealth.GetStatus()
existingAddrs := make(map[string]bool)
for _, n := range nodes {
existingAddrs[n.Address] = true
}
for _, status := range healthStatuses {
if !existingAddrs[status.Address] {
nodes = append(nodes, ui.PivotNodeStatus{
Address: status.Address,
Healthy: status.Healthy,
LastCheck: status.LastCheck,
Protocol: status.Protocol,
Compatible: status.Compatible,
})
}
}
}
}
if nodes == nil {
nodes = []ui.PivotNodeStatus{}
}
// Get overall pivot health (all pivots healthy = healthy)
pivotHealthy := true
pivotLastCheck := ""
pivotProtocol := ""
pivotCompatible := true
instance.healthMu.RLock()
for _, status := range instance.PivotHealth {
if !status.Healthy {
pivotHealthy = false
}
if !status.Compatible {
pivotCompatible = false
}
if status.LastCheck > pivotLastCheck {
pivotLastCheck = status.LastCheck
}
// Use the protocol from the first pivot (typically only one for node servers)
if pivotProtocol == "" && status.Protocol != "" {
pivotProtocol = status.Protocol
}
}
instance.healthMu.RUnlock()
if pivotProtocol == "" {
pivotProtocol = "unknown"
}
return &ui.PivotInfo{
Role: role,
PivotIP: instance.ClusterURL,
SyncedKeys: instance.SyncedKeys,
Nodes: nodes,
PivotHealthy: pivotHealthy,
PivotLastCheck: pivotLastCheck,
PivotProtocol: pivotProtocol,
PivotCompatible: pivotCompatible,
}
}
}
// Attach configures an external storage for pivot synchronization.
// It starts the storage with BeforeRead callback and sets up event watching.
// This is a convenience method that replaces the manual setup:
//
// db.Start(storage.Options{BeforeRead: instance.BeforeRead})
// storage.WatchWithCallback(db, instance.SyncCallback)
//
// Optional storageOpts can be provided to pass additional storage options (e.g., AfterWrite for testing).
func (i *Instance) Attach(db storage.Database, storageOpts ...storage.Options) error {
opts := storage.Options{BeforeRead: i.BeforeRead}
if len(storageOpts) > 0 {
// Merge user options, preserving BeforeRead
userOpts := storageOpts[0]
opts.NoBroadcastKeys = userOpts.NoBroadcastKeys
opts.AfterWrite = userOpts.AfterWrite
opts.Workers = userOpts.Workers
}
if db.Active() {
// Storage already started - use SetBeforeRead to update callback safely
// This works for both memory-only and embedded storage
db.SetBeforeRead(i.BeforeRead)
} else {
// Storage not started - start it with BeforeRead configured
err := db.Start(opts)
if err != nil {
return err
}
}
storage.WatchWithCallback(db, i.SyncCallback)
return nil
}
// storeInstance stores the pivot instance for GetInstance lookup
func storeInstance(server *ooo.Server, instance *Instance) {
instancesMu.Lock()
instances[server] = instance
instancesMu.Unlock()
}
// removeInstance drops the entry from the registry. Called on shutdown so a
// process that recreates servers (tests, supervised in-process restarts)
// doesn't leak one map entry per server for the rest of the process's life.
func removeInstance(server *ooo.Server) {
instancesMu.Lock()
delete(instances, server)
instancesMu.Unlock()
}