Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not break if a day only contains a single line. #11

Merged
merged 3 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ Change log
0.3 (unreleased)
================

- Nothing changed yet.
- Do not break if a day only contains a single line.

- Render a better error message if an entry cannot be parsed.


0.2.1 (2023-12-11)
Expand Down
240 changes: 144 additions & 96 deletions gtimelog2tick.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import itertools
import pathlib
import sys
from typing import Iterable
from typing import Iterable, Optional

import requests

Expand All @@ -17,25 +17,51 @@
Entry = collections.namedtuple('Entry', ('start', 'end', 'message'))
JiraWorkLog = collections.namedtuple('JiraWorkLog', ('id', 'start', 'end'))
TickSyncStatus = collections.namedtuple(
'TickSyncStatus', ('entry', 'json', 'action'))
Task = collections.namedtuple('Task', ('name', 'id'))
'TickSyncStatus', ('worklog', 'json', 'action'))


@dataclasses.dataclass
class Task:
name: str
id: int
project: Optional['Project'] = dataclasses.field(default=None, init=False)

@property
def title(self) -> str:
project_name = (
'<unknown project>' if self.project is None else self.project.name)
return f'{project_name}: {self.name}'


@dataclasses.dataclass
class Project:
name: str
id: int
tasks: list[Task] | None
_tasks: tuple[Task] = ()

def __post_init__(self) -> None:
# Set the project for the tasks set during __init__.
self.tasks = self._tasks

@property
def tasks(self) -> tuple[Task]:
return self._tasks

@tasks.setter
def tasks(self, value: tuple[Task]) -> None:
self._tasks = value
for task in value:
task.project = self


@dataclasses.dataclass
class WorkLog:
"""Entry in the work log."""

entry: Entry
text: str
task: str
task_id: int
config: dict
_text: str | None = dataclasses.field(default=None, init=False)
_task: Task | None = dataclasses.field(default=None, init=False)

def __post_init__(self) -> None:
self.start = self.entry.start
Expand All @@ -44,6 +70,82 @@ def __post_init__(self) -> None:
int((self.entry.end - self.entry.start).total_seconds()) / 3600,
2)

@property
def text(self) -> str:
if self._text is None:
self._parse_entry_message()
return self._text

@text.setter
def text(self, value: str) -> None:
self._text = value

@property
def task(self) -> Task:
if self._task is None:
self._parse_entry_message()
return self._task

@task.setter
def task(self, value: Task) -> None:
"""Create an arbitrary task without parsing the entry message."""
self._task = value

def _parse_entry_message(self) -> None:
"""Parse entry message into task and text and store them."""
msg = self.entry.message
try:
project_name, task_name, *text_parts = msg.split(':')
except ValueError:
raise DataError(
f'Error: Unable to split {msg!r}, it needs one colon or more.')

task_name = task_name.strip()
tick_projects = [
(x, x.name == project_name)
for x in self.config['tick_projects']
if x.name.startswith(project_name)]

if not tick_projects:
raise DataError(f'Cannot find a Tick project for {msg!r}.')
if len(tick_projects) > 1:
exact_match = [x for x, match in tick_projects if match]
if not exact_match:
raise DataError(
f'Found multiple Tick projects for {msg!r}, but no'
' exact match.'
f' ({", ".join(x[0].name for x in tick_projects)})')
tick_project = exact_match[0]
else:
tick_project = tick_projects[0][0]
if not tick_project.tasks:
raw_tasks = call(
self.config, 'get', f'/projects/{tick_project.id}/tasks.json')
tick_project.tasks = tuple(
Task(x['name'], x['id']) for x in raw_tasks)

possible_tasks = [
x
for x in tick_project.tasks
if x.name.startswith(task_name)]

if not possible_tasks:
raise DataError(f'Cannot find a Tick task for {msg!r}.')
if len(possible_tasks) > 1:
exact_match = [
task for task in possible_tasks if task.name == task_name]
if not exact_match:
raise DataError(
f'Found multiple Tick tasks for {msg!r}, but no'
' exact match.'
f' ({", ".join(x.name for x in tick_project.tasks)})')
task = exact_match[0]
else:
task = possible_tasks[0]

self._task = task
self._text = ':'.join(text_parts).strip()


