Skip to content

Commit 4d03d3a

Browse files
feat: adding ability to create subscriptions at HEAD (#106)
* feat: adding ability to create subscriptions at head * fix: lint errors * fix: remove absl dependency * fix: lint * feat: use default keyword args * fix: rename offset location to backlog location * fix: broken samples
1 parent 62e376c commit 4d03d3a

File tree

5 files changed

+54
-8
lines changed

5 files changed

+54
-8
lines changed

google/cloud/pubsublite/admin_client.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
SubscriptionPath,
3131
LocationPath,
3232
TopicPath,
33+
BacklogLocation,
3334
)
3435
from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic
3536

@@ -101,8 +102,12 @@ def list_topic_subscriptions(self, topic_path: TopicPath):
101102
return self._impl.list_topic_subscriptions(topic_path)
102103

103104
@overrides
104-
def create_subscription(self, subscription: Subscription) -> Subscription:
105-
return self._impl.create_subscription(subscription)
105+
def create_subscription(
106+
self,
107+
subscription: Subscription,
108+
starting_offset: BacklogLocation = BacklogLocation.END,
109+
) -> Subscription:
110+
return self._impl.create_subscription(subscription, starting_offset)
106111

107112
@overrides
108113
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:

google/cloud/pubsublite/admin_client_interface.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
TopicPath,
2121
LocationPath,
2222
SubscriptionPath,
23+
BacklogLocation,
2324
)
2425
from google.cloud.pubsublite_v1 import Topic, Subscription
2526
from google.protobuf.field_mask_pb2 import FieldMask
@@ -63,8 +64,14 @@ def list_topic_subscriptions(self, topic_path: TopicPath):
6364
"""List the subscriptions that exist for a given topic."""
6465

6566
@abstractmethod
66-
def create_subscription(self, subscription: Subscription) -> Subscription:
67-
"""Create a subscription, returns the created subscription."""
67+
def create_subscription(
68+
self,
69+
subscription: Subscription,
70+
starting_offset: BacklogLocation = BacklogLocation.END,
71+
) -> Subscription:
72+
"""Create a subscription, returns the created subscription. By default
73+
a subscription will only receive messages published after the
74+
subscription was created."""
6875

6976
@abstractmethod
7077
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:

google/cloud/pubsublite/internal/wire/admin_client_impl.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
SubscriptionPath,
2323
LocationPath,
2424
TopicPath,
25+
BacklogLocation,
2526
)
2627
from google.cloud.pubsublite_v1 import (
2728
Subscription,
@@ -72,12 +73,19 @@ def list_topic_subscriptions(self, topic_path: TopicPath):
7273
]
7374
return [SubscriptionPath.parse(x) for x in subscription_strings]
7475

75-
def create_subscription(self, subscription: Subscription) -> Subscription:
76+
def create_subscription(
77+
self,
78+
subscription: Subscription,
79+
starting_offset: BacklogLocation = BacklogLocation.END,
80+
) -> Subscription:
7681
path = SubscriptionPath.parse(subscription.name)
7782
return self._underlying.create_subscription(
78-
parent=str(path.to_location_path()),
79-
subscription=subscription,
80-
subscription_id=path.name,
83+
request={
84+
"parent": str(path.to_location_path()),
85+
"subscription": subscription,
86+
"subscription_id": path.name,
87+
"skip_backlog": (starting_offset == BacklogLocation.END),
88+
}
8189
)
8290

8391
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:

google/cloud/pubsublite/types/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from .paths import LocationPath, TopicPath, SubscriptionPath
1919
from .message_metadata import MessageMetadata
2020
from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL
21+
from .backlog_location import BacklogLocation
2122

2223
__all__ = (
2324
"CloudRegion",
@@ -28,4 +29,5 @@
2829
"MessageMetadata",
2930
"SubscriptionPath",
3031
"TopicPath",
32+
"BacklogLocation",
3133
)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import enum
16+
17+
18+
class BacklogLocation(enum.Enum):
19+
"""A location with respect to the message backlog. BEGINNING refers to the
20+
location of the oldest retained message. END refers to the location past
21+
all currently published messages, skipping the entire message backlog."""
22+
23+
BEGINNING = 0
24+
END = 1

0 commit comments

Comments
 (0)