Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Changelog

# 2.0.5
* Merging upstream forked repo changes.

# 2.0.4
* Recursively call the function if `Retry-After` has the value greater than 0 [#192](https://github.com/singer-io/tap-github/pull/192)

# 2.0.3
* Handles the secondary rate limit - `Retry-After` [#191](https://github.com/singer-io/tap-github/pull/191)

# 2.0.2
* Make the tap sleep for `X-RateLimit-Reset` + `2` seconds, whenever the API rate limit is hit [#190](https://github.com/singer-io/tap-github/pull/190)

# 2.0.1
* Allow `commits` stream sync to continue when we hit an empty repo [#187](https://github.com/singer-io/tap-github/pull/187)

# 2.0.0
* Schema updates [#170](https://github.com/singer-io/tap-github/pull/170) [#169](https://github.com/singer-io/tap-github/pull/169)
* Update data types of fields in `events` and `issue_events` stream
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup, find_packages

setup(name='tap-github',
version='2.0.0',
version='2.0.5',
description='Singer.io tap for extracting data from the GitHub API',
author='Stitch',
url='http://singer.io',
Expand Down
37 changes: 29 additions & 8 deletions tap_github/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ def raise_for_error(resp, source, stream, client, should_skip_404):
except JSONDecodeError:
response_json = {}

if stream == "commits" and response_json.get("message") == "Git Repository is empty.":
LOGGER.info("Encountered an empty git repository")
return None

if error_code == 404 and should_skip_404:
# Add not accessible stream into list.
client.not_accessible_repos.add(stream)
Expand Down Expand Up @@ -150,6 +154,14 @@ def rate_throttling(response, max_sleep_seconds, min_remain_rate_limit):
"""
For rate limit errors, get the remaining time before retrying and calculate the time to sleep before making a new request.
"""
if "Retry-After" in response.headers:
# handles the secondary rate limit
seconds_to_sleep = int(response.headers['Retry-After'])
if seconds_to_sleep > 0:
LOGGER.info("API rate limit exceeded. Tap will retry the data collection after %s seconds.", seconds_to_sleep)
time.sleep(seconds_to_sleep)
#returns True if tap sleeps
return True
if 'X-RateLimit-Remaining' in response.headers:
if int(response.headers['X-RateLimit-Remaining']) <= min_remain_rate_limit:
seconds_to_sleep = calculate_seconds(int(response.headers['X-RateLimit-Reset']))
Expand All @@ -160,10 +172,13 @@ def rate_throttling(response, max_sleep_seconds, min_remain_rate_limit):

LOGGER.info("API rate limit exceeded. Tap will retry the data collection after %s seconds.", seconds_to_sleep)
time.sleep(seconds_to_sleep)
else:
# Raise an exception if `X-RateLimit-Remaining` is not found in the header.
# API does include this key header if provided base URL is not a valid github custom domain.
raise GithubException("The API call using the specified base url was unsuccessful. Please double-check the provided base URL.")
#returns True if tap sleeps
return True
return False

# Raise an exception if `X-RateLimit-Remaining` is not found in the header.
# API does include this key header if provided base URL is not a valid github custom domain.
raise GithubException("The API call using the specified base url was unsuccessful. Please double-check the provided base URL.")

class GithubClient:
"""
Expand Down Expand Up @@ -211,13 +226,19 @@ def authed_get(self, source, url, headers={}, stream="", should_skip_404 = True)
with metrics.http_request_timer(url) as timer:
self.session.headers.update(headers)
resp = self.session.request(method='get', url=url, timeout=self.get_request_timeout())
if rate_throttling(resp, self.max_sleep_seconds, self.min_remain_rate_limit):
# If the API rate limit is reached, the function will be recursively
self.authed_get(source, url, headers, stream, should_skip_404)
if resp.status_code != 200:
raise_for_error(resp, source, stream, self, should_skip_404)
timer.tags[metrics.Tag.http_status_code] = resp.status_code
rate_throttling(resp, self.max_sleep_seconds, self.min_remain_rate_limit)
if resp.status_code == 404 or resp.status_code == 422:
if resp.status_code in {404, 409, 422}:
# Return an empty response body since we're not raising a NotFoundException
resp._content = b'{}' # pylint: disable=protected-access

# In the 409 case, this is only for `commits` returning an
# error for an empty repository, so we'll treat this as an
# empty list of records to process
resp._content = b'{}' # pylint: disable=protected-access
return resp

def authed_get_all_pages(self, source, url, headers={}, stream="", should_skip_404 = True):
Expand Down Expand Up @@ -245,7 +266,7 @@ def verify_repo_access(self, url_for_repo, repo):
Call rest API to verify that the user has sufficient permissions to access this repository.
"""
try:
self.authed_get("verifying repository access", url_for_repo)
self.authed_get("verifying repository access", url_for_repo, stream="commits")
except NotFoundException:
# Throwing user-friendly error message as it checks token access
message = "HTTP-error-code: 404, Error: Please check the repository name \'{}\' or you do not have sufficient permissions to access this repository.".format(repo)
Expand Down
34 changes: 21 additions & 13 deletions tests/test_github_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ class TestGithubBookmarks(TestGithubBase):
def name():
return "tap_tester_github_bookmarks"

def calculated_states_by_stream(self, current_state, synced_records, replication_keys):
def calculated_states_by_stream(self, current_state, synced_records, replication_keys, start_date):
"""
Look at the bookmarks from a previous sync and set a new bookmark
value based off timedelta expectations. This ensures the subsequent sync will replicate
at least 1 record but, fewer records than the previous sync.
Look at the bookmarks from a previous sync and shift it to a
date to ensure the subsequent sync will replicate at least 1
record but, fewer records than the previous sync.
"""
timedelta_by_stream = {stream: [90,0,0] # {stream_name: [days, hours, minutes], ...}
# {stream_name: [days, hours, minutes], ...}
timedelta_by_stream = {stream: [90,0,0]
for stream in self.expected_streams()}

timedelta_by_stream["commits"] = [7, 0, 0]

repo = self.get_properties().get('repository')

stream_to_calculated_state = {repo: {stream: "" for stream in current_state['bookmarks'][repo].keys()}}
Expand All @@ -31,7 +34,9 @@ def calculated_states_by_stream(self, current_state, synced_records, replication
state_as_datetime = dateutil.parser.parse(state_value)

days, hours, minutes = timedelta_by_stream[stream]
calculated_state_as_datetime = state_as_datetime - datetime.timedelta(days=days, hours=hours, minutes=minutes)

start_date_as_datetime = dateutil.parser.parse(start_date)
calculated_state_as_datetime = start_date_as_datetime + datetime.timedelta(days=days, hours=hours, minutes=minutes)

state_format = '%Y-%m-%dT%H:%M:%SZ'
calculated_state_formatted = datetime.datetime.strftime(calculated_state_as_datetime, state_format)
Expand All @@ -49,7 +54,7 @@ def test_run(self):
All data of the second sync is >= the bookmark from the first sync
The number of records in the 2nd sync is less then the first
• Verify that for full table stream, all data replicated in sync 1 is replicated again in sync 2.

PREREQUISITE
For EACH stream that is incrementally replicated there are multiple rows of data with
different values for the replication key
Expand Down Expand Up @@ -83,9 +88,12 @@ def test_run(self):
### Update State Between Syncs
##########################################################################

first_sync_start_date = self.get_properties()['start_date']
new_states = {'bookmarks': dict()}
simulated_states = self.calculated_states_by_stream(first_sync_bookmarks,
first_sync_records, expected_replication_keys)
first_sync_records,
expected_replication_keys,
first_sync_start_date)
for repo, new_state in simulated_states.items():
new_states['bookmarks'][repo] = new_state
menagerie.set_state(conn_id, new_states)
Expand Down Expand Up @@ -126,7 +134,7 @@ def test_run(self):
replication_key = next(iter(expected_replication_keys[stream]))
first_bookmark_value = first_bookmark_key_value.get('since')
second_bookmark_value = second_bookmark_key_value.get('since')

first_bookmark_value_ts = self.dt_to_ts(first_bookmark_value, self.BOOKMARK_FORMAT)
second_bookmark_value_ts = self.dt_to_ts(second_bookmark_value, self.BOOKMARK_FORMAT)

Expand All @@ -147,11 +155,11 @@ def test_run(self):
# For events stream replication key value is coming in different format
if stream == 'events':
replication_key_format = self.EVENTS_RECORD_REPLICATION_KEY_FORMAT

for record in first_sync_messages:
# Verify the first sync bookmark value is the max replication key value for a given stream
replication_key_value = self.dt_to_ts(record.get(replication_key), replication_key_format)

self.assertLessEqual(
replication_key_value, first_bookmark_value_ts,
msg="First sync bookmark was set incorrectly, a record with a greater replication-key value was synced."
Expand All @@ -160,10 +168,10 @@ def test_run(self):
for record in second_sync_messages:
# Verify the second sync bookmark value is the max replication key value for a given stream
replication_key_value = self.dt_to_ts(record.get(replication_key), replication_key_format)

self.assertGreaterEqual(replication_key_value, simulated_bookmark_value,
msg="Second sync records do not respect the previous bookmark.")

self.assertLessEqual(
replication_key_value, second_bookmark_value_ts,
msg="Second sync bookmark was set incorrectly, a record with a greater replication-key value was synced."
Expand Down
35 changes: 25 additions & 10 deletions tests/test_github_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,45 @@ def get_properties(self, original: bool = True):
return return_value

def test_run(self):

streams_to_test = self.expected_streams()

# Pagination is not supported for "team_memberships" by Github API.
# Skipping "teams" stream as it's RECORD count is <= 30.
untestable_streams = {'team_memberships', 'teams'}
untestable_streams = {
'team_memberships',
'teams',
'team_members',
'collaborators',
'assignees',
}

# For some streams RECORD count were not > 30 in same test-repo.
# For some streams RECORD count were not > 30 in same test-repo.
# So, separated streams on the basis of RECORD count.
self.repository_name = 'singer-io/tap-github'
expected_stream_1 = {'comments', 'stargazers', 'commits', 'pull_requests', 'reviews', 'review_comments', 'pr_commits', 'issues'}
expected_stream_1 = {
'comments',
'stargazers',
'commits',
'pull_requests',
'reviews',
'review_comments',
'pr_commits',
'issues',
}
self.run_test(expected_stream_1)

self.repository_name = 'singer-io/test-repo'
expected_stream_2 = streams_to_test - expected_stream_1 - untestable_streams
self.run_test(expected_stream_2)

def run_test(self, streams):
"""
• Verify that for each stream you can get multiple pages of data.
• Verify that for each stream you can get multiple pages of data.
This requires we ensure more than 1 page of data exists at all times for any given stream.
• Verify by pks that the data replicated matches the data we expect.
"""

# Page size for pagination supported streams
page_size = 30
conn_id = connections.ensure_connection(self)
Expand Down Expand Up @@ -83,7 +98,7 @@ def run_test(self, streams):
# Verify that for each stream you can get multiple pages of data
self.assertGreater(record_count_sync, page_size,
msg="The number of records is not over the stream max limit")

# Chunk the replicated records (just primary keys) into expected pages
pages = []
page_count = ceil(len(primary_keys_list) / page_size)
Expand All @@ -102,4 +117,4 @@ def run_test(self, streams):

self.assertTrue(
current_page.isdisjoint(other_page), msg=f'other_page_primary_keys={other_page}'
)
)
30 changes: 22 additions & 8 deletions tests/test_github_start_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,27 @@ def test_run(self):
self.run_test(date_1, date_2, expected_stream_2)

date_2 = '2022-05-06T00:00:00Z'
expected_stream_3 = {'pull_requests', 'pr_commits', 'review_comments', 'reviews'}
expected_stream_3 = {'pr_commits', 'review_comments', 'reviews'}
self.run_test(date_1, date_2, expected_stream_3)

date_2 = '2022-01-27T00:00:00Z'
expected_stream_4 = self.expected_streams().difference(
expected_stream_1,
expected_stream_2,
expected_stream_3,
{'events', 'issues', 'pull_requests'}
)

# run the test for all the streams excluding 'events' stream
# as for 'events' stream we have to use dynamic dates
self.run_test(date_1, date_2, self.expected_streams() - expected_stream_1 - expected_stream_2 - expected_stream_3 - {'events'})
# as for 'events' stream we have to use dynamic dates.
# `issues` doesn't have enough data in this range, so we skip it too
self.run_test(date_1, date_2, expected_stream_4)

date_3 = '2023-01-27T00:00:00Z'
self.run_test(date_1, date_3, {"issues"})

date_4 = '2023-01-01T00:00:00Z'
self.run_test(date_1, date_4, {'pull_requests'})

# As per the Documentation: https://docs.github.com/en/rest/reference/activity#events
# the 'events' of past 90 days will only be returned
Expand All @@ -60,7 +74,7 @@ def test_run(self):
self.run_test(date_1, date_2, {'events'})

def run_test(self, date_1, date_2, streams):
"""
"""
• Verify that a sync with a later start date has at least one record synced
and less records than the 1st sync with a previous start date
• Verify that each stream has less records than the earlier start date sync
Expand Down Expand Up @@ -89,7 +103,7 @@ def run_test(self, date_1, date_2, streams):

# run check mode
found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1)

# table and field selection
test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1
if catalog.get('stream_name') in expected_streams]
Expand Down Expand Up @@ -130,7 +144,7 @@ def run_test(self, date_1, date_2, streams):
self.assertGreater(sum(record_count_by_stream_1.values()), sum(record_count_by_stream_2.values()))

for stream in expected_streams:
with self.subTest(stream=stream):
with self.subTest(stream=stream, start_date_1=date_1, start_date_2=date_2):

# expected values
expected_primary_keys = self.expected_primary_keys()[stream]
Expand All @@ -154,7 +168,7 @@ def run_test(self, date_1, date_2, streams):
self.assertGreater(record_count_sync_2, 0)

if expected_metadata.get(self.OBEYS_START_DATE):

# Expected bookmark key is one element in set so directly access it
bookmark_keys_list_1 = [message.get('data').get(next(iter(expected_bookmark_keys))) for message in synced_records_1.get(stream).get('messages')
if message.get('action') == 'upsert']
Expand Down Expand Up @@ -195,7 +209,7 @@ def run_test(self, date_1, date_2, streams):
self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1))

else:

# Verify that the 2nd sync with a later start date replicates the same number of
# records as the 1st sync.
self.assertEqual(record_count_sync_2, record_count_sync_1)
Expand Down