class ConfigurationError(Exception):
pass
Expand Down Expand Up @@ -143,7 +245,7 @@ def read_config(config_file: pathlib.Path) -> dict:
if not raw_projects:
break
tick_projects.extend(
[Project(x['name'], x['id'], None) for x in raw_projects])
[Project(x['name'], x['id']) for x in raw_projects])
page += 1
config['tick_projects'] = tick_projects
return config
Expand Down Expand Up @@ -193,60 +295,6 @@ def read_timelog(
yield Entry(last, last, last_note)


def parse_entry_message(
config: dict,
message: str
) -> tuple[str, str, int]:
"""Parse entry message into "project: task", text and task_id."""
project_name, task_name, *text_parts = message.split(':')
task_name = task_name.strip()

tick_projects = [
(x, x.name == project_name)
for x in config['tick_projects']
if x.name.startswith(project_name)]

if not tick_projects:
raise DataError(f'Cannot find a Tick project matching {message}.')
if len(tick_projects) > 1:
exact_match = [x for x, match in tick_projects if match]
if not exact_match:
raise DataError(
f'Found multiple Tick projects matching {message!r}, but no'
' exact match.'
f' ({", ".join(x[0].name for x in tick_projects)})')
tick_project = exact_match[0]
else:
tick_project = tick_projects[0][0]
if tick_project.tasks is None:
raw_tasks = call(
config, 'get', f'/projects/{tick_project.id}/tasks.json')
tick_project.tasks = [Task(x['name'], x['id']) for x in raw_tasks]

possible_tasks = [
x
for x in tick_project.tasks
if x.name.startswith(task_name)]

if not possible_tasks:
raise DataError(f'Cannot find a Tick task matching {message}.')
if len(possible_tasks) > 1:
exact_match = [
task for task in possible_tasks if task.name == task_name]
if not exact_match:
raise DataError(
f'Found multiple Tick tasks matching {message!r}, but no'
' exact match.'
f' ({", ".join(x.name for x in tick_project.tasks)})')
task = exact_match[0]
else:
task = possible_tasks[0]

task_name = f'{tick_project.name}: {task.name}'

return task_name, ':'.join(text_parts).strip(), task.id


def parse_timelog(
config: dict,
entries: Iterable[Entry],
Expand All @@ -262,8 +310,7 @@ def parse_timelog(
for x in config['requested_projects']):
continue

task, text, task_id = parse_entry_message(config, entry.message)
worklog = WorkLog(entry, text, task, task_id)
worklog = WorkLog(entry, config)
if worklog.hours > 0:
yield worklog
elif worklog.hours < 0:
Expand Down Expand Up @@ -341,67 +388,67 @@ def remove_tick_data(
entries = call(config, 'get', get_path)
for entry in entries:
date = datetime.datetime.strptime(entry['date'], '%Y-%m-%d')
sync_entry = WorkLog(
del_worklog = WorkLog(
Entry(date,
date + datetime.timedelta(hours=entry['hours']),
entry["id"]),
entry["notes"], '<unknown task name>', entry["task_id"]
)
entry["id"]), config={})
del_worklog.task = Task('<unknown task name>', entry["task_id"])
del_worklog.text = entry["notes"]
if dry_run:
yield TickSyncStatus(sync_entry, {}, 'delete (dry run)')
yield TickSyncStatus(del_worklog, {}, 'delete (dry run)')
else:
call(config, 'delete', f'/entries/{entry["id"]}.json', {204})
yield TickSyncStatus(sync_entry, {"id": entry["id"]}, 'delete')
yield TickSyncStatus(del_worklog, {"id": entry["id"]}, 'delete')


def add_tick_entry(
config: dict,
entry: WorkLog,
worklog: WorkLog,
dry_run: bool,
) -> Iterable[TickSyncStatus]:
"""Add a new tick entry."""
data = {
"date": entry.start.isoformat(),
"hours": entry.hours,
"notes": entry.text,
"task_id": entry.task_id,
"date": worklog.start.isoformat(),
"hours": worklog.hours,
"notes": worklog.text,
"task_id": worklog.task.id,
"user_id": config["user_id"],
}
if dry_run:
yield TickSyncStatus(entry, data, 'add (dry run)')
yield TickSyncStatus(worklog, data, 'add (dry run)')
else:
response = call(config, 'post', '/entries.json', {201}, data=data)
yield TickSyncStatus(entry, response, 'add')
yield TickSyncStatus(worklog, response, 'add')


def sync_with_tick(
config,
entries: Iterable[WorkLog],
worklogs: Iterable[WorkLog],
dry_run=False) -> Iterable[TickSyncStatus]:
def get_day(entry):
return entry.start.date()
for date, entries in itertools.groupby(entries, key=get_day):
for date, worklogs in itertools.groupby(worklogs, key=get_day):
yield from remove_tick_data(config, date, dry_run)
for entry in entries:
yield from add_tick_entry(config, entry, dry_run)
for worklog in worklogs:
yield from add_tick_entry(config, worklog, dry_run)


def log_tick_sync(
entries: Iterable[TickSyncStatus],
tick_sync_status_items: Iterable[TickSyncStatus],
ticklog) -> Iterable[TickSyncStatus]:
with ticklog.open('a') as f:
for entry, resp, action in entries:
comment = entry.text
for worklog, resp, action in tick_sync_status_items:
comment = worklog.text
f.write(','.join(map(str, [
get_now().isoformat(timespec='seconds'),
entry.start.isoformat(timespec='minutes'),
entry.hours,
worklog.start.isoformat(timespec='minutes'),
worklog.hours,
resp.get('id', ''),
action,
comment,
])) + '\n')

yield TickSyncStatus(entry, resp, action)
yield TickSyncStatus(worklog, resp, action)


class Date:
Expand All @@ -424,29 +471,30 @@ def __call__(self, value):


def show_results(
entries: Iterable[TickSyncStatus],
tick_sync_status_items: Iterable[TickSyncStatus],
stdout):
totals = {
'hours': collections.defaultdict(int),
'entries': collections.defaultdict(int),
}
print(file=stdout)
for entry, resp, action in entries:
for worklog, resp, action in tick_sync_status_items:
action = action.replace(' (dry run)', '')
if action == 'add':
print('ADD: {start} {amount:>8.2f}: {comment}'.format(
start=entry.start.isoformat(timespec='minutes'),
amount=entry.hours,
comment=entry.text,
start=worklog.start.isoformat(timespec='minutes'),
amount=worklog.hours,
comment=worklog.text,
), file=stdout)
totals['hours'][entry.task] += entry.hours
totals['entries'][entry.task] += 1
totals['hours'][worklog.task.title] += worklog.hours
totals['entries'][worklog.task.title] += 1
if totals['hours']:
print(file=stdout)
print('TOTALS:', file=stdout)
for task, hours in sorted(totals['hours'].items()):
entries = totals['entries'][task]
print(f'{task}: {hours:.2f} h in {entries} entries.', file=stdout)
num_entries = totals['entries'][task]
print(f'{task}: {hours:.2f} h in {num_entries} entries.',
file=stdout)


def _main(argv=None, stdout=sys.stdout):
Expand Down
Loading