首页 手册 参考 源码 测试 关于中文版Repository
暗色主题亮色主题

es6/observable/ConnectableObservable.js

  1. import { SubjectSubscriber } from '../Subject';
  2. import { Observable } from '../Observable';
  3. import { Subscriber } from '../Subscriber';
  4. import { Subscription } from '../Subscription';
  5. /**
  6. * @class ConnectableObservable<T>
  7. */
  8. export class ConnectableObservable extends Observable {
  9. constructor(source, subjectFactory) {
  10. super();
  11. this.source = source;
  12. this.subjectFactory = subjectFactory;
  13. this._refCount = 0;
  14. this._isComplete = false;
  15. }
  16. _subscribe(subscriber) {
  17. return this.getSubject().subscribe(subscriber);
  18. }
  19. getSubject() {
  20. const subject = this._subject;
  21. if (!subject || subject.isStopped) {
  22. this._subject = this.subjectFactory();
  23. }
  24. return this._subject;
  25. }
  26. connect() {
  27. let connection = this._connection;
  28. if (!connection) {
  29. this._isComplete = false;
  30. connection = this._connection = new Subscription();
  31. connection.add(this.source
  32. .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
  33. if (connection.closed) {
  34. this._connection = null;
  35. connection = Subscription.EMPTY;
  36. }
  37. else {
  38. this._connection = connection;
  39. }
  40. }
  41. return connection;
  42. }
  43. refCount() {
  44. return this.lift(new RefCountOperator(this));
  45. }
  46. }
  47. const connectableProto = ConnectableObservable.prototype;
  48. export const connectableObservableDescriptor = {
  49. operator: { value: null },
  50. _refCount: { value: 0, writable: true },
  51. _subject: { value: null, writable: true },
  52. _connection: { value: null, writable: true },
  53. _subscribe: { value: connectableProto._subscribe },
  54. _isComplete: { value: connectableProto._isComplete, writable: true },
  55. getSubject: { value: connectableProto.getSubject },
  56. connect: { value: connectableProto.connect },
  57. refCount: { value: connectableProto.refCount }
  58. };
  59. class ConnectableSubscriber extends SubjectSubscriber {
  60. constructor(destination, connectable) {
  61. super(destination);
  62. this.connectable = connectable;
  63. }
  64. _error(err) {
  65. this._unsubscribe();
  66. super._error(err);
  67. }
  68. _complete() {
  69. this.connectable._isComplete = true;
  70. this._unsubscribe();
  71. super._complete();
  72. }
  73. _unsubscribe() {
  74. const connectable = this.connectable;
  75. if (connectable) {
  76. this.connectable = null;
  77. const connection = connectable._connection;
  78. connectable._refCount = 0;
  79. connectable._subject = null;
  80. connectable._connection = null;
  81. if (connection) {
  82. connection.unsubscribe();
  83. }
  84. }
  85. }
  86. }
  87. class RefCountOperator {
  88. constructor(connectable) {
  89. this.connectable = connectable;
  90. }
  91. call(subscriber, source) {
  92. const { connectable } = this;
  93. connectable._refCount++;
  94. const refCounter = new RefCountSubscriber(subscriber, connectable);
  95. const subscription = source.subscribe(refCounter);
  96. if (!refCounter.closed) {
  97. refCounter.connection = connectable.connect();
  98. }
  99. return subscription;
  100. }
  101. }
  102. class RefCountSubscriber extends Subscriber {
  103. constructor(destination, connectable) {
  104. super(destination);
  105. this.connectable = connectable;
  106. }
  107. _unsubscribe() {
  108. const { connectable } = this;
  109. if (!connectable) {
  110. this.connection = null;
  111. return;
  112. }
  113. this.connectable = null;
  114. const refCount = connectable._refCount;
  115. if (refCount <= 0) {
  116. this.connection = null;
  117. return;
  118. }
  119. connectable._refCount = refCount - 1;
  120. if (refCount > 1) {
  121. this.connection = null;
  122. return;
  123. }
  124. ///
  125. // Compare the local RefCountSubscriber's connection Subscription to the
  126. // connection Subscription on the shared ConnectableObservable. In cases
  127. // where the ConnectableObservable source synchronously emits values, and
  128. // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
  129. // execution continues to here before the RefCountOperator has a chance to
  130. // supply the RefCountSubscriber with the shared connection Subscription.
  131. // For example:
  132. // ```
  133. // Observable.range(0, 10)
  134. // .publish()
  135. // .refCount()
  136. // .take(5)
  137. // .subscribe();
  138. // ```
  139. // In order to account for this case, RefCountSubscriber should only dispose
  140. // the ConnectableObservable's shared connection Subscription if the
  141. // connection Subscription exists, *and* either:
  142. // a. RefCountSubscriber doesn't have a reference to the shared connection
  143. // Subscription yet, or,
  144. // b. RefCountSubscriber's connection Subscription reference is identical
  145. // to the shared connection Subscription
  146. ///
  147. const { connection } = this;
  148. const sharedConnection = connectable._connection;
  149. this.connection = null;
  150. if (sharedConnection && (!connection || sharedConnection === connection)) {
  151. sharedConnection.unsubscribe();
  152. }
  153. }
  154. }
  155. //# sourceMappingURL=ConnectableObservable.js.map