1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.google.common.util.concurrent;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static com.google.common.base.Predicates.equalTo;
22 import static com.google.common.base.Predicates.in;
23 import static com.google.common.base.Predicates.instanceOf;
24 import static com.google.common.base.Predicates.not;
25 import static com.google.common.util.concurrent.Service.State.FAILED;
26 import static com.google.common.util.concurrent.Service.State.NEW;
27 import static com.google.common.util.concurrent.Service.State.RUNNING;
28 import static com.google.common.util.concurrent.Service.State.STARTING;
29 import static com.google.common.util.concurrent.Service.State.STOPPING;
30 import static com.google.common.util.concurrent.Service.State.TERMINATED;
31 import static java.util.concurrent.TimeUnit.MILLISECONDS;
32
33 import com.google.common.annotations.Beta;
34 import com.google.common.base.Function;
35 import com.google.common.base.Objects;
36 import com.google.common.base.Stopwatch;
37 import com.google.common.base.Supplier;
38 import com.google.common.collect.Collections2;
39 import com.google.common.collect.ImmutableCollection;
40 import com.google.common.collect.ImmutableList;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.collect.ImmutableMultimap;
43 import com.google.common.collect.ImmutableSet;
44 import com.google.common.collect.ImmutableSetMultimap;
45 import com.google.common.collect.Lists;
46 import com.google.common.collect.Maps;
47 import com.google.common.collect.Multimaps;
48 import com.google.common.collect.Multiset;
49 import com.google.common.collect.Ordering;
50 import com.google.common.collect.SetMultimap;
51 import com.google.common.collect.Sets;
52 import com.google.common.util.concurrent.ListenerCallQueue.Callback;
53 import com.google.common.util.concurrent.Service.State;
54
55 import java.lang.ref.WeakReference;
56 import java.util.ArrayList;
57 import java.util.Collection;
58 import java.util.Collections;
59 import java.util.EnumMap;
60 import java.util.List;
61 import java.util.Map;
62 import java.util.Map.Entry;
63 import java.util.Set;
64 import java.util.concurrent.Executor;
65 import java.util.concurrent.TimeUnit;
66 import java.util.concurrent.TimeoutException;
67 import java.util.logging.Level;
68 import java.util.logging.Logger;
69
70 import javax.annotation.concurrent.GuardedBy;
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 @Beta
125 public final class ServiceManager {
126 private static final Logger logger = Logger.getLogger(ServiceManager.class.getName());
127 private static final Callback<Listener> HEALTHY_CALLBACK = new Callback<Listener>("healthy()") {
128 @Override void call(Listener listener) {
129 listener.healthy();
130 }
131 };
132 private static final Callback<Listener> STOPPED_CALLBACK = new Callback<Listener>("stopped()") {
133 @Override void call(Listener listener) {
134 listener.stopped();
135 }
136 };
137
138
139
140
141
142
143
144
145
146
147 @Beta
148 public abstract static class Listener {
149
150
151
152
153
154
155
156
157 public void healthy() {}
158
159
160
161
162
163 public void stopped() {}
164
165
166
167
168
169
170 public void failure(Service service) {}
171 }
172
173
174
175
176
177
178
179
180 private final ServiceManagerState state;
181 private final ImmutableList<Service> services;
182
183
184
185
186
187
188
189
190
191 public ServiceManager(Iterable<? extends Service> services) {
192 ImmutableList<Service> copy = ImmutableList.copyOf(services);
193 if (copy.isEmpty()) {
194
195
196 logger.log(Level.WARNING,
197 "ServiceManager configured with no services. Is your application configured properly?",
198 new EmptyServiceManagerWarning());
199 copy = ImmutableList.<Service>of(new NoOpService());
200 }
201 this.state = new ServiceManagerState(copy);
202 this.services = copy;
203 WeakReference<ServiceManagerState> stateReference =
204 new WeakReference<ServiceManagerState>(state);
205 Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
206 for (Service service : copy) {
207
208
209
210
211
212
213
214
215 service.addListener(new ServiceListener(service, stateReference), sameThreadExecutor);
216
217
218 checkArgument(service.state() == NEW, "Can only manage NEW services, %s", service);
219 }
220
221
222 this.state.markReady();
223 }
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 public void addListener(Listener listener, Executor executor) {
249 state.addListener(listener, executor);
250 }
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268 public void addListener(Listener listener) {
269 state.addListener(listener, MoreExecutors.sameThreadExecutor());
270 }
271
272
273
274
275
276
277
278
279
280 public ServiceManager startAsync() {
281 for (Service service : services) {
282 State state = service.state();
283 checkState(state == NEW, "Service %s is %s, cannot start it.", service, state);
284 }
285 for (Service service : services) {
286 try {
287 service.startAsync();
288 } catch (IllegalStateException e) {
289
290
291
292
293 logger.log(Level.WARNING, "Unable to start Service " + service, e);
294 }
295 }
296 return this;
297 }
298
299
300
301
302
303
304
305
306
307 public void awaitHealthy() {
308 state.awaitHealthy();
309 }
310
311
312
313
314
315
316
317
318
319
320
321
322 public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
323 state.awaitHealthy(timeout, unit);
324 }
325
326
327
328
329
330
331
332 public ServiceManager stopAsync() {
333 for (Service service : services) {
334 service.stopAsync();
335 }
336 return this;
337 }
338
339
340
341
342
343
344 public void awaitStopped() {
345 state.awaitStopped();
346 }
347
348
349
350
351
352
353
354
355
356
357 public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
358 state.awaitStopped(timeout, unit);
359 }
360
361
362
363
364
365
366
367 public boolean isHealthy() {
368 for (Service service : services) {
369 if (!service.isRunning()) {
370 return false;
371 }
372 }
373 return true;
374 }
375
376
377
378
379
380
381
382 public ImmutableMultimap<State, Service> servicesByState() {
383 return state.servicesByState();
384 }
385
386
387
388
389
390
391
392
393 public ImmutableMap<Service, Long> startupTimes() {
394 return state.startupTimes();
395 }
396
397 @Override public String toString() {
398 return Objects.toStringHelper(ServiceManager.class)
399 .add("services", Collections2.filter(services, not(instanceOf(NoOpService.class))))
400 .toString();
401 }
402
403
404
405
406
407 private static final class ServiceManagerState {
408 final Monitor monitor = new Monitor();
409
410 @GuardedBy("monitor")
411 final SetMultimap<State, Service> servicesByState =
412 Multimaps.newSetMultimap(new EnumMap<State, Collection<Service>>(State.class),
413 new Supplier<Set<Service>>() {
414 @Override public Set<Service> get() {
415 return Sets.newLinkedHashSet();
416 }
417 });
418
419 @GuardedBy("monitor")
420 final Multiset<State> states = servicesByState.keys();
421
422 @GuardedBy("monitor")
423 final Map<Service, Stopwatch> startupTimers = Maps.newIdentityHashMap();
424
425
426
427
428
429
430
431
432
433
434
435
436 @GuardedBy("monitor")
437 boolean ready;
438
439 @GuardedBy("monitor")
440 boolean transitioned;
441
442 final int numberOfServices;
443
444
445
446
447
448 final Monitor.Guard awaitHealthGuard = new Monitor.Guard(monitor) {
449 @Override public boolean isSatisfied() {
450
451 return states.count(RUNNING) == numberOfServices
452 || states.contains(STOPPING)
453 || states.contains(TERMINATED)
454 || states.contains(FAILED);
455 }
456 };
457
458
459
460
461 final Monitor.Guard stoppedGuard = new Monitor.Guard(monitor) {
462 @Override public boolean isSatisfied() {
463 return states.count(TERMINATED) + states.count(FAILED) == numberOfServices;
464 }
465 };
466
467
468 @GuardedBy("monitor")
469 final List<ListenerCallQueue<Listener>> listeners =
470 Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());
471
472
473
474
475
476
477
478 ServiceManagerState(ImmutableCollection<Service> services) {
479 this.numberOfServices = services.size();
480 servicesByState.putAll(NEW, services);
481 for (Service service : services) {
482 startupTimers.put(service, Stopwatch.createUnstarted());
483 }
484 }
485
486
487
488
489
490 void markReady() {
491 monitor.enter();
492 try {
493 if (!transitioned) {
494
495 ready = true;
496 } else {
497
498 List<Service> servicesInBadStates = Lists.newArrayList();
499 for (Service service : servicesByState().values()) {
500 if (service.state() != NEW) {
501 servicesInBadStates.add(service);
502 }
503 }
504 throw new IllegalArgumentException("Services started transitioning asynchronously before "
505 + "the ServiceManager was constructed: " + servicesInBadStates);
506 }
507 } finally {
508 monitor.leave();
509 }
510 }
511
512 void addListener(Listener listener, Executor executor) {
513 checkNotNull(listener, "listener");
514 checkNotNull(executor, "executor");
515 monitor.enter();
516 try {
517
518 if (!stoppedGuard.isSatisfied()) {
519 listeners.add(new ListenerCallQueue<Listener>(listener, executor));
520 }
521 } finally {
522 monitor.leave();
523 }
524 }
525
526 void awaitHealthy() {
527 monitor.enterWhenUninterruptibly(awaitHealthGuard);
528 try {
529 checkHealthy();
530 } finally {
531 monitor.leave();
532 }
533 }
534
535 void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
536 monitor.enter();
537 try {
538 if (!monitor.waitForUninterruptibly(awaitHealthGuard, timeout, unit)) {
539 throw new TimeoutException("Timeout waiting for the services to become healthy. The "
540 + "following services have not started: "
541 + Multimaps.filterKeys(servicesByState, in(ImmutableSet.of(NEW, STARTING))));
542 }
543 checkHealthy();
544 } finally {
545 monitor.leave();
546 }
547 }
548
549 void awaitStopped() {
550 monitor.enterWhenUninterruptibly(stoppedGuard);
551 monitor.leave();
552 }
553
554 void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
555 monitor.enter();
556 try {
557 if (!monitor.waitForUninterruptibly(stoppedGuard, timeout, unit)) {
558 throw new TimeoutException("Timeout waiting for the services to stop. The following "
559 + "services have not stopped: "
560 + Multimaps.filterKeys(servicesByState,
561 not(in(ImmutableSet.of(TERMINATED, FAILED)))));
562 }
563 } finally {
564 monitor.leave();
565 }
566 }
567
568 ImmutableMultimap<State, Service> servicesByState() {
569 ImmutableSetMultimap.Builder<State, Service> builder = ImmutableSetMultimap.builder();
570 monitor.enter();
571 try {
572 for (Entry<State, Service> entry : servicesByState.entries()) {
573 if (!(entry.getValue() instanceof NoOpService)) {
574 builder.put(entry.getKey(), entry.getValue());
575 }
576 }
577 } finally {
578 monitor.leave();
579 }
580 return builder.build();
581 }
582
583 ImmutableMap<Service, Long> startupTimes() {
584 List<Entry<Service, Long>> loadTimes;
585 monitor.enter();
586 try {
587 loadTimes = Lists.newArrayListWithCapacity(
588 states.size() - states.count(NEW) + states.count(STARTING));
589 for (Entry<Service, Stopwatch> entry : startupTimers.entrySet()) {
590 Service service = entry.getKey();
591 Stopwatch stopWatch = entry.getValue();
592
593
594
595
596 if (!stopWatch.isRunning() && !servicesByState.containsEntry(NEW, service)
597 && !(service instanceof NoOpService)) {
598 loadTimes.add(Maps.immutableEntry(service, stopWatch.elapsed(MILLISECONDS)));
599 }
600 }
601 } finally {
602 monitor.leave();
603 }
604 Collections.sort(loadTimes, Ordering.<Long>natural()
605 .onResultOf(new Function<Entry<Service, Long>, Long>() {
606 @Override public Long apply(Map.Entry<Service, Long> input) {
607 return input.getValue();
608 }
609 }));
610 ImmutableMap.Builder<Service, Long> builder = ImmutableMap.builder();
611 for (Entry<Service, Long> entry : loadTimes) {
612 builder.put(entry);
613 }
614 return builder.build();
615 }
616
617
618
619
620
621
622
623
624
625
626
627
628 void transitionService(final Service service, State from, State to) {
629 checkNotNull(service);
630 checkArgument(from != to);
631 monitor.enter();
632 try {
633 transitioned = true;
634 if (!ready) {
635 return;
636 }
637
638 checkState(servicesByState.remove(from, service),
639 "Service %s not at the expected location in the state map %s", service, from);
640 checkState(servicesByState.put(to, service),
641 "Service %s in the state map unexpectedly at %s", service, to);
642
643 Stopwatch stopwatch = startupTimers.get(service);
644 if (from == NEW) {
645 stopwatch.start();
646 }
647 if (to.compareTo(RUNNING) >= 0 && stopwatch.isRunning()) {
648
649 stopwatch.stop();
650 if (!(service instanceof NoOpService)) {
651 logger.log(Level.FINE, "Started {0} in {1}.", new Object[] {service, stopwatch});
652 }
653 }
654
655
656
657 if (to == FAILED) {
658 fireFailedListeners(service);
659 }
660
661 if (states.count(RUNNING) == numberOfServices) {
662
663
664 fireHealthyListeners();
665 } else if (states.count(TERMINATED) + states.count(FAILED) == numberOfServices) {
666 fireStoppedListeners();
667 }
668 } finally {
669 monitor.leave();
670
671 executeListeners();
672 }
673 }
674
675 @GuardedBy("monitor")
676 void fireStoppedListeners() {
677 STOPPED_CALLBACK.enqueueOn(listeners);
678 }
679
680 @GuardedBy("monitor")
681 void fireHealthyListeners() {
682 HEALTHY_CALLBACK.enqueueOn(listeners);
683 }
684
685 @GuardedBy("monitor")
686 void fireFailedListeners(final Service service) {
687 new Callback<Listener>("failed({service=" + service + "})") {
688 @Override void call(Listener listener) {
689 listener.failure(service);
690 }
691 }.enqueueOn(listeners);
692 }
693
694
695 void executeListeners() {
696 checkState(!monitor.isOccupiedByCurrentThread(),
697 "It is incorrect to execute listeners with the monitor held.");
698
699 for (int i = 0; i < listeners.size(); i++) {
700 listeners.get(i).execute();
701 }
702 }
703
704 @GuardedBy("monitor")
705 void checkHealthy() {
706 if (states.count(RUNNING) != numberOfServices) {
707 throw new IllegalStateException("Expected to be healthy after starting. "
708 + "The following services are not running: " +
709 Multimaps.filterKeys(servicesByState, not(equalTo(RUNNING))));
710 }
711 }
712 }
713
714
715
716
717
718
719 private static final class ServiceListener extends Service.Listener {
720 final Service service;
721
722
723 final WeakReference<ServiceManagerState> state;
724
725 ServiceListener(Service service, WeakReference<ServiceManagerState> state) {
726 this.service = service;
727 this.state = state;
728 }
729
730 @Override public void starting() {
731 ServiceManagerState state = this.state.get();
732 if (state != null) {
733 state.transitionService(service, NEW, STARTING);
734 if (!(service instanceof NoOpService)) {
735 logger.log(Level.FINE, "Starting {0}.", service);
736 }
737 }
738 }
739
740 @Override public void running() {
741 ServiceManagerState state = this.state.get();
742 if (state != null) {
743 state.transitionService(service, STARTING, RUNNING);
744 }
745 }
746
747 @Override public void stopping(State from) {
748 ServiceManagerState state = this.state.get();
749 if (state != null) {
750 state.transitionService(service, from, STOPPING);
751 }
752 }
753
754 @Override public void terminated(State from) {
755 ServiceManagerState state = this.state.get();
756 if (state != null) {
757 if (!(service instanceof NoOpService)) {
758 logger.log(Level.FINE, "Service {0} has terminated. Previous state was: {1}",
759 new Object[] {service, from});
760 }
761 state.transitionService(service, from, TERMINATED);
762 }
763 }
764
765 @Override public void failed(State from, Throwable failure) {
766 ServiceManagerState state = this.state.get();
767 if (state != null) {
768
769
770 if (!(service instanceof NoOpService)) {
771 logger.log(Level.SEVERE, "Service " + service + " has failed in the " + from + " state.",
772 failure);
773 }
774 state.transitionService(service, from, FAILED);
775 }
776 }
777 }
778
779
780
781
782
783
784
785
786
787 private static final class NoOpService extends AbstractService {
788 @Override protected void doStart() { notifyStarted(); }
789 @Override protected void doStop() { notifyStopped(); }
790 }
791
792
793 private static final class EmptyServiceManagerWarning extends Throwable {}
794 